This commit is contained in:
Baobhan Sith
2025-04-20 19:32:51 +08:00
parent 7940044512
commit 1ebf05940a
3 changed files with 503 additions and 323 deletions
+251 -105
View File
@@ -8,6 +8,7 @@ import { decrypt } from './utils/crypto';
import { SftpService } from './services/sftp.service';
import { StatusMonitorService } from './services/status-monitor.service';
import * as SshService from './services/ssh.service';
import { DockerService } from './services/docker.service'; // 导入 DockerService
import { AuditLogService } from './services/audit.service'; // 导入 AuditLogService
import { AuditLogActionType } from './types/audit.types'; // 导入 AuditLogActionType
@@ -29,6 +30,7 @@ export interface ClientState { // 导出以便 Service 可以导入
dbConnectionId: number;
sftp?: SFTPWrapper; // 添加 sftp 实例 (由 SftpService 管理)
statusIntervalId?: NodeJS.Timeout; // 添加状态轮询 ID (由 StatusMonitorService 管理)
dockerStatusIntervalId?: NodeJS.Timeout; // NEW: Docker 状态轮询 ID
ipAddress?: string; // 添加 IP 地址字段
}
@@ -42,18 +44,36 @@ interface PortInfo {
}
// --- End FIX ---
// --- NEW: Stats Interface (Ensure this matches frontend) ---
// --- Docker Interfaces (Ensure this matches frontend and DockerService) ---
// Stats 接口
interface DockerStats {
ID: string;
Name: string;
CPUPerc: string;
MemUsage: string;
MemPerc: string;
NetIO: string;
BlockIO: string;
PIDs: string;
ID: string; // 来自 docker stats
Name: string; // 来自 docker stats
CPUPerc: string; // 来自 docker stats
MemUsage: string; // 来自 docker stats
MemPerc: string; // 来自 docker stats
NetIO: string; // 来自 docker stats
BlockIO: string; // 来自 docker stats
PIDs: string; // 来自 docker stats
}
// Container 接口 (包含 stats)
interface DockerContainer {
id: string; // 使用小写 id 以匹配前端期望
Names: string[];
Image: string;
ImageID: string;
Command: string;
Created: number;
State: string;
Status: string;
Ports: PortInfo[];
Labels: Record<string, string>;
stats?: DockerStats | null; // 可选的 stats 字段
}
// --- End Docker Interfaces ---
// --- 新增:解析 Ports 字符串的辅助函数 ---
function parsePortsString(portsString: string | undefined | null): PortInfo[] { // Now PortInfo is defined
if (!portsString) {
@@ -123,6 +143,7 @@ export const clientStates = new Map<string, ClientState>(); // Export clientStat
const sftpService = new SftpService(clientStates);
const statusMonitorService = new StatusMonitorService(clientStates);
const auditLogService = new AuditLogService(); // 实例化 AuditLogService
const dockerService = new DockerService(); // 实例化 DockerService (主要用于类型或未来可能的本地调用)
/**
* 清理指定会话 ID 关联的所有资源
@@ -145,10 +166,16 @@ const cleanupClientConnection = (sessionId: string | undefined) => {
state.sshShellStream?.end(); // 结束 shell 流
state.sshClient?.end(); // 结束 SSH 客户端
// 4. 从状态 Map 中移除
// 4. 清理 Docker 状态轮询定时器
if (state.dockerStatusIntervalId) {
clearInterval(state.dockerStatusIntervalId);
console.log(`WebSocket: Cleared Docker status interval for session ${sessionId}.`);
}
// 5. 从状态 Map 中移除
clientStates.delete(sessionId);
// 5. 清除 WebSocket 上的 sessionId 关联 (可选,因为 ws 可能已关闭)
// 6. 清除 WebSocket 上的 sessionId 关联 (可选,因为 ws 可能已关闭)
if (state.ws && state.ws.sessionId === sessionId) {
delete state.ws.sessionId;
}
@@ -159,9 +186,146 @@ const cleanupClientConnection = (sessionId: string | undefined) => {
}
};
// --- NEW: Reusable function to fetch remote Docker status with stats ---
const fetchRemoteDockerStatus = async (state: ClientState): Promise<{ available: boolean; containers: DockerContainer[] }> => {
if (!state || !state.sshClient) {
throw new Error('SSH client is not available in the current state.');
}
let allContainers: DockerContainer[] = [];
const statsMap = new Map<string, DockerStats>();
let isDockerCmdAvailable = true; // Assume available initially
// 1. Get basic container info
try {
const psCommand = "docker ps -a --no-trunc --format '{{json .}}'";
const { stdout: psStdout, stderr: psStderr } = await new Promise<{ stdout: string; stderr: string }>((resolve, reject) => {
let stdout = '';
let stderr = '';
state.sshClient.exec(psCommand, { pty: false }, (err, stream) => {
if (err) return reject(err);
stream.on('data', (data: Buffer) => { stdout += data.toString(); });
stream.stderr.on('data', (data: Buffer) => { stderr += data.toString(); });
stream.on('close', (code: number | null) => {
// Don't reject on non-zero code here, check stderr
resolve({ stdout, stderr });
});
stream.on('error', (execErr: Error) => reject(execErr));
});
});
if (psStderr.includes('command not found') || psStderr.includes('Cannot connect to the Docker daemon')) {
console.warn(`[fetchRemoteDockerStatus] Docker ps command failed on session ${state.ws.sessionId}. Docker unavailable. Stderr: ${psStderr}`);
isDockerCmdAvailable = false;
return { available: false, containers: [] };
} else if (psStderr) {
console.warn(`[fetchRemoteDockerStatus] Docker ps command stderr on session ${state.ws.sessionId}: ${psStderr}`);
// Continue execution but log the warning
}
const lines = psStdout.trim().split('\n');
allContainers = lines
.map(line => {
try {
const data = JSON.parse(line);
// Map raw data to DockerContainer interface (lowercase id)
const container: DockerContainer = {
id: data.ID, // Map ID to lowercase id
Names: typeof data.Names === 'string' ? data.Names.split(',') : (data.Names || []),
Image: data.Image || '',
ImageID: data.ImageID || '',
Command: data.Command || '',
Created: data.CreatedAt || 0, // Check if CreatedAt exists
State: data.State || 'unknown',
Status: data.Status || '',
Ports: parsePortsString(data.Ports),
Labels: data.Labels || {},
stats: null // Initialize stats as null
};
return container;
} catch (parseError) {
console.error(`[fetchRemoteDockerStatus] Failed to parse container JSON line for session ${state.ws.sessionId}: ${line}`, parseError);
return null;
}
})
.filter((container): container is DockerContainer => container !== null);
} catch (error: any) {
console.error(`[fetchRemoteDockerStatus] Error executing docker ps for session ${state.ws.sessionId}:`, error);
// Check if error indicates docker unavailable
const errorMessage = error.message || '';
if (errorMessage.includes('command not found') || errorMessage.includes('Cannot connect to the Docker daemon')) {
isDockerCmdAvailable = false;
return { available: false, containers: [] };
}
// Otherwise, throw to indicate a more general fetch error
throw new Error(`Failed to get remote Docker container list: ${errorMessage}`);
}
// If docker ps failed indicating unavailability, return early
if (!isDockerCmdAvailable) {
return { available: false, containers: [] };
}
// 2. Get stats for running containers
try {
const statsCommand = "docker stats --no-stream --format '{{json .}}'";
const { stdout: statsStdout, stderr: statsStderr } = await new Promise<{ stdout: string; stderr: string }>((resolve, reject) => {
let stdout = '';
let stderr = '';
state.sshClient.exec(statsCommand, { pty: false }, (err, stream) => {
if (err) return reject(err);
stream.on('data', (data: Buffer) => { stdout += data.toString(); });
stream.stderr.on('data', (data: Buffer) => { stderr += data.toString(); });
stream.on('close', (code: number | null) => {
// Don't reject on non-zero code, check stderr
resolve({ stdout, stderr });
});
stream.on('error', (execErr: Error) => reject(execErr));
});
});
if (statsStderr) {
// Log stats errors but don't necessarily fail the whole process
console.warn(`[fetchRemoteDockerStatus] Docker stats command stderr on session ${state.ws.sessionId}: ${statsStderr}`);
}
const statsLines = statsStdout.trim().split('\n');
statsLines.forEach(line => {
try {
const statsData = JSON.parse(line) as DockerStats;
if (statsData.ID) {
// Use the ID from stats data (usually short ID) as the key
statsMap.set(statsData.ID, statsData);
}
} catch (parseError) {
console.error(`[fetchRemoteDockerStatus] Failed to parse stats JSON line for session ${state.ws.sessionId}: ${line}`, parseError);
}
});
} catch (error: any) {
// Failure to get stats is not critical, just log and continue
console.warn(`[fetchRemoteDockerStatus] Error executing docker stats for session ${state.ws.sessionId}:`, error);
}
// 3. Merge stats into containers
allContainers.forEach(container => {
const shortId = container.id.substring(0, 12); // docker stats often uses short ID
const stats = statsMap.get(container.id) || statsMap.get(shortId); // Try matching long and short ID
if (stats) {
container.stats = stats;
}
});
return { available: true, containers: allContainers };
};
// --- End fetchRemoteDockerStatus function ---
export const initializeWebSocket = async (server: http.Server, sessionParser: RequestHandler): Promise<WebSocketServer> => { // Make async
const wss = new WebSocketServer({ noServer: true });
const db = await getDbInstance(); // 获取数据库实例 (use await and getDbInstance)
const DOCKER_STATUS_INTERVAL = 2000; // Poll Docker status every 2 seconds
// --- 心跳检测 ---
const heartbeatInterval = setInterval(() => {
@@ -325,6 +489,58 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
console.log(`WebSocket: 会话 ${newSessionId} 正在启动状态监控...`);
statusMonitorService.startStatusPolling(newSessionId);
// 8. Start Docker status polling
console.log(`WebSocket: 会话 ${newSessionId} 正在启动 Docker 状态轮询...`);
const dockerIntervalId = setInterval(async () => {
const currentState = clientStates.get(newSessionId); // Re-fetch state
if (!currentState || currentState.ws.readyState !== WebSocket.OPEN) {
console.log(`[Docker Polling] Session ${newSessionId} no longer valid or WS closed. Stopping poll.`);
clearInterval(dockerIntervalId);
return;
}
try {
// console.log(`[Docker Polling] Fetching status for session ${newSessionId}...`);
const statusPayload = await fetchRemoteDockerStatus(currentState);
if (currentState.ws.readyState === WebSocket.OPEN) {
currentState.ws.send(JSON.stringify({ type: 'docker:status:update', payload: statusPayload }));
}
} catch (error: any) {
console.error(`[Docker Polling] Error fetching Docker status for session ${newSessionId}:`, error);
// Optionally send error to client, or just log
// if (currentState.ws.readyState === WebSocket.OPEN) {
// currentState.ws.send(JSON.stringify({ type: 'docker:status:error', payload: { message: `Polling failed: ${error.message}` } }));
// }
}
}, DOCKER_STATUS_INTERVAL);
newState.dockerStatusIntervalId = dockerIntervalId;
// 9. Trigger initial Docker status fetch immediately
(async () => {
const currentState = clientStates.get(newSessionId);
if (currentState && currentState.ws.readyState === WebSocket.OPEN) {
try {
console.log(`[Docker Initial Fetch] Fetching status for session ${newSessionId}...`);
const statusPayload = await fetchRemoteDockerStatus(currentState);
if (currentState.ws.readyState === WebSocket.OPEN) {
currentState.ws.send(JSON.stringify({ type: 'docker:status:update', payload: statusPayload }));
}
} catch (error: any) {
console.error(`[Docker Initial Fetch] Error fetching Docker status for session ${newSessionId}:`, error);
if (currentState.ws.readyState === WebSocket.OPEN) {
// Send specific error type for initial fetch failure
const errorMessage = error.message || 'Unknown error during initial fetch';
const isUnavailable = errorMessage.includes('command not found') || errorMessage.includes('Cannot connect to the Docker daemon');
if (isUnavailable) {
currentState.ws.send(JSON.stringify({ type: 'docker:status:update', payload: { available: false, containers: [] } }));
} else {
currentState.ws.send(JSON.stringify({ type: 'docker:status:error', payload: { message: `Initial Docker status fetch failed: ${errorMessage}` } }));
}
}
}
}
})();
} catch (shellError: any) {
console.error(`SSH: 会话 ${newSessionId} 打开 Shell 失败:`, shellError);
// 记录审计日志:打开 Shell 失败
@@ -340,7 +556,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
cleanupClientConnection(newSessionId);
}
// 8. 设置 SSH Client 的关闭和错误处理 (移到 Shell 成功打开之后)
// 10. 设置 SSH Client 的关闭和错误处理 (移到 Shell 成功打开之后)
sshClient.on('close', () => {
console.log(`SSH: 会话 ${newSessionId} 的客户端连接已关闭。`);
cleanupClientConnection(newSessionId);
@@ -393,106 +609,36 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
break;
}
// --- NEW: Handle Docker Status Request ---
// --- REFACTORED: Handle Docker Status Request ---
case 'docker:get_status': {
if (!state || !state.sshClient) {
console.warn(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求,但无活动 SSH 连接`);
ws.send(JSON.stringify({ type: 'docker:status:error', payload: { message: 'SSH connection not active.' } }));
if (!state) {
console.warn(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求,但无活动会话状态`);
ws.send(JSON.stringify({ type: 'docker:status:error', payload: { message: 'Session state not found.' } }));
return;
}
console.log(`WebSocket: 处理来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求...`);
if (!state.sshClient) {
console.warn(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求,但无活动 SSH 连接。`);
ws.send(JSON.stringify({ type: 'docker:status:error', payload: { message: 'SSH connection not active.' } }));
return;
}
console.log(`WebSocket: 处理来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求 (手动触发)...`);
try {
// Execute docker ps command remotely
const command = "docker ps -a --no-trunc --format '{{json .}}'";
const execResult = await new Promise<{ stdout: string; stderr: string }>((resolve, reject) => {
let stdout = '';
let stderr = '';
state.sshClient.exec(command, { pty: false }, (err, stream) => { // pty: false might be better for non-interactive commands
if (err) return reject(err);
stream.on('data', (data: Buffer) => { stdout += data.toString(); });
stream.stderr.on('data', (data: Buffer) => { stderr += data.toString(); });
stream.on('close', (code: number | null, signal: string | null) => {
if (code === 0) {
resolve({ stdout, stderr });
} else {
// Check if stderr indicates docker not found or cannot connect
if (stderr.includes('command not found') || stderr.includes('Cannot connect to the Docker daemon')) {
console.warn(`WebSocket: 远程 Docker 命令 (${command}) 执行失败 (可能未安装或未运行) on session ${sessionId}. Stderr: ${stderr}`);
// Send specific 'unavailable' status back
ws.send(JSON.stringify({ type: 'docker:status:update', payload: { available: false, containers: [] } }));
// Resolve normally here as we handled the 'unavailable' case
resolve({ stdout: '', stderr });
} else {
reject(new Error(`Command failed with code ${code}. Stderr: ${stderr}`));
}
}
});
// Add type annotation for execErr
stream.on('error', (execErr: Error) => reject(execErr));
});
});
// If stdout is empty, it means the command failed in a way we handled (like docker unavailable)
if (!execResult.stdout.trim()) {
// Response already sent if docker was unavailable
if (!execResult.stderr.includes('command not found') && !execResult.stderr.includes('Cannot connect to the Docker daemon')) {
console.warn(`WebSocket: Docker ps command for session ${sessionId} produced no output, but no specific error detected. Assuming available but no containers.`);
ws.send(JSON.stringify({ type: 'docker:status:update', payload: { available: true, containers: [] } }));
}
return;
}
// Parse the multi-line JSON output
const lines = execResult.stdout.trim().split('\n');
const containers = lines.map(line => {
try {
const data = JSON.parse(line);
// --- FIX: Use parsePortsString ---
const containerData = {
id: data.ID, // Assume original field is uppercase ID
Names: typeof data.Names === 'string' ? data.Names.split(',') : (data.Names || []),
Image: data.Image || '',
ImageID: data.ImageID || '',
Command: data.Command || '',
Created: data.CreatedAt || 0, // Check if CreatedAt exists
State: data.State || 'unknown',
Status: data.Status || '',
Ports: parsePortsString(data.Ports), // <--- Use the parser here
Labels: data.Labels || {}
// Add other fields as needed, mapping from data.*
};
// --- End FIX ---
// --- Add Log to verify parsed container ---
// console.log(`Parsed Container Data (Session: ${sessionId}):`, containerData);
// --- End Log ---
return containerData;
} catch (parseError) {
console.error(`WebSocket: Failed to parse remote docker ps JSON line for session ${sessionId}: ${line}`, parseError);
return null;
}
}).filter(Boolean); // Filter out nulls from parse errors
// --- Add Log to verify final containers array ---
// console.log(`Final Containers Array to Send (Session: ${sessionId}):`, containers);
// --- End Log ---
ws.send(JSON.stringify({ type: 'docker:status:update', payload: { available: true, containers } }));
// Call the reusable function
const statusPayload = await fetchRemoteDockerStatus(state);
ws.send(JSON.stringify({ type: 'docker:status:update', payload: statusPayload }));
} catch (error: any) {
console.error(`WebSocket: 执行远程 Docker 状态命令失败 for session ${sessionId}:`, error);
// Check if error indicates docker not found or cannot connect
const errorMessage = error.message || '';
if (errorMessage.includes('command not found') || errorMessage.includes('Cannot connect to the Docker daemon')) {
ws.send(JSON.stringify({ type: 'docker:status:update', payload: { available: false, containers: [] } }));
} else {
ws.send(JSON.stringify({ type: 'docker:status:error', payload: { message: `Failed to get remote Docker status: ${errorMessage}` } }));
}
console.error(`WebSocket: 手动执行远程 Docker 状态命令失败 for session ${sessionId}:`, error);
const errorMessage = error.message || 'Unknown error fetching status';
// Send specific error if Docker unavailable, general error otherwise
const isUnavailable = errorMessage.includes('command not found') || errorMessage.includes('Cannot connect to the Docker daemon');
if (isUnavailable) {
ws.send(JSON.stringify({ type: 'docker:status:update', payload: { available: false, containers: [] } }));
} else {
ws.send(JSON.stringify({ type: 'docker:status:error', payload: { message: `Failed to get remote Docker status: ${errorMessage}` } }));
}
}
break;
} // end case 'docker:get_status'
} // end case 'docker:get_status' (Refactored)
// --- NEW: Handle Docker Command Execution ---
case 'docker:command': {