From 2d7434d77816e303003de1bbf7cbfa75b9951f98 Mon Sep 17 00:00:00 2001 From: Baobhan Sith <80159437+Heavrnl@users.noreply.github.com> Date: Thu, 1 May 2025 15:37:17 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E7=BB=88=E7=AB=AF?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=A4=9A=E8=89=B2=E5=92=8C=E9=BC=A0=E6=A0=87?= =?UTF-8?q?=E9=94=AE=E7=9B=98=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Related to #1 --- packages/backend/src/websocket.ts | 334 +++++++++++++++--------------- 1 file changed, 170 insertions(+), 164 deletions(-) diff --git a/packages/backend/src/websocket.ts b/packages/backend/src/websocket.ts index 971504d..dda3673 100644 --- a/packages/backend/src/websocket.ts +++ b/packages/backend/src/websocket.ts @@ -36,6 +36,7 @@ export interface ClientState { // 导出以便 Service 可以导入 statusIntervalId?: NodeJS.Timeout; // 添加状态轮询 ID (由 StatusMonitorService 管理) dockerStatusIntervalId?: NodeJS.Timeout; // NEW: Docker 状态轮询 ID ipAddress?: string; // 添加 IP 地址字段 + isShellReady?: boolean; // 新增:标记 Shell 是否已准备好处理输入和调整大小 } @@ -524,7 +525,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re // 根据宽高的简单 DPI 计算逻辑 (如果宽度 > 1920,则 DPI=120,否则 DPI=96) const calculatedDpi = rdpWidth > 1920 ? 120 : 96; console.log(`WebSocket: RDP Proxy calculated DPI for ${ws.username} based on width ${rdpWidth}: ${calculatedDpi}`); - // --- 结束新增 --- + // Determine RDP target URL based on deployment mode @@ -541,9 +542,8 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re console.warn(`[WebSocket RDP Proxy] Unknown deployment mode '${deploymentMode}'. Defaulting to safe fallback RDP Target Base: ${rdpBaseUrl}`); } - // Ensure base URL doesn't end with a slash before appending query params const cleanRdpBaseUrl = rdpBaseUrl.endsWith('/') ? rdpBaseUrl.slice(0, -1) : rdpBaseUrl; - // Append ALL parameters to the target URL, using calculated DPI + const rdpTargetUrl = `${cleanRdpBaseUrl}/?token=${encodeURIComponent(rdpToken)}&width=${encodeURIComponent(rdpWidth)}&height=${encodeURIComponent(rdpHeight)}&dpi=${encodeURIComponent(calculatedDpi)}`; // 使用 calculatedDpi console.log(`WebSocket: RDP Proxy for ${ws.username} attempting to connect to ${rdpTargetUrl}`); @@ -555,10 +555,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re // --- 消息转发: Client -> RDP --- ws.on('message', (message: RawData) => { if (rdpWs.readyState === WebSocket.OPEN) { - // --- 添加中文日志 --- const messageString = message.toString('utf-8'); // 尝试解码为 UTF-8 - // console.log(`[RDP 代理 C->S] 用户: ${ws.username}, 会话: ${ws.sessionId}, 转发消息 (前 100 字符): ${messageString.substring(0, 100)}`); - // --- 结束日志 --- rdpWs.send(message); } else { console.warn(`[RDP 代理 C->S] 用户: ${ws.username}, 会话: ${ws.sessionId}, RDP WS 未打开,丢弃消息。`); @@ -570,9 +567,6 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re if (ws.readyState === WebSocket.OPEN) { // 将 RawData (可能是 Buffer) 转换为 UTF-8 字符串再发送 const messageString = message.toString('utf-8'); - // --- 添加中文日志 --- - // console.log(`[RDP 代理 S->C] 用户: ${ws.username}, 会话: ${ws.sessionId}, 转发消息 (前 100 字符): ${messageString.substring(0, 100)}`); - // --- 结束日志 --- ws.send(messageString); } else { console.warn(`[RDP 代理 S->C] 用户: ${ws.username}, 会话: ${ws.sessionId}, 客户端 WS 未打开,丢弃消息。`); @@ -581,9 +575,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re // --- 错误处理 --- ws.on('error', (error) => { - // --- 添加中文日志 --- console.error(`[RDP 代理 客户端 WS 错误] 用户: ${ws.username}, 会话: ${ws.sessionId}, 错误:`, error); - // --- 结束日志 --- if (!rdpWsClosed && rdpWs.readyState !== WebSocket.CLOSED && rdpWs.readyState !== WebSocket.CLOSING) { console.log(`[RDP 代理] 因客户端 WS 错误关闭 RDP WS。会话: ${ws.sessionId}`); rdpWs.close(1011, 'Client WS Error'); @@ -592,9 +584,8 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re clientWsClosed = true; }); rdpWs.on('error', (error) => { - // --- 添加中文日志 --- + console.error(`[RDP 代理 RDP WS 错误] 用户: ${ws.username}, 会话: ${ws.sessionId}, 连接到 ${rdpTargetUrl} 时出错:`, error); - // --- 结束日志 --- if (!clientWsClosed && ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) { console.log(`[RDP 代理] 因 RDP WS 错误关闭客户端 WS。会话: ${ws.sessionId}`); ws.close(1011, `RDP WS Error: ${error.message}`); @@ -628,10 +619,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re }); rdpWs.on('open', () => { - // --- 添加中文日志 --- console.log(`[RDP 代理 RDP WS 打开] 用户: ${ws.username}, 会话: ${ws.sessionId}, 到 ${rdpTargetUrl} 的连接已建立。开始转发消息。`); - // --- 结束日志 --- - // Do not send custom message, let Guacamole protocol flow directly }); // --- 标准 (SSH/SFTP/Docker) 连接处理 --- @@ -693,175 +681,185 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re sshClient: sshClient, dbConnectionId: dbConnectionId, ipAddress: clientIp, // 存储 IP 地址 + isShellReady: false, // 初始化 Shell 状态为未就绪 }; clientStates.set(newSessionId, newState); console.log(`WebSocket: 为用户 ${ws.username} (IP: ${clientIp}) 创建新会话 ${newSessionId} (DB ID: ${dbConnectionId})`); - // 4. 打开 Shell + // 4. 立即打开 Shell (使用默认尺寸) ws.send(JSON.stringify({ type: 'ssh:status', payload: 'SSH 连接成功,正在打开 Shell...' })); try { - const shellStream = await SshService.openShell(sshClient); - newState.sshShellStream = shellStream; - - // 5. 设置 Shell 事件转发 - shellStream.on('data', (data: Buffer) => { - // --- 添加日志:打印收到的原始数据 --- - // console.log(`SSH Data (会话: ${newSessionId}, 原始): `, data.toString()); // 添加原始数据日志 (尝试 utf8) - // console.log(`SSH Data (会话: ${newSessionId}, Hex): `, data.toString('hex')); // 添加 Hex 日志 - - if (ws.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' })); + // 使用默认尺寸 80x24 打开 Shell + const defaultCols = 80; + const defaultRows = 24; + sshClient.shell({ term: 'xterm-256color', cols: defaultCols, rows: defaultRows }, (err, stream) => { + if (err) { + console.error(`SSH: 会话 ${newSessionId} 打开 Shell 失败:`, err); + // 记录审计日志:打开 Shell 失败 + auditLogService.logAction('SSH_SHELL_FAILURE', { + userId: ws.userId, + username: ws.username, + connectionId: dbConnectionId, + sessionId: newSessionId, + ip: newState.ipAddress, + reason: err.message + }); + notificationService.sendNotification('SSH_SHELL_FAILURE', { // 添加通知调用 + userId: ws.userId, + username: ws.username, + connectionId: dbConnectionId, + sessionId: newSessionId, + ip: newState.ipAddress, + reason: err.message + }); + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ type: 'ssh:error', payload: `打开 Shell 失败: ${err.message}` })); + } + cleanupClientConnection(newSessionId); // 清理连接 + return; } - }); - shellStream.stderr.on('data', (data: Buffer) => { - console.error(`SSH Stderr (会话: ${newSessionId}): ${data.toString('utf8').substring(0, 100)}...`); - if (ws.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' })); - } - }); - shellStream.on('close', () => { - console.log(`SSH: 会话 ${newSessionId} 的 Shell 通道已关闭。`); - ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'Shell 通道已关闭。' })); - cleanupClientConnection(newSessionId); - }); - // 6. 发送 SSH 连接成功消息 - ws.send(JSON.stringify({ - type: 'ssh:connected', - payload: { + // Shell 打开成功 + console.log(`WebSocket: 会话 ${newSessionId} Shell 打开成功 (使用默认尺寸 ${defaultCols}x${defaultRows})。`); + newState.sshShellStream = stream; + newState.isShellReady = true; // 标记 Shell 已就绪 + + // 5. 立即设置 Shell 事件转发 (捕获初始输出) + stream.on('data', (data: Buffer) => { + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' })); + } + }); + stream.stderr.on('data', (data: Buffer) => { + console.error(`SSH Stderr (会话: ${newSessionId}): ${data.toString('utf8').substring(0, 100)}...`); + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' })); + } + }); + stream.on('close', () => { + console.log(`SSH: 会话 ${newSessionId} 的 Shell 通道已关闭。`); + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'Shell 通道已关闭。' })); + } + cleanupClientConnection(newSessionId); + }); + + // 6. 发送 SSH 连接成功消息 (现在 Shell 也已打开) + ws.send(JSON.stringify({ + type: 'ssh:connected', + payload: { + connectionId: dbConnectionId, + sessionId: newSessionId + } + })); + console.log(`WebSocket: 会话 ${newSessionId} SSH 连接和 Shell 建立成功。`); + // 记录审计日志:SSH 连接成功 + auditLogService.logAction('SSH_CONNECT_SUCCESS', { + userId: ws.userId, + username: ws.username, connectionId: dbConnectionId, - sessionId: newSessionId - } - })); - console.log(`WebSocket: 会话 ${newSessionId} SSH 连接和 Shell 建立成功。`); - // 记录审计日志:SSH 连接成功 - auditLogService.logAction('SSH_CONNECT_SUCCESS', { - userId: ws.userId, - username: ws.username, - connectionId: dbConnectionId, - sessionId: newSessionId, - ip: newState.ipAddress - }); - notificationService.sendNotification('SSH_CONNECT_SUCCESS', { // 添加通知调用 - userId: ws.userId, - username: ws.username, - connectionId: dbConnectionId, - sessionId: newSessionId, - ip: newState.ipAddress - }); + sessionId: newSessionId, + ip: newState.ipAddress + }); + notificationService.sendNotification('SSH_CONNECT_SUCCESS', { // 添加通知调用 + userId: ws.userId, + username: ws.username, + connectionId: dbConnectionId, + sessionId: newSessionId, + ip: newState.ipAddress + }); - // 7. 异步初始化 SFTP 和启动状态监控 - console.log(`WebSocket: 会话 ${newSessionId} 正在异步初始化 SFTP...`); - sftpService.initializeSftpSession(newSessionId) - .then(() => console.log(`SFTP: 会话 ${newSessionId} 异步初始化成功。`)) - .catch(sftpInitError => console.error(`WebSocket: 会话 ${newSessionId} 异步初始化 SFTP 失败:`, sftpInitError)); + // 7. 启动异步任务 (SFTP, Status Monitor, Docker) + console.log(`WebSocket: 会话 ${newSessionId} 正在异步初始化 SFTP...`); + sftpService.initializeSftpSession(newSessionId) + .then(() => console.log(`SFTP: 会话 ${newSessionId} 异步初始化成功。`)) + .catch(sftpInitError => console.error(`WebSocket: 会话 ${newSessionId} 异步初始化 SFTP 失败:`, sftpInitError)); - console.log(`WebSocket: 会话 ${newSessionId} 正在启动状态监控...`); - statusMonitorService.startStatusPolling(newSessionId); + console.log(`WebSocket: 会话 ${newSessionId} 正在启动状态监控...`); + statusMonitorService.startStatusPolling(newSessionId); - - console.log(`WebSocket: 会话 ${newSessionId} 正在启动 Docker 状态轮询...`); - - let dockerPollIntervalMs = 2000; - try { - const intervalSetting = await settingsService.getSetting('dockerStatusIntervalSeconds'); - if (intervalSetting) { - const intervalSeconds = parseInt(intervalSetting, 10); - if (!isNaN(intervalSeconds) && intervalSeconds >= 1) { - dockerPollIntervalMs = intervalSeconds * 1000; - console.log(`[Docker Polling] Using interval from settings: ${intervalSeconds}s (${dockerPollIntervalMs}ms) for session ${newSessionId}`); - } else { - console.warn(`[Docker Polling] Invalid interval setting '${intervalSetting}' found. Using default ${dockerPollIntervalMs}ms for session ${newSessionId}`); - } - } else { - console.log(`[Docker Polling] No interval setting found. Using default ${dockerPollIntervalMs}ms for session ${newSessionId}`); - } - } catch (settingError) { - console.error(`[Docker Polling] Error fetching interval setting for session ${newSessionId}. Using default ${dockerPollIntervalMs}ms:`, settingError); - } - - - const dockerIntervalId = setInterval(async () => { - const currentState = clientStates.get(newSessionId); - if (!currentState || currentState.ws.readyState !== WebSocket.OPEN) { - console.log(`[Docker Polling] Session ${newSessionId} no longer valid or WS closed. Stopping poll.`); - clearInterval(dockerIntervalId); - return; - } - try { - - const statusPayload = await fetchRemoteDockerStatus(currentState); - if (currentState.ws.readyState === WebSocket.OPEN) { - currentState.ws.send(JSON.stringify({ type: 'docker:status:update', payload: statusPayload })); - } - } catch (error: any) { - console.error(`[Docker Polling] Error fetching Docker status for session ${newSessionId}:`, error); - - - - - } - }, dockerPollIntervalMs); - newState.dockerStatusIntervalId = dockerIntervalId; - - - (async () => { - const currentState = clientStates.get(newSessionId); - if (currentState && currentState.ws.readyState === WebSocket.OPEN) { + console.log(`WebSocket: 会话 ${newSessionId} 正在启动 Docker 状态轮询...`); + let dockerPollIntervalMs = 2000; + (async () => { // 使用 IIFE 获取设置 try { - console.log(`[Docker Initial Fetch] Fetching status for session ${newSessionId}...`); - const statusPayload = await fetchRemoteDockerStatus(currentState); - if (currentState.ws.readyState === WebSocket.OPEN) { - currentState.ws.send(JSON.stringify({ type: 'docker:status:update', payload: statusPayload })); + const intervalSetting = await settingsService.getSetting('dockerStatusIntervalSeconds'); + if (intervalSetting) { + const intervalSeconds = parseInt(intervalSetting, 10); + if (!isNaN(intervalSeconds) && intervalSeconds >= 1) { + dockerPollIntervalMs = intervalSeconds * 1000; + console.log(`[Docker Polling] Using interval from settings: ${intervalSeconds}s (${dockerPollIntervalMs}ms) for session ${newSessionId}`); + } else { + console.warn(`[Docker Polling] Invalid interval setting '${intervalSetting}' found. Using default ${dockerPollIntervalMs}ms for session ${newSessionId}`); + } + } else { + console.log(`[Docker Polling] No interval setting found. Using default ${dockerPollIntervalMs}ms for session ${newSessionId}`); } - } catch (error: any) { - console.error(`[Docker Initial Fetch] Error fetching Docker status for session ${newSessionId}:`, error); - if (currentState.ws.readyState === WebSocket.OPEN) { - - const errorMessage = error.message || 'Unknown error during initial fetch'; - const isUnavailable = errorMessage.includes('command not found') || errorMessage.includes('Cannot connect to the Docker daemon'); - if (isUnavailable) { - currentState.ws.send(JSON.stringify({ type: 'docker:status:update', payload: { available: false, containers: [] } })); - } else { - currentState.ws.send(JSON.stringify({ type: 'docker:status:error', payload: { message: `Initial Docker status fetch failed: ${errorMessage}` } })); - } + } catch (settingError) { + console.error(`[Docker Polling] Error fetching interval setting for session ${newSessionId}. Using default ${dockerPollIntervalMs}ms:`, settingError); + } + + const dockerIntervalId = setInterval(async () => { + const currentState = clientStates.get(newSessionId); + if (!currentState || currentState.ws.readyState !== WebSocket.OPEN) { + console.log(`[Docker Polling] Session ${newSessionId} no longer valid or WS closed. Stopping poll.`); + clearInterval(dockerIntervalId); + return; + } + try { + const statusPayload = await fetchRemoteDockerStatus(currentState); + if (currentState.ws.readyState === WebSocket.OPEN) { + currentState.ws.send(JSON.stringify({ type: 'docker:status:update', payload: statusPayload })); + } + } catch (error: any) { + console.error(`[Docker Polling] Error fetching Docker status for session ${newSessionId}:`, error); + } + }, dockerPollIntervalMs); + if (newState) newState.dockerStatusIntervalId = dockerIntervalId; // 确保 newState 仍然存在 + + // 立即触发一次 Docker 状态获取 + const currentState = clientStates.get(newSessionId); + if (currentState && currentState.ws.readyState === WebSocket.OPEN) { + try { + console.log(`[Docker Initial Fetch] Fetching status for session ${newSessionId}...`); + const statusPayload = await fetchRemoteDockerStatus(currentState); + if (currentState.ws.readyState === WebSocket.OPEN) { + currentState.ws.send(JSON.stringify({ type: 'docker:status:update', payload: statusPayload })); + } + } catch (error: any) { + console.error(`[Docker Initial Fetch] Error fetching Docker status for session ${newSessionId}:`, error); + if (currentState.ws.readyState === WebSocket.OPEN) { + const errorMessage = error.message || 'Unknown error during initial fetch'; + const isUnavailable = errorMessage.includes('command not found') || errorMessage.includes('Cannot connect to the Docker daemon'); + if (isUnavailable) { + currentState.ws.send(JSON.stringify({ type: 'docker:status:update', payload: { available: false, containers: [] } })); + } else { + currentState.ws.send(JSON.stringify({ type: 'docker:status:error', payload: { message: `Initial Docker status fetch failed: ${errorMessage}` } })); + } + } } } - } - })(); - - + })(); + }); // End of sshClient.shell callback } catch (shellError: any) { - console.error(`SSH: 会话 ${newSessionId} 打开 Shell 失败:`, shellError); - // 记录审计日志:打开 Shell 失败 - auditLogService.logAction('SSH_SHELL_FAILURE', { - userId: ws.userId, - username: ws.username, - connectionId: dbConnectionId, - sessionId: newSessionId, - ip: newState.ipAddress, - reason: shellError.message - }); - notificationService.sendNotification('SSH_SHELL_FAILURE', { // 添加通知调用 - userId: ws.userId, - username: ws.username, - connectionId: dbConnectionId, - sessionId: newSessionId, - ip: newState.ipAddress, - reason: shellError.message - }); - ws.send(JSON.stringify({ type: 'ssh:error', payload: `打开 Shell 失败: ${shellError.message}` })); - cleanupClientConnection(newSessionId); + // 这个 catch 块理论上不会被触发,因为错误在 shell 回调中处理,但保留以防万一 + console.error(`SSH: 会话 ${newSessionId} 打开 Shell 时发生意外错误:`, shellError); + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ type: 'ssh:error', payload: `打开 Shell 时发生意外错误: ${shellError.message}` })); + } + cleanupClientConnection(newSessionId); } - // 10. 设置 SSH Client 的关闭和错误处理 (移到 Shell 成功打开之后) + // 8. 设置 SSH Client 的关闭和错误处理 sshClient.on('close', () => { console.log(`SSH: 会话 ${newSessionId} 的客户端连接已关闭。`); cleanupClientConnection(newSessionId); }); sshClient.on('error', (err: Error) => { console.error(`SSH: 会话 ${newSessionId} 的客户端连接错误:`, err); - ws.send(JSON.stringify({ type: 'ssh:error', payload: `SSH 连接错误: ${err.message}` })); + // 确保在发送错误消息前检查 WebSocket 是否仍然打开 + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ type: 'ssh:error', payload: `SSH 连接错误: ${err.message}` })); + } cleanupClientConnection(newSessionId); }); @@ -904,14 +902,22 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re // --- SSH 终端大小调整 --- case 'ssh:resize': { - if (!state || !state.sshShellStream) { - console.warn(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的调整大小请求,但无活动 Shell。`); + if (!sessionId || !state || !state.sshClient) { + console.warn(`WebSocket: 收到来自 ${ws.username} 的调整大小请求,但无有效会话或 SSH 客户端。`); return; } + const { cols, rows } = payload || {}; - if (typeof cols === 'number' && typeof rows === 'number') { + if (typeof cols !== 'number' || typeof rows !== 'number' || cols <= 0 || rows <= 0) { + console.warn(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的无效调整大小请求:`, payload); + return; + } + + if (state.isShellReady && state.sshShellStream) { console.log(`SSH: 会话 ${sessionId} 调整终端大小: ${cols}x${rows}`); - state.sshShellStream.setWindow(rows, cols, 0, 0); + state.sshShellStream.setWindow(rows, cols, 0, 0); // ssh2 使用 (rows, cols) + } else { + console.warn(`WebSocket: 会话 ${sessionId} 收到调整大小请求,但 Shell 尚未就绪或流不存在 (isShellReady: ${state.isShellReady})。`); } break; }