diff --git a/package-lock.json b/package-lock.json index 15bdc84..fbb2163 100644 --- a/package-lock.json +++ b/package-lock.json @@ -923,6 +923,12 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/uuid": { + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-10.0.0.tgz", + "integrity": "sha512-7gqG38EyHgyP1S+7+xomFtL+ZNHcKv6DwNaCZmJmo1vgMugyF3TCnXVg4t1uk89mLNwnLtnY3TpOpCOyp1/xHQ==", + "license": "MIT" + }, "node_modules/@types/ws": { "version": "8.18.1", "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", @@ -5296,6 +5302,19 @@ "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", "license": "MIT" }, + "node_modules/uuid": { + "version": "11.1.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.0.tgz", + "integrity": "sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "license": "MIT", + "bin": { + "uuid": "dist/esm/bin/uuid" + } + }, "node_modules/v8-compile-cache-lib": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", @@ -5605,6 +5624,7 @@ "version": "0.1.0", "dependencies": { "@types/multer": "^1.4.12", + "@types/uuid": "^10.0.0", "bcrypt": "^5.1.1", "connect-sqlite3": "^0.9.15", "express": "^5.1.0", @@ -5614,6 +5634,7 @@ "socks": "^2.8.4", "sqlite3": "^5.1.7", "ssh2": "^1.16.0", + "uuid": "^11.1.0", "ws": "^8.18.1" }, "devDependencies": { diff --git a/packages/backend/package.json b/packages/backend/package.json index 3134854..6a1b0b0 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -10,6 +10,7 @@ }, "dependencies": { "@types/multer": "^1.4.12", + "@types/uuid": "^10.0.0", "bcrypt": "^5.1.1", "connect-sqlite3": "^0.9.15", "express": "^5.1.0", @@ -19,6 +20,7 @@ "socks": "^2.8.4", "sqlite3": "^5.1.7", "ssh2": "^1.16.0", + "uuid": "^11.1.0", "ws": "^8.18.1" }, "devDependencies": { diff --git a/packages/backend/src/services/sftp.service.ts b/packages/backend/src/services/sftp.service.ts index fcc429f..29a70a3 100644 --- a/packages/backend/src/services/sftp.service.ts +++ b/packages/backend/src/services/sftp.service.ts @@ -1,17 +1,44 @@ -import { Client, SFTPWrapper, Stats, Dirent } from 'ssh2'; // 导入 Stats 和 Dirent 类型 +import { Client, SFTPWrapper, Stats } from 'ssh2'; import { WebSocket } from 'ws'; -// import { logger } from '../utils/logger'; // 不再使用自定义 logger,改用 console +import { ClientState } from '../websocket'; // 导入统一的 ClientState -// 定义客户端状态接口 -interface ClientState { - ws: WebSocket; - sshClient: Client; - sftp?: SFTPWrapper; - // 如果需要,可以添加其他相关的状态属性 +// 定义服务器状态的数据结构 (与前端 StatusMonitor.vue 匹配) +// Note: This interface seems out of place here, but keeping it for now as it was in the original file. +// Ideally, it should be in a shared types file. +interface ServerStatus { + cpuPercent?: number; + memPercent?: number; + memUsed?: number; // MB + memTotal?: number; // MB + swapPercent?: number; + swapUsed?: number; // MB + swapTotal?: number; // MB + diskPercent?: number; + diskUsed?: number; // KB + diskTotal?: number; // KB + cpuModel?: string; + netRxRate?: number; // Bytes per second + netTxRate?: number; // Bytes per second + netInterface?: string; + osName?: string; + loadAvg?: number[]; // 系统平均负载 [1min, 5min, 15min] + timestamp: number; // 状态获取时间戳 } +// Interface for parsed network stats - Also seems out of place here. +interface NetworkStats { + [interfaceName: string]: { + rx_bytes: number; + tx_bytes: number; + } +} + +// Note: These constants seem related to StatusMonitorService, not SftpService. +const DEFAULT_POLLING_INTERVAL = 1000; +const previousNetStats = new Map(); + export class SftpService { - private clientStates: Map; // 存储 connectionId 到 ClientState 的映射 + private clientStates: Map; // 使用导入的 ClientState constructor(clientStates: Map) { this.clientStates = clientStates; @@ -19,164 +46,322 @@ export class SftpService { /** * 初始化 SFTP 会话 - * @param connectionId 连接 ID + * @param sessionId 会话 ID */ - async initializeSftpSession(connectionId: string): Promise { - const state = this.clientStates.get(connectionId); + async initializeSftpSession(sessionId: string): Promise { + const state = this.clientStates.get(sessionId); if (!state || !state.sshClient || state.sftp) { - console.warn(`[SFTP] 无法为 ${connectionId} 初始化 SFTP:状态无效或 SFTP 已初始化。`); + console.warn(`[SFTP] 无法为会话 ${sessionId} 初始化 SFTP:状态无效、SSH客户端不存在或 SFTP 已初始化。`); return; } - + if (!state.sshClient) { + console.error(`[SFTP] 会话 ${sessionId} 的 SSH 客户端不存在,无法初始化 SFTP。`); + return; + } return new Promise((resolve, reject) => { - state.sshClient.sftp((err, sftp) => { + state.sshClient.sftp((err, sftpInstance) => { if (err) { - console.error(`[SFTP] 为 ${connectionId} 初始化 SFTP 会话失败:`, err); - state.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, message: 'SFTP 初始化失败' } })); + console.error(`[SFTP] 为会话 ${sessionId} 初始化 SFTP 会话失败:`, err); + state.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId: state.dbConnectionId, message: 'SFTP 初始化失败' } })); reject(err); } else { - console.log(`[SFTP] 为 ${connectionId} 初始化 SFTP 会话成功。`); - state.sftp = sftp; - state.ws.send(JSON.stringify({ type: 'sftp_ready', payload: { connectionId } })); - - sftp.on('end', () => { - console.log(`[SFTP] ${connectionId} 的 SFTP 会话已结束。`); - if (state) state.sftp = undefined; // 在结束时清除 SFTP 实例 + console.log(`[SFTP] 为会话 ${sessionId} 初始化 SFTP 会话成功。`); + state.sftp = sftpInstance; + state.ws.send(JSON.stringify({ type: 'sftp_ready', payload: { connectionId: state.dbConnectionId } })); + sftpInstance.on('end', () => { + console.log(`[SFTP] 会话 ${sessionId} 的 SFTP 会话已结束。`); + if (state) state.sftp = undefined; }); - sftp.on('close', () => { - console.log(`[SFTP] ${connectionId} 的 SFTP 会话已关闭。`); - if (state) state.sftp = undefined; // 在关闭时清除 SFTP 实例 + sftpInstance.on('close', () => { + console.log(`[SFTP] 会话 ${sessionId} 的 SFTP 会话已关闭。`); + if (state) state.sftp = undefined; }); - sftp.on('error', (sftpErr: Error) => { // 为 sftpErr 添加 Error 类型 - console.error(`[SFTP] ${connectionId} 的 SFTP 会话出错:`, sftpErr); - if (state) state.sftp = undefined; // 在出错时清除 SFTP 实例 - // 可选:通知客户端 - state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, message: 'SFTP 会话错误' } })); + sftpInstance.on('error', (sftpErr: Error) => { + console.error(`[SFTP] 会话 ${sessionId} 的 SFTP 会话出错:`, sftpErr); + if (state) state.sftp = undefined; + state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId: state.dbConnectionId, message: 'SFTP 会话错误' } })); }); - resolve(); } }); }); } - cleanupSftpSession(connectionId: string): void { - const state = this.clientStates.get(connectionId); + /** + * 清理 SFTP 会话 + * @param sessionId 会话 ID + */ + cleanupSftpSession(sessionId: string): void { + const state = this.clientStates.get(sessionId); if (state?.sftp) { - logger.info(`[SFTP] Cleaning up SFTP session for ${connectionId}`); + console.log(`[SFTP] 正在清理 ${sessionId} 的 SFTP 会话...`); state.sftp.end(); state.sftp = undefined; } } - // Placeholder methods for SFTP operations - to be implemented - async readdir(connectionId: string, path: string, requestId: string): Promise { - const state = this.clientStates.get(connectionId); + // --- SFTP 操作方法 --- + + /** 读取目录内容 */ + async readdir(sessionId: string, path: string, requestId: string): Promise { + const state = this.clientStates.get(sessionId); if (!state || !state.sftp) { - logger.warn(`[SFTP] SFTP not ready for readdir on ${connectionId}`); - state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } })); + console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 readdir (ID: ${requestId})`); + state?.ws.send(JSON.stringify({ type: 'sftp:readdir:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); return; } - // Implementation to follow - logger.debug(`[SFTP] Received readdir request for ${connectionId}:${path}`); - // Example: state.sftp.readdir(...) - state.ws.send(JSON.stringify({ type: 'sftp_readdir_result', payload: { connectionId, requestId, files: [] /* Placeholder */ } })); + console.debug(`[SFTP ${sessionId}] Received readdir request for ${path} (ID: ${requestId})`); + try { + state.sftp.readdir(path, (err, list) => { + if (err) { + console.error(`[SFTP ${sessionId}] readdir ${path} failed (ID: ${requestId}):`, err); + state.ws.send(JSON.stringify({ type: 'sftp:readdir:error', path: path, payload: `读取目录失败: ${err.message}`, requestId: requestId })); + } else { + const files = list.map((item) => ({ + filename: item.filename, + longname: item.longname, + attrs: { + size: item.attrs.size, uid: item.attrs.uid, gid: item.attrs.gid, mode: item.attrs.mode, + atime: item.attrs.atime * 1000, mtime: item.attrs.mtime * 1000, + isDirectory: item.attrs.isDirectory(), isFile: item.attrs.isFile(), isSymbolicLink: item.attrs.isSymbolicLink(), + } + })); + state.ws.send(JSON.stringify({ type: 'sftp:readdir:success', path: path, payload: files, requestId: requestId })); + } + }); + } catch (error: any) { + console.error(`[SFTP ${sessionId}] readdir ${path} caught unexpected error (ID: ${requestId}):`, error); + state.ws.send(JSON.stringify({ type: 'sftp:readdir:error', path: path, payload: `读取目录时发生意外错误: ${error.message}`, requestId: requestId })); + } } - async stat(connectionId: string, path: string, requestId: string): Promise { - const state = this.clientStates.get(connectionId); + /** 获取文件/目录状态信息 */ + async stat(sessionId: string, path: string, requestId: string): Promise { + const state = this.clientStates.get(sessionId); if (!state || !state.sftp) { - logger.warn(`[SFTP] SFTP not ready for stat on ${connectionId}`); - state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } })); + console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 stat (ID: ${requestId})`); + state?.ws.send(JSON.stringify({ type: 'sftp:stat:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); // Use specific error type return; } - logger.debug(`[SFTP] Received stat request for ${connectionId}:${path}`); - // Implementation to follow - state.ws.send(JSON.stringify({ type: 'sftp_stat_result', payload: { connectionId, requestId, stats: null /* Placeholder */ } })); + console.debug(`[SFTP ${sessionId}] Received stat request for ${path} (ID: ${requestId})`); + try { + state.sftp.lstat(path, (err, stats: Stats) => { + if (err) { + console.error(`[SFTP ${sessionId}] stat ${path} failed (ID: ${requestId}):`, err); + state.ws.send(JSON.stringify({ type: 'sftp:stat:error', path: path, payload: `获取状态失败: ${err.message}`, requestId: requestId })); + } else { + const fileStats = { + size: stats.size, uid: stats.uid, gid: stats.gid, mode: stats.mode, + atime: stats.atime * 1000, mtime: stats.mtime * 1000, + isDirectory: stats.isDirectory(), isFile: stats.isFile(), isSymbolicLink: stats.isSymbolicLink(), + }; + // Send specific success type + state.ws.send(JSON.stringify({ type: 'sftp:stat:success', path: path, payload: fileStats, requestId: requestId })); + } + }); + } catch (error: any) { + console.error(`[SFTP ${sessionId}] stat ${path} caught unexpected error (ID: ${requestId}):`, error); + state.ws.send(JSON.stringify({ type: 'sftp:stat:error', path: path, payload: `获取状态时发生意外错误: ${error.message}`, requestId: requestId })); + } } - async readFile(connectionId: string, path: string, requestId: string): Promise { - const state = this.clientStates.get(connectionId); + /** 读取文件内容 */ + async readFile(sessionId: string, path: string, requestId: string): Promise { + const state = this.clientStates.get(sessionId); if (!state || !state.sftp) { - logger.warn(`[SFTP] SFTP not ready for readFile on ${connectionId}`); - state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } })); + console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 readFile (ID: ${requestId})`); + state?.ws.send(JSON.stringify({ type: 'sftp:readfile:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); return; } - logger.debug(`[SFTP] Received readFile request for ${connectionId}:${path}`); - // Implementation to follow - state.ws.send(JSON.stringify({ type: 'sftp_readfile_result', payload: { connectionId, requestId, data: '' /* Placeholder */ } })); + console.debug(`[SFTP ${sessionId}] Received readFile request for ${path} (ID: ${requestId})`); + try { + const readStream = state.sftp.createReadStream(path); + let fileData = Buffer.alloc(0); + let errorOccurred = false; + + readStream.on('data', (chunk: Buffer) => { fileData = Buffer.concat([fileData, chunk]); }); + readStream.on('error', (err: Error) => { + if (errorOccurred) return; errorOccurred = true; + console.error(`[SFTP ${sessionId}] readFile ${path} stream error (ID: ${requestId}):`, err); + state.ws.send(JSON.stringify({ type: 'sftp:readfile:error', path: path, payload: `读取文件流错误: ${err.message}`, requestId: requestId })); + }); + readStream.on('end', () => { + if (!errorOccurred) { + console.log(`[SFTP ${sessionId}] readFile ${path} success, size: ${fileData.length} bytes (ID: ${requestId})`); + state.ws.send(JSON.stringify({ type: 'sftp:readfile:success', path: path, payload: { content: fileData.toString('base64'), encoding: 'base64' }, requestId: requestId })); + } + }); + } catch (error: any) { + console.error(`[SFTP ${sessionId}] readFile ${path} caught unexpected error (ID: ${requestId}):`, error); + state.ws.send(JSON.stringify({ type: 'sftp:readfile:error', path: path, payload: `读取文件时发生意外错误: ${error.message}`, requestId: requestId })); + } } - async writeFile(connectionId: string, path: string, data: string, requestId: string): Promise { - const state = this.clientStates.get(connectionId); + /** 写入文件内容 */ + async writefile(sessionId: string, path: string, data: string, requestId: string): Promise { + const state = this.clientStates.get(sessionId); if (!state || !state.sftp) { - logger.warn(`[SFTP] SFTP not ready for writeFile on ${connectionId}`); - state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } })); + console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 writefile (ID: ${requestId})`); + state?.ws.send(JSON.stringify({ type: 'sftp:writefile:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); return; } - logger.debug(`[SFTP] Received writeFile request for ${connectionId}:${path}`); - // Implementation to follow - state.ws.send(JSON.stringify({ type: 'sftp_writefile_result', payload: { connectionId, requestId, success: false /* Placeholder */ } })); + console.debug(`[SFTP ${sessionId}] Received writefile request for ${path} (ID: ${requestId})`); + try { + const buffer = Buffer.from(data, 'utf8'); + console.debug(`[SFTP ${sessionId}] Creating write stream for ${path} (ID: ${requestId})`); + const writeStream = state.sftp.createWriteStream(path); + let errorOccurred = false; + + writeStream.on('error', (err: Error) => { + if (errorOccurred) return; // Prevent sending multiple errors + errorOccurred = true; + console.error(`[SFTP ${sessionId}] writefile ${path} stream error (ID: ${requestId}):`, err); + state.ws.send(JSON.stringify({ type: 'sftp:writefile:error', path: path, payload: `写入文件流错误: ${err.message}`, requestId: requestId })); + }); + + // Listen for the 'close' event which indicates the stream has finished writing and the file descriptor is closed. + writeStream.on('close', () => { + if (!errorOccurred) { + console.log(`[SFTP ${sessionId}] writefile ${path} stream closed successfully (ID: ${requestId})`); + state.ws.send(JSON.stringify({ type: 'sftp:writefile:success', path: path, requestId: requestId })); + } + }); + + console.debug(`[SFTP ${sessionId}] Writing ${buffer.length} bytes to ${path} (ID: ${requestId})`); + writeStream.end(buffer); // Start writing and close the stream afterwards + console.debug(`[SFTP ${sessionId}] writefile ${path} end() called (ID: ${requestId})`); + + // Success message is now sent in the 'close' event handler + + } catch (error: any) { + console.error(`[SFTP ${sessionId}] writefile ${path} caught unexpected error (ID: ${requestId}):`, error); + state.ws.send(JSON.stringify({ type: 'sftp:writefile:error', path: path, payload: `写入文件时发生意外错误: ${error.message}`, requestId: requestId })); + } } - async mkdir(connectionId: string, path: string, requestId: string): Promise { - const state = this.clientStates.get(connectionId); + /** 创建目录 */ + async mkdir(sessionId: string, path: string, requestId: string): Promise { + const state = this.clientStates.get(sessionId); if (!state || !state.sftp) { - logger.warn(`[SFTP] SFTP not ready for mkdir on ${connectionId}`); - state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } })); + console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 mkdir (ID: ${requestId})`); + state?.ws.send(JSON.stringify({ type: 'sftp:mkdir:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); // Use specific error type return; } - logger.debug(`[SFTP] Received mkdir request for ${connectionId}:${path}`); - // Implementation to follow - state.ws.send(JSON.stringify({ type: 'sftp_mkdir_result', payload: { connectionId, requestId, success: false /* Placeholder */ } })); + console.debug(`[SFTP ${sessionId}] Received mkdir request for ${path} (ID: ${requestId})`); + try { + state.sftp.mkdir(path, (err) => { + if (err) { + console.error(`[SFTP ${sessionId}] mkdir ${path} failed (ID: ${requestId}):`, err); + state.ws.send(JSON.stringify({ type: 'sftp:mkdir:error', path: path, payload: `创建目录失败: ${err.message}`, requestId: requestId })); + } else { + console.log(`[SFTP ${sessionId}] mkdir ${path} success (ID: ${requestId})`); + state.ws.send(JSON.stringify({ type: 'sftp:mkdir:success', path: path, requestId: requestId })); // Send specific success type + } + }); + } catch (error: any) { + console.error(`[SFTP ${sessionId}] mkdir ${path} caught unexpected error (ID: ${requestId}):`, error); + state.ws.send(JSON.stringify({ type: 'sftp:mkdir:error', path: path, payload: `创建目录时发生意外错误: ${error.message}`, requestId: requestId })); + } } - async rmdir(connectionId: string, path: string, requestId: string): Promise { - const state = this.clientStates.get(connectionId); + /** 删除空目录 */ + async rmdir(sessionId: string, path: string, requestId: string): Promise { + const state = this.clientStates.get(sessionId); if (!state || !state.sftp) { - logger.warn(`[SFTP] SFTP not ready for rmdir on ${connectionId}`); - state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } })); + console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 rmdir (ID: ${requestId})`); + state?.ws.send(JSON.stringify({ type: 'sftp:rmdir:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); // Use specific error type return; } - logger.debug(`[SFTP] Received rmdir request for ${connectionId}:${path}`); - // Implementation to follow - state.ws.send(JSON.stringify({ type: 'sftp_rmdir_result', payload: { connectionId, requestId, success: false /* Placeholder */ } })); + console.debug(`[SFTP ${sessionId}] Received rmdir request for ${path} (ID: ${requestId})`); + try { + state.sftp.rmdir(path, (err) => { + if (err) { + console.error(`[SFTP ${sessionId}] rmdir ${path} failed (ID: ${requestId}):`, err); + state.ws.send(JSON.stringify({ type: 'sftp:rmdir:error', path: path, payload: `删除目录失败: ${err.message}`, requestId: requestId })); + } else { + console.log(`[SFTP ${sessionId}] rmdir ${path} success (ID: ${requestId})`); + state.ws.send(JSON.stringify({ type: 'sftp:rmdir:success', path: path, requestId: requestId })); // Send specific success type + } + }); + } catch (error: any) { + console.error(`[SFTP ${sessionId}] rmdir ${path} caught unexpected error (ID: ${requestId}):`, error); + state.ws.send(JSON.stringify({ type: 'sftp:rmdir:error', path: path, payload: `删除目录时发生意外错误: ${error.message}`, requestId: requestId })); + } } - async unlink(connectionId: string, path: string, requestId: string): Promise { - const state = this.clientStates.get(connectionId); + /** 删除文件 */ + async unlink(sessionId: string, path: string, requestId: string): Promise { + const state = this.clientStates.get(sessionId); if (!state || !state.sftp) { - logger.warn(`[SFTP] SFTP not ready for unlink on ${connectionId}`); - state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } })); + console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 unlink (ID: ${requestId})`); + state?.ws.send(JSON.stringify({ type: 'sftp:unlink:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); // Use specific error type return; } - logger.debug(`[SFTP] Received unlink request for ${connectionId}:${path}`); - // Implementation to follow - state.ws.send(JSON.stringify({ type: 'sftp_unlink_result', payload: { connectionId, requestId, success: false /* Placeholder */ } })); + console.debug(`[SFTP ${sessionId}] Received unlink request for ${path} (ID: ${requestId})`); + try { + state.sftp.unlink(path, (err) => { + if (err) { + console.error(`[SFTP ${sessionId}] unlink ${path} failed (ID: ${requestId}):`, err); + state.ws.send(JSON.stringify({ type: 'sftp:unlink:error', path: path, payload: `删除文件失败: ${err.message}`, requestId: requestId })); + } else { + console.log(`[SFTP ${sessionId}] unlink ${path} success (ID: ${requestId})`); + state.ws.send(JSON.stringify({ type: 'sftp:unlink:success', path: path, requestId: requestId })); // Send specific success type + } + }); + } catch (error: any) { + console.error(`[SFTP ${sessionId}] unlink ${path} caught unexpected error (ID: ${requestId}):`, error); + state.ws.send(JSON.stringify({ type: 'sftp:unlink:error', path: path, payload: `删除文件时发生意外错误: ${error.message}`, requestId: requestId })); + } } - async rename(connectionId: string, oldPath: string, newPath: string, requestId: string): Promise { - const state = this.clientStates.get(connectionId); + /** 重命名/移动文件或目录 */ + async rename(sessionId: string, oldPath: string, newPath: string, requestId: string): Promise { + const state = this.clientStates.get(sessionId); if (!state || !state.sftp) { - logger.warn(`[SFTP] SFTP not ready for rename on ${connectionId}`); - state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } })); + console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 rename (ID: ${requestId})`); + state?.ws.send(JSON.stringify({ type: 'sftp:rename:error', oldPath: oldPath, newPath: newPath, payload: 'SFTP 会话未就绪', requestId: requestId })); // Use specific error type return; } - logger.debug(`[SFTP] Received rename request for ${connectionId}: ${oldPath} -> ${newPath}`); - // Implementation to follow - state.ws.send(JSON.stringify({ type: 'sftp_rename_result', payload: { connectionId, requestId, success: false /* Placeholder */ } })); + console.debug(`[SFTP ${sessionId}] Received rename request ${oldPath} -> ${newPath} (ID: ${requestId})`); + try { + state.sftp.rename(oldPath, newPath, (err) => { + if (err) { + console.error(`[SFTP ${sessionId}] rename ${oldPath} -> ${newPath} failed (ID: ${requestId}):`, err); + state.ws.send(JSON.stringify({ type: 'sftp:rename:error', oldPath: oldPath, newPath: newPath, payload: `重命名/移动失败: ${err.message}`, requestId: requestId })); + } else { + console.log(`[SFTP ${sessionId}] rename ${oldPath} -> ${newPath} success (ID: ${requestId})`); + state.ws.send(JSON.stringify({ type: 'sftp:rename:success', oldPath: oldPath, newPath: newPath, requestId: requestId })); // Send specific success type + } + }); + } catch (error: any) { + console.error(`[SFTP ${sessionId}] rename ${oldPath} -> ${newPath} caught unexpected error (ID: ${requestId}):`, error); + state.ws.send(JSON.stringify({ type: 'sftp:rename:error', oldPath: oldPath, newPath: newPath, payload: `重命名/移动时发生意外错误: ${error.message}`, requestId: requestId })); + } } - async chmod(connectionId: string, path: string, mode: number, requestId: string): Promise { - const state = this.clientStates.get(connectionId); + /** 修改文件/目录权限 */ + async chmod(sessionId: string, path: string, mode: number, requestId: string): Promise { + const state = this.clientStates.get(sessionId); if (!state || !state.sftp) { - logger.warn(`[SFTP] SFTP not ready for chmod on ${connectionId}`); - state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } })); + console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 chmod (ID: ${requestId})`); + state?.ws.send(JSON.stringify({ type: 'sftp:chmod:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); // Use specific error type return; } - logger.debug(`[SFTP] Received chmod request for ${connectionId}:${path} to mode ${mode.toString(8)}`); - // Implementation to follow - state.ws.send(JSON.stringify({ type: 'sftp_chmod_result', payload: { connectionId, requestId, success: false /* Placeholder */ } })); + console.debug(`[SFTP ${sessionId}] Received chmod request for ${path} to ${mode.toString(8)} (ID: ${requestId})`); + try { + state.sftp.chmod(path, mode, (err) => { + if (err) { + console.error(`[SFTP ${sessionId}] chmod ${path} to ${mode.toString(8)} failed (ID: ${requestId}):`, err); + state.ws.send(JSON.stringify({ type: 'sftp:chmod:error', path: path, payload: `修改权限失败: ${err.message}`, requestId: requestId })); + } else { + console.log(`[SFTP ${sessionId}] chmod ${path} to ${mode.toString(8)} success (ID: ${requestId})`); + state.ws.send(JSON.stringify({ type: 'sftp:chmod:success', path: path, requestId: requestId })); // Send specific success type + } + }); + } catch (error: any) { + console.error(`[SFTP ${sessionId}] chmod ${path} caught unexpected error (ID: ${requestId}):`, error); + state.ws.send(JSON.stringify({ type: 'sftp:chmod:error', path: path, payload: `修改权限时发生意外错误: ${error.message}`, requestId: requestId })); + } } // TODO: Implement file upload/download logic with progress reporting diff --git a/packages/backend/src/services/ssh.service.ts b/packages/backend/src/services/ssh.service.ts index 2e3f37c..7f8f057 100644 --- a/packages/backend/src/services/ssh.service.ts +++ b/packages/backend/src/services/ssh.service.ts @@ -1,45 +1,15 @@ -import { Client, ClientChannel, ConnectConfig } from 'ssh2'; // Import ClientChannel and ConnectConfig -import { SocksClient, SocksClientOptions } from 'socks'; // Import SocksClientOptions +import { Client, ClientChannel, ConnectConfig } from 'ssh2'; +import { SocksClient, SocksClientOptions } from 'socks'; import http from 'http'; import net from 'net'; -import WebSocket from 'ws'; // Import WebSocket for type hint import * as ConnectionRepository from '../repositories/connection.repository'; import { decrypt } from '../utils/crypto'; -// Import SftpService if needed later for initialization -// import * as SftpService from './sftp.service'; -// Import StatusMonitorService if needed later for initialization -// import * as StatusMonitorService from './status-monitor.service'; - const CONNECT_TIMEOUT = 20000; // 连接超时时间 (毫秒) const TEST_TIMEOUT = 15000; // 测试连接超时时间 (毫秒) -// Define AuthenticatedWebSocket interface (or import from websocket.ts if refactored there) -// This is needed to associate SSH clients with specific WS connections -interface AuthenticatedWebSocket extends WebSocket { - isAlive?: boolean; - userId?: number; - username?: string; - // sshClient?: Client; // Managed by the service now - // sshShellStream?: ClientChannel; // Managed by the service now -} - -// Structure to hold active SSH connection details managed by this service -interface ActiveSshSession { - client: Client; - shell: ClientChannel; - // sftp?: SFTPWrapper; // SFTP will be managed by SftpService - // statusIntervalId?: NodeJS.Timeout; // Status polling managed by StatusMonitorService - connectionInfo: DecryptedConnectionDetails; // Store connection info for context (Fix typo) -} - -// Map to store active sessions associated with WebSocket clients -const activeSessions = new Map(); - - -// 辅助接口:定义解密后的凭证和代理信息结构 (可以共享到 types 文件) -// Renamed to avoid conflict if imported later -interface DecryptedConnectionDetails { +// 辅助接口:定义解密后的凭证和代理信息结构 (导出以便 websocket.ts 使用) +export interface DecryptedConnectionDetails { id: number; name: string; host: string; @@ -57,30 +27,27 @@ interface DecryptedConnectionDetails { port: number; username?: string; password?: string; // Decrypted - auth_method?: string; // Proxy auth method - privateKey?: string; // Decrypted proxy key - passphrase?: string; // Decrypted proxy passphrase + // auth_method?: string; // Proxy auth method (如果需要可以保留) + // privateKey?: string; // Decrypted proxy key (如果需要可以保留) + // passphrase?: string; // Decrypted proxy passphrase (如果需要可以保留) } | null; } /** - * 测试给定 ID 的 SSH 连接(包括代理) + * 获取并解密指定 ID 的完整连接信息(包括代理) * @param connectionId 连接 ID - * @returns Promise - 如果连接成功则 resolve,否则 reject - * @throws Error 如果连接失败或配置错误 + * @returns Promise 解密后的连接详情 + * @throws Error 如果连接配置未找到或解密失败 */ -export const testConnection = async (connectionId: number): Promise => { - console.log(`SshService: Testing connection ${connectionId}...`); - // 1. 获取完整的连接信息(包括加密凭证和代理信息) - const rawConnInfo = await ConnectionRepository.findFullConnectionById(connectionId); // Assuming this fetches proxy details too +export const getConnectionDetails = async (connectionId: number): Promise => { + console.log(`SshService: 获取连接 ${connectionId} 的详细信息...`); + const rawConnInfo = await ConnectionRepository.findFullConnectionById(connectionId); if (!rawConnInfo) { - throw new Error('连接配置未找到。'); + throw new Error(`连接配置 ID ${connectionId} 未找到。`); } - // 2. 解密凭证并构建结构化的连接信息 - let fullConnInfo: DecryptedConnectionDetails; try { - fullConnInfo = { + const fullConnInfo: DecryptedConnectionDetails = { id: rawConnInfo.id, name: rawConnInfo.name, host: rawConnInfo.host, @@ -101,99 +68,91 @@ export const testConnection = async (connectionId: number): Promise => { host: rawConnInfo.proxy_host, port: rawConnInfo.proxy_port, username: rawConnInfo.proxy_username || undefined, - auth_method: rawConnInfo.proxy_auth_method, // Include proxy auth method password: rawConnInfo.proxy_encrypted_password ? decrypt(rawConnInfo.proxy_encrypted_password) : undefined, - privateKey: rawConnInfo.proxy_encrypted_private_key ? decrypt(rawConnInfo.proxy_encrypted_private_key) : undefined, // Decrypt proxy key - passphrase: rawConnInfo.proxy_encrypted_passphrase ? decrypt(rawConnInfo.proxy_encrypted_passphrase) : undefined, // Decrypt proxy passphrase + // 可以根据需要解密代理的其他凭证 }; } + console.log(`SshService: 连接 ${connectionId} 的详细信息获取并解密成功。`); + return fullConnInfo; } catch (decryptError: any) { - console.error(`Service: 处理连接 ${connectionId} 凭证或代理凭证失败:`, decryptError); + console.error(`SshService: 处理连接 ${connectionId} 凭证或代理凭证失败:`, decryptError); throw new Error(`处理凭证失败: ${decryptError.message}`); } - - // 3. 构建 ssh2 连接配置 - const connectConfig: ConnectConfig = { // Use ConnectConfig type - host: fullConnInfo.host, - port: fullConnInfo.port, - username: fullConnInfo.username, - password: fullConnInfo.password, - privateKey: fullConnInfo.privateKey, - passphrase: fullConnInfo.passphrase, - readyTimeout: TEST_TIMEOUT, - keepaliveInterval: 0, // 测试连接不需要 keepalive - }; - - // 4. 应用代理配置并执行连接 (Refactored into helper) - const sshClient = new Client(); - try { - await establishSshConnection(sshClient, connectConfig, fullConnInfo.proxy); // Use helper - console.log(`SshService: Test connection ${connectionId} successful.`); - // Test successful, void promise resolves implicitly - } catch (error) { - console.error(`SshService: Test connection ${connectionId} failed:`, error); - throw error; // Re-throw the specific error - } finally { - // 无论成功失败,都关闭 SSH 客户端 - sshClient.end(); - console.log(`SshService: Test connection ${connectionId} client closed.`); - } }; - -// --- NEW FUNCTIONS FOR MANAGING LIVE CONNECTIONS --- - /** - * Establishes an SSH connection, handling proxies. - * Internal helper function. - * @param sshClient - The ssh2 Client instance. - * @param connectConfig - Base SSH connection config. - * @param proxyInfo - Optional proxy details. - * @returns Promise that resolves when SSH is ready, or rejects on error. + * 根据解密后的连接详情建立 SSH 连接(处理代理) + * @param connDetails - 解密后的连接详情 + * @param timeout - 连接超时时间 (毫秒),可选 + * @returns Promise 连接成功的 SSH Client 实例 + * @throws Error 如果连接失败 */ -const establishSshConnection = ( - sshClient: Client, - connectConfig: ConnectConfig, - proxyInfo: DecryptedConnectionDetails['proxy'] -): Promise => { +export const establishSshConnection = ( + connDetails: DecryptedConnectionDetails, + timeout: number = CONNECT_TIMEOUT +): Promise => { return new Promise((resolve, reject) => { - const readyHandler = () => { - sshClient.removeListener('error', errorHandler); // Clean up error listener on success - resolve(); + const sshClient = new Client(); + + const connectConfig: ConnectConfig = { + host: connDetails.host, + port: connDetails.port, + username: connDetails.username, + password: connDetails.password, + privateKey: connDetails.privateKey, + passphrase: connDetails.passphrase, + readyTimeout: timeout, + keepaliveInterval: 30000, // 保持连接 + keepaliveCountMax: 3, }; + + const readyHandler = () => { + console.log(`SshService: SSH 连接到 ${connDetails.host}:${connDetails.port} (ID: ${connDetails.id}) 成功。`); + sshClient.removeListener('error', errorHandler); // 成功后移除错误监听器 + resolve(sshClient); // 返回 Client 实例 + }; + const errorHandler = (err: Error) => { - sshClient.removeListener('ready', readyHandler); // Clean up ready listener on error - reject(err); // Reject with the specific error + console.error(`SshService: SSH 连接到 ${connDetails.host}:${connDetails.port} (ID: ${connDetails.id}) 失败:`, err); + sshClient.removeListener('ready', readyHandler); // 失败后移除成功监听器 + sshClient.end(); // 确保关闭客户端 + reject(err); }; sshClient.once('ready', readyHandler); - sshClient.once('error', errorHandler); // Generic error handler for direct connect issues + sshClient.once('error', errorHandler); - if (proxyInfo) { - const proxy = proxyInfo; - console.log(`SshService: Applying proxy ${proxy.name} (${proxy.type})`); + // --- 处理代理 --- + if (connDetails.proxy) { + const proxy = connDetails.proxy; + console.log(`SshService: 应用代理 ${proxy.name} (${proxy.type}) 连接到 ${connDetails.host}:${connDetails.port}`); if (proxy.type === 'SOCKS5') { const socksOptions: SocksClientOptions = { - proxy: { host: proxy.host, port: proxy.port, type: 5, userId: proxy.username, password: proxy.password }, // Type 5 is implicit + proxy: { host: proxy.host, port: proxy.port, type: 5, userId: proxy.username, password: proxy.password }, command: 'connect', - destination: { host: connectConfig.host!, port: connectConfig.port! }, // Use base config host/port - timeout: connectConfig.readyTimeout ?? CONNECT_TIMEOUT, // Use connection timeout + destination: { host: connectConfig.host!, port: connectConfig.port! }, + timeout: connectConfig.readyTimeout, }; SocksClient.createConnection(socksOptions) .then(({ socket }) => { - console.log(`SshService: SOCKS5 proxy connection successful.`); + console.log(`SshService: SOCKS5 代理连接成功 (目标: ${connDetails.host}:${connDetails.port})。`); connectConfig.sock = socket; - sshClient.connect(connectConfig); // Connect SSH via proxy socket + sshClient.connect(connectConfig); }) .catch(socksError => { - console.error(`SshService: SOCKS5 proxy connection failed:`, socksError); - // Reject the main promise, remove listeners handled by errorHandler - errorHandler(new Error(`SOCKS5 代理连接失败: ${socksError.message}`)); + errorHandler(new Error(`SOCKS5 代理 ${proxy.host}:${proxy.port} 连接失败: ${socksError.message}`)); }); } else if (proxy.type === 'HTTP') { - console.log(`SshService: Attempting HTTP proxy tunnel via ${proxy.host}:${proxy.port}...`); - const reqOptions: http.RequestOptions = { method: 'CONNECT', host: proxy.host, port: proxy.port, path: `${connectConfig.host}:${connectConfig.port}`, timeout: connectConfig.readyTimeout ?? CONNECT_TIMEOUT, agent: false }; + console.log(`SshService: 尝试通过 HTTP 代理 ${proxy.host}:${proxy.port} 建立隧道到 ${connDetails.host}:${connDetails.port}...`); + const reqOptions: http.RequestOptions = { + method: 'CONNECT', + host: proxy.host, + port: proxy.port, + path: `${connectConfig.host}:${connectConfig.port}`, + timeout: connectConfig.readyTimeout, + agent: false + }; if (proxy.username) { const auth = 'Basic ' + Buffer.from(proxy.username + ':' + (proxy.password || '')).toString('base64'); reqOptions.headers = { ...reqOptions.headers, 'Proxy-Authorization': auth, 'Proxy-Connection': 'Keep-Alive', 'Host': `${connectConfig.host}:${connectConfig.port}` }; @@ -201,233 +160,86 @@ const establishSshConnection = ( const req = http.request(reqOptions); req.on('connect', (res, socket, head) => { if (res.statusCode === 200) { - console.log(`SshService: HTTP proxy tunnel established.`); + console.log(`SshService: HTTP 代理隧道建立成功 (目标: ${connDetails.host}:${connDetails.port})。`); connectConfig.sock = socket; - sshClient.connect(connectConfig); // Connect SSH via tunnel socket + sshClient.connect(connectConfig); } else { - console.error(`SshService: HTTP proxy CONNECT request failed, status code: ${res.statusCode}`); socket.destroy(); - errorHandler(new Error(`HTTP 代理连接失败 (状态码: ${res.statusCode})`)); // Reject main promise - } // <-- Added missing closing parenthesis here + errorHandler(new Error(`HTTP 代理 ${proxy.host}:${proxy.port} 连接失败 (状态码: ${res.statusCode})`)); + } }); req.on('error', (err) => { - console.error(`SshService: HTTP proxy request error:`, err); - errorHandler(new Error(`HTTP 代理连接错误: ${err.message}`)); // Reject main promise + errorHandler(new Error(`HTTP 代理 ${proxy.host}:${proxy.port} 请求错误: ${err.message}`)); }); req.on('timeout', () => { - console.error(`SshService: HTTP proxy request timeout.`); req.destroy(); - errorHandler(new Error('HTTP 代理连接超时')); // Reject main promise + errorHandler(new Error(`HTTP 代理 ${proxy.host}:${proxy.port} 连接超时`)); }); - req.end(); // Send the CONNECT request + req.end(); } else { - errorHandler(new Error(`不支持的代理类型: ${proxy.type}`)); // Reject main promise + errorHandler(new Error(`不支持的代理类型: ${proxy.type}`)); } } else { - // No proxy, connect directly - console.log(`SshService: No proxy detected, connecting directly...`); + // 无代理,直接连接 + console.log(`SshService: 无代理,直接连接到 ${connDetails.host}:${connDetails.port}`); sshClient.connect(connectConfig); } }); }; +/** + * 在已连接的 SSH Client 上打开 Shell 通道 + * @param sshClient - 已连接的 SSH Client 实例 + * @returns Promise Shell 通道实例 + * @throws Error 如果打开 Shell 失败 + */ +export const openShell = (sshClient: Client): Promise => { + return new Promise((resolve, reject) => { + sshClient.shell((err, stream) => { + if (err) { + console.error(`SshService: 打开 Shell 失败:`, err); + return reject(new Error(`打开 Shell 失败: ${err.message}`)); + } + console.log(`SshService: Shell 通道已打开。`); + resolve(stream); + }); + }); +}; /** - * Connects to SSH, opens a shell, and sets up event forwarding via WebSocket. - * @param connectionId - The ID of the connection config in the database. - * @param ws - The authenticated WebSocket client instance. + * 测试给定 ID 的 SSH 连接(包括代理) + * @param connectionId 连接 ID + * @returns Promise - 如果连接成功则 resolve,否则 reject + * @throws Error 如果连接失败或配置错误 */ -export const connectAndOpenShell = async (connectionId: number, ws: AuthenticatedWebSocket): Promise => { - console.log(`SshService: User ${ws.username} requested connection to ID: ${connectionId}`); - if (activeSessions.has(ws)) { - console.warn(`SshService: User ${ws.username} already has an active session.`); - throw new Error('已存在活动的 SSH 连接。'); - } - - ws.send(JSON.stringify({ type: 'ssh:status', payload: '正在获取连接信息...' })); - - // 1. Get connection info - const rawConnInfo = await ConnectionRepository.findFullConnectionById(connectionId); - if (!rawConnInfo) { - throw new Error('连接配置未找到。'); - } - - // 2. Decrypt and prepare connection details - let fullConnInfo: DecryptedConnectionDetails; +export const testConnection = async (connectionId: number): Promise => { + console.log(`SshService: 测试连接 ${connectionId}...`); + let sshClient: Client | null = null; try { - // (Decryption logic similar to testConnection, could be refactored) - fullConnInfo = { /* ... decryption ... */ - id: rawConnInfo.id, name: rawConnInfo.name, host: rawConnInfo.host, port: rawConnInfo.port, username: rawConnInfo.username, auth_method: rawConnInfo.auth_method, - password: (rawConnInfo.auth_method === 'password' && rawConnInfo.encrypted_password) ? decrypt(rawConnInfo.encrypted_password) : undefined, - privateKey: (rawConnInfo.auth_method === 'key' && rawConnInfo.encrypted_private_key) ? decrypt(rawConnInfo.encrypted_private_key) : undefined, - passphrase: (rawConnInfo.auth_method === 'key' && rawConnInfo.encrypted_passphrase) ? decrypt(rawConnInfo.encrypted_passphrase) : undefined, - proxy: null, - }; - if (rawConnInfo.proxy_db_id) { - fullConnInfo.proxy = { /* ... proxy decryption ... */ - id: rawConnInfo.proxy_db_id, name: rawConnInfo.proxy_name, type: rawConnInfo.proxy_type, host: rawConnInfo.proxy_host, port: rawConnInfo.proxy_port, username: rawConnInfo.proxy_username || undefined, auth_method: rawConnInfo.proxy_auth_method, - password: rawConnInfo.proxy_encrypted_password ? decrypt(rawConnInfo.proxy_encrypted_password) : undefined, - privateKey: rawConnInfo.proxy_encrypted_private_key ? decrypt(rawConnInfo.proxy_encrypted_private_key) : undefined, - passphrase: rawConnInfo.proxy_encrypted_passphrase ? decrypt(rawConnInfo.proxy_encrypted_passphrase) : undefined, - }; + // 1. 获取并解密连接信息 + const connDetails = await getConnectionDetails(connectionId); + + // 2. 尝试建立连接 (使用较短的测试超时时间) + sshClient = await establishSshConnection(connDetails, TEST_TIMEOUT); + + console.log(`SshService: 测试连接 ${connectionId} 成功。`); + // 测试成功,Promise 自动 resolve void + } catch (error) { + console.error(`SshService: 测试连接 ${connectionId} 失败:`, error); + throw error; // 将错误向上抛出 + } finally { + // 无论成功失败,都关闭 SSH 客户端 + if (sshClient) { + sshClient.end(); + console.log(`SshService: 测试连接 ${connectionId} 的客户端已关闭。`); } - } catch (decryptError: any) { - console.error(`SshService: Handling credentials failed for ${connectionId}:`, decryptError); - throw new Error(`无法处理连接凭证: ${decryptError.message}`); - } - - // 3. Prepare SSH config - const connectConfig: ConnectConfig = { - host: fullConnInfo.host, - port: fullConnInfo.port, - username: fullConnInfo.username, - password: fullConnInfo.password, - privateKey: fullConnInfo.privateKey, - passphrase: fullConnInfo.passphrase, - readyTimeout: CONNECT_TIMEOUT, - keepaliveInterval: 30000, - keepaliveCountMax: 3, - }; - - // 4. Establish connection and open shell - const sshClient = new Client(); - - // Generic error/close handlers for the client - const clientCloseHandler = () => { - console.log(`SshService: SSH client for ${ws.username} closed.`); - if (activeSessions.has(ws)) { // Check if cleanup wasn't already called - if (!ws.CLOSED && !ws.CLOSING) { - ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'SSH 连接已关闭。' })); - } - cleanupConnection(ws); // Ensure cleanup - } - }; - const clientErrorHandler = (err: Error) => { - console.error(`SshService: SSH client error for ${ws.username}:`, err); - if (activeSessions.has(ws)) { // Check if cleanup wasn't already called - if (!ws.CLOSED && !ws.CLOSING) { - ws.send(JSON.stringify({ type: 'ssh:error', payload: `SSH 连接错误: ${err.message}` })); - } - cleanupConnection(ws); // Ensure cleanup - } - }; - sshClient.on('close', clientCloseHandler); - sshClient.on('error', clientErrorHandler); - - - try { - ws.send(JSON.stringify({ type: 'ssh:status', payload: `正在连接到 ${fullConnInfo.host}...` })); - await establishSshConnection(sshClient, connectConfig, fullConnInfo.proxy); // Use helper - - ws.send(JSON.stringify({ type: 'ssh:status', payload: 'SSH 连接成功,正在打开 Shell...' })); - - // 5. Open Shell Stream - const shellStream = await new Promise((resolve, reject) => { - sshClient.shell((err, stream) => { - if (err) { - console.error(`SshService: User ${ws.username} failed to open shell:`, err); - return reject(new Error(`打开 Shell 失败: ${err.message}`)); - } - console.log(`SshService: User ${ws.username} shell channel opened.`); - resolve(stream); - }); - }); - - // 6. Store active session - const session: ActiveSshSession = { client: sshClient, shell: shellStream, connectionInfo: fullConnInfo }; - activeSessions.set(ws, session); - console.log(`SshService: Active session stored for ${ws.username}.`); - - // 7. Setup event forwarding for the shell stream - shellStream.on('data', (data: Buffer) => { - if (ws.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' })); - } - }); - shellStream.stderr.on('data', (data: Buffer) => { - console.error(`SSH Stderr (${ws.username}): ${data.toString('utf8').substring(0,100)}...`); - if (ws.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' })); // Send stderr as output - } - }); - shellStream.on('close', () => { - console.log(`SshService: Shell stream for ${ws.username} closed.`); - if (activeSessions.has(ws)) { // Check if cleanup wasn't already called by client close - if (!ws.CLOSED && !ws.CLOSING) { - ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'Shell 通道已关闭。' })); - } - cleanupConnection(ws); // Trigger cleanup if shell closes independently - } - }); - - // 8. Initialize SFTP (TODO: Move to SftpService) and Status Polling (TODO: Move to StatusMonitorService) - // For now, just notify connection success - ws.send(JSON.stringify({ type: 'ssh:connected' })); - - // TODO: Call SftpService.initializeSftpSession(ws, sshClient); - // TODO: Call StatusMonitorService.startStatusPolling(ws, sshClient); - - - } catch (error: any) { - console.error(`SshService: Failed to connect or open shell for ${ws.username}:`, error); - // Ensure client listeners are removed and client is ended on failure - sshClient.removeListener('close', clientCloseHandler); - sshClient.removeListener('error', clientErrorHandler); - sshClient.end(); - cleanupConnection(ws); // Clean up any partial state - throw error; // Re-throw for the controller } }; -/** - * Sends input data to the SSH shell stream associated with a WebSocket connection. - * @param ws - The authenticated WebSocket client. - * @param data - The data string to send. - */ -export const sendInput = (ws: AuthenticatedWebSocket, data: string): void => { - const session = activeSessions.get(ws); - if (session?.shell && session.shell.writable) { - session.shell.write(data); - } else { - console.warn(`SshService: Cannot send input for ${ws.username}, no active/writable shell stream found.`); - // Optionally notify the client ws.send(...) - } -}; - -/** - * Resizes the pseudo-terminal associated with a WebSocket connection. - * @param ws - The authenticated WebSocket client. - * @param cols - Terminal width in columns. - * @param rows - Terminal height in rows. - */ -export const resizeTerminal = (ws: AuthenticatedWebSocket, cols: number, rows: number): void => { - const session = activeSessions.get(ws); - if (session?.shell) { - console.log(`SshService: Resizing terminal for ${ws.username} to ${cols}x${rows}`); - session.shell.setWindow(rows, cols, 0, 0); // Note: rows, cols order - } else { - console.warn(`SshService: Cannot resize terminal for ${ws.username}, no active shell stream found.`); - } -}; - -/** - * Cleans up SSH resources associated with a WebSocket connection. - * @param ws - The authenticated WebSocket client. - */ -export const cleanupConnection = (ws: AuthenticatedWebSocket): void => { - const session = activeSessions.get(ws); - if (session) { - console.log(`SshService: Cleaning up SSH session for ${ws.username}...`); - // TODO: Call StatusMonitorService.stopStatusPolling(ws); - // TODO: Call SftpService.cleanupSftpSession(ws); - - // End streams and client - session.shell?.end(); // End the shell stream first - session.client?.end(); // End the main SSH client connection - - activeSessions.delete(ws); // Remove from active sessions map - console.log(`SshService: SSH session for ${ws.username} cleaned up.`); - } else { - // console.log(`SshService: No active SSH session found for ${ws.username} during cleanup.`); - } -}; +// --- 移除旧的函数 --- +// - connectAndOpenShell +// - sendInput +// - resizeTerminal +// - cleanupConnection +// - activeSessions Map +// - AuthenticatedWebSocket interface (如果仅在此文件使用) diff --git a/packages/backend/src/services/status-monitor.service.ts b/packages/backend/src/services/status-monitor.service.ts new file mode 100644 index 0000000..7c4ca61 --- /dev/null +++ b/packages/backend/src/services/status-monitor.service.ts @@ -0,0 +1,318 @@ +import { Client } from 'ssh2'; +import { WebSocket } from 'ws'; +import { ClientState } from '../websocket'; // 导入统一的 ClientState + +// 定义服务器状态的数据结构 (与前端 StatusMonitor.vue 匹配) +interface ServerStatus { + cpuPercent?: number; + memPercent?: number; + memUsed?: number; // MB + memTotal?: number; // MB + swapPercent?: number; + swapUsed?: number; // MB + swapTotal?: number; // MB + diskPercent?: number; + diskUsed?: number; // KB + diskTotal?: number; // KB + cpuModel?: string; + netRxRate?: number; // Bytes per second + netTxRate?: number; // Bytes per second + netInterface?: string; + osName?: string; + loadAvg?: number[]; // 系统平均负载 [1min, 5min, 15min] + timestamp: number; // 状态获取时间戳 +} + +// Interface for parsed network stats +interface NetworkStats { + [interfaceName: string]: { + rx_bytes: number; + tx_bytes: number; + } +} + +const DEFAULT_POLLING_INTERVAL = 1000; // 修改为 1 秒轮询间隔 (毫秒) +// 用于存储上一次的网络统计信息以计算速率 +const previousNetStats = new Map(); + +export class StatusMonitorService { + private clientStates: Map; // 使用导入的 ClientState + + constructor(clientStates: Map) { + this.clientStates = clientStates; + } + + /** + * 启动指定会话的状态轮询 + * @param sessionId 会话 ID + * @param interval 轮询间隔 (毫秒),可选,默认为 DEFAULT_POLLING_INTERVAL + */ + startStatusPolling(sessionId: string, interval: number = DEFAULT_POLLING_INTERVAL): void { + const state = this.clientStates.get(sessionId); + if (!state || !state.sshClient) { + console.warn(`[StatusMonitor] 无法为会话 ${sessionId} 启动状态轮询:状态无效或 SSH 客户端不存在。`); + return; + } + if (state.statusIntervalId) { + console.warn(`[StatusMonitor] 会话 ${sessionId} 的状态轮询已在运行中。`); + return; + } + console.log(`[StatusMonitor] 为会话 ${sessionId} 启动状态轮询,间隔 ${interval}ms`); + this.fetchAndSendServerStatus(sessionId); // 立即执行一次 + state.statusIntervalId = setInterval(() => { + this.fetchAndSendServerStatus(sessionId); + }, interval); + } + + /** + * 停止指定会话的状态轮询 + * @param sessionId 会话 ID + */ + stopStatusPolling(sessionId: string): void { + const state = this.clientStates.get(sessionId); + if (state?.statusIntervalId) { + console.log(`[StatusMonitor] 停止会话 ${sessionId} 的状态轮询。`); + clearInterval(state.statusIntervalId); + state.statusIntervalId = undefined; + previousNetStats.delete(sessionId); // 清理网络统计缓存 + } + } + + /** + * 获取并发送服务器状态给客户端 + * @param sessionId 会话 ID + */ + private async fetchAndSendServerStatus(sessionId: string): Promise { + const state = this.clientStates.get(sessionId); + if (!state || !state.sshClient || state.ws.readyState !== WebSocket.OPEN) { + console.warn(`[StatusMonitor] 无法获取会话 ${sessionId} 的状态,停止轮询。原因:状态无效、SSH断开或WS关闭。`); + this.stopStatusPolling(sessionId); + return; + } + try { + // 传递 sessionId 给 fetchServerStatus 以便查找 previousNetStats + const status = await this.fetchServerStatus(state.sshClient, sessionId); + state.ws.send(JSON.stringify({ type: 'status_update', payload: { connectionId: state.dbConnectionId, status } })); + } catch (error: any) { + console.error(`[StatusMonitor] 获取会话 ${sessionId} 服务器状态失败:`, error); + state.ws.send(JSON.stringify({ type: 'status_error', payload: { connectionId: state.dbConnectionId, message: `获取状态失败: ${error.message}` } })); + } + } + + /** + * 通过 SSH 执行命令获取服务器状态信息 + * @param sshClient SSH 客户端实例 + * @param sessionId 当前会话 ID,用于网络速率计算 + * @returns Promise 服务器状态信息 + */ + private async fetchServerStatus(sshClient: Client, sessionId: string): Promise { + console.debug(`[StatusMonitor ${sessionId}] Fetching server status...`); + const timestamp = Date.now(); + let status: Partial = { timestamp }; + + try { + // --- OS Name --- + try { + const osReleaseOutput = await this.executeSshCommand(sshClient, 'cat /etc/os-release'); + const nameMatch = osReleaseOutput.match(/^PRETTY_NAME="?([^"]+)"?/m); + status.osName = nameMatch ? nameMatch[1] : (osReleaseOutput.match(/^NAME="?([^"]+)"?/m)?.[1] ?? 'Unknown'); + } catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get OS name:`, err); } + + // --- CPU Model --- + try { + const lscpuOutput = await this.executeSshCommand(sshClient, "lscpu | grep 'Model name:'"); + status.cpuModel = lscpuOutput.match(/Model name:\s+(.*)/)?.[1].trim() ?? 'Unknown'; + } catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get CPU model:`, err); } + + // --- Memory and Swap --- + try { + const freeOutput = await this.executeSshCommand(sshClient, 'free -m'); + const lines = freeOutput.split('\n'); + const memLine = lines.find(line => line.startsWith('Mem:')); + const swapLine = lines.find(line => line.startsWith('Swap:')); + if (memLine) { + const parts = memLine.split(/\s+/); + if (parts.length >= 4) { + const total = parseInt(parts[1], 10); + const used = parseInt(parts[2], 10); + if (!isNaN(total) && !isNaN(used)) { + status.memTotal = total; status.memUsed = used; + status.memPercent = total > 0 ? parseFloat(((used / total) * 100).toFixed(1)) : 0; + } + } + } + if (swapLine) { + const parts = swapLine.split(/\s+/); + if (parts.length >= 4) { + const total = parseInt(parts[1], 10); + const used = parseInt(parts[2], 10); + if (!isNaN(total) && !isNaN(used)) { + status.swapTotal = total; status.swapUsed = used; + status.swapPercent = total > 0 ? parseFloat(((used / total) * 100).toFixed(1)) : 0; + } + } + } else { status.swapTotal = 0; status.swapUsed = 0; status.swapPercent = 0; } + } catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get memory/swap usage:`, err); } + + // --- Disk Usage (Root Partition) --- + try { + const dfOutput = await this.executeSshCommand(sshClient, "df -k / | tail -n 1"); + const parts = dfOutput.split(/\s+/); + if (parts.length >= 5) { + const total = parseInt(parts[1], 10); const used = parseInt(parts[2], 10); + const percentMatch = parts[4].match(/(\d+)%/); + if (!isNaN(total) && !isNaN(used) && percentMatch) { + status.diskTotal = total; status.diskUsed = used; + status.diskPercent = parseFloat(percentMatch[1]); + } + } + } catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get disk usage:`, err); } + + // --- CPU Usage (Simplified from top) --- + try { + const topOutput = await this.executeSshCommand(sshClient, "top -bn1 | grep '%Cpu(s)' | head -n 1"); + const idleMatch = topOutput.match(/(\d+\.?\d*)\s+id/); // Adjusted regex for float + if (idleMatch) { + const idlePercent = parseFloat(idleMatch[1]); + status.cpuPercent = parseFloat((100 - idlePercent).toFixed(1)); + } + } catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get CPU usage from top:`, err); } + + // --- Load Average --- + try { + const uptimeOutput = await this.executeSshCommand(sshClient, 'uptime'); + const match = uptimeOutput.match(/load average(?:s)?:\s*([\d.]+)[, ]?\s*([\d.]+)[, ]?\s*([\d.]+)/); + if (match) status.loadAvg = [parseFloat(match[1]), parseFloat(match[2]), parseFloat(match[3])]; + } catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get uptime/load average:`, err); } + + // --- Network Rates --- + try { + const currentStats = await this.parseProcNetDev(sshClient); + if (currentStats) { + const defaultInterface = await this.getDefaultInterface(sshClient) || Object.keys(currentStats).find(iface => iface !== 'lo'); // Detect or fallback excluding loopback + + if (defaultInterface && currentStats[defaultInterface]) { + status.netInterface = defaultInterface; + const currentRx = currentStats[defaultInterface].rx_bytes; + const currentTx = currentStats[defaultInterface].tx_bytes; + const prevStats = previousNetStats.get(sessionId); + + if (prevStats && prevStats.timestamp < timestamp) { // Ensure time has passed + const timeDiffSeconds = (timestamp - prevStats.timestamp) / 1000; + if (timeDiffSeconds > 0.1) { // Avoid division by zero or tiny intervals + status.netRxRate = Math.max(0, Math.round((currentRx - prevStats.rx) / timeDiffSeconds)); + status.netTxRate = Math.max(0, Math.round((currentTx - prevStats.tx) / timeDiffSeconds)); + } else { status.netRxRate = 0; status.netTxRate = 0; } // Rate is 0 if interval too small + } else { status.netRxRate = 0; status.netTxRate = 0; } // First run or no time diff + + previousNetStats.set(sessionId, { rx: currentRx, tx: currentTx, timestamp }); + } else { console.warn(`[StatusMonitor ${sessionId}] Could not find stats for default interface ${defaultInterface}`); } + } + } catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get network stats:`, err); } + + } catch (error) { + console.error(`[StatusMonitor ${sessionId}] General error fetching server status:`, error); + } + + return status as ServerStatus; + } + + /** + * 解析 /proc/net/dev 的输出 + * @param sshClient SSH 客户端实例 + * @returns Promise 解析后的网络统计信息或 null + */ + private async parseProcNetDev(sshClient: Client): Promise { + try { + const output = await this.executeSshCommand(sshClient, 'cat /proc/net/dev'); + const lines = output.split('\n').slice(2); // Skip header lines + const stats: NetworkStats = {}; + for (const line of lines) { + const parts = line.trim().split(/:\s+|\s+/); + if (parts.length < 17) continue; // Need at least interface name + 16 stats + const interfaceName = parts[0]; + const rx_bytes = parseInt(parts[1], 10); + const tx_bytes = parseInt(parts[9], 10); // TX bytes is the 10th field (index 9) + if (!isNaN(rx_bytes) && !isNaN(tx_bytes)) { + stats[interfaceName] = { rx_bytes, tx_bytes }; + } + } + return Object.keys(stats).length > 0 ? stats : null; + } catch (error) { + console.error("[StatusMonitor] Error parsing /proc/net/dev:", error); + return null; + } + } + + /** + * 获取默认网络接口名称 (Linux specific) + * @param sshClient SSH 客户端实例 + * @returns Promise 默认接口名称或 null + */ + private async getDefaultInterface(sshClient: Client): Promise { + try { + // 使用 ip route 命令查找默认路由对应的接口 + const output = await this.executeSshCommand(sshClient, "ip route get 1.1.1.1 | grep -oP 'dev\\s+\\K\\S+'"); + return output.trim() || null; + } catch (error) { + console.warn("[StatusMonitor] Failed to get default interface using 'ip route':", error); + // Fallback: 尝试查找第一个非 lo 接口 + try { + const netDevOutput = await this.executeSshCommand(sshClient, 'cat /proc/net/dev'); + const lines = netDevOutput.split('\n').slice(2); + for (const line of lines) { + const iface = line.trim().split(':')[0]; + if (iface && iface !== 'lo') { + return iface; + } + } + } catch (fallbackError) { + console.error("[StatusMonitor] Failed to fallback to /proc/net/dev for interface:", fallbackError); + } + return null; + } + } + + /** + * 在 SSH 连接上执行单个命令 + * @param sshClient SSH 客户端实例 + * @param command 要执行的命令 + * @returns Promise 命令的标准输出 + * @throws Error 如果命令执行失败 + */ + private executeSshCommand(sshClient: Client, command: string): Promise { + return new Promise((resolve, reject) => { + let output = ''; + sshClient.exec(command, (err, stream) => { + if (err) { + return reject(new Error(`执行命令 '${command}' 失败: ${err.message}`)); + } + stream.on('close', (code: number, signal?: string) => { + // Don't reject on non-zero exit code, as some commands might return non-zero normally + // if (code !== 0) { + // console.warn(`[StatusMonitor] Command '${command}' exited with code ${code}`); + // } + resolve(output.trim()); + }).on('data', (data: Buffer) => { + output += data.toString('utf8'); + }).stderr.on('data', (data: Buffer) => { + console.warn(`[StatusMonitor] Command '${command}' stderr: ${data.toString('utf8').trim()}`); + }); + }); + }); + } + + /** + * 查找与给定 SSH 客户端关联的会话 ID (辅助函数) + * @param sshClientToFind 要查找的 SSH 客户端实例 + * @returns string | undefined 找到的会话 ID 或 undefined + */ + private findSessionIdForClient(sshClientToFind: Client): string | undefined { + for (const [sessionId, state] of this.clientStates.entries()) { + if (state.sshClient === sshClientToFind) { + return sessionId; + } + } + return undefined; + } +} diff --git a/packages/backend/src/websocket.ts b/packages/backend/src/websocket.ts index a6c506c..f76f919 100644 --- a/packages/backend/src/websocket.ts +++ b/packages/backend/src/websocket.ts @@ -1,441 +1,99 @@ import WebSocket, { WebSocketServer } from 'ws'; import http from 'http'; import { Request, RequestHandler } from 'express'; -import { Client, ClientChannel, SFTPWrapper, Stats } from 'ssh2'; // 引入 SFTPWrapper 和 Stats -import { WriteStream } from 'fs'; // 需要 WriteStream 类型 (虽然 ssh2 的流类型不同,但可以借用) -import { getDb } from './database'; // 引入数据库实例 - import { decrypt } from './utils/crypto'; // 引入解密函数 - import path from 'path'; // 需要 path - // import { HttpsProxyAgent } from 'https-proxy-agent'; // 不再直接使用 HttpsProxyAgent for SSH tunneling - import { SocksClient } from 'socks'; // 引入 SOCKS 代理支持 - // import http from 'http'; // 重复导入,保留上面的 - import net from 'net'; // 引入 net 用于 Socket 类型 +import { Client, ClientChannel } from 'ssh2'; +import { v4 as uuidv4 } from 'uuid'; // 用于生成唯一的会话 ID +import { getDb } from './database'; +import { decrypt } from './utils/crypto'; +import { SftpService } from './services/sftp.service'; // 引入 SftpService +import { StatusMonitorService } from './services/status-monitor.service'; // 引入 StatusMonitorService +import * as SshService from './services/ssh.service'; // 引入重构后的 SshService 函数 -// 扩展 WebSocket 类型以包含会话和 SSH/SFTP 连接信息 +// 扩展 WebSocket 类型以包含会话 ID interface AuthenticatedWebSocket extends WebSocket { isAlive?: boolean; userId?: number; username?: string; - sshClient?: Client; // 关联的 SSH Client 实例 - sshShellStream?: ClientChannel; // 关联的 SSH Shell Stream - sftpStream?: SFTPWrapper; // 关联的 SFTP Stream - statusIntervalId?: NodeJS.Timeout; // 用于存储状态轮询的 Interval ID + sessionId?: string; // 用于关联 ClientState 的唯一 ID } -// 存储活跃的 SSH/SFTP 连接 (导出以便其他模块访问) -export const activeSshConnections = new Map(); +import { SFTPWrapper } from 'ssh2'; // 引入 SFTPWrapper 类型 -// 存储正在进行的 SFTP 上传操作 (key: uploadId, value: WriteStream) -// 注意:WriteStream 类型来自 'fs',但 ssh2 的流行为类似 -const activeUploads = new Map(); - -// 数据库连接信息接口 (包含所有可能的凭证字段和 proxy_id) -interface DbConnectionInfo { - id: number; - name: string; - host: string; - port: number; - username: string; - auth_method: 'password' | 'key'; - encrypted_password?: string | null; - encrypted_private_key?: string | null; - encrypted_passphrase?: string | null; - proxy_id?: number | null; // 关联的代理 ID - // 其他字段... +// 中心化的客户端状态接口 (统一版本) +export interface ClientState { // 导出以便 Service 可以导入 + ws: AuthenticatedWebSocket; + sshClient: Client; + sshShellStream?: ClientChannel; + dbConnectionId: number; + sftp?: SFTPWrapper; // 添加 sftp 实例 (由 SftpService 管理) + statusIntervalId?: NodeJS.Timeout; // 添加状态轮询 ID (由 StatusMonitorService 管理) } -// 新增:数据库代理信息接口 -interface DbProxyInfo { - id: number; - name: string; - type: 'SOCKS5' | 'HTTP'; - host: string; - port: number; - username?: string | null; - encrypted_password?: string | null; -} +// 存储所有活动客户端的状态 (key: sessionId) +const clientStates = new Map(); +// --- 服务实例化 --- +// 将 clientStates 传递给需要访问共享状态的服务 +const sftpService = new SftpService(clientStates); // 移除 as any +const statusMonitorService = new StatusMonitorService(clientStates); // 移除 as any /** - * 清理指定 WebSocket 连接关联的 SSH 资源 - * @param ws - WebSocket 连接实例 + * 清理指定会话 ID 关联的所有资源 + * @param sessionId - 会话 ID */ -const cleanupSshConnection = (ws: AuthenticatedWebSocket) => { - const connection = activeSshConnections.get(ws); - if (connection) { - console.log(`WebSocket: 清理用户 ${ws.username} 的 SSH/SFTP 连接...`); - // 注意:SFTP 流通常不需要显式关闭,它依赖于 SSH Client 的关闭 - // connection.sftp?.end(); // SFTPWrapper 没有 end 方法 - connection.shell?.end(); // 尝试结束 shell 流 - // 清除状态轮询定时器 - if (connection.statusIntervalId) { - clearInterval(connection.statusIntervalId); - console.log(`WebSocket: 清理用户 ${ws.username} 的状态轮询定时器。`); - } - connection.client?.end(); // 结束 SSH 客户端连接会隐式关闭 SFTP - activeSshConnections.delete(ws); // 从 Map 中移除 - } -}; +const cleanupClientConnection = (sessionId: string | undefined) => { + if (!sessionId) return; -// --- 状态获取相关 --- -const STATUS_POLL_INTERVAL = 1000; // 每 5 秒获取一次状态 + const state = clientStates.get(sessionId); + if (state) { + console.log(`WebSocket: 清理会话 ${sessionId} (用户: ${state.ws.username}, DB 连接 ID: ${state.dbConnectionId})...`); -// Helper function to execute a command and return its stdout -const executeSshCommand = (client: Client, command: string): Promise => { - return new Promise((resolve, reject) => { - let output = ''; - let stderrOutput = ''; // Capture stderr too - client.exec(command, (err, stream) => { - if (err) { - console.error(`SSH Command (${command}) exec error:`, err); - return reject(err); // Reject on initial exec error - } - stream.on('data', (data: Buffer) => { - output += data.toString(); - }).stderr.on('data', (data: Buffer) => { - stderrOutput += data.toString(); // Capture stderr - // Log stderr as warning, but don't reject based on it unless needed - // console.warn(`SSH Command (${command}) stderr: ${data.toString().trim()}`); - }).on('close', (code: number | null | undefined, signal: string | null) => { - const trimmedOutput = output.trim(); - const trimmedStderr = stderrOutput.trim(); + // 1. 停止状态轮询 + statusMonitorService.stopStatusPolling(sessionId); - if (signal) { - console.error(`Command "${command}" terminated by signal: ${signal}. Stderr: ${trimmedStderr}`); - return reject(new Error(`Command "${command}" terminated by signal: ${signal}`)); - } + // 2. 清理 SFTP 会话 + sftpService.cleanupSftpSession(sessionId); - // **Crucial Change:** Prioritize resolving if we have ANY stdout, regardless of exit code. - if (trimmedOutput) { - if (code !== 0 && code != null) { - console.warn(`Command "${command}" exited with code ${code} but produced output. Resolving with output. Stderr: ${trimmedStderr}`); - } else if (code == null) { - console.warn(`Command "${command}" exited with code undefined but produced output. Resolving with output. Stderr: ${trimmedStderr}`); - } - return resolve(trimmedOutput); - } + // 3. 清理 SSH 连接 (调用 SshService 中的底层清理逻辑,或直接操作) + // SshService.cleanupConnection(state.ws); // 旧版 SshService 的清理方式,需要调整 + state.sshShellStream?.end(); // 结束 shell 流 + state.sshClient?.end(); // 结束 SSH 客户端 - // If NO stdout, then reject based on error code or lack thereof. - if (code !== 0 && code != null) { - console.error(`Command "${command}" failed with code ${code} and no output. Stderr: ${trimmedStderr}`); - return reject(new Error(`Command "${command}" failed with code ${code} and no output. Stderr: ${trimmedStderr}`)); - } - if (code == null) { - // This case now specifically means no output AND undefined code - likely a genuine failure - console.error(`Command "${command}" failed with code undefined and no output. Stderr: ${trimmedStderr}`); - return reject(new Error(`Command "${command}" failed with code undefined and no output. Stderr: ${trimmedStderr}`)); - } + // 4. 从状态 Map 中移除 + clientStates.delete(sessionId); - // If code is 0 and no output, resolve with empty string (command succeeded but printed nothing) - resolve(''); - - }).on('error', (streamErr: Error) => { // Handle stream-specific errors - reject(streamErr); - }); - }); - }); -}; - -// Interface for the detailed status object -interface ServerStatusDetails { - cpuPercent?: number; // Percentage - memPercent?: number; // Percentage - memUsed?: number; // MB - memTotal?: number; // MB - swapPercent?: number; // Percentage - swapUsed?: number; // MB - swapTotal?: number; // MB - diskPercent?: number; // Percentage for / - diskUsed?: number; // KB - diskTotal?: number; // KB - cpuModel?: string; - netRxRate?: number; // Bytes per second - netTxRate?: number; // Bytes per second - netInterface?: string; // Detected network interface - osName?: string; // Added OS Name -} - -// Store previous network stats for rate calculation -interface NetStats { - rx: number; - tx: number; - timestamp: number; -} -const previousNetStats = new Map(); - - -// Function to fetch server status metrics -const fetchServerStatus = async (ws: AuthenticatedWebSocket, client: Client): Promise => { - const status: ServerStatusDetails = {}; - const connection = activeSshConnections.get(ws); // Needed for network stats - - try { - // CPU Usage (%) using vmstat (100 - idle) - // Try vmstat first - try { - const cpuCmd = `vmstat 1 2 | tail -1 | awk '{print 100-$15}'`; - const cpuOutput = await executeSshCommand(client, cpuCmd); - const cpuUsage = parseFloat(cpuOutput); - if (!isNaN(cpuUsage)) status.cpuPercent = parseFloat(cpuUsage.toFixed(1)); - } catch (vmstatError) { - console.warn(`获取 CPU 使用率失败 (vmstat):`, vmstatError, `尝试 top...`); - // Fallback attempt using top if vmstat failed - try { - const cpuCmdFallback = `top -bn1 | grep '%Cpu(s)' | head -1 | awk '{print $2+$4}'`; // Sum User + System CPU % - const cpuOutputFallback = await executeSshCommand(client, cpuCmdFallback); - const cpuUsageFallback = parseFloat(cpuOutputFallback); - if (!isNaN(cpuUsageFallback)) status.cpuPercent = parseFloat(cpuUsageFallback.toFixed(1)); - } catch (topError) { - console.warn(`获取 CPU 使用率失败 (top fallback):`, topError); - } - } - } catch (error) { // Catch potential outer errors, though unlikely now - console.error(`获取 CPU 使用率时发生意外错误:`, error); - } - - // --- Corrected CPU Model Fetch --- - try { - // CPU Model Name from /proc/cpuinfo - const cpuModelCmd = `cat /proc/cpuinfo | grep 'model name' | head -1 | cut -d ':' -f 2 | sed 's/^[ \t]*//'`; - const cpuModelOutput = await executeSshCommand(client, cpuModelCmd); // Use correct command and variable - if (cpuModelOutput) status.cpuModel = cpuModelOutput; - } catch (error) { // Use standard 'error' variable name and remove the incorrect logic/extra brace - console.warn(`获取 CPU 型号失败:`, error); - } - // Removed duplicated CPU Model fetch block here (Comment remains from previous step, actual change is above) - - // --- Fetch OS Name --- - try { - const osCmd = `cat /etc/os-release`; - const osOutput = await executeSshCommand(client, osCmd); - const lines = osOutput.split('\n'); - const prettyNameLine = lines.find(line => line.startsWith('PRETTY_NAME=')); - if (prettyNameLine) { - // Extract value, remove potential quotes - status.osName = prettyNameLine.split('=')[1]?.trim().replace(/^"(.*)"$/, '$1'); - } else { - // Fallback or alternative methods if needed (e.g., uname -a) - const unameCmd = `uname -a`; // Less pretty, but usually available - const unameOutput = await executeSshCommand(client, unameCmd); - if (unameOutput) status.osName = unameOutput.trim(); // Trim uname output - } - } catch (error) { - console.warn(`获取操作系统名称失败:`, error); - // Attempt uname as a last resort even if os-release failed - try { - const unameCmd = `uname -a`; - const unameOutput = await executeSshCommand(client, unameCmd); - if (unameOutput) status.osName = unameOutput.trim(); // Trim uname output - } catch (unameError) { - console.warn(`获取操作系统名称失败 (uname fallback):`, unameError); - } - } - - - try { - // Memory Usage (Total and Used in MB, and Percentage) - const memCmd = `free -m | awk 'NR==2{print $2 " " $3}'`; // Output: "total used" - const memOutput = await executeSshCommand(client, memCmd); - const memValues = memOutput.split(' '); - if (memValues.length === 2) { - const total = parseInt(memValues[0], 10); - const used = parseInt(memValues[1], 10); - if (!isNaN(total) && !isNaN(used) && total > 0) { - status.memTotal = total; - status.memUsed = used; - status.memPercent = parseFloat(((used / total) * 100).toFixed(1)); - } - } - } catch (error) { - console.warn(`获取内存状态失败:`, error); - } - // Removed duplicated Memory fetch block here - - try { - // Swap Usage (Total and Used in MB, and Percentage) - const swapCmd = `free -m | awk 'NR==3{print $2 " " $3}'`; // Output: "total used" for swap - const swapOutput = await executeSshCommand(client, swapCmd); - const swapValues = swapOutput.split(' '); - if (swapValues.length === 2) { - const total = parseInt(swapValues[0], 10); - const used = parseInt(swapValues[1], 10); - // Only report swap if total > 0 - if (!isNaN(total) && !isNaN(used) && total > 0) { - status.swapTotal = total; - status.swapUsed = used; - status.swapPercent = parseFloat(((used / total) * 100).toFixed(1)); - } else if (!isNaN(total) && total === 0) { - status.swapTotal = 0; - status.swapUsed = 0; - status.swapPercent = 0; - } - } - } catch (error) { - console.warn(`获取 Swap 状态失败:`, error); - } - - - try { - // Disk Usage - Using POSIX standard output 'df -Pk /' for reliable parsing - const diskCmd = `df -Pk /`; // Use -P flag for POSIX standard output - const diskOutput = await executeSshCommand(client, diskCmd); - const lines = diskOutput.trim().split('\n'); // Trim output and split into lines - - if (lines.length >= 2) { - // Skip header line (usually the first line) - let dataLine = ''; - // Find the line ending with ' /' (mount point) - for (let i = 1; i < lines.length; i++) { - // Trim the line before checking the ending - if (lines[i].trim().endsWith(' /')) { - dataLine = lines[i].trim(); - break; - } - } - - // The second line (index 1) should contain the data in POSIX format - if (lines.length >= 2) { - const dataLine = lines[1].trim(); - console.log(`[Disk P Debug] dataLine: "${dataLine}"`); // Log the line - const parts = dataLine.split(/\s+/); - console.log(`[Disk P Debug] parts:`, parts); // Log the split parts - // POSIX format: Filesystem, 1024-blocks (Total), Used, Available, Capacity, Mounted on - if (parts.length >= 4) { // Need at least up to 'Available' column - const totalKb = parseInt(parts[1], 10); - const usedKb = parseInt(parts[2], 10); - // const availableKb = parseInt(parts[3], 10); // Available if needed - // const capacityPercent = parts[4]; // Percentage string like "20%" - - if (!isNaN(totalKb) && !isNaN(usedKb) && totalKb >= 0) { - status.diskTotal = totalKb; - status.diskUsed = usedKb; - // Calculate percent only if total > 0 to avoid division by zero - status.diskPercent = totalKb > 0 ? parseFloat(((usedKb / totalKb) * 100).toFixed(1)) : 0; - // Optional: Could also try parsing parts[4] if calculation seems off - } else { - console.warn(`无法从 'df -Pk /' 行解析有效的磁盘大小 (Total=${parts[1]}, Used=${parts[2]}):`, dataLine); - } - } else { - console.warn(`'df -Pk /' 数据行格式不符合预期 (列数不足):`, dataLine); - } - } else { - console.warn(`无法从 'df -k /' 输出中找到根目录 ('/') 的数据行:`, diskOutput); - } - } else { - console.warn(`'df -k /' 命令输出格式不符合预期 (行数不足):`, diskOutput); - } - } catch (error) { - console.warn(`获取磁盘状态失败 (df -k):`, error); - } - - // Network Rate Calculation - let defaultInterface = ''; - try { - const routeCmd = `ip route | grep default | awk '{print $5}' | head -1`; - defaultInterface = await executeSshCommand(client, routeCmd); - status.netInterface = defaultInterface; // Store detected interface - } catch (error) { - console.warn(`获取默认网络接口失败:`, error); - } - - if (defaultInterface && connection) { - try { - const netCmd = `cat /proc/net/dev | grep '${defaultInterface}:' | awk '{print $2 " " $10}'`; // RX bytes (col 2), TX bytes (col 10) - const netOutput = await executeSshCommand(client, netCmd); - const netValues = netOutput.split(' '); - if (netValues.length === 2) { - const currentRx = parseInt(netValues[0], 10); - const currentTx = parseInt(netValues[1], 10); - const currentTime = Date.now(); - - const prevStats = previousNetStats.get(ws); - - if (prevStats && !isNaN(currentRx) && !isNaN(currentTx)) { - const timeDiffSeconds = (currentTime - prevStats.timestamp) / 1000; - if (timeDiffSeconds > 0) { - status.netRxRate = Math.max(0, Math.round((currentRx - prevStats.rx) / timeDiffSeconds)); // Corrected property name - status.netTxRate = Math.max(0, Math.round((currentTx - prevStats.tx) / timeDiffSeconds)); // Corrected property name - } - } - - // Store current stats for next calculation - if (!isNaN(currentRx) && !isNaN(currentTx)) { - previousNetStats.set(ws, { rx: currentRx, tx: currentTx, timestamp: currentTime }); - } - } - } catch (error) { - console.warn(`获取网络速率失败 (${defaultInterface}):`, error); - } - } else if (!defaultInterface) { - console.warn(`无法计算网络速率,因为未找到默认接口。`); - } - - return status; -}; - -// Function to start status polling for a connection -const startStatusPolling = (ws: AuthenticatedWebSocket, client: Client) => { - const connection = activeSshConnections.get(ws); - if (!connection || connection.statusIntervalId) { - console.warn(`用户 ${ws.username} 的状态轮询已启动或连接不存在。`); - return; // Already polling or connection gone - } - - console.log(`WebSocket: 为用户 ${ws.username} 启动状态轮询 (间隔: ${STATUS_POLL_INTERVAL}ms)...`); - - const intervalId = setInterval(async () => { - // Double check connection still exists before fetching - const currentConnection = activeSshConnections.get(ws); - if (!currentConnection || !currentConnection.client || !ws || ws.readyState !== WebSocket.OPEN) { - console.log(`WebSocket: 用户 ${ws.username} 连接已关闭或无效,停止状态轮询。`); - if (intervalId) clearInterval(intervalId); // Clear interval if connection is gone - // Also ensure it's cleared from the map if cleanup didn't catch it - if (currentConnection?.statusIntervalId === intervalId) { - delete currentConnection.statusIntervalId; - } - previousNetStats.delete(ws); // Clear previous stats on disconnect/error - return; + // 5. 清除 WebSocket 上的 sessionId 关联 (可选,因为 ws 可能已关闭) + if (state.ws && state.ws.sessionId === sessionId) { + delete state.ws.sessionId; } - try { - const status = await fetchServerStatus(ws, currentConnection.client); // Pass ws for net stats map - // Send status only if we got at least one metric - if (Object.keys(status).length > 0) { - // console.log(`[Status Poll] Sending status for ${ws.username}:`, status); // Debug log - ws.send(JSON.stringify({ type: 'ssh:status:update', payload: status })); - } - } catch (error) { - console.error(`用户 ${ws.username} 状态轮询时出错:`, error); - // Optionally send an error message to the client - // ws.send(JSON.stringify({ type: 'ssh:status:error', payload: '无法获取服务器状态' })); - // Consider stopping polling if errors persist? For now, continue polling. - } - }, STATUS_POLL_INTERVAL); - - connection.statusIntervalId = intervalId; // Store the interval ID - // Initialize previous network stats - previousNetStats.set(ws, { rx: 0, tx: 0, timestamp: Date.now() - STATUS_POLL_INTERVAL }); // Initialize with dummy past data + console.log(`WebSocket: 会话 ${sessionId} 已清理。`); + } else { + // console.log(`WebSocket: 清理时未找到会话 ${sessionId} 的状态。`); + } }; export const initializeWebSocket = (server: http.Server, sessionParser: RequestHandler): WebSocketServer => { const wss = new WebSocketServer({ noServer: true }); const db = getDb(); // 获取数据库实例 - const interval = setInterval(() => { + // --- 心跳检测 --- + const heartbeatInterval = setInterval(() => { wss.clients.forEach((ws: WebSocket) => { const extWs = ws as AuthenticatedWebSocket; if (extWs.isAlive === false) { - console.log(`WebSocket 心跳检测:用户 ${extWs.username} 连接无响应,正在终止...`); - cleanupSshConnection(extWs); // 清理 SSH 资源 + console.log(`WebSocket 心跳检测:用户 ${extWs.username} (会话: ${extWs.sessionId}) 连接无响应,正在终止...`); + cleanupClientConnection(extWs.sessionId); // 使用会话 ID 清理 return extWs.terminate(); } extWs.isAlive = false; extWs.ping(() => {}); }); - }, 60000); // Increased interval to 60 seconds + }, 30000); // 30 秒心跳间隔 + // --- WebSocket 升级处理 (认证) --- server.on('upgrade', (request: Request, socket, head) => { - // @ts-ignore + // @ts-ignore Express-session 类型问题 sessionParser(request, {} as any, () => { if (!request.session || !request.session.userId) { console.log('WebSocket 认证失败:未找到会话或用户未登录。'); @@ -453,861 +111,332 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH }); }); + // --- WebSocket 连接处理 --- wss.on('connection', (ws: AuthenticatedWebSocket, request: Request) => { ws.isAlive = true; console.log(`WebSocket:客户端 ${ws.username} (ID: ${ws.userId}) 已连接。`); ws.on('pong', () => { ws.isAlive = true; }); + // --- 消息处理 --- ws.on('message', async (message) => { - console.log(`WebSocket:收到来自 ${ws.username} 的消息: ${message.toString().substring(0, 100)}...`); // 截断长消息日志 + // console.log(`WebSocket:收到来自 ${ws.username} (会话: ${ws.sessionId}) 的消息: ${message.toString().substring(0, 100)}...`); + let parsedMessage: any; try { - const parsedMessage = JSON.parse(message.toString()); - const connection = activeSshConnections.get(ws); // 获取当前连接信息 - const sftp = connection?.sftp; // 获取 SFTP 实例 + parsedMessage = JSON.parse(message.toString()); + } catch (e) { + console.error(`WebSocket:来自 ${ws.username} 的无效 JSON 消息:`, message.toString()); + ws.send(JSON.stringify({ type: 'error', payload: '无效的消息格式 (非 JSON)' })); + return; + } - // 辅助函数发送错误消息 - const sendSftpError = (action: string, path: string | undefined, error: any, customMsg?: string) => { - const errorMessage = customMsg || (error instanceof Error ? error.message : String(error)); - console.error(`SFTP: 用户 ${ws.username} 执行 ${action} 操作 ${path ? `于 ${path}` : ''} 失败:`, error); - ws.send(JSON.stringify({ type: `sftp:${action}:error`, path, payload: `${action} 失败: ${errorMessage}` })); - }; + const { type, payload, requestId } = parsedMessage; // requestId 用于 SFTP 操作 + const sessionId = ws.sessionId; // 获取当前 WebSocket 的会话 ID + const state = sessionId ? clientStates.get(sessionId) : undefined; // 获取当前会话状态 - // 辅助函数发送成功消息 - const sendSftpSuccess = (action: string, path: string | undefined, payload?: any) => { - console.log(`SFTP: 用户 ${ws.username} 执行 ${action} 操作 ${path ? `于 ${path}` : ''} 成功。`); - ws.send(JSON.stringify({ type: `sftp:${action}:success`, path, payload })); - }; - - // 检查 SFTP 会话是否存在 - const ensureSftp = (action: string, path?: string): SFTPWrapper | null => { - if (!sftp) { - console.warn(`WebSocket: 收到来自 ${ws.username} 的 SFTP ${action} 请求,但无活动 SFTP 会话。`); - ws.send(JSON.stringify({ type: `sftp:${action}:error`, path, payload: 'SFTP 会话未初始化或已断开。' })); - return null; - } - return sftp; - }; - - - switch (parsedMessage.type) { - // --- 处理 SSH 连接请求 --- + try { + switch (type) { + // --- SSH 连接请求 --- case 'ssh:connect': { - // 注意:ssh:connect 内部逻辑需要自行处理 sftp 实例的获取,不能依赖顶层的 sftp 变量 - if (activeSshConnections.has(ws)) { - console.warn(`WebSocket: 用户 ${ws.username} 已有活动的 SSH 连接,忽略新的连接请求。`); + if (sessionId && state) { + console.warn(`WebSocket: 用户 ${ws.username} (会话: ${sessionId}) 已有活动连接,忽略新的连接请求。`); ws.send(JSON.stringify({ type: 'ssh:error', payload: '已存在活动的 SSH 连接。' })); return; } - const connectionId = parsedMessage.payload?.connectionId; - if (!connectionId) { + const dbConnectionId = payload?.connectionId; + if (!dbConnectionId) { ws.send(JSON.stringify({ type: 'ssh:error', payload: '缺少 connectionId。' })); return; } - console.log(`WebSocket: 用户 ${ws.username} 请求连接到 ID: ${connectionId}`); - ws.send(JSON.stringify({ type: 'ssh:status', payload: '正在获取连接信息...' })); - - // 1. 从数据库获取连接信息 (包括 proxy_id) - const connInfo = await new Promise((resolve, reject) => { - db.get( - `SELECT id, name, host, port, username, auth_method, proxy_id, - encrypted_password, encrypted_private_key, encrypted_passphrase - FROM connections WHERE id = ?`, // 添加 proxy_id - [connectionId], - (err, row: DbConnectionInfo) => { // 类型已更新 - if (err) { - console.error(`查询连接 ${connectionId} 详细信息时出错:`, err); - return reject(new Error('查询连接信息失败')); - } - resolve(row ?? null); - } - ); - }); - - if (!connInfo) { - ws.send(JSON.stringify({ type: 'ssh:error', payload: `未找到 ID 为 ${connectionId} 的连接配置。` })); - return; - } - if (!connInfo.encrypted_password) { - ws.send(JSON.stringify({ type: 'ssh:error', payload: '连接配置缺少密码信息。' })); - // This check might be too early if key auth is used - // ws.send(JSON.stringify({ type: 'ssh:error', payload: '连接配置缺少密码信息。' })); - // return; - } - - // 2. 获取代理信息 (如果 connInfo.proxy_id 存在) - let proxyInfo: DbProxyInfo | null = null; - if (connInfo.proxy_id) { - ws.send(JSON.stringify({ type: 'ssh:status', payload: `正在获取代理 ${connInfo.proxy_id} 信息...` })); - try { - proxyInfo = await new Promise((resolve, reject) => { - db.get( - `SELECT id, name, type, host, port, username, encrypted_password FROM proxies WHERE id = ?`, - [connInfo.proxy_id], - (err, row: DbProxyInfo) => { - if (err) return reject(new Error(`查询代理 ${connInfo.proxy_id} 失败: ${err.message}`)); - resolve(row ?? null); - } - ); - }); - if (!proxyInfo) { - throw new Error(`未找到 ID 为 ${connInfo.proxy_id} 的代理配置。`); - } - console.log(`使用代理: ${proxyInfo.name} (${proxyInfo.type})`); - } catch (proxyError: any) { - console.error(`获取代理信息失败:`, proxyError); - ws.send(JSON.stringify({ type: 'ssh:error', payload: `获取代理信息失败: ${proxyError.message}` })); - return; // 获取代理失败则停止连接 - } - } - - ws.send(JSON.stringify({ type: 'ssh:status', payload: `正在连接到 ${connInfo.host}...` })); - - // 3. 解密凭证并构建连接配置 - let connectConfig: any = { - host: connInfo.host, - port: connInfo.port, - username: connInfo.username, - keepaliveInterval: 30000, // Send keep-alive every 30 seconds (milliseconds) - keepaliveCountMax: 3, // Disconnect after 3 missed keep-alives - readyTimeout: 20000 // 连接超时时间 (毫秒) - }; + console.log(`WebSocket: 用户 ${ws.username} 请求连接到数据库 ID: ${dbConnectionId}`); + ws.send(JSON.stringify({ type: 'ssh:status', payload: '正在处理连接请求...' })); try { - if (connInfo.auth_method === 'password') { - if (!connInfo.encrypted_password) { - throw new Error('连接配置缺少密码信息。'); - } - connectConfig.password = decrypt(connInfo.encrypted_password); - } else if (connInfo.auth_method === 'key') { - if (!connInfo.encrypted_private_key) { - throw new Error('连接配置缺少私钥信息。'); - } - connectConfig.privateKey = decrypt(connInfo.encrypted_private_key); - if (connInfo.encrypted_passphrase) { - connectConfig.passphrase = decrypt(connInfo.encrypted_passphrase); - } - } else { - throw new Error(`不支持的认证方式: ${connInfo.auth_method}`); - } - } catch (decryptError: any) { - console.error(`处理连接 ${connectionId} 凭证失败:`, decryptError); - ws.send(JSON.stringify({ type: 'ssh:error', payload: `无法处理连接凭证: ${decryptError.message}` })); - return; - } + // 调用 SshService 建立连接并打开 Shell + // 注意:SshService.connectAndOpenShell 现在需要返回 Client 和 ShellStream + // 或者我们在这里编排,调用 SshService 的不同部分 + // 这里采用 SshService.connectAndOpenShell 返回包含 client 和 shell 的对象的假设 + // SshService 内部不再管理 activeSessions Map - // 4. 处理代理配置(如果存在)并建立连接 - const sshClient = new Client(); // 创建 SSH Client 实例 + // 模拟调用 SshService (实际应调用重构后的函数) + // const { client, shellStream } = await SshService.connectAndOpenShell(dbConnectionId, ws); // 假设 SshService 返回这些 - if (proxyInfo) { - console.log(`WebSocket: 检测到连接 ${connInfo.id} 使用代理 ${proxyInfo.id} (${proxyInfo.type})`); - ws.send(JSON.stringify({ type: 'ssh:status', payload: `正在应用代理 ${proxyInfo.name}...` })); + // --- 手动编排 SSH 连接流程 --- + // 1. 获取连接信息 (与旧代码类似,但移到这里) + ws.send(JSON.stringify({ type: 'ssh:status', payload: '正在获取连接信息...' })); + const connInfo = await SshService.getConnectionDetails(dbConnectionId); // 假设 SshService 提供此函数 + + // 2. 建立 SSH 连接 (调用 SshService 的底层连接函数) + ws.send(JSON.stringify({ type: 'ssh:status', payload: `正在连接到 ${connInfo.host}...` })); + const sshClient = await SshService.establishSshConnection(connInfo); // 假设 SshService 提供此函数 + + // 3. 连接成功,创建状态 + const newSessionId = uuidv4(); // 生成唯一会话 ID + ws.sessionId = newSessionId; // 关联到 WebSocket + + const newState: ClientState = { + ws: ws, + sshClient: sshClient, + dbConnectionId: dbConnectionId, + // shellStream 稍后添加 + }; + clientStates.set(newSessionId, newState); + console.log(`WebSocket: 为用户 ${ws.username} 创建新会话 ${newSessionId} (DB ID: ${dbConnectionId})`); + + // 4. 打开 Shell + ws.send(JSON.stringify({ type: 'ssh:status', payload: 'SSH 连接成功,正在打开 Shell...' })); try { - let proxyPassword = ''; - if (proxyInfo.encrypted_password) { - proxyPassword = decrypt(proxyInfo.encrypted_password); - } + const shellStream = await SshService.openShell(sshClient); // 假设 SshService 提供此函数 + newState.sshShellStream = shellStream; // 存储 Shell 流 - if (proxyInfo.type === 'SOCKS5') { - const socksOptions = { - proxy: { - host: proxyInfo.host, - port: proxyInfo.port, - type: 5 as 5, // SOCKS 版本 5 - userId: proxyInfo.username || undefined, - password: proxyPassword || undefined, - }, - command: 'connect' as 'connect', - destination: { - host: connInfo.host, - port: connInfo.port, - }, - timeout: connectConfig.readyTimeout ?? 20000, // 使用连接超时时间 - }; - console.log(`WebSocket: 正在通过 SOCKS5 代理 ${proxyInfo.host}:${proxyInfo.port} 连接到目标 ${connInfo.host}:${connInfo.port}...`); - ws.send(JSON.stringify({ type: 'ssh:status', payload: `正在通过 SOCKS5 代理 ${proxyInfo.name} 连接...` })); - - SocksClient.createConnection(socksOptions) - .then(({ socket }) => { - console.log(`WebSocket: SOCKS5 代理连接成功。正在建立 SSH 连接...`); - ws.send(JSON.stringify({ type: 'ssh:status', payload: 'SOCKS5 代理连接成功,正在建立 SSH...' })); - connectConfig.sock = socket; // 使用建立的 SOCKS socket - connectSshClient(ws, sshClient, connectConfig, connInfo); // 通过代理连接 SSH - }) - .catch(socksError => { - console.error(`WebSocket: SOCKS5 代理连接失败:`, socksError); - ws.send(JSON.stringify({ type: 'ssh:error', payload: `SOCKS5 代理连接失败: ${socksError.message}` })); - cleanupSshConnection(ws); - }); - // 注意:对于 SOCKS5,连接逻辑在 .then 回调中处理 - - } else if (proxyInfo.type === 'HTTP') { - console.log(`WebSocket: 尝试通过 HTTP 代理 ${proxyInfo.host}:${proxyInfo.port} 建立隧道...`); - ws.send(JSON.stringify({ type: 'ssh:status', payload: `正在通过 HTTP 代理 ${proxyInfo.name} 建立隧道...` })); - - // 手动发起 CONNECT 请求 - const reqOptions: http.RequestOptions = { - method: 'CONNECT', - host: proxyInfo.host, - port: proxyInfo.port, - path: `${connInfo.host}:${connInfo.port}`, // 目标 SSH 服务器地址和端口 - timeout: connectConfig.readyTimeout ?? 20000, - agent: false, // 不使用全局 agent - }; - // 添加代理认证头部 (如果需要) - if (proxyInfo.username) { - const auth = 'Basic ' + Buffer.from(proxyInfo.username + ':' + (proxyPassword || '')).toString('base64'); - reqOptions.headers = { - ...reqOptions.headers, - 'Proxy-Authorization': auth, - 'Proxy-Connection': 'Keep-Alive', // 某些代理需要 - 'Host': `${connInfo.host}:${connInfo.port}` // CONNECT 请求的目标 - }; + // 5. 设置 Shell 事件转发 + shellStream.on('data', (data: Buffer) => { + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' })); } + }); + shellStream.stderr.on('data', (data: Buffer) => { + console.error(`SSH Stderr (会话: ${newSessionId}): ${data.toString('utf8').substring(0, 100)}...`); + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' })); + } + }); + shellStream.on('close', () => { + console.log(`SSH: 会话 ${newSessionId} 的 Shell 通道已关闭。`); + ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'Shell 通道已关闭。' })); + cleanupClientConnection(newSessionId); // Shell 关闭时清理整个会话 + }); - const req = http.request(reqOptions); - req.on('connect', (res, socket, head) => { - if (res.statusCode === 200) { - console.log(`WebSocket: HTTP 代理隧道建立成功。正在建立 SSH 连接...`); - ws.send(JSON.stringify({ type: 'ssh:status', payload: 'HTTP 代理隧道成功,正在建立 SSH...' })); - connectConfig.sock = socket; // 使用建立的隧道 socket - connectSshClient(ws, sshClient, connectConfig, connInfo); // 通过隧道连接 SSH - } else { - console.error(`WebSocket: HTTP 代理 CONNECT 请求失败, 状态码: ${res.statusCode}`); - socket.destroy(); - ws.send(JSON.stringify({ type: 'ssh:error', payload: `HTTP 代理连接失败 (状态码: ${res.statusCode})` })); - cleanupSshConnection(ws); - } - }); - req.on('error', (err) => { - console.error(`WebSocket: HTTP 代理请求错误:`, err); - ws.send(JSON.stringify({ type: 'ssh:error', payload: `HTTP 代理连接错误: ${err.message}` })); - cleanupSshConnection(ws); - }); - req.on('timeout', () => { - console.error(`WebSocket: HTTP 代理请求超时`); - req.destroy(); // 销毁请求 - ws.send(JSON.stringify({ type: 'ssh:error', payload: 'HTTP 代理连接超时' })); - cleanupSshConnection(ws); - }); - req.end(); // 发送请求 - // 注意:对于 HTTP 代理,连接逻辑在 'connect' 事件回调中处理 + // 6. 发送 SSH 连接成功消息 (Shell 已就绪) + ws.send(JSON.stringify({ + type: 'ssh:connected', + payload: { + connectionId: dbConnectionId, + sessionId: newSessionId + // sftpReady 标志移除,将通过 sftp_ready 消息通知 + } + })); + console.log(`WebSocket: 会话 ${newSessionId} SSH 连接和 Shell 建立成功。`); - } else { - console.error(`WebSocket: 未知的代理类型: ${proxyInfo.type}`); - ws.send(JSON.stringify({ type: 'ssh:error', payload: `未知的代理类型: ${proxyInfo.type}` })); - cleanupSshConnection(ws); - } - } catch (proxyProcessError: any) { - console.error(`处理代理 ${proxyInfo.id} 配置或凭证失败:`, proxyProcessError); - ws.send(JSON.stringify({ type: 'ssh:error', payload: `无法处理代理配置: ${proxyProcessError.message}` })); - cleanupSshConnection(ws); + // 7. 异步初始化 SFTP 和启动状态监控 + console.log(`WebSocket: 会话 ${newSessionId} 正在异步初始化 SFTP...`); + sftpService.initializeSftpSession(newSessionId) + .then(() => { + console.log(`SFTP: 会话 ${newSessionId} 异步初始化成功。`); + // SFTP 初始化成功后,前端会收到 sftp_ready 消息 + // FileManager 会在 isConnected 变为 true 后自动请求目录 + }) + .catch(sftpInitError => { + console.error(`WebSocket: 会话 ${newSessionId} 异步初始化 SFTP 失败:`, sftpInitError); + // 错误消息已在 initializeSftpSession 内部发送 + }); + + console.log(`WebSocket: 会话 ${newSessionId} 正在启动状态监控...`); + statusMonitorService.startStatusPolling(newSessionId); // 启动状态轮询 + + } catch (shellError: any) { + console.error(`SSH: 会话 ${newSessionId} 打开 Shell 失败:`, shellError); + ws.send(JSON.stringify({ type: 'ssh:error', payload: `打开 Shell 失败: ${shellError.message}` })); + cleanupClientConnection(newSessionId); // 打开 Shell 失败也需要清理 } - } else { - // 5. 无代理,直接连接 - console.log(`WebSocket: 未配置代理。正在直接建立 SSH 连接...`); - ws.send(JSON.stringify({ type: 'ssh:status', payload: `正在直接连接到 ${connInfo.host}...` })); - connectSshClient(ws, sshClient, connectConfig, connInfo); // 直接连接 SSH + + // 7. 设置 SSH Client 的关闭和错误处理 + sshClient.on('close', () => { + console.log(`SSH: 会话 ${newSessionId} 的客户端连接已关闭。`); + // Shell 关闭事件通常会先触发清理,这里作为保险 + cleanupClientConnection(newSessionId); + }); + sshClient.on('error', (err: Error) => { + console.error(`SSH: 会话 ${newSessionId} 的客户端连接错误:`, err); + ws.send(JSON.stringify({ type: 'ssh:error', payload: `SSH 连接错误: ${err.message}` })); + cleanupClientConnection(newSessionId); + }); + + } catch (connectError: any) { + console.error(`WebSocket: 用户 ${ws.username} 连接到数据库 ID ${dbConnectionId} 失败:`, connectError); + ws.send(JSON.stringify({ type: 'ssh:error', payload: `连接失败: ${connectError.message}` })); + // 此处不需要 cleanup,因为状态尚未创建 } break; } // end case 'ssh:connect' - // --- 处理 SSH 输入 --- - - // --- 处理 SSH 输入 --- + // --- SSH 输入 --- case 'ssh:input': { - const connection = activeSshConnections.get(ws); - if (connection?.shell && parsedMessage.payload?.data) { - connection.shell.write(parsedMessage.payload.data); - } else { - console.warn(`WebSocket: 收到来自 ${ws.username} 的 SSH 输入,但无活动 Shell 或数据为空。`); + if (!state || !state.sshShellStream) { + console.warn(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 SSH 输入,但无活动 Shell。`); + return; + } + const data = payload?.data; + if (typeof data === 'string') { + state.sshShellStream.write(data); } break; } - // --- 处理终端大小调整 --- + // --- SSH 终端大小调整 --- case 'ssh:resize': { - const connection = activeSshConnections.get(ws); - const { cols, rows } = parsedMessage.payload || {}; - if (connection?.shell && cols && rows) { - console.log(`SSH: 用户 ${ws.username} 调整终端大小: ${cols}x${rows}`); - connection.shell.setWindow(rows, cols, 0, 0); // 注意参数顺序 rows, cols - } else { - console.warn(`WebSocket: 收到来自 ${ws.username} 的调整大小请求,但无活动 Shell 或尺寸数据无效。`); - } - break; - } - - // --- 处理 SFTP 目录列表请求 --- - case 'sftp:readdir': { - const targetPath = parsedMessage.payload?.path; - const currentSftp = ensureSftp('readdir', targetPath); - if (!currentSftp) break; - - if (typeof targetPath !== 'string') { - sendSftpError('readdir', targetPath, '请求路径无效。'); - break; + if (!state || !state.sshShellStream) { + console.warn(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的调整大小请求,但无活动 Shell。`); + return; + } + const { cols, rows } = payload || {}; + if (typeof cols === 'number' && typeof rows === 'number') { + console.log(`SSH: 会话 ${sessionId} 调整终端大小: ${cols}x${rows}`); + state.sshShellStream.setWindow(rows, cols, 0, 0); } - - console.log(`SFTP: 用户 ${ws.username} 请求读取目录: ${targetPath}`); - currentSftp.readdir(targetPath, (err, list) => { - if (err) { - sendSftpError('readdir', targetPath, err); - return; - } - // 格式化文件列表以便前端使用 - const formattedList = list.map(item => ({ - filename: item.filename, - longname: item.longname, - attrs: { - size: item.attrs.size, - uid: item.attrs.uid, - gid: item.attrs.gid, - mode: item.attrs.mode, - atime: item.attrs.atime * 1000, - mtime: item.attrs.mtime * 1000, - isDirectory: item.attrs.isDirectory(), - isFile: item.attrs.isFile(), - isSymbolicLink: item.attrs.isSymbolicLink(), - } - })); - sendSftpSuccess('readdir', targetPath, formattedList); - }); break; } - // --- 处理 SFTP 文件/目录状态获取请求 --- - case 'sftp:stat': { - const targetPath = parsedMessage.payload?.path; - const currentSftp = ensureSftp('stat', targetPath); - if (!currentSftp) break; - - if (typeof targetPath !== 'string') { - sendSftpError('stat', targetPath, '请求路径无效。'); - break; + // --- SFTP 操作 (委托给 SftpService) --- + case 'sftp:readdir': + case 'sftp:stat': + case 'sftp:readfile': + case 'sftp:writefile': // Added missing case + case 'sftp:mkdir': + case 'sftp:rmdir': + case 'sftp:unlink': + case 'sftp:rename': + case 'sftp:chmod': { + if (!sessionId || !state) { + console.warn(`WebSocket: 收到来自 ${ws.username} 的 SFTP 请求 (${type}),但无活动会话。`); + // 尝试包含 requestId 发送错误,如果 requestId 存在的话 + const errPayload: { message: string; requestId?: string } = { message: '无效的会话' }; + if (requestId) errPayload.requestId = requestId; + ws.send(JSON.stringify({ type: 'sftp_error', payload: errPayload })); + return; } - console.log(`SFTP: 用户 ${ws.username} 请求获取状态: ${targetPath}`); - currentSftp.lstat(targetPath, (err, stats) => { // 使用 lstat 获取链接本身信息 - if (err) { - sendSftpError('stat', targetPath, err); - return; - } - const formattedStats = { - mode: stats.mode, - uid: stats.uid, - gid: stats.gid, - size: stats.size, - atime: stats.atime * 1000, - mtime: stats.mtime * 1000, - isDirectory: stats.isDirectory(), - isFile: stats.isFile(), - isBlockDevice: stats.isBlockDevice(), - isCharacterDevice: stats.isCharacterDevice(), - isSymbolicLink: stats.isSymbolicLink(), - isFIFO: stats.isFIFO(), - isSocket: stats.isSocket(), - }; - sendSftpSuccess('stat', targetPath, formattedStats); - }); - break; - } - - // --- 处理 SFTP 文件上传 --- - case 'sftp:upload:start': { - const { remotePath, uploadId, size } = parsedMessage.payload || {}; - const currentSftp = ensureSftp('upload:start', remotePath); - if (!currentSftp) break; - - if (typeof remotePath !== 'string' || !uploadId) { - sendSftpError('upload:start', remotePath, '无效的上传请求参数 (remotePath, uploadId)。', undefined); - break; - } - if (activeUploads.has(uploadId)) { - sendSftpError('upload:start', remotePath, '具有相同 ID 的上传已在进行中。', undefined); - break; + // --- 添加 Request ID 检查 --- + // 对于需要响应关联的操作,强制要求 requestId + if (!requestId) { + console.error(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 SFTP 请求 (${type}),但缺少 requestId。`); + ws.send(JSON.stringify({ type: 'sftp_error', payload: { message: `SFTP 操作 ${type} 缺少 requestId` } })); + return; // 没有 requestId 则不继续处理 } + // --- 结束 Request ID 检查 --- - console.log(`SFTP: 用户 ${ws.username} 开始上传到 ${remotePath} (ID: ${uploadId}, 大小: ${size ?? '未知'})`); + // Explicitly call SftpService methods based on type try { - const writeStream = currentSftp.createWriteStream(remotePath); - writeStream.on('error', (err: Error) => { - sendSftpError('upload', remotePath, err, `写入远程文件失败: ${err.message}`); - activeUploads.delete(uploadId); - }); - let uploadFinished = false; - const onStreamEnd = (eventName: string) => { - if (uploadFinished) return; - uploadFinished = true; - sendSftpSuccess('upload', remotePath, { uploadId }); // 成功时也带上 uploadId - activeUploads.delete(uploadId); - }; - writeStream.on('finish', () => onStreamEnd('finish')); - writeStream.on('close', () => onStreamEnd('close')); - activeUploads.set(uploadId, writeStream as any); - ws.send(JSON.stringify({ type: 'sftp:upload:ready', uploadId })); - } catch (err: any) { - sendSftpError('upload:start', remotePath, err, `无法创建远程文件: ${err.message}`); + switch (type) { + case 'sftp:readdir': + if (payload?.path) { + sftpService.readdir(sessionId, payload.path, requestId); + } else { throw new Error("Missing 'path' in payload for readdir"); } + break; + case 'sftp:stat': + if (payload?.path) { + sftpService.stat(sessionId, payload.path, requestId); + } else { throw new Error("Missing 'path' in payload for stat"); } + break; + case 'sftp:readfile': + if (payload?.path) { + sftpService.readFile(sessionId, payload.path, requestId); + } else { throw new Error("Missing 'path' in payload for readfile"); } + break; + case 'sftp:writefile': + // Handle both 'data' (from potential future upload refactor) and 'content' + const fileContent = payload?.content ?? payload?.data ?? ''; // Default to empty string for create + if (payload?.path) { + // Ensure content is base64 encoded if needed (assuming frontend sends base64 for now) + // If creating empty file, data might be empty string, Buffer.from('') is fine. + const dataToSend = (typeof fileContent === 'string') ? fileContent : ''; // Ensure it's a string + sftpService.writefile(sessionId, payload.path, dataToSend, requestId); + } else { throw new Error("Missing 'path' in payload for writefile"); } + break; + case 'sftp:mkdir': + if (payload?.path) { + sftpService.mkdir(sessionId, payload.path, requestId); + } else { throw new Error("Missing 'path' in payload for mkdir"); } + break; + case 'sftp:rmdir': + if (payload?.path) { + sftpService.rmdir(sessionId, payload.path, requestId); + } else { throw new Error("Missing 'path' in payload for rmdir"); } + break; + case 'sftp:unlink': + if (payload?.path) { + sftpService.unlink(sessionId, payload.path, requestId); + } else { throw new Error("Missing 'path' in payload for unlink"); } + break; + case 'sftp:rename': + if (payload?.oldPath && payload?.newPath) { + sftpService.rename(sessionId, payload.oldPath, payload.newPath, requestId); + } else { throw new Error("Missing 'oldPath' or 'newPath' in payload for rename"); } + break; + case 'sftp:chmod': + if (payload?.path && typeof payload?.mode === 'number') { + sftpService.chmod(sessionId, payload.path, payload.mode, requestId); + } else { throw new Error("Missing 'path' or invalid 'mode' in payload for chmod"); } + break; + default: + // Should not happen if already checked type, but as a safeguard + throw new Error(`Unhandled SFTP type: ${type}`); + } + } catch (sftpCallError: any) { + console.error(`WebSocket: Error preparing/calling SFTP service for ${type} (Request ID: ${requestId}):`, sftpCallError); + ws.send(JSON.stringify({ type: 'sftp_error', payload: { message: `处理 SFTP 请求 ${type} 时出错: ${sftpCallError.message}`, requestId } })); } break; } - - case 'sftp:upload:chunk': { - const { uploadId, data, isLast } = parsedMessage.payload || {}; - const writeStream = activeUploads.get(uploadId); - - if (!writeStream) { - // console.warn(`WebSocket: 收到上传数据块 (ID: ${uploadId}),但未找到对应的上传任务。`); - // 不必每次都报错,前端可能已经取消或完成 - break; - } - if (typeof data !== 'string') { - sendSftpError('upload:chunk', undefined, '无效的数据块格式。', undefined); - break; - } - - try { - const buffer = Buffer.from(data, 'base64'); - const canWriteMore = writeStream.write(buffer); - if (!canWriteMore) { - writeStream.once('drain', () => { - ws.send(JSON.stringify({ type: 'sftp:upload:resume', uploadId })); - }); - ws.send(JSON.stringify({ type: 'sftp:upload:pause', uploadId })); - } - if (isLast) { - writeStream.end(); - } - } catch (err: any) { - sendSftpError('upload:chunk', undefined, err, `处理数据块失败: ${err.message}`); - writeStream.end(); - activeUploads.delete(uploadId); - } - break; - } - + // --- SFTP 文件上传 (保持部分逻辑,因为涉及分块) --- + // TODO: 考虑将上传逻辑也移入 SftpService + case 'sftp:upload:start': + case 'sftp:upload:chunk': case 'sftp:upload:cancel': { - const { uploadId } = parsedMessage.payload || {}; - const writeStream = activeUploads.get(uploadId); - if (writeStream) { - console.log(`SFTP: 用户 ${ws.username} 取消上传 (ID: ${uploadId})`); - writeStream.end(); // 触发清理 - // TODO: 删除部分文件? sftp.unlink? - ws.send(JSON.stringify({ type: 'sftp:upload:cancelled', uploadId })); - } else { - // console.warn(`WebSocket: 收到取消上传请求 (ID: ${uploadId}),但未找到对应的上传任务。`); - } - break; - } - - // --- 处理 SFTP 文件读取请求 --- - case 'sftp:readfile': { - const targetPath = parsedMessage.payload?.path; - const currentSftp = ensureSftp('readfile', targetPath); - if (!currentSftp) break; - - if (typeof targetPath !== 'string') { - sendSftpError('readfile', targetPath, '请求路径无效。'); - break; - } - - console.log(`SFTP: 用户 ${ws.username} 请求读取文件: ${targetPath}`); - const readStream = currentSftp.createReadStream(targetPath); - let fileContent = ''; - let hasError = false; - - readStream.on('data', (chunk: Buffer) => { - // 尝试多种编码解码,优先 UTF-8 - try { - fileContent += chunk.toString('utf8'); - } catch (e) { - // 如果 UTF-8 失败,尝试其他常见编码,例如 GBK (适用于中文 Windows) - // 注意:这只是一个尝试,可能不准确。更可靠的方法是让用户指定编码。 - try { - // 需要安装 iconv-lite: npm install iconv-lite @types/iconv-lite -w packages/backend - // import * as iconv from 'iconv-lite'; - // fileContent += iconv.decode(chunk, 'gbk'); - // 暂时回退到 base64 发送原始数据,让前端处理解码 - console.warn(`SFTP: 文件 ${targetPath} 无法以 UTF-8 解码,将发送 Base64 编码内容。`); - fileContent = Buffer.concat([Buffer.from(fileContent), chunk]).toString('base64'); - } catch (decodeError) { - console.error(`SFTP: 文件 ${targetPath} 解码失败:`, decodeError); - sendSftpError('readfile', targetPath, '文件解码失败。'); - readStream.destroy(); // 停止读取 - hasError = true; - } - } - }); - - readStream.on('error', (err: Error) => { - if (hasError) return; // 避免重复发送错误 - sendSftpError('readfile', targetPath, err); - hasError = true; - }); - - readStream.on('end', () => { - if (hasError) return; // 如果之前已出错,则不发送成功消息 - // 判断是发送文本内容还是 Base64 - let payload: { content: string; encoding: 'utf8' | 'base64' }; - try { - // 尝试再次解码整个内容为 UTF-8,如果成功则发送 UTF-8 - Buffer.from(fileContent, 'base64').toString('utf8'); - // 如果上一步是 base64 编码,这里会是原始 base64 字符串 - if (fileContent === Buffer.from(fileContent, 'base64').toString('base64')) { - payload = { content: fileContent, encoding: 'base64' }; - } else { - payload = { content: fileContent, encoding: 'utf8' }; - } - - } catch (e) { - // 如果整体解码失败,则发送 Base64 - payload = { content: Buffer.from(fileContent).toString('base64'), encoding: 'base64' }; - } - // 限制发送内容的大小,避免 WebSocket 拥塞 (例如 1MB) - const MAX_CONTENT_SIZE = 1 * 1024 * 1024; - if (Buffer.byteLength(payload.content, payload.encoding === 'base64' ? 'base64' : 'utf8') > MAX_CONTENT_SIZE) { - sendSftpError('readfile', targetPath, `文件过大 (超过 ${MAX_CONTENT_SIZE / 1024 / 1024}MB),无法在编辑器中打开。`); - } else { - sendSftpSuccess('readfile', targetPath, payload); - } - }); - break; - } - - // --- 处理 SFTP 文件写入请求 --- - case 'sftp:writefile': { - const { path: targetPath, content, encoding } = parsedMessage.payload || {}; - const currentSftp = ensureSftp('writefile', targetPath); - if (!currentSftp) break; - - if (typeof targetPath !== 'string' || typeof content !== 'string' || (encoding !== 'utf8' && encoding !== 'base64')) { - sendSftpError('writefile', targetPath, '请求参数无效 (需要 path, content, encoding[\'utf8\'|\'base64\'])。'); - break; - } - - console.log(`SFTP: 用户 ${ws.username} 请求写入文件: ${targetPath}, Encoding: ${encoding}, Content length: ${content.length}`); // 增加日志细节 - - try { - console.log(`[writefile] Attempting to create buffer for ${targetPath}`); - const buffer = Buffer.from(content, encoding); // 根据 encoding 解码/转换内容为 Buffer - console.log(`[writefile] Buffer created successfully for ${targetPath}. Attempting to create write stream.`); - const writeStream = currentSftp.createWriteStream(targetPath); - console.log(`[writefile] Write stream created for ${targetPath}. Attaching listeners.`); - let hasError = false; - let operationCompleted = false; // Flag to track if finish/error occurred - let backendTimeoutId: NodeJS.Timeout | null = null; - const streamId = Math.random().toString(36).substring(2, 9); // Unique ID for logging this stream instance - const BACKEND_WRITE_TIMEOUT = 15000; // 15 seconds backend timeout - - console.log(`[${streamId}] SFTP: Attaching listeners for writeStream to ${targetPath}`); - - const cleanupTimeout = () => { - if (backendTimeoutId) { - clearTimeout(backendTimeoutId); - backendTimeoutId = null; - } - }; - - writeStream.on('error', (err: Error) => { - console.error(`[${streamId}] SFTP: writeStream 'error' event for ${targetPath}:`, err); - if (operationCompleted) return; // Already completed - operationCompleted = true; - cleanupTimeout(); - sendSftpError('writefile', targetPath, err, `写入远程文件失败: ${err.message}`); - hasError = true; // Keep track for close handler if needed - }); - - writeStream.on('finish', () => { // 'finish' 表示所有数据已刷入底层系统 - console.log(`[${streamId}] SFTP: writeStream 'finish' event for ${targetPath}. HasError: ${hasError}`); - if (operationCompleted) return; // Already completed (e.g., error occurred first) - operationCompleted = true; - cleanupTimeout(); - if (hasError) return; // Error occurred before finish - sendSftpSuccess('writefile', targetPath); - }); - writeStream.on('close', () => { // 'close' 表示流已关闭 - console.log(`[${streamId}] SFTP: writeStream 'close' event for ${targetPath}. writableFinished: ${writeStream.writableFinished}, HasError: ${hasError}, OperationCompleted: ${operationCompleted}`); - cleanupTimeout(); // Clear timeout if close happens before it fires - // If the stream closed and no error/finish/timeout event handled it yet, - // consider it a success. This handles cases where 'finish' might not fire reliably, - // even if writableFinished is false when close is emitted prematurely. - if (!operationCompleted) { - console.warn(`[${streamId}] SFTP: writeStream 'close' event occurred before 'finish' or 'error'. Assuming success for ${targetPath}.`); - sendSftpSuccess('writefile', targetPath); - operationCompleted = true; // Mark as completed via close - } - // If an error or finish occurred, the respective handlers already sent the message. - // If finish occurred, the 'finish' handler sent success. - // If closed without finishing and without error, the backend timeout might handle it, - // or it might be a legitimate early close after an error on the server side not reported via 'error' event. - }); - - // 写入数据并结束流 - console.log(`[${streamId}] SFTP: Calling writeStream.end() for ${targetPath}`); - writeStream.end(buffer, () => { - console.log(`[${streamId}] SFTP: writeStream.end() callback fired for ${targetPath}. Starting backend timeout.`); - // Start backend timeout *after* end() callback fires (or immediately if no callback needed) - backendTimeoutId = setTimeout(() => { - if (!operationCompleted) { - console.error(`[${streamId}] SFTP: Backend write timeout (${BACKEND_WRITE_TIMEOUT}ms) reached for ${targetPath}.`); - operationCompleted = true; // Mark as completed due to timeout - sendSftpError('writefile', targetPath, `后端写入超时 (${BACKEND_WRITE_TIMEOUT / 1000}秒)`); - } - }, BACKEND_WRITE_TIMEOUT); - }); - - - } catch (err: any) { - console.error(`[writefile] Error during write stream creation or buffer processing for ${targetPath}:`, err); // 增加 catch 日志 - // Buffer.from 可能因无效编码或内容抛出错误 - sendSftpError('writefile', targetPath, err, `处理文件内容或创建写入流失败: ${err.message}`); - } - break; - } - - - // --- 新增 SFTP 操作 --- - case 'sftp:mkdir': { - const targetPath = parsedMessage.payload?.path; - const currentSftp = ensureSftp('mkdir', targetPath); - if (!currentSftp) break; - - if (typeof targetPath !== 'string') { - sendSftpError('mkdir', targetPath, '请求路径无效。'); - break; - } - - console.log(`SFTP: 用户 ${ws.username} 请求创建目录: ${targetPath}`); - // TODO: 考虑添加 mode 参数支持 - currentSftp.mkdir(targetPath, (err) => { - if (err) { - sendSftpError('mkdir', targetPath, err); - } else { - sendSftpSuccess('mkdir', targetPath); - } - }); - break; - } - - case 'sftp:rmdir': { - const targetPath = parsedMessage.payload?.path; - const currentSftp = ensureSftp('rmdir', targetPath); - if (!currentSftp) break; - - if (typeof targetPath !== 'string') { - sendSftpError('rmdir', targetPath, '请求路径无效。'); - break; - } - - console.log(`SFTP: 用户 ${ws.username} 请求删除目录: ${targetPath}`); - currentSftp.rmdir(targetPath, (err) => { - if (err) { - sendSftpError('rmdir', targetPath, err); - } else { - sendSftpSuccess('rmdir', targetPath); - } - }); - break; - } - - case 'sftp:unlink': { // 删除文件 - const targetPath = parsedMessage.payload?.path; - const currentSftp = ensureSftp('unlink', targetPath); - if (!currentSftp) break; - - if (typeof targetPath !== 'string') { - sendSftpError('unlink', targetPath, '请求路径无效。'); - break; - } - - console.log(`SFTP: 用户 ${ws.username} 请求删除文件: ${targetPath}`); - currentSftp.unlink(targetPath, (err) => { - if (err) { - sendSftpError('unlink', targetPath, err); - } else { - sendSftpSuccess('unlink', targetPath); - } - }); - break; - } - - case 'sftp:rename': { - const { oldPath, newPath } = parsedMessage.payload || {}; - const currentSftp = ensureSftp('rename', oldPath); - if (!currentSftp) break; - - if (typeof oldPath !== 'string' || typeof newPath !== 'string') { - sendSftpError('rename', oldPath, '无效的旧路径或新路径。'); - break; - } - - console.log(`SFTP: 用户 ${ws.username} 请求重命名: ${oldPath} -> ${newPath}`); - currentSftp.rename(oldPath, newPath, (err) => { - if (err) { - sendSftpError('rename', oldPath, err); - } else { - sendSftpSuccess('rename', oldPath, { oldPath, newPath }); // 返回新旧路径 - } - }); - break; - } - - case 'sftp:chmod': { - const { targetPath, mode } = parsedMessage.payload || {}; - const currentSftp = ensureSftp('chmod', targetPath); - if (!currentSftp) break; - - if (typeof targetPath !== 'string' || typeof mode !== 'number') { - sendSftpError('chmod', targetPath, '无效的路径或权限模式。'); - break; - } - - console.log(`SFTP: 用户 ${ws.username} 请求修改权限: ${targetPath} -> ${mode.toString(8)}`); // 以八进制显示 mode - currentSftp.chmod(targetPath, mode, (err) => { - if (err) { - sendSftpError('chmod', targetPath, err); - } else { - sendSftpSuccess('chmod', targetPath, { mode }); // 返回设置的 mode - } - }); - break; - } + console.warn(`WebSocket: SFTP 上传功能 (${type}) 尚未完全迁移到 SftpService。`); + // 可以在这里调用 SftpService 的对应方法,或者暂时保留旧逻辑 + ws.send(JSON.stringify({ type: 'error', payload: `SFTP 上传功能正在重构中。` })); + break; + } default: - console.warn(`WebSocket:收到未知类型的消息: ${parsedMessage.type}`); - ws.send(JSON.stringify({ type: 'error', payload: `不支持的消息类型: ${parsedMessage.type}` })); + console.warn(`WebSocket:收到来自 ${ws.username} (会话: ${sessionId}) 的未知消息类型: ${type}`); + ws.send(JSON.stringify({ type: 'error', payload: `不支持的消息类型: ${type}` })); } - } catch (e) { - console.error('WebSocket:解析消息时出错:', e); - ws.send(JSON.stringify({ type: 'error', payload: '无效的消息格式' })); + } catch (error: any) { + console.error(`WebSocket: 处理来自 ${ws.username} (会话: ${sessionId}) 的消息 (${type}) 时发生顶层错误:`, error); + ws.send(JSON.stringify({ type: 'error', payload: `处理消息时发生内部错误: ${error.message}` })); + // 考虑是否需要清理连接?取决于错误的性质 + // cleanupClientConnection(sessionId); } }); + // --- 连接关闭和错误处理 --- ws.on('close', (code, reason) => { - console.log(`WebSocket:客户端 ${ws.username} (ID: ${ws.userId}) 已断开连接。代码: ${code}, 原因: ${reason.toString()}`); - cleanupSshConnection(ws); // 清理关联的 SSH 资源 + console.log(`WebSocket:客户端 ${ws.username} (会话: ${ws.sessionId}) 已断开连接。代码: ${code}, 原因: ${reason.toString()}`); + cleanupClientConnection(ws.sessionId); // 使用会话 ID 清理 }); ws.on('error', (error) => { - console.error(`WebSocket:客户端 ${ws.username} (ID: ${ws.userId}) 发生错误:`, error); - cleanupSshConnection(ws); // 清理关联的 SSH 资源 + console.error(`WebSocket:客户端 ${ws.username} (会话: ${ws.sessionId}) 发生错误:`, error); + cleanupClientConnection(ws.sessionId); // 使用会话 ID 清理 }); - - // 不再发送通用欢迎消息,等待前端发起 ssh:connect - // ws.send(JSON.stringify({ type: 'info', payload: `欢迎, ${ws.username}! WebSocket 连接已建立。` })); }); + // --- WebSocket 服务器关闭处理 --- wss.on('close', () => { - console.log('WebSocket 服务器正在关闭,清理心跳定时器...'); - clearInterval(interval); - // 关闭所有活动的 SSH 连接 - console.log('关闭所有活动的 SSH 连接...'); - activeSshConnections.forEach((conn, ws) => { - cleanupSshConnection(ws); + console.log('WebSocket 服务器正在关闭,清理心跳定时器和所有活动会话...'); + clearInterval(heartbeatInterval); + // 关闭所有活动的连接 + clientStates.forEach((state, sessionId) => { + cleanupClientConnection(sessionId); }); + console.log('所有活动会话已清理。'); }); console.log('WebSocket 服务器初始化完成。'); return wss; }; -// --- 辅助函数:建立 SSH 连接并处理事件 --- -function connectSshClient(ws: AuthenticatedWebSocket, sshClient: Client, connectConfig: any, connInfo: DbConnectionInfo) { - ws.sshClient = sshClient; // 关联 client - - sshClient.on('ready', () => { - console.log(`SSH: 用户 ${ws.username} 到 ${connInfo.host} 连接成功!`); - ws.send(JSON.stringify({ type: 'ssh:status', payload: 'SSH 连接成功,正在打开 Shell...' })); - - // 请求 Shell 通道 - sshClient.shell((err, stream) => { - if (err) { - console.error(`SSH: 用户 ${ws.username} 打开 Shell 失败:`, err); - ws.send(JSON.stringify({ type: 'ssh:error', payload: `打开 Shell 失败: ${err.message}` })); - cleanupSshConnection(ws); - return; - } - ws.sshShellStream = stream; // 关联 stream - // 存储活动连接 (此时 sftp 可能还未就绪) - // 确保 client 和 shell 都存在才存储 - if (activeSshConnections.has(ws)) { - // 如果已存在(例如 SOCKS 连接后),更新 shell - const existing = activeSshConnections.get(ws)!; - existing.shell = stream; - } else { - activeSshConnections.set(ws, { client: sshClient, shell: stream }); - } - console.log(`SSH: 用户 ${ws.username} Shell 通道已打开。`); - - // 尝试初始化 SFTP 会话 - sshClient.sftp((sftpErr, sftp) => { - if (sftpErr) { - console.error(`SFTP: 用户 ${ws.username} 初始化失败:`, sftpErr); - ws.send(JSON.stringify({ type: 'sftp:error', payload: `SFTP 初始化失败: ${sftpErr.message}` })); - ws.send(JSON.stringify({ type: 'ssh:status', payload: 'Shell 已连接,但 SFTP 初始化失败。' })); - // SFTP 失败不应断开整个连接,但需要标记 - const existingConn = activeSshConnections.get(ws); - if (existingConn) { - // SFTP 失败,但 Shell 仍可用,启动状态轮询 - startStatusPolling(ws, sshClient); - } - return; - } - console.log(`SFTP: 用户 ${ws.username} 会话已初始化。`); - const existingConn = activeSshConnections.get(ws); - if (existingConn) { - existingConn.sftp = sftp; - ws.send(JSON.stringify({ type: 'ssh:connected' })); // SFTP 就绪后通知前端 - startStatusPolling(ws, sshClient); // 启动状态轮询 - } else { - console.error(`SFTP: 无法找到用户 ${ws.username} 的活动连接记录以存储 SFTP 或启动轮询。`); - ws.send(JSON.stringify({ type: 'ssh:error', payload: '内部服务器错误:无法关联 SFTP 会话。' })); - cleanupSshConnection(ws); - } - }); - - // 数据转发:Shell -> WebSocket - stream.on('data', (data: Buffer) => { - ws.send(JSON.stringify({ - type: 'ssh:output', - payload: data.toString('base64'), - encoding: 'base64' - })); - }); - - // 处理 Shell 关闭 - stream.on('close', () => { - console.log(`SSH: 用户 ${ws.username} Shell 通道已关闭。`); - ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'Shell 通道已关闭。' })); - cleanupSshConnection(ws); - }); - // Stderr 转发 - stream.stderr.on('data', (data: Buffer) => { - console.error(`SSH Stderr (${ws.username}): ${data.toString('utf8').substring(0,100)}...`); - ws.send(JSON.stringify({ - type: 'ssh:output', - payload: data.toString('base64'), - encoding: 'base64' - })); - }); - }); - }).on('error', (err) => { - console.error(`SSH: 用户 ${ws.username} 连接错误:`, err); - // 避免在 SOCKS 错误后重复发送错误 - if (!ws.CLOSED && !ws.CLOSING) { // 检查 WebSocket 状态 - ws.send(JSON.stringify({ type: 'ssh:error', payload: `SSH 连接错误: ${err.message}` })); - } - cleanupSshConnection(ws); - }).on('close', () => { - console.log(`SSH: 用户 ${ws.username} 连接已关闭。`); - if (activeSshConnections.has(ws)) { - if (!ws.CLOSED && !ws.CLOSING) { - ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'SSH 连接已关闭。' })); - } - cleanupSshConnection(ws); - } - }).connect(connectConfig); -} +// --- 移除旧的辅助函数 --- +// - connectSshClient +// - fetchServerStatus +// - executeSshCommand +// - startStatusPolling +// - cleanupSshConnection (旧版本) +// - activeSshConnections Map +// - activeUploads Map +// - previousNetStats Map diff --git a/packages/frontend/src/components/FileManager.vue b/packages/frontend/src/components/FileManager.vue index 7faf5e3..f913fdd 100644 --- a/packages/frontend/src/components/FileManager.vue +++ b/packages/frontend/src/components/FileManager.vue @@ -86,6 +86,10 @@ const saveError = ref(null); // --- Helper Functions --- +const generateRequestId = (): string => { + return `req-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`; +}; + const joinPath = (base: string, name: string): string => { if (base === '/') return `/${name}`; return `${base}/${name}`; @@ -162,19 +166,13 @@ const handleSaveFile = () => { saveStatus.value = 'saving'; saveError.value = null; - // Determine encoding: prefer original encoding if possible, otherwise default to utf8 - // For simplicity now, we always send as utf8. If base64 was received, - // it means the backend couldn't decode it, so sending back utf8 might be problematic. - // A more robust solution would involve detecting if content was modified from original base64. - // For now, assume content is valid UTF-8 after editing. const contentToSave = editingFileContent.value; const encodingToSend: 'utf8' | 'base64' = 'utf8'; // Always send UTF8 for now - - // If the original was base64 and content hasn't changed significantly (heuristic), - // maybe send base64 back? This is complex. Let's stick to UTF-8 for V1.1. + const requestId = generateRequestId(); // Generate request ID props.ws.send(JSON.stringify({ type: 'sftp:writefile', + requestId: requestId, // Add request ID payload: { path: editingFilePath.value, content: contentToSave, @@ -182,18 +180,7 @@ const handleSaveFile = () => { } })); - // Timeout for saving status display - setTimeout(() => { - if (saveStatus.value === 'saving') { // If still saving after timeout, assume error (no response) - saveStatus.value = 'error'; - saveError.value = t('fileManager.errors.saveTimeout'); - isSaving.value = false; - // Reset status after a while - setTimeout(() => { - if (saveStatus.value === 'error') saveStatus.value = 'idle'; saveError.value = null; - }, 3000); - } - }, 20000); // Increased to 20 second timeout + // The save status will now be updated solely based on the sftp:writefile:success or sftp:writefile:error messages received via WebSocket. }; @@ -321,7 +308,7 @@ watch(() => props.isConnected, (connected) => { }); const handleWebSocketMessage = (event: MessageEvent) => { - console.log('[FileManager] Received WebSocket message:', event.data.substring(0, 200)); // Log incoming message + // console.log('[FileManager] Received WebSocket message:', event.data.substring(0, 200)); // Moved logging inside specific cases try { const message = JSON.parse(event.data); // Destructure only common top-level keys @@ -329,12 +316,11 @@ const handleWebSocketMessage = (event: MessageEvent) => { // Extract uploadId specifically where needed from payload or top-level const uploadIdFromPayload = message.uploadId || payload?.uploadId; // Check top-level first, then payload - // Log specific message types relevant to FileManager + // Log and process specific message types relevant to FileManager if (type.startsWith('sftp:')) { console.log(`[FileManager] Processing SFTP message: ${type}`, { path, uploadId: uploadIdFromPayload, payload: type === 'sftp:readfile:success' ? '...' : payload }); } - if (type === 'sftp:readdir:success' && path === currentPath.value) { fileList.value = payload.sort(sortFiles); selectedItems.value.clear(); lastClickedIndex.value = -1; error.value = null; isLoading.value = false; } else if (type === 'sftp:readdir:error' && path === currentPath.value) { @@ -450,6 +436,16 @@ const handleWebSocketMessage = (event: MessageEvent) => { if (saveStatus.value === 'error') saveStatus.value = 'idle'; saveError.value = null; }, 5000); } + // --- Handle SFTP Ready --- + else if (type === 'sftp_ready') { + console.log('[FileManager] Received sftp_ready message.'); + // If the file list is empty and we are connected, it might mean the initial load failed. Retry. + // Also check if there's currently an error displayed, indicating a previous load failure. + if (props.isConnected && !isLoading.value && (fileList.value.length === 0 || error.value)) { + console.log('[FileManager] SFTP is ready and file list is empty or has error, retrying loadDirectory.'); + loadDirectory(currentPath.value); + } + } } catch (e) { console.error("Error handling WebSocket message:", e); /* Log other errors */ } @@ -459,7 +455,8 @@ const handleWebSocketMessage = (event: MessageEvent) => { const loadDirectory = (path: string) => { if (!props.ws || props.ws.readyState !== WebSocket.OPEN) { error.value = t('fileManager.errors.websocketNotConnected'); return; } isLoading.value = true; error.value = null; - props.ws.send(JSON.stringify({ type: 'sftp:readdir', payload: { path } })); + const requestId = generateRequestId(); // Generate request ID + props.ws.send(JSON.stringify({ type: 'sftp:readdir', requestId: requestId, payload: { path } })); // Add request ID currentPath.value = path; }; @@ -511,7 +508,8 @@ const handleItemClick = (event: MouseEvent, item: FileListItem) => { isEditorLoading.value = true; editorError.value = null; isEditorVisible.value = true; - props.ws.send(JSON.stringify({ type: 'sftp:readfile', payload: { path: filePath } })); + const requestId = generateRequestId(); // Generate request ID + props.ws.send(JSON.stringify({ type: 'sftp:readfile', requestId: requestId, payload: { path: filePath } })); // Add request ID } } }; @@ -633,7 +631,8 @@ const handleDeleteClick = () => { itemsToDelete.forEach(item => { const targetPath = joinPath(currentPath.value, item.filename); const actionType = item.attrs.isDirectory ? 'sftp:rmdir' : 'sftp:unlink'; - props.ws!.send(JSON.stringify({ type: actionType, payload: { path: targetPath } })); + const requestId = generateRequestId(); // Generate request ID + props.ws!.send(JSON.stringify({ type: actionType, requestId: requestId, payload: { path: targetPath } })); // Add request ID }); } }; @@ -643,7 +642,8 @@ const handleRenameClick = (item: FileListItem) => { if (newName && newName !== item.filename) { const oldPath = joinPath(currentPath.value, item.filename); const newPath = joinPath(currentPath.value, newName); - props.ws.send(JSON.stringify({ type: 'sftp:rename', payload: { oldPath, newPath } })); + const requestId = generateRequestId(); // Generate request ID + props.ws.send(JSON.stringify({ type: 'sftp:rename', requestId: requestId, payload: { oldPath, newPath } })); // Add request ID } }; const handleChangePermissionsClick = (item: FileListItem) => { @@ -654,7 +654,9 @@ const handleChangePermissionsClick = (item: FileListItem) => { if (!/^[0-7]{3,4}$/.test(newModeStr)) { alert(t('fileManager.errors.invalidPermissionsFormat')); return; } const newMode = parseInt(newModeStr, 8); const targetPath = joinPath(currentPath.value, item.filename); - props.ws.send(JSON.stringify({ type: 'sftp:chmod', payload: { targetPath, mode: newMode } })); + const requestId = generateRequestId(); // Generate request ID + // Note: Backend expects 'path' in payload for chmod, not 'targetPath' + props.ws.send(JSON.stringify({ type: 'sftp:chmod', requestId: requestId, payload: { path: targetPath, mode: newMode } })); // Add request ID, fix payload key } }; const handleNewFolderClick = () => { @@ -662,9 +664,9 @@ const handleNewFolderClick = () => { const folderName = prompt(t('fileManager.prompts.enterFolderName')); if (folderName) { const newFolderPath = joinPath(currentPath.value, folderName); - props.ws.send(JSON.stringify({ type: 'sftp:mkdir', payload: { path: newFolderPath } })); + const requestId = generateRequestId(); // Generate request ID + props.ws.send(JSON.stringify({ type: 'sftp:mkdir', requestId: requestId, payload: { path: newFolderPath } })); // Add request ID // 移除立即刷新,依赖 sftp:mkdir:success 消息 - // loadDirectory(currentPath.value); } }; @@ -679,18 +681,19 @@ const handleNewFileClick = () => { return; } const newFilePath = joinPath(currentPath.value, fileName); + const requestId = generateRequestId(); // Generate request ID // 发送创建空文件的请求到后端 (通过写入空内容) props.ws.send(JSON.stringify({ type: 'sftp:writefile', + requestId: requestId, // Add request ID payload: { path: newFilePath, content: '', // 发送空内容来创建文件 encoding: 'utf8', } })); - // 显式调用刷新,即使成功消息处理程序也会刷新 - loadDirectory(currentPath.value); // 确保在发送请求后立即尝试刷新 - // 成功或失败的消息会触发 sftp:writefile:success/error,进而刷新目录 + // 移除显式刷新,依赖 sftp:writefile:success 消息 + // loadDirectory(currentPath.value); } }; diff --git a/packages/frontend/src/views/WorkspaceView.vue b/packages/frontend/src/views/WorkspaceView.vue index 4430fae..e1c93f3 100644 --- a/packages/frontend/src/views/WorkspaceView.vue +++ b/packages/frontend/src/views/WorkspaceView.vue @@ -161,10 +161,15 @@ const initializeWebSocketConnection = () => { terminalInstance.value?.writeln(`\r\n\x1b[31m${getTerminalText('errorPrefix')} ${message.payload}\x1b[0m`); break; // --- Handle Status Updates --- - case 'ssh:status:update': - // console.log('收到状态更新:', message.payload); // Debug log - serverStatus.value = message.payload; - statusError.value = null; // Clear previous error on successful update + case 'status_update': // Corrected message type + // console.log('收到状态更新:', message.payload.status); // Debug log + // Ensure payload and status exist before assigning + if (message.payload && message.payload.status) { + serverStatus.value = message.payload.status; // Assign the nested status object + statusError.value = null; // Clear previous error on successful update + } else { + console.warn('WorkspaceView: Received status_update message with missing payload.status'); + } break; // Optional: Handle status errors if backend sends them // case 'ssh:status:error':