update
This commit is contained in:
+110
-116
@@ -2,16 +2,14 @@ import WebSocket, { WebSocketServer } from 'ws';
|
||||
import http from 'http';
|
||||
import { Request, RequestHandler } from 'express';
|
||||
import { Client, ClientChannel } from 'ssh2';
|
||||
import { v4 as uuidv4 } from 'uuid'; // 用于生成唯一的会话 ID
|
||||
import { getDbInstance } from './database/connection'; // Updated import path, use getDbInstance
|
||||
import { decrypt } from './utils/crypto';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import { getDbInstance } from './database/connection';
|
||||
import { SftpService } from './services/sftp.service';
|
||||
import { StatusMonitorService } from './services/status-monitor.service';
|
||||
import * as SshService from './services/ssh.service';
|
||||
import { DockerService } from './services/docker.service'; // 导入 DockerService
|
||||
import { AuditLogService } from './services/audit.service'; // 导入 AuditLogService
|
||||
import { AuditLogActionType } from './types/audit.types'; // 导入 AuditLogActionType
|
||||
import { settingsService } from './services/settings.service'; // +++ 修正导入路径 +++
|
||||
import { DockerService } from './services/docker.service';
|
||||
import { AuditLogService } from './services/audit.service';
|
||||
import { settingsService } from './services/settings.service';
|
||||
|
||||
// 扩展 WebSocket 类型以包含会话 ID
|
||||
interface AuthenticatedWebSocket extends WebSocket {
|
||||
@@ -35,15 +33,14 @@ export interface ClientState { // 导出以便 Service 可以导入
|
||||
ipAddress?: string; // 添加 IP 地址字段
|
||||
}
|
||||
|
||||
// --- Interfaces (保持与前端一致) ---
|
||||
// --- FIX: Move PortInfo definition before its usage ---
|
||||
|
||||
interface PortInfo {
|
||||
IP?: string;
|
||||
PrivatePort: number;
|
||||
PublicPort?: number;
|
||||
Type: 'tcp' | 'udp' | string;
|
||||
}
|
||||
// --- End FIX ---
|
||||
|
||||
|
||||
// --- Docker Interfaces (Ensure this matches frontend and DockerService) ---
|
||||
// Stats 接口
|
||||
@@ -72,16 +69,14 @@ interface DockerContainer {
|
||||
Labels: Record<string, string>;
|
||||
stats?: DockerStats | null; // 可选的 stats 字段
|
||||
}
|
||||
// --- End Docker Interfaces ---
|
||||
|
||||
|
||||
// --- 新增:解析 Ports 字符串的辅助函数 ---
|
||||
function parsePortsString(portsString: string | undefined | null): PortInfo[] { // Now PortInfo is defined
|
||||
function parsePortsString(portsString: string | undefined | null): PortInfo[] {
|
||||
if (!portsString) {
|
||||
return [];
|
||||
}
|
||||
const ports: PortInfo[] = []; // Now PortInfo is defined
|
||||
// 示例格式: "0.0.0.0:8080->80/tcp, :::8080->80/tcp", "127.0.0.1:5432->5432/tcp", "6379/tcp"
|
||||
const entries = portsString.split(', ');
|
||||
|
||||
for (const entry of entries) {
|
||||
@@ -89,17 +84,16 @@ function parsePortsString(portsString: string | undefined | null): PortInfo[] {
|
||||
let publicPart = '';
|
||||
let privatePart = '';
|
||||
|
||||
if (parts.length === 2) { // Format like "IP:PublicPort->PrivatePort/Type" or "PublicPort->PrivatePort/Type"
|
||||
if (parts.length === 2) {
|
||||
publicPart = parts[0];
|
||||
privatePart = parts[1];
|
||||
} else if (parts.length === 1) { // Format like "PrivatePort/Type"
|
||||
} else if (parts.length === 1) {
|
||||
privatePart = parts[0];
|
||||
} else {
|
||||
console.warn(`[WebSocket] Skipping unparsable port entry: ${entry}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Parse Private Part (e.g., "80/tcp")
|
||||
const privateMatch = privatePart.match(/^(\d+)\/(tcp|udp|\w+)$/);
|
||||
if (!privateMatch) {
|
||||
console.warn(`[WebSocket] Skipping unparsable private port part: ${privatePart}`);
|
||||
@@ -111,15 +105,15 @@ function parsePortsString(portsString: string | undefined | null): PortInfo[] {
|
||||
let ip: string | undefined = undefined;
|
||||
let publicPort: number | undefined = undefined;
|
||||
|
||||
// Parse Public Part (e.g., "0.0.0.0:8080" or ":::8080" or just "8080")
|
||||
|
||||
if (publicPart) {
|
||||
const publicMatch = publicPart.match(/^(?:([\d.:a-fA-F]+):)?(\d+)$/); // Supports IPv4, IPv6, or just port
|
||||
const publicMatch = publicPart.match(/^(?:([\d.:a-fA-F]+):)?(\d+)$/);
|
||||
if (publicMatch) {
|
||||
ip = publicMatch[1] || undefined; // IP might be undefined if only port is specified
|
||||
ip = publicMatch[1] || undefined;
|
||||
publicPort = parseInt(publicMatch[2], 10);
|
||||
} else {
|
||||
console.warn(`[WebSocket] Skipping unparsable public port part: ${publicPart}`);
|
||||
// Continue processing with only private port info if public part is weird
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,10 +128,10 @@ function parsePortsString(portsString: string | undefined | null): PortInfo[] {
|
||||
}
|
||||
return ports;
|
||||
}
|
||||
// --- 结束辅助函数 ---
|
||||
|
||||
|
||||
// 存储所有活动客户端的状态 (key: sessionId)
|
||||
export const clientStates = new Map<string, ClientState>(); // Export clientStates
|
||||
export const clientStates = new Map<string, ClientState>();
|
||||
|
||||
// --- 服务实例化 ---
|
||||
// 将 clientStates 传递给需要访问共享状态的服务
|
||||
@@ -183,11 +177,11 @@ const cleanupClientConnection = (sessionId: string | undefined) => {
|
||||
|
||||
console.log(`WebSocket: 会话 ${sessionId} 已清理。`);
|
||||
} else {
|
||||
// console.log(`WebSocket: 清理时未找到会话 ${sessionId} 的状态。`);
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
// --- NEW: Reusable function to fetch remote Docker status with stats ---
|
||||
|
||||
const fetchRemoteDockerStatus = async (state: ClientState): Promise<{ available: boolean; containers: DockerContainer[] }> => {
|
||||
if (!state || !state.sshClient) {
|
||||
throw new Error('SSH client is not available in the current state.');
|
||||
@@ -195,9 +189,9 @@ const fetchRemoteDockerStatus = async (state: ClientState): Promise<{ available:
|
||||
|
||||
let allContainers: DockerContainer[] = [];
|
||||
const statsMap = new Map<string, DockerStats>();
|
||||
let isDockerCmdAvailable = false; // Start assuming unavailable until version check passes
|
||||
let isDockerCmdAvailable = false;
|
||||
|
||||
// --- 1. Check Docker Availability with 'docker version' ---
|
||||
|
||||
try {
|
||||
const versionCommand = "docker version --format '{{.Server.Version}}'";
|
||||
console.log(`[fetchRemoteDockerStatus] Executing: ${versionCommand} on session ${state.ws.sessionId}`);
|
||||
@@ -209,43 +203,43 @@ const fetchRemoteDockerStatus = async (state: ClientState): Promise<{ available:
|
||||
stream.on('data', (data: Buffer) => { stdout += data.toString(); });
|
||||
stream.stderr.on('data', (data: Buffer) => { stderr += data.toString(); });
|
||||
stream.on('close', (code: number | null) => {
|
||||
// Resolve even if code is non-zero, check stderr
|
||||
|
||||
resolve({ stdout, stderr });
|
||||
});
|
||||
stream.on('error', (execErr: Error) => reject(execErr));
|
||||
});
|
||||
});
|
||||
|
||||
// Check stderr for common errors indicating Docker is unavailable or inaccessible
|
||||
|
||||
if (versionStderr.includes('command not found') ||
|
||||
versionStderr.includes('permission denied') ||
|
||||
versionStderr.includes('Cannot connect to the Docker daemon')) {
|
||||
console.warn(`[fetchRemoteDockerStatus] Docker version check failed on session ${state.ws.sessionId}. Docker unavailable or inaccessible. Stderr: ${versionStderr.trim()}`);
|
||||
return { available: false, containers: [] }; // Docker not available
|
||||
return { available: false, containers: [] };
|
||||
} else if (versionStderr) {
|
||||
// Log other stderr outputs as warnings but proceed
|
||||
|
||||
console.warn(`[fetchRemoteDockerStatus] Docker version command stderr on session ${state.ws.sessionId}: ${versionStderr.trim()}`);
|
||||
}
|
||||
|
||||
// If stdout has content (version number), Docker is likely available
|
||||
|
||||
if (versionStdout.trim()) {
|
||||
console.log(`[fetchRemoteDockerStatus] Docker version check successful on session ${state.ws.sessionId}. Version: ${versionStdout.trim()}`);
|
||||
isDockerCmdAvailable = true;
|
||||
} else {
|
||||
// If stdout is empty but no critical error in stderr, still assume unavailable
|
||||
|
||||
console.warn(`[fetchRemoteDockerStatus] Docker version check on session ${state.ws.sessionId} produced no output, assuming Docker unavailable.`);
|
||||
return { available: false, containers: [] };
|
||||
}
|
||||
|
||||
} catch (error: any) {
|
||||
console.error(`[fetchRemoteDockerStatus] Error executing docker version for session ${state.ws.sessionId}:`, error);
|
||||
// Treat any error during version check as Docker being unavailable
|
||||
|
||||
return { available: false, containers: [] };
|
||||
}
|
||||
|
||||
// If version check failed, we already returned. If it passed, isDockerCmdAvailable is true.
|
||||
|
||||
|
||||
// --- 2. Get basic container info ---
|
||||
|
||||
try {
|
||||
const psCommand = "docker ps -a --no-trunc --format '{{json .}}'";
|
||||
console.log(`[fetchRemoteDockerStatus] Executing: ${psCommand} on session ${state.ws.sessionId}`);
|
||||
@@ -257,44 +251,44 @@ const fetchRemoteDockerStatus = async (state: ClientState): Promise<{ available:
|
||||
stream.on('data', (data: Buffer) => { stdout += data.toString(); });
|
||||
stream.stderr.on('data', (data: Buffer) => { stderr += data.toString(); });
|
||||
stream.on('close', (code: number | null) => {
|
||||
// Don't reject on non-zero code here, check stderr
|
||||
|
||||
resolve({ stdout, stderr });
|
||||
});
|
||||
stream.on('error', (execErr: Error) => reject(execErr));
|
||||
});
|
||||
});
|
||||
|
||||
// Although version check should catch most, double-check ps stderr
|
||||
if (psStderr.includes('command not found') || // Should not happen if version check passed
|
||||
psStderr.includes('permission denied') || // Could still happen if permissions differ
|
||||
psStderr.includes('Cannot connect to the Docker daemon')) { // Should not happen
|
||||
|
||||
if (psStderr.includes('command not found') ||
|
||||
psStderr.includes('permission denied') ||
|
||||
psStderr.includes('Cannot connect to the Docker daemon')) {
|
||||
console.warn(`[fetchRemoteDockerStatus] Docker ps command failed unexpectedly after version check on session ${state.ws.sessionId}. Stderr: ${psStderr.trim()}`);
|
||||
// Report as available=false, as ps failed critically
|
||||
|
||||
return { available: false, containers: [] };
|
||||
} else if (psStderr) {
|
||||
console.warn(`[fetchRemoteDockerStatus] Docker ps command stderr on session ${state.ws.sessionId}: ${psStderr.trim()}`);
|
||||
// Continue execution but log the warning
|
||||
|
||||
}
|
||||
|
||||
// If stdout is empty, there are no containers, which is valid
|
||||
|
||||
const lines = psStdout.trim() ? psStdout.trim().split('\n') : [];
|
||||
allContainers = lines
|
||||
.map(line => {
|
||||
try {
|
||||
const data = JSON.parse(line);
|
||||
// Map raw data to DockerContainer interface (lowercase id)
|
||||
|
||||
const container: DockerContainer = {
|
||||
id: data.ID, // Map ID to lowercase id
|
||||
id: data.ID,
|
||||
Names: typeof data.Names === 'string' ? data.Names.split(',') : (data.Names || []),
|
||||
Image: data.Image || '',
|
||||
ImageID: data.ImageID || '',
|
||||
Command: data.Command || '',
|
||||
Created: data.CreatedAt || 0, // Check if CreatedAt exists
|
||||
Created: data.CreatedAt || 0,
|
||||
State: data.State || 'unknown',
|
||||
Status: data.Status || '',
|
||||
Ports: parsePortsString(data.Ports),
|
||||
Labels: data.Labels || {},
|
||||
stats: null // Initialize stats as null
|
||||
stats: null
|
||||
};
|
||||
return container;
|
||||
} catch (parseError) {
|
||||
@@ -306,19 +300,19 @@ const fetchRemoteDockerStatus = async (state: ClientState): Promise<{ available:
|
||||
|
||||
} catch (error: any) {
|
||||
console.error(`[fetchRemoteDockerStatus] Error executing docker ps for session ${state.ws.sessionId}:`, error);
|
||||
// If ps command fails after version check, report as unavailable
|
||||
|
||||
return { available: false, containers: [] };
|
||||
// Rethrowing might be too aggressive here, better to report unavailability
|
||||
// throw new Error(`Failed to get remote Docker container list: ${error.message || error}`);
|
||||
|
||||
|
||||
}
|
||||
|
||||
// --- 3. Get stats for running containers (only if ps was successful) ---
|
||||
// Check if there are any containers before running stats
|
||||
|
||||
|
||||
const runningContainerIds = allContainers.filter(c => c.State === 'running').map(c => c.id);
|
||||
|
||||
if (runningContainerIds.length > 0) {
|
||||
try {
|
||||
// Construct command to get stats only for running containers
|
||||
|
||||
const statsCommand = `docker stats ${runningContainerIds.join(' ')} --no-stream --format '{{json .}}'`;
|
||||
console.log(`[fetchRemoteDockerStatus] Executing: ${statsCommand} on session ${state.ws.sessionId}`);
|
||||
const { stdout: statsStdout, stderr: statsStderr } = await new Promise<{ stdout: string; stderr: string }>((resolve, reject) => {
|
||||
@@ -329,7 +323,7 @@ const fetchRemoteDockerStatus = async (state: ClientState): Promise<{ available:
|
||||
stream.on('data', (data: Buffer) => { stdout += data.toString(); });
|
||||
stream.stderr.on('data', (data: Buffer) => { stderr += data.toString(); });
|
||||
stream.on('close', (code: number | null) => {
|
||||
// Don't reject on non-zero code, check stderr
|
||||
|
||||
resolve({ stdout, stderr });
|
||||
});
|
||||
stream.on('error', (execErr: Error) => reject(execErr));
|
||||
@@ -337,7 +331,7 @@ const fetchRemoteDockerStatus = async (state: ClientState): Promise<{ available:
|
||||
});
|
||||
|
||||
if (statsStderr) {
|
||||
// Log stats errors but don't necessarily fail the whole process
|
||||
|
||||
console.warn(`[fetchRemoteDockerStatus] Docker stats command stderr on session ${state.ws.sessionId}: ${statsStderr.trim()}`);
|
||||
}
|
||||
|
||||
@@ -346,7 +340,7 @@ const fetchRemoteDockerStatus = async (state: ClientState): Promise<{ available:
|
||||
try {
|
||||
const statsData = JSON.parse(line) as DockerStats;
|
||||
if (statsData.ID) {
|
||||
// Use the ID from stats data (usually short ID) as the key
|
||||
|
||||
statsMap.set(statsData.ID, statsData);
|
||||
}
|
||||
} catch (parseError) {
|
||||
@@ -354,32 +348,32 @@ const fetchRemoteDockerStatus = async (state: ClientState): Promise<{ available:
|
||||
}
|
||||
});
|
||||
} catch (error: any) {
|
||||
// Failure to get stats is not critical, just log and continue
|
||||
|
||||
console.warn(`[fetchRemoteDockerStatus] Error executing docker stats for session ${state.ws.sessionId}:`, error);
|
||||
}
|
||||
} else {
|
||||
console.log(`[fetchRemoteDockerStatus] No running containers found on session ${state.ws.sessionId}, skipping docker stats.`);
|
||||
}
|
||||
|
||||
// --- 4. Merge stats into containers ---
|
||||
|
||||
allContainers.forEach(container => {
|
||||
const shortId = container.id.substring(0, 12); // docker stats often uses short ID
|
||||
const stats = statsMap.get(container.id) || statsMap.get(shortId); // Try matching long and short ID
|
||||
const shortId = container.id.substring(0, 12);
|
||||
const stats = statsMap.get(container.id) || statsMap.get(shortId);
|
||||
if (stats) {
|
||||
container.stats = stats;
|
||||
}
|
||||
});
|
||||
|
||||
// If we reached here, Docker is considered available (version check passed)
|
||||
|
||||
return { available: true, containers: allContainers };
|
||||
};
|
||||
// --- End fetchRemoteDockerStatus function ---
|
||||
|
||||
|
||||
export const initializeWebSocket = async (server: http.Server, sessionParser: RequestHandler): Promise<WebSocketServer> => { // Make async
|
||||
|
||||
export const initializeWebSocket = async (server: http.Server, sessionParser: RequestHandler): Promise<WebSocketServer> => {
|
||||
const wss = new WebSocketServer({ noServer: true });
|
||||
const db = await getDbInstance(); // 获取数据库实例 (use await and getDbInstance)
|
||||
const DOCKER_STATUS_INTERVAL = 2000; // Poll Docker status every 2 seconds
|
||||
const db = await getDbInstance();
|
||||
const DOCKER_STATUS_INTERVAL = 2000;
|
||||
|
||||
// --- 心跳检测 ---
|
||||
const heartbeatInterval = setInterval(() => {
|
||||
@@ -499,7 +493,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
// --- 添加日志:打印收到的原始数据 ---
|
||||
console.log(`SSH Data (会话: ${newSessionId}, 原始): `, data.toString()); // 添加原始数据日志 (尝试 utf8)
|
||||
console.log(`SSH Data (会话: ${newSessionId}, Hex): `, data.toString('hex')); // 添加 Hex 日志
|
||||
// ------------------------------------
|
||||
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' }));
|
||||
}
|
||||
@@ -543,10 +537,10 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
console.log(`WebSocket: 会话 ${newSessionId} 正在启动状态监控...`);
|
||||
statusMonitorService.startStatusPolling(newSessionId);
|
||||
|
||||
// 8. Start Docker status polling (using setting)
|
||||
|
||||
console.log(`WebSocket: 会话 ${newSessionId} 正在启动 Docker 状态轮询...`);
|
||||
// --- Get interval from settings ---
|
||||
let dockerPollIntervalMs = 2000; // Default interval
|
||||
|
||||
let dockerPollIntervalMs = 2000;
|
||||
try {
|
||||
const intervalSetting = await settingsService.getSetting('dockerStatusIntervalSeconds');
|
||||
if (intervalSetting) {
|
||||
@@ -563,32 +557,32 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
} catch (settingError) {
|
||||
console.error(`[Docker Polling] Error fetching interval setting for session ${newSessionId}. Using default ${dockerPollIntervalMs}ms:`, settingError);
|
||||
}
|
||||
// --- End get interval ---
|
||||
|
||||
|
||||
const dockerIntervalId = setInterval(async () => {
|
||||
const currentState = clientStates.get(newSessionId); // Re-fetch state
|
||||
const currentState = clientStates.get(newSessionId);
|
||||
if (!currentState || currentState.ws.readyState !== WebSocket.OPEN) {
|
||||
console.log(`[Docker Polling] Session ${newSessionId} no longer valid or WS closed. Stopping poll.`);
|
||||
clearInterval(dockerIntervalId);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
// console.log(`[Docker Polling] Fetching status for session ${newSessionId}...`);
|
||||
|
||||
const statusPayload = await fetchRemoteDockerStatus(currentState);
|
||||
if (currentState.ws.readyState === WebSocket.OPEN) {
|
||||
currentState.ws.send(JSON.stringify({ type: 'docker:status:update', payload: statusPayload }));
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.error(`[Docker Polling] Error fetching Docker status for session ${newSessionId}:`, error);
|
||||
// Optionally send error to client, or just log
|
||||
// if (currentState.ws.readyState === WebSocket.OPEN) {
|
||||
// currentState.ws.send(JSON.stringify({ type: 'docker:status:error', payload: { message: `Polling failed: ${error.message}` } }));
|
||||
// }
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
}, dockerPollIntervalMs); // <-- Use the determined interval
|
||||
}, dockerPollIntervalMs);
|
||||
newState.dockerStatusIntervalId = dockerIntervalId;
|
||||
|
||||
// 9. Trigger initial Docker status fetch immediately
|
||||
|
||||
(async () => {
|
||||
const currentState = clientStates.get(newSessionId);
|
||||
if (currentState && currentState.ws.readyState === WebSocket.OPEN) {
|
||||
@@ -601,7 +595,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
} catch (error: any) {
|
||||
console.error(`[Docker Initial Fetch] Error fetching Docker status for session ${newSessionId}:`, error);
|
||||
if (currentState.ws.readyState === WebSocket.OPEN) {
|
||||
// Send specific error type for initial fetch failure
|
||||
|
||||
const errorMessage = error.message || 'Unknown error during initial fetch';
|
||||
const isUnavailable = errorMessage.includes('command not found') || errorMessage.includes('Cannot connect to the Docker daemon');
|
||||
if (isUnavailable) {
|
||||
@@ -654,7 +648,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
ws.send(JSON.stringify({ type: 'ssh:error', payload: `连接失败: ${connectError.message}` }));
|
||||
}
|
||||
break;
|
||||
} // end case 'ssh:connect'
|
||||
}
|
||||
|
||||
// --- SSH 输入 ---
|
||||
case 'ssh:input': {
|
||||
@@ -683,7 +677,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
break;
|
||||
}
|
||||
|
||||
// --- REFACTORED: Handle Docker Status Request ---
|
||||
|
||||
case 'docker:get_status': {
|
||||
if (!state) {
|
||||
console.warn(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求,但无活动会话状态。`);
|
||||
@@ -697,13 +691,13 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
}
|
||||
console.log(`WebSocket: 处理来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求 (手动触发)...`);
|
||||
try {
|
||||
// Call the reusable function
|
||||
|
||||
const statusPayload = await fetchRemoteDockerStatus(state);
|
||||
ws.send(JSON.stringify({ type: 'docker:status:update', payload: statusPayload }));
|
||||
} catch (error: any) {
|
||||
console.error(`WebSocket: 手动执行远程 Docker 状态命令失败 for session ${sessionId}:`, error);
|
||||
const errorMessage = error.message || 'Unknown error fetching status';
|
||||
// Send specific error if Docker unavailable, general error otherwise
|
||||
|
||||
const isUnavailable = errorMessage.includes('command not found') || errorMessage.includes('Cannot connect to the Docker daemon');
|
||||
if (isUnavailable) {
|
||||
ws.send(JSON.stringify({ type: 'docker:status:update', payload: { available: false, containers: [] } }));
|
||||
@@ -712,9 +706,9 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
}
|
||||
}
|
||||
break;
|
||||
} // end case 'docker:get_status' (Refactored)
|
||||
}
|
||||
|
||||
// --- NEW: Handle Docker Command Execution ---
|
||||
|
||||
case 'docker:command': {
|
||||
if (!state || !state.sshClient) {
|
||||
console.warn(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求,但无活动 SSH 连接。`);
|
||||
@@ -743,7 +737,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
console.log(`WebSocket: Validation PASSED for docker:command.`); // 增加成功日志
|
||||
console.log(`WebSocket: Processing command '${command}' for container '${containerId}' on session ${sessionId}...`);
|
||||
try {
|
||||
// Sanitize containerId (basic) - more robust validation might be needed
|
||||
|
||||
const cleanContainerId = containerId.replace(/[^a-zA-Z0-9_-]/g, '');
|
||||
if (!cleanContainerId) throw new Error('Invalid container ID format after sanitization.');
|
||||
|
||||
@@ -752,11 +746,11 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
case 'start': dockerCliCommand = `docker start ${cleanContainerId}`; break;
|
||||
case 'stop': dockerCliCommand = `docker stop ${cleanContainerId}`; break;
|
||||
case 'restart': dockerCliCommand = `docker restart ${cleanContainerId}`; break;
|
||||
case 'remove': dockerCliCommand = `docker rm -f ${cleanContainerId}`; break; // Use -f for remove
|
||||
default: throw new Error(`Unsupported command: ${command}`); // Should be caught by earlier validation
|
||||
case 'remove': dockerCliCommand = `docker rm -f ${cleanContainerId}`; break;
|
||||
default: throw new Error(`Unsupported command: ${command}`);
|
||||
}
|
||||
|
||||
// Execute command remotely
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
state.sshClient.exec(dockerCliCommand, { pty: false }, (err, stream) => {
|
||||
if (err) return reject(err);
|
||||
@@ -771,20 +765,20 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
reject(new Error(`Command failed with code ${code}. ${stderr || 'No stderr output.'}`));
|
||||
}
|
||||
});
|
||||
// Add type annotation for execErr
|
||||
|
||||
stream.on('error', (execErr: Error) => reject(execErr));
|
||||
});
|
||||
});
|
||||
// Optionally send a success confirmation back? Not strictly needed if status updates quickly.
|
||||
// ws.send(JSON.stringify({ type: 'docker:command:success', payload: { command, containerId } }));
|
||||
|
||||
|
||||
|
||||
// Trigger a status update after command execution
|
||||
// Use a small delay to allow Docker daemon to potentially update state
|
||||
|
||||
|
||||
setTimeout(() => {
|
||||
if (clientStates.has(sessionId!)) { // Check if session still exists
|
||||
ws.send(JSON.stringify({ type: 'request_docker_status_update' })); // Ask frontend to re-request
|
||||
// Or directly trigger backend fetch and push:
|
||||
// handleDockerGetStatus(ws, state); // Need to refactor get_status logic into a reusable function
|
||||
if (clientStates.has(sessionId!)) {
|
||||
ws.send(JSON.stringify({ type: 'request_docker_status_update' }));
|
||||
|
||||
|
||||
}
|
||||
}, 500);
|
||||
|
||||
@@ -794,10 +788,10 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
ws.send(JSON.stringify({ type: 'docker:command:error', payload: { command, containerId, message: `Failed to execute remote command: ${error.message}` } }));
|
||||
}
|
||||
break;
|
||||
} // end case 'docker:command'
|
||||
}
|
||||
|
||||
|
||||
// --- SFTP Cases ---
|
||||
|
||||
case 'sftp:readdir':
|
||||
case 'sftp:stat':
|
||||
case 'sftp:readfile':
|
||||
@@ -887,7 +881,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId: payload?.uploadId, message: '缺少 uploadId, remotePath 或 size' } }));
|
||||
return;
|
||||
}
|
||||
// TODO: Add audit log for SFTP upload start?
|
||||
|
||||
sftpService.startUpload(sessionId, payload.uploadId, payload.remotePath, payload.size);
|
||||
break;
|
||||
}
|
||||
@@ -907,17 +901,17 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId: payload?.uploadId, message: '缺少 uploadId' } }));
|
||||
return;
|
||||
}
|
||||
// TODO: Add audit log for SFTP upload cancel?
|
||||
|
||||
sftpService.cancelUpload(sessionId, payload.uploadId);
|
||||
break;
|
||||
}
|
||||
|
||||
// --- NEW CASE: Handle docker:get_stats ---
|
||||
|
||||
case 'docker:get_stats': {
|
||||
if (!state || !state.sshClient) { // Check state and sshClient
|
||||
if (!state || !state.sshClient) {
|
||||
console.warn(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求,但无活动 SSH 连接。`);
|
||||
ws.send(JSON.stringify({ type: 'docker:stats:error', payload: { containerId: payload?.containerId, message: 'SSH connection not active.' } }));
|
||||
return; // Use return instead of break inside switch
|
||||
return;
|
||||
}
|
||||
if (!payload || !payload.containerId) {
|
||||
console.warn(`WebSocket: Invalid payload for docker:get_stats in session ${sessionId}:`, payload);
|
||||
@@ -930,7 +924,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
const command = `docker stats ${containerId} --no-stream --format '{{json .}}'`;
|
||||
|
||||
try {
|
||||
// --- FIX: Use sshClient.exec directly ---
|
||||
|
||||
const execResult = await new Promise<{ stdout: string; stderr: string }>((resolve, reject) => {
|
||||
let stdout = '';
|
||||
let stderr = '';
|
||||
@@ -939,33 +933,33 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
stream.on('data', (data: Buffer) => { stdout += data.toString(); });
|
||||
stream.stderr.on('data', (data: Buffer) => { stderr += data.toString(); });
|
||||
stream.on('close', (code: number | null) => {
|
||||
// Don't reject on non-zero exit code here, stderr check is more reliable for docker stats
|
||||
|
||||
resolve({ stdout, stderr });
|
||||
});
|
||||
stream.on('error', (execErr: Error) => reject(execErr));
|
||||
});
|
||||
});
|
||||
// --- End FIX ---
|
||||
|
||||
|
||||
if (execResult.stderr) {
|
||||
// Handle cases like container not found or docker errors
|
||||
|
||||
console.error(`WebSocket: Docker stats stderr for ${containerId} in session ${sessionId}: ${execResult.stderr}`);
|
||||
ws.send(JSON.stringify({ type: 'docker:stats:error', payload: { containerId, message: execResult.stderr.trim() || 'Error executing stats command.' } }));
|
||||
return; // Use return after sending error
|
||||
return;
|
||||
}
|
||||
|
||||
if (!execResult.stdout) {
|
||||
console.warn(`WebSocket: No stats output for container ${containerId} in session ${sessionId}. Might be stopped or error occurred.`);
|
||||
// Check stderr again just in case, although previous check should catch most errors
|
||||
|
||||
if (!execResult.stderr) {
|
||||
ws.send(JSON.stringify({ type: 'docker:stats:error', payload: { containerId, message: 'No stats data received (container might be stopped).' } }));
|
||||
}
|
||||
return; // Use return after sending error or warning
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const statsData = JSON.parse(execResult.stdout.trim());
|
||||
// Optional: Clean up or format statsData if needed before sending
|
||||
|
||||
ws.send(JSON.stringify({ type: 'docker:stats:update', payload: { containerId, stats: statsData } }));
|
||||
} catch (parseError) {
|
||||
console.error(`WebSocket: Failed to parse docker stats JSON for ${containerId} in session ${sessionId}: ${execResult.stdout}`, parseError);
|
||||
@@ -976,8 +970,8 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
console.error(`WebSocket: Failed to execute docker stats for ${containerId} in session ${sessionId}:`, error);
|
||||
ws.send(JSON.stringify({ type: 'docker:stats:error', payload: { containerId, message: error.message || 'Failed to fetch Docker stats.' } }));
|
||||
}
|
||||
break; // Break after handling the case
|
||||
} // --- END CASE: docker:get_stats ---
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
default:
|
||||
@@ -1016,4 +1010,4 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
return wss;
|
||||
};
|
||||
|
||||
// --- 移除旧的辅助函数 ---
|
||||
|
||||
|
||||
Reference in New Issue
Block a user