This commit is contained in:
Baobhan Sith
2025-04-15 18:59:56 +08:00
parent 7649a7b69d
commit c026a42d06
43 changed files with 3479 additions and 169 deletions
+95 -115
View File
@@ -5,9 +5,11 @@ import { Client, ClientChannel } from 'ssh2';
import { v4 as uuidv4 } from 'uuid'; // 用于生成唯一的会话 ID
import { getDb } from './database';
import { decrypt } from './utils/crypto';
import { SftpService } from './services/sftp.service'; // 引入 SftpService
import { StatusMonitorService } from './services/status-monitor.service'; // 引入 StatusMonitorService
import * as SshService from './services/ssh.service'; // 引入重构后的 SshService 函数
import { SftpService } from './services/sftp.service';
import { StatusMonitorService } from './services/status-monitor.service';
import * as SshService from './services/ssh.service';
import { AuditLogService } from './services/audit.service'; // 导入 AuditLogService
import { AuditLogActionType } from './types/audit.types'; // 导入 AuditLogActionType
// 扩展 WebSocket 类型以包含会话 ID
interface AuthenticatedWebSocket extends WebSocket {
@@ -27,6 +29,7 @@ export interface ClientState { // 导出以便 Service 可以导入
dbConnectionId: number;
sftp?: SFTPWrapper; // 添加 sftp 实例 (由 SftpService 管理)
statusIntervalId?: NodeJS.Timeout; // 添加状态轮询 ID (由 StatusMonitorService 管理)
ipAddress?: string; // 添加 IP 地址字段
}
// 存储所有活动客户端的状态 (key: sessionId)
@@ -34,8 +37,9 @@ const clientStates = new Map<string, ClientState>();
// --- 服务实例化 ---
// 将 clientStates 传递给需要访问共享状态的服务
const sftpService = new SftpService(clientStates); // 移除 as any
const statusMonitorService = new StatusMonitorService(clientStates); // 移除 as any
const sftpService = new SftpService(clientStates);
const statusMonitorService = new StatusMonitorService(clientStates);
const auditLogService = new AuditLogService(); // 实例化 AuditLogService
/**
* 清理指定会话 ID 关联的所有资源
@@ -55,7 +59,6 @@ const cleanupClientConnection = (sessionId: string | undefined) => {
sftpService.cleanupSftpSession(sessionId);
// 3. 清理 SSH 连接 (调用 SshService 中的底层清理逻辑,或直接操作)
// SshService.cleanupConnection(state.ws); // 旧版 SshService 的清理方式,需要调整
state.sshShellStream?.end(); // 结束 shell 流
state.sshClient?.end(); // 结束 SSH 客户端
@@ -102,10 +105,16 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
return;
}
console.log(`WebSocket 认证成功:用户 ${request.session.username} (ID: ${request.session.userId})`);
// 获取客户端 IP 地址
const ipAddress = request.ip;
console.log(`WebSocket: 升级请求来自 IP: ${ipAddress}`);
wss.handleUpgrade(request, socket, head, (ws) => {
const extWs = ws as AuthenticatedWebSocket;
extWs.userId = request.session.userId;
extWs.username = request.session.username;
// 将 IP 地址附加到 request 对象上传递给 connection 事件处理器,以便后续使用
(request as any).clientIpAddress = ipAddress;
wss.emit('connection', extWs, request);
});
});
@@ -152,44 +161,37 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
console.log(`WebSocket: 用户 ${ws.username} 请求连接到数据库 ID: ${dbConnectionId}`);
ws.send(JSON.stringify({ type: 'ssh:status', payload: '正在处理连接请求...' }));
// 从传递过来的 request 对象获取 IP 地址 (在 catch 块中也需要访问)
const clientIp = (request as any).clientIpAddress || 'unknown';
try {
// 调用 SshService 建立连接并打开 Shell
// 注意:SshService.connectAndOpenShell 现在需要返回 Client 和 ShellStream
// 或者我们在这里编排,调用 SshService 的不同部分
// 这里采用 SshService.connectAndOpenShell 返回包含 client 和 shell 的对象的假设
// SshService 内部不再管理 activeSessions Map
// 模拟调用 SshService (实际应调用重构后的函数)
// const { client, shellStream } = await SshService.connectAndOpenShell(dbConnectionId, ws); // 假设 SshService 返回这些
// --- 手动编排 SSH 连接流程 ---
// 1. 获取连接信息 (与旧代码类似,但移到这里)
// 1. 获取连接信息
ws.send(JSON.stringify({ type: 'ssh:status', payload: '正在获取连接信息...' }));
const connInfo = await SshService.getConnectionDetails(dbConnectionId); // 假设 SshService 提供此函数
const connInfo = await SshService.getConnectionDetails(dbConnectionId);
// 2. 建立 SSH 连接 (调用 SshService 的底层连接函数)
// 2. 建立 SSH 连接
ws.send(JSON.stringify({ type: 'ssh:status', payload: `正在连接到 ${connInfo.host}...` }));
const sshClient = await SshService.establishSshConnection(connInfo); // 假设 SshService 提供此函数
const sshClient = await SshService.establishSshConnection(connInfo);
// 3. 连接成功,创建状态
const newSessionId = uuidv4(); // 生成唯一会话 ID
ws.sessionId = newSessionId; // 关联到 WebSocket
const newSessionId = uuidv4();
ws.sessionId = newSessionId;
const newState: ClientState = {
ws: ws,
sshClient: sshClient,
dbConnectionId: dbConnectionId,
// shellStream 稍后添加
ipAddress: clientIp, // 存储 IP 地址
};
clientStates.set(newSessionId, newState);
console.log(`WebSocket: 为用户 ${ws.username} 创建新会话 ${newSessionId} (DB ID: ${dbConnectionId})`);
console.log(`WebSocket: 为用户 ${ws.username} (IP: ${clientIp}) 创建新会话 ${newSessionId} (DB ID: ${dbConnectionId})`);
// 4. 打开 Shell
ws.send(JSON.stringify({ type: 'ssh:status', payload: 'SSH 连接成功,正在打开 Shell...' }));
try {
const shellStream = await SshService.openShell(sshClient); // 假设 SshService 提供此函数
newState.sshShellStream = shellStream; // 存储 Shell 流
const shellStream = await SshService.openShell(sshClient);
newState.sshShellStream = shellStream;
// 5. 设置 Shell 事件转发
shellStream.on('data', (data: Buffer) => {
@@ -206,46 +208,54 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
shellStream.on('close', () => {
console.log(`SSH: 会话 ${newSessionId} 的 Shell 通道已关闭。`);
ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'Shell 通道已关闭。' }));
cleanupClientConnection(newSessionId); // Shell 关闭时清理整个会话
cleanupClientConnection(newSessionId);
});
// 6. 发送 SSH 连接成功消息 (Shell 已就绪)
// 6. 发送 SSH 连接成功消息
ws.send(JSON.stringify({
type: 'ssh:connected',
payload: {
connectionId: dbConnectionId,
sessionId: newSessionId
// sftpReady 标志移除,将通过 sftp_ready 消息通知
}
}));
console.log(`WebSocket: 会话 ${newSessionId} SSH 连接和 Shell 建立成功。`);
// 记录审计日志:SSH 连接成功
auditLogService.logAction('SSH_CONNECT_SUCCESS', {
userId: ws.userId,
username: ws.username,
connectionId: dbConnectionId,
sessionId: newSessionId,
ip: newState.ipAddress
});
// 7. 异步初始化 SFTP 和启动状态监控
console.log(`WebSocket: 会话 ${newSessionId} 正在异步初始化 SFTP...`);
sftpService.initializeSftpSession(newSessionId)
.then(() => {
console.log(`SFTP: 会话 ${newSessionId} 异步初始化成功。`);
// SFTP 初始化成功后,前端会收到 sftp_ready 消息
// FileManager 会在 isConnected 变为 true 后自动请求目录
})
.catch(sftpInitError => {
console.error(`WebSocket: 会话 ${newSessionId} 异步初始化 SFTP 失败:`, sftpInitError);
// 错误消息已在 initializeSftpSession 内部发送
});
.then(() => console.log(`SFTP: 会话 ${newSessionId} 异步初始化成功。`))
.catch(sftpInitError => console.error(`WebSocket: 会话 ${newSessionId} 异步初始化 SFTP 失败:`, sftpInitError));
console.log(`WebSocket: 会话 ${newSessionId} 正在启动状态监控...`);
statusMonitorService.startStatusPolling(newSessionId); // 启动状态轮询
statusMonitorService.startStatusPolling(newSessionId);
} catch (shellError: any) {
console.error(`SSH: 会话 ${newSessionId} 打开 Shell 失败:`, shellError);
// 记录审计日志:打开 Shell 失败
auditLogService.logAction('SSH_SHELL_FAILURE', {
userId: ws.userId,
username: ws.username,
connectionId: dbConnectionId,
sessionId: newSessionId,
ip: newState.ipAddress,
reason: shellError.message
});
ws.send(JSON.stringify({ type: 'ssh:error', payload: `打开 Shell 失败: ${shellError.message}` }));
cleanupClientConnection(newSessionId); // 打开 Shell 失败也需要清理
cleanupClientConnection(newSessionId);
}
// 7. 设置 SSH Client 的关闭和错误处理
// 8. 设置 SSH Client 的关闭和错误处理 (移到 Shell 成功打开之后)
sshClient.on('close', () => {
console.log(`SSH: 会话 ${newSessionId} 的客户端连接已关闭。`);
// Shell 关闭事件通常会先触发清理,这里作为保险
cleanupClientConnection(newSessionId);
});
sshClient.on('error', (err: Error) => {
@@ -255,9 +265,16 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
});
} catch (connectError: any) {
console.error(`WebSocket: 用户 ${ws.username} 连接到数据库 ID ${dbConnectionId} 失败:`, connectError);
console.error(`WebSocket: 用户 ${ws.username} (IP: ${clientIp}) 连接到数据库 ID ${dbConnectionId} 失败:`, connectError);
// 记录审计日志:SSH 连接失败
auditLogService.logAction('SSH_CONNECT_FAILURE', {
userId: ws.userId,
username: ws.username,
connectionId: dbConnectionId,
ip: clientIp,
reason: connectError.message
});
ws.send(JSON.stringify({ type: 'ssh:error', payload: `连接失败: ${connectError.message}` }));
// 此处不需要 cleanup,因为状态尚未创建
}
break;
} // end case 'ssh:connect'
@@ -293,92 +310,73 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
case 'sftp:readdir':
case 'sftp:stat':
case 'sftp:readfile':
case 'sftp:writefile': // Added missing case
case 'sftp:writefile':
case 'sftp:mkdir':
case 'sftp:rmdir':
case 'sftp:unlink':
case 'sftp:rename':
case 'sftp:chmod':
case 'sftp:realpath': { // Add realpath case
case 'sftp:realpath': {
if (!sessionId || !state) {
console.warn(`WebSocket: 收到来自 ${ws.username} 的 SFTP 请求 (${type}),但无活动会话。`);
// 尝试包含 requestId 发送错误,如果 requestId 存在的话
const errPayload: { message: string; requestId?: string } = { message: '无效的会话' };
if (requestId) errPayload.requestId = requestId;
ws.send(JSON.stringify({ type: 'sftp_error', payload: errPayload }));
return;
}
// --- 添加 Request ID 检查 ---
// 对于需要响应关联的操作,强制要求 requestId
if (!requestId) {
console.error(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 SFTP 请求 (${type}),但缺少 requestId。`);
ws.send(JSON.stringify({ type: 'sftp_error', payload: { message: `SFTP 操作 ${type} 缺少 requestId` } }));
return; // 没有 requestId 则不继续处理
return;
}
// --- 结束 Request ID 检查 ---
// Explicitly call SftpService methods based on type
// TODO: 在这里或 SftpService 内部添加 SFTP 操作的审计日志记录 (可选)
// 例如: auditLogService.logAction('SFTP_ACTION', { type, path: payload?.path, userId: ws.userId, ip: state.ipAddress });
try {
switch (type) {
case 'sftp:readdir':
if (payload?.path) {
sftpService.readdir(sessionId, payload.path, requestId);
} else { throw new Error("Missing 'path' in payload for readdir"); }
if (payload?.path) sftpService.readdir(sessionId, payload.path, requestId);
else throw new Error("Missing 'path' in payload for readdir");
break;
case 'sftp:stat':
if (payload?.path) {
sftpService.stat(sessionId, payload.path, requestId);
} else { throw new Error("Missing 'path' in payload for stat"); }
if (payload?.path) sftpService.stat(sessionId, payload.path, requestId);
else throw new Error("Missing 'path' in payload for stat");
break;
case 'sftp:readfile':
if (payload?.path) {
sftpService.readFile(sessionId, payload.path, requestId);
} else { throw new Error("Missing 'path' in payload for readfile"); }
if (payload?.path) sftpService.readFile(sessionId, payload.path, requestId);
else throw new Error("Missing 'path' in payload for readfile");
break;
case 'sftp:writefile':
// Handle both 'data' (from potential future upload refactor) and 'content'
const fileContent = payload?.content ?? payload?.data ?? ''; // Default to empty string for create
const fileContent = payload?.content ?? payload?.data ?? '';
if (payload?.path) {
// Ensure content is base64 encoded if needed (assuming frontend sends base64 for now)
// If creating empty file, data might be empty string, Buffer.from('') is fine.
const dataToSend = (typeof fileContent === 'string') ? fileContent : ''; // Ensure it's a string
const dataToSend = (typeof fileContent === 'string') ? fileContent : '';
sftpService.writefile(sessionId, payload.path, dataToSend, requestId);
} else { throw new Error("Missing 'path' in payload for writefile"); }
} else throw new Error("Missing 'path' in payload for writefile");
break;
case 'sftp:mkdir':
if (payload?.path) {
sftpService.mkdir(sessionId, payload.path, requestId);
} else { throw new Error("Missing 'path' in payload for mkdir"); }
if (payload?.path) sftpService.mkdir(sessionId, payload.path, requestId);
else throw new Error("Missing 'path' in payload for mkdir");
break;
case 'sftp:rmdir':
if (payload?.path) {
sftpService.rmdir(sessionId, payload.path, requestId);
} else { throw new Error("Missing 'path' in payload for rmdir"); }
if (payload?.path) sftpService.rmdir(sessionId, payload.path, requestId);
else throw new Error("Missing 'path' in payload for rmdir");
break;
case 'sftp:unlink':
if (payload?.path) {
sftpService.unlink(sessionId, payload.path, requestId);
} else { throw new Error("Missing 'path' in payload for unlink"); }
if (payload?.path) sftpService.unlink(sessionId, payload.path, requestId);
else throw new Error("Missing 'path' in payload for unlink");
break;
case 'sftp:rename':
if (payload?.oldPath && payload?.newPath) {
sftpService.rename(sessionId, payload.oldPath, payload.newPath, requestId);
} else { throw new Error("Missing 'oldPath' or 'newPath' in payload for rename"); }
if (payload?.oldPath && payload?.newPath) sftpService.rename(sessionId, payload.oldPath, payload.newPath, requestId);
else throw new Error("Missing 'oldPath' or 'newPath' in payload for rename");
break;
case 'sftp:chmod':
if (payload?.path && typeof payload?.mode === 'number') {
sftpService.chmod(sessionId, payload.path, payload.mode, requestId);
} else { throw new Error("Missing 'path' or invalid 'mode' in payload for chmod"); }
if (payload?.path && typeof payload?.mode === 'number') sftpService.chmod(sessionId, payload.path, payload.mode, requestId);
else throw new Error("Missing 'path' or invalid 'mode' in payload for chmod");
break;
case 'sftp:realpath': // Add realpath handler
if (payload?.path) {
sftpService.realpath(sessionId, payload.path, requestId);
} else { throw new Error("Missing 'path' in payload for realpath"); }
case 'sftp:realpath':
if (payload?.path) sftpService.realpath(sessionId, payload.path, requestId);
else throw new Error("Missing 'path' in payload for realpath");
break;
default:
// Should not happen if already checked type, but as a safeguard
throw new Error(`Unhandled SFTP type: ${type}`);
default: throw new Error(`Unhandled SFTP type: ${type}`);
}
} catch (sftpCallError: any) {
console.error(`WebSocket: Error preparing/calling SFTP service for ${type} (Request ID: ${requestId}):`, sftpCallError);
@@ -386,7 +384,7 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
}
break;
}
// --- SFTP 文件上传 (委托给 SftpService) ---
// --- SFTP 文件上传 ---
case 'sftp:upload:start': {
if (!sessionId || !state) {
console.warn(`WebSocket: 收到来自 ${ws.username} 的 SFTP 请求 (${type}),但无活动会话。`);
@@ -398,34 +396,27 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId: payload?.uploadId, message: '缺少 uploadId, remotePath 或 size' } }));
return;
}
// TODO: Add audit log for SFTP upload start?
sftpService.startUpload(sessionId, payload.uploadId, payload.remotePath, payload.size);
break;
}
case 'sftp:upload:chunk': {
if (!sessionId || !state) {
// Don't warn repeatedly for chunks if session is gone
return;
}
if (!sessionId || !state) return;
if (!payload?.uploadId || typeof payload?.chunkIndex !== 'number' || !payload?.data) {
console.error(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求,但缺少 uploadId, chunkIndex 或 data。`);
// Avoid flooding with errors for every chunk if something is wrong
// Consider sending a single error and potentially cancelling on the service side
return;
}
// Assuming data is base64 encoded string from frontend
sftpService.handleUploadChunk(sessionId, payload.uploadId, payload.chunkIndex, payload.data);
break;
}
case 'sftp:upload:cancel': {
if (!sessionId || !state) {
// Don't warn if session is already gone
return;
}
if (!sessionId || !state) return;
if (!payload?.uploadId) {
console.error(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求,但缺少 uploadId。`);
ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId: payload?.uploadId, message: '缺少 uploadId' } }));
return;
}
// TODO: Add audit log for SFTP upload cancel?
sftpService.cancelUpload(sessionId, payload.uploadId);
break;
}
@@ -437,20 +428,18 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
} catch (error: any) {
console.error(`WebSocket: 处理来自 ${ws.username} (会话: ${sessionId}) 的消息 (${type}) 时发生顶层错误:`, error);
ws.send(JSON.stringify({ type: 'error', payload: `处理消息时发生内部错误: ${error.message}` }));
// 考虑是否需要清理连接?取决于错误的性质
// cleanupClientConnection(sessionId);
}
});
// --- 连接关闭和错误处理 ---
ws.on('close', (code, reason) => {
console.log(`WebSocket:客户端 ${ws.username} (会话: ${ws.sessionId}) 已断开连接。代码: ${code}, 原因: ${reason.toString()}`);
cleanupClientConnection(ws.sessionId); // 使用会话 ID 清理
cleanupClientConnection(ws.sessionId);
});
ws.on('error', (error) => {
console.error(`WebSocket:客户端 ${ws.username} (会话: ${ws.sessionId}) 发生错误:`, error);
cleanupClientConnection(ws.sessionId); // 使用会话 ID 清理
cleanupClientConnection(ws.sessionId);
});
});
@@ -458,7 +447,6 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
wss.on('close', () => {
console.log('WebSocket 服务器正在关闭,清理心跳定时器和所有活动会话...');
clearInterval(heartbeatInterval);
// 关闭所有活动的连接
clientStates.forEach((state, sessionId) => {
cleanupClientConnection(sessionId);
});
@@ -470,11 +458,3 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
};
// --- 移除旧的辅助函数 ---
// - connectSshClient
// - fetchServerStatus
// - executeSshCommand
// - startStatusPolling
// - cleanupSshConnection (旧版本)
// - activeSshConnections Map
// - activeUploads Map
// - previousNetStats Map