This commit is contained in:
Baobhan Sith
2025-04-14 22:51:05 +08:00
parent 286492fc63
commit a974b8b1d9
49 changed files with 13954 additions and 0 deletions
+771
View File
@@ -0,0 +1,771 @@
import WebSocket, { WebSocketServer } from 'ws';
import http from 'http';
import { Request, RequestHandler } from 'express';
import { Client, ClientChannel, SFTPWrapper, Stats } from 'ssh2'; // 引入 SFTPWrapper 和 Stats
import { WriteStream } from 'fs'; // 需要 WriteStream 类型 (虽然 ssh2 的流类型不同,但可以借用)
import { getDb } from './database'; // 引入数据库实例
import { decrypt } from './utils/crypto'; // 引入解密函数
import path from 'path'; // 需要 path
// 扩展 WebSocket 类型以包含会话和 SSH/SFTP 连接信息
interface AuthenticatedWebSocket extends WebSocket {
isAlive?: boolean;
userId?: number;
username?: string;
sshClient?: Client; // 关联的 SSH Client 实例
sshShellStream?: ClientChannel; // 关联的 SSH Shell Stream
sftpStream?: SFTPWrapper; // 关联的 SFTP Stream
}
// 存储活跃的 SSH/SFTP 连接 (导出以便其他模块访问)
export const activeSshConnections = new Map<AuthenticatedWebSocket, { client: Client, shell: ClientChannel, sftp?: SFTPWrapper }>();
// 存储正在进行的 SFTP 上传操作 (key: uploadId, value: WriteStream)
// 注意:WriteStream 类型来自 'fs',但 ssh2 的流行为类似
const activeUploads = new Map<string, WriteStream>();
// 数据库连接信息接口 (包含加密密码)
interface DbConnectionInfo {
id: number;
name: string;
host: string;
port: number;
username: string;
auth_method: 'password';
encrypted_password?: string; // 注意是可选的,因为可能没有密码 (虽然 MVP 要求有)
// 其他字段...
}
/**
* 清理指定 WebSocket 连接关联的 SSH 资源
* @param ws - WebSocket 连接实例
*/
const cleanupSshConnection = (ws: AuthenticatedWebSocket) => {
const connection = activeSshConnections.get(ws);
if (connection) {
console.log(`WebSocket: 清理用户 ${ws.username} 的 SSH/SFTP 连接...`);
// 注意:SFTP 流通常不需要显式关闭,它依赖于 SSH Client 的关闭
// connection.sftp?.end(); // SFTPWrapper 没有 end 方法
connection.shell?.end(); // 尝试结束 shell 流
connection.client?.end(); // 结束 SSH 客户端连接会隐式关闭 SFTP
activeSshConnections.delete(ws); // 从 Map 中移除
}
};
export const initializeWebSocket = (server: http.Server, sessionParser: RequestHandler): WebSocketServer => {
const wss = new WebSocketServer({ noServer: true });
const db = getDb(); // 获取数据库实例
const interval = setInterval(() => {
wss.clients.forEach((ws: WebSocket) => {
const extWs = ws as AuthenticatedWebSocket;
if (extWs.isAlive === false) {
console.log(`WebSocket 心跳检测:用户 ${extWs.username} 连接无响应,正在终止...`);
cleanupSshConnection(extWs); // 清理 SSH 资源
return extWs.terminate();
}
extWs.isAlive = false;
extWs.ping(() => {});
});
}, 60000); // Increased interval to 60 seconds
server.on('upgrade', (request: Request, socket, head) => {
// @ts-ignore
sessionParser(request, {} as any, () => {
if (!request.session || !request.session.userId) {
console.log('WebSocket 认证失败:未找到会话或用户未登录。');
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
return;
}
console.log(`WebSocket 认证成功:用户 ${request.session.username} (ID: ${request.session.userId})`);
wss.handleUpgrade(request, socket, head, (ws) => {
const extWs = ws as AuthenticatedWebSocket;
extWs.userId = request.session.userId;
extWs.username = request.session.username;
wss.emit('connection', extWs, request);
});
});
});
wss.on('connection', (ws: AuthenticatedWebSocket, request: Request) => {
ws.isAlive = true;
console.log(`WebSocket:客户端 ${ws.username} (ID: ${ws.userId}) 已连接。`);
ws.on('pong', () => { ws.isAlive = true; });
ws.on('message', async (message) => {
console.log(`WebSocket:收到来自 ${ws.username} 的消息: ${message.toString().substring(0, 100)}...`); // 截断长消息日志
try {
const parsedMessage = JSON.parse(message.toString());
const connection = activeSshConnections.get(ws); // 获取当前连接信息
const sftp = connection?.sftp; // 获取 SFTP 实例
// 辅助函数发送错误消息
const sendSftpError = (action: string, path: string | undefined, error: any, customMsg?: string) => {
const errorMessage = customMsg || (error instanceof Error ? error.message : String(error));
console.error(`SFTP: 用户 ${ws.username} 执行 ${action} 操作 ${path ? `${path}` : ''} 失败:`, error);
ws.send(JSON.stringify({ type: `sftp:${action}:error`, path, payload: `${action} 失败: ${errorMessage}` }));
};
// 辅助函数发送成功消息
const sendSftpSuccess = (action: string, path: string | undefined, payload?: any) => {
console.log(`SFTP: 用户 ${ws.username} 执行 ${action} 操作 ${path ? `${path}` : ''} 成功。`);
ws.send(JSON.stringify({ type: `sftp:${action}:success`, path, payload }));
};
// 检查 SFTP 会话是否存在
const ensureSftp = (action: string, path?: string): SFTPWrapper | null => {
if (!sftp) {
console.warn(`WebSocket: 收到来自 ${ws.username} 的 SFTP ${action} 请求,但无活动 SFTP 会话。`);
ws.send(JSON.stringify({ type: `sftp:${action}:error`, path, payload: 'SFTP 会话未初始化或已断开。' }));
return null;
}
return sftp;
};
switch (parsedMessage.type) {
// --- 处理 SSH 连接请求 ---
case 'ssh:connect': {
// 注意:ssh:connect 内部逻辑需要自行处理 sftp 实例的获取,不能依赖顶层的 sftp 变量
if (activeSshConnections.has(ws)) {
console.warn(`WebSocket: 用户 ${ws.username} 已有活动的 SSH 连接,忽略新的连接请求。`);
ws.send(JSON.stringify({ type: 'ssh:error', payload: '已存在活动的 SSH 连接。' }));
return;
}
const connectionId = parsedMessage.payload?.connectionId;
if (!connectionId) {
ws.send(JSON.stringify({ type: 'ssh:error', payload: '缺少 connectionId。' }));
return;
}
console.log(`WebSocket: 用户 ${ws.username} 请求连接到 ID: ${connectionId}`);
ws.send(JSON.stringify({ type: 'ssh:status', payload: '正在获取连接信息...' }));
// 1. 从数据库获取连接信息
const connInfo = await new Promise<DbConnectionInfo | null>((resolve, reject) => {
// 注意:如果多用户,需要验证 connectionId 是否属于当前 userId
db.get('SELECT * FROM connections WHERE id = ?', [connectionId], (err, row: DbConnectionInfo) => {
if (err) return reject(new Error('查询连接信息失败'));
resolve(row ?? null);
});
});
if (!connInfo) {
ws.send(JSON.stringify({ type: 'ssh:error', payload: `未找到 ID 为 ${connectionId} 的连接配置。` }));
return;
}
if (!connInfo.encrypted_password) {
ws.send(JSON.stringify({ type: 'ssh:error', payload: '连接配置缺少密码信息。' }));
return;
}
ws.send(JSON.stringify({ type: 'ssh:status', payload: `正在连接到 ${connInfo.host}...` }));
// 2. 解密密码
let password = '';
try {
password = decrypt(connInfo.encrypted_password);
} catch (decryptError: any) {
console.error(`解密连接 ${connectionId} 密码失败:`, decryptError);
ws.send(JSON.stringify({ type: 'ssh:error', payload: '无法解密连接凭证。' }));
return;
}
// 3. 建立 SSH 连接
const sshClient = new Client();
ws.sshClient = sshClient; // 关联 client
sshClient.on('ready', () => {
console.log(`SSH: 用户 ${ws.username}${connInfo.host} 连接成功!`);
ws.send(JSON.stringify({ type: 'ssh:status', payload: 'SSH 连接成功,正在打开 Shell...' }));
// 4. 请求 Shell 通道
sshClient.shell((err, stream) => {
if (err) {
console.error(`SSH: 用户 ${ws.username} 打开 Shell 失败:`, err);
ws.send(JSON.stringify({ type: 'ssh:error', payload: `打开 Shell 失败: ${err.message}` }));
cleanupSshConnection(ws);
return;
}
ws.sshShellStream = stream; // 关联 stream
// 存储活动连接 (此时 sftp 可能还未就绪)
activeSshConnections.set(ws, { client: sshClient, shell: stream });
console.log(`SSH: 用户 ${ws.username} Shell 通道已打开。`);
// 尝试初始化 SFTP 会话
sshClient.sftp((sftpErr, sftp) => {
if (sftpErr) {
console.error(`SFTP: 用户 ${ws.username} 初始化失败:`, sftpErr);
// 即使 SFTP 失败,也保持 Shell 连接,但发送错误通知
ws.send(JSON.stringify({ type: 'sftp:error', payload: `SFTP 初始化失败: ${sftpErr.message}` }));
// 不再发送 ssh:connected,因为 SFTP 也是核心功能的一部分
// ws.send(JSON.stringify({ type: 'ssh:connected' }));
// 可以在这里发送一个包含错误的状态
ws.send(JSON.stringify({ type: 'ssh:status', payload: 'Shell 已连接,但 SFTP 初始化失败。' }));
return;
}
console.log(`SFTP: 用户 ${ws.username} 会话已初始化。`);
// 将 SFTP 实例存入 Map
const existingConn = activeSshConnections.get(ws);
if (existingConn) {
existingConn.sftp = sftp;
}
// SFTP 就绪后,才真正通知前端连接完成
ws.send(JSON.stringify({ type: 'ssh:connected' }));
});
// 5. 数据转发:Shell -> WebSocket (发送 Base64 编码的数据)
stream.on('data', (data: Buffer) => {
// console.log('SSH Output Buffer Length:', data.length); // Debug log
ws.send(JSON.stringify({
type: 'ssh:output',
payload: data.toString('base64'), // 将 Buffer 转为 Base64 字符串
encoding: 'base64' // 明确告知前端编码方式
}));
});
// 6. 处理 Shell 关闭
stream.on('close', () => {
console.log(`SSH: 用户 ${ws.username} Shell 通道已关闭。`);
ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'Shell 通道已关闭。' }));
cleanupSshConnection(ws); // 清理资源
});
// Stderr 也使用 Base64 发送
stream.stderr.on('data', (data: Buffer) => {
console.error(`SSH Stderr (${ws.username}): ${data.toString('utf8').substring(0,100)}...`); // 日志中尝试 utf8 解码预览
ws.send(JSON.stringify({
type: 'ssh:output', // 同样使用 ssh:output 类型
payload: data.toString('base64'),
encoding: 'base64'
}));
});
});
}).on('error', (err) => {
console.error(`SSH: 用户 ${ws.username} 连接错误:`, err);
ws.send(JSON.stringify({ type: 'ssh:error', payload: `SSH 连接错误: ${err.message}` }));
cleanupSshConnection(ws);
}).on('close', () => {
console.log(`SSH: 用户 ${ws.username} 连接已关闭。`);
// 确保即使 shell 没关闭,也要通知前端并清理
if (activeSshConnections.has(ws)) {
ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'SSH 连接已关闭。' }));
cleanupSshConnection(ws);
}
}).connect({
host: connInfo.host,
port: connInfo.port,
username: connInfo.username,
password: password, // 使用解密后的密码
// TODO: 添加对密钥认证的支持
// privateKey: require('fs').readFileSync('/path/to/key'),
// passphrase: 'key passphrase'
keepaliveInterval: 30000, // Send keep-alive every 30 seconds (milliseconds)
keepaliveCountMax: 3, // Disconnect after 3 missed keep-alives
readyTimeout: 20000 // 连接超时时间 (毫秒)
});
break;
} // end case 'ssh:connect'
// --- 处理 SSH 输入 ---
case 'ssh:input': {
const connection = activeSshConnections.get(ws);
if (connection?.shell && parsedMessage.payload?.data) {
connection.shell.write(parsedMessage.payload.data);
} else {
console.warn(`WebSocket: 收到来自 ${ws.username} 的 SSH 输入,但无活动 Shell 或数据为空。`);
}
break;
}
// --- 处理终端大小调整 ---
case 'ssh:resize': {
const connection = activeSshConnections.get(ws);
const { cols, rows } = parsedMessage.payload || {};
if (connection?.shell && cols && rows) {
console.log(`SSH: 用户 ${ws.username} 调整终端大小: ${cols}x${rows}`);
connection.shell.setWindow(rows, cols, 0, 0); // 注意参数顺序 rows, cols
} else {
console.warn(`WebSocket: 收到来自 ${ws.username} 的调整大小请求,但无活动 Shell 或尺寸数据无效。`);
}
break;
}
// --- 处理 SFTP 目录列表请求 ---
case 'sftp:readdir': {
const targetPath = parsedMessage.payload?.path;
const currentSftp = ensureSftp('readdir', targetPath);
if (!currentSftp) break;
if (typeof targetPath !== 'string') {
sendSftpError('readdir', targetPath, '请求路径无效。');
break;
}
console.log(`SFTP: 用户 ${ws.username} 请求读取目录: ${targetPath}`);
currentSftp.readdir(targetPath, (err, list) => {
if (err) {
sendSftpError('readdir', targetPath, err);
return;
}
// 格式化文件列表以便前端使用
const formattedList = list.map(item => ({
filename: item.filename,
longname: item.longname,
attrs: {
size: item.attrs.size,
uid: item.attrs.uid,
gid: item.attrs.gid,
mode: item.attrs.mode,
atime: item.attrs.atime * 1000,
mtime: item.attrs.mtime * 1000,
isDirectory: item.attrs.isDirectory(),
isFile: item.attrs.isFile(),
isSymbolicLink: item.attrs.isSymbolicLink(),
}
}));
sendSftpSuccess('readdir', targetPath, formattedList);
});
break;
}
// --- 处理 SFTP 文件/目录状态获取请求 ---
case 'sftp:stat': {
const targetPath = parsedMessage.payload?.path;
const currentSftp = ensureSftp('stat', targetPath);
if (!currentSftp) break;
if (typeof targetPath !== 'string') {
sendSftpError('stat', targetPath, '请求路径无效。');
break;
}
console.log(`SFTP: 用户 ${ws.username} 请求获取状态: ${targetPath}`);
currentSftp.lstat(targetPath, (err, stats) => { // 使用 lstat 获取链接本身信息
if (err) {
sendSftpError('stat', targetPath, err);
return;
}
const formattedStats = {
mode: stats.mode,
uid: stats.uid,
gid: stats.gid,
size: stats.size,
atime: stats.atime * 1000,
mtime: stats.mtime * 1000,
isDirectory: stats.isDirectory(),
isFile: stats.isFile(),
isBlockDevice: stats.isBlockDevice(),
isCharacterDevice: stats.isCharacterDevice(),
isSymbolicLink: stats.isSymbolicLink(),
isFIFO: stats.isFIFO(),
isSocket: stats.isSocket(),
};
sendSftpSuccess('stat', targetPath, formattedStats);
});
break;
}
// --- 处理 SFTP 文件上传 ---
case 'sftp:upload:start': {
const { remotePath, uploadId, size } = parsedMessage.payload || {};
const currentSftp = ensureSftp('upload:start', remotePath);
if (!currentSftp) break;
if (typeof remotePath !== 'string' || !uploadId) {
sendSftpError('upload:start', remotePath, '无效的上传请求参数 (remotePath, uploadId)。', undefined);
break;
}
if (activeUploads.has(uploadId)) {
sendSftpError('upload:start', remotePath, '具有相同 ID 的上传已在进行中。', undefined);
break;
}
console.log(`SFTP: 用户 ${ws.username} 开始上传到 ${remotePath} (ID: ${uploadId}, 大小: ${size ?? '未知'})`);
try {
const writeStream = currentSftp.createWriteStream(remotePath);
writeStream.on('error', (err: Error) => {
sendSftpError('upload', remotePath, err, `写入远程文件失败: ${err.message}`);
activeUploads.delete(uploadId);
});
let uploadFinished = false;
const onStreamEnd = (eventName: string) => {
if (uploadFinished) return;
uploadFinished = true;
sendSftpSuccess('upload', remotePath, { uploadId }); // 成功时也带上 uploadId
activeUploads.delete(uploadId);
};
writeStream.on('finish', () => onStreamEnd('finish'));
writeStream.on('close', () => onStreamEnd('close'));
activeUploads.set(uploadId, writeStream as any);
ws.send(JSON.stringify({ type: 'sftp:upload:ready', uploadId }));
} catch (err: any) {
sendSftpError('upload:start', remotePath, err, `无法创建远程文件: ${err.message}`);
}
break;
}
case 'sftp:upload:chunk': {
const { uploadId, data, isLast } = parsedMessage.payload || {};
const writeStream = activeUploads.get(uploadId);
if (!writeStream) {
// console.warn(`WebSocket: 收到上传数据块 (ID: ${uploadId}),但未找到对应的上传任务。`);
// 不必每次都报错,前端可能已经取消或完成
break;
}
if (typeof data !== 'string') {
sendSftpError('upload:chunk', undefined, '无效的数据块格式。', undefined);
break;
}
try {
const buffer = Buffer.from(data, 'base64');
const canWriteMore = writeStream.write(buffer);
if (!canWriteMore) {
writeStream.once('drain', () => {
ws.send(JSON.stringify({ type: 'sftp:upload:resume', uploadId }));
});
ws.send(JSON.stringify({ type: 'sftp:upload:pause', uploadId }));
}
if (isLast) {
writeStream.end();
}
} catch (err: any) {
sendSftpError('upload:chunk', undefined, err, `处理数据块失败: ${err.message}`);
writeStream.end();
activeUploads.delete(uploadId);
}
break;
}
case 'sftp:upload:cancel': {
const { uploadId } = parsedMessage.payload || {};
const writeStream = activeUploads.get(uploadId);
if (writeStream) {
console.log(`SFTP: 用户 ${ws.username} 取消上传 (ID: ${uploadId})`);
writeStream.end(); // 触发清理
// TODO: 删除部分文件? sftp.unlink?
ws.send(JSON.stringify({ type: 'sftp:upload:cancelled', uploadId }));
} else {
// console.warn(`WebSocket: 收到取消上传请求 (ID: ${uploadId}),但未找到对应的上传任务。`);
}
break;
}
// --- 处理 SFTP 文件读取请求 ---
case 'sftp:readfile': {
const targetPath = parsedMessage.payload?.path;
const currentSftp = ensureSftp('readfile', targetPath);
if (!currentSftp) break;
if (typeof targetPath !== 'string') {
sendSftpError('readfile', targetPath, '请求路径无效。');
break;
}
console.log(`SFTP: 用户 ${ws.username} 请求读取文件: ${targetPath}`);
const readStream = currentSftp.createReadStream(targetPath);
let fileContent = '';
let hasError = false;
readStream.on('data', (chunk: Buffer) => {
// 尝试多种编码解码,优先 UTF-8
try {
fileContent += chunk.toString('utf8');
} catch (e) {
// 如果 UTF-8 失败,尝试其他常见编码,例如 GBK (适用于中文 Windows)
// 注意:这只是一个尝试,可能不准确。更可靠的方法是让用户指定编码。
try {
// 需要安装 iconv-lite: npm install iconv-lite @types/iconv-lite -w packages/backend
// import * as iconv from 'iconv-lite';
// fileContent += iconv.decode(chunk, 'gbk');
// 暂时回退到 base64 发送原始数据,让前端处理解码
console.warn(`SFTP: 文件 ${targetPath} 无法以 UTF-8 解码,将发送 Base64 编码内容。`);
fileContent = Buffer.concat([Buffer.from(fileContent), chunk]).toString('base64');
} catch (decodeError) {
console.error(`SFTP: 文件 ${targetPath} 解码失败:`, decodeError);
sendSftpError('readfile', targetPath, '文件解码失败。');
readStream.destroy(); // 停止读取
hasError = true;
}
}
});
readStream.on('error', (err: Error) => {
if (hasError) return; // 避免重复发送错误
sendSftpError('readfile', targetPath, err);
hasError = true;
});
readStream.on('end', () => {
if (hasError) return; // 如果之前已出错,则不发送成功消息
// 判断是发送文本内容还是 Base64
let payload: { content: string; encoding: 'utf8' | 'base64' };
try {
// 尝试再次解码整个内容为 UTF-8,如果成功则发送 UTF-8
Buffer.from(fileContent, 'base64').toString('utf8');
// 如果上一步是 base64 编码,这里会是原始 base64 字符串
if (fileContent === Buffer.from(fileContent, 'base64').toString('base64')) {
payload = { content: fileContent, encoding: 'base64' };
} else {
payload = { content: fileContent, encoding: 'utf8' };
}
} catch (e) {
// 如果整体解码失败,则发送 Base64
payload = { content: Buffer.from(fileContent).toString('base64'), encoding: 'base64' };
}
// 限制发送内容的大小,避免 WebSocket 拥塞 (例如 1MB)
const MAX_CONTENT_SIZE = 1 * 1024 * 1024;
if (Buffer.byteLength(payload.content, payload.encoding === 'base64' ? 'base64' : 'utf8') > MAX_CONTENT_SIZE) {
sendSftpError('readfile', targetPath, `文件过大 (超过 ${MAX_CONTENT_SIZE / 1024 / 1024}MB),无法在编辑器中打开。`);
} else {
sendSftpSuccess('readfile', targetPath, payload);
}
});
break;
}
// --- 处理 SFTP 文件写入请求 ---
case 'sftp:writefile': {
const { path: targetPath, content, encoding } = parsedMessage.payload || {};
const currentSftp = ensureSftp('writefile', targetPath);
if (!currentSftp) break;
if (typeof targetPath !== 'string' || typeof content !== 'string' || (encoding !== 'utf8' && encoding !== 'base64')) {
sendSftpError('writefile', targetPath, '请求参数无效 (需要 path, content, encoding[\'utf8\'|\'base64\'])。');
break;
}
console.log(`SFTP: 用户 ${ws.username} 请求写入文件: ${targetPath}, Encoding: ${encoding}, Content length: ${content.length}`); // 增加日志细节
try {
console.log(`[writefile] Attempting to create buffer for ${targetPath}`);
const buffer = Buffer.from(content, encoding); // 根据 encoding 解码/转换内容为 Buffer
console.log(`[writefile] Buffer created successfully for ${targetPath}. Attempting to create write stream.`);
const writeStream = currentSftp.createWriteStream(targetPath);
console.log(`[writefile] Write stream created for ${targetPath}. Attaching listeners.`);
let hasError = false;
let operationCompleted = false; // Flag to track if finish/error occurred
let backendTimeoutId: NodeJS.Timeout | null = null;
const streamId = Math.random().toString(36).substring(2, 9); // Unique ID for logging this stream instance
const BACKEND_WRITE_TIMEOUT = 15000; // 15 seconds backend timeout
console.log(`[${streamId}] SFTP: Attaching listeners for writeStream to ${targetPath}`);
const cleanupTimeout = () => {
if (backendTimeoutId) {
clearTimeout(backendTimeoutId);
backendTimeoutId = null;
}
};
writeStream.on('error', (err: Error) => {
console.error(`[${streamId}] SFTP: writeStream 'error' event for ${targetPath}:`, err);
if (operationCompleted) return; // Already completed
operationCompleted = true;
cleanupTimeout();
sendSftpError('writefile', targetPath, err, `写入远程文件失败: ${err.message}`);
hasError = true; // Keep track for close handler if needed
});
writeStream.on('finish', () => { // 'finish' 表示所有数据已刷入底层系统
console.log(`[${streamId}] SFTP: writeStream 'finish' event for ${targetPath}. HasError: ${hasError}`);
if (operationCompleted) return; // Already completed (e.g., error occurred first)
operationCompleted = true;
cleanupTimeout();
if (hasError) return; // Error occurred before finish
sendSftpSuccess('writefile', targetPath);
});
writeStream.on('close', () => { // 'close' 表示流已关闭
console.log(`[${streamId}] SFTP: writeStream 'close' event for ${targetPath}. writableFinished: ${writeStream.writableFinished}, HasError: ${hasError}, OperationCompleted: ${operationCompleted}`);
cleanupTimeout(); // Clear timeout if close happens before it fires
// If the stream closed and no error/finish/timeout event handled it yet,
// consider it a success. This handles cases where 'finish' might not fire reliably,
// even if writableFinished is false when close is emitted prematurely.
if (!operationCompleted) {
console.warn(`[${streamId}] SFTP: writeStream 'close' event occurred before 'finish' or 'error'. Assuming success for ${targetPath}.`);
sendSftpSuccess('writefile', targetPath);
operationCompleted = true; // Mark as completed via close
}
// If an error or finish occurred, the respective handlers already sent the message.
// If finish occurred, the 'finish' handler sent success.
// If closed without finishing and without error, the backend timeout might handle it,
// or it might be a legitimate early close after an error on the server side not reported via 'error' event.
});
// 写入数据并结束流
console.log(`[${streamId}] SFTP: Calling writeStream.end() for ${targetPath}`);
writeStream.end(buffer, () => {
console.log(`[${streamId}] SFTP: writeStream.end() callback fired for ${targetPath}. Starting backend timeout.`);
// Start backend timeout *after* end() callback fires (or immediately if no callback needed)
backendTimeoutId = setTimeout(() => {
if (!operationCompleted) {
console.error(`[${streamId}] SFTP: Backend write timeout (${BACKEND_WRITE_TIMEOUT}ms) reached for ${targetPath}.`);
operationCompleted = true; // Mark as completed due to timeout
sendSftpError('writefile', targetPath, `后端写入超时 (${BACKEND_WRITE_TIMEOUT / 1000}秒)`);
}
}, BACKEND_WRITE_TIMEOUT);
});
} catch (err: any) {
console.error(`[writefile] Error during write stream creation or buffer processing for ${targetPath}:`, err); // 增加 catch 日志
// Buffer.from 可能因无效编码或内容抛出错误
sendSftpError('writefile', targetPath, err, `处理文件内容或创建写入流失败: ${err.message}`);
}
break;
}
// --- 新增 SFTP 操作 ---
case 'sftp:mkdir': {
const targetPath = parsedMessage.payload?.path;
const currentSftp = ensureSftp('mkdir', targetPath);
if (!currentSftp) break;
if (typeof targetPath !== 'string') {
sendSftpError('mkdir', targetPath, '请求路径无效。');
break;
}
console.log(`SFTP: 用户 ${ws.username} 请求创建目录: ${targetPath}`);
// TODO: 考虑添加 mode 参数支持
currentSftp.mkdir(targetPath, (err) => {
if (err) {
sendSftpError('mkdir', targetPath, err);
} else {
sendSftpSuccess('mkdir', targetPath);
}
});
break;
}
case 'sftp:rmdir': {
const targetPath = parsedMessage.payload?.path;
const currentSftp = ensureSftp('rmdir', targetPath);
if (!currentSftp) break;
if (typeof targetPath !== 'string') {
sendSftpError('rmdir', targetPath, '请求路径无效。');
break;
}
console.log(`SFTP: 用户 ${ws.username} 请求删除目录: ${targetPath}`);
currentSftp.rmdir(targetPath, (err) => {
if (err) {
sendSftpError('rmdir', targetPath, err);
} else {
sendSftpSuccess('rmdir', targetPath);
}
});
break;
}
case 'sftp:unlink': { // 删除文件
const targetPath = parsedMessage.payload?.path;
const currentSftp = ensureSftp('unlink', targetPath);
if (!currentSftp) break;
if (typeof targetPath !== 'string') {
sendSftpError('unlink', targetPath, '请求路径无效。');
break;
}
console.log(`SFTP: 用户 ${ws.username} 请求删除文件: ${targetPath}`);
currentSftp.unlink(targetPath, (err) => {
if (err) {
sendSftpError('unlink', targetPath, err);
} else {
sendSftpSuccess('unlink', targetPath);
}
});
break;
}
case 'sftp:rename': {
const { oldPath, newPath } = parsedMessage.payload || {};
const currentSftp = ensureSftp('rename', oldPath);
if (!currentSftp) break;
if (typeof oldPath !== 'string' || typeof newPath !== 'string') {
sendSftpError('rename', oldPath, '无效的旧路径或新路径。');
break;
}
console.log(`SFTP: 用户 ${ws.username} 请求重命名: ${oldPath} -> ${newPath}`);
currentSftp.rename(oldPath, newPath, (err) => {
if (err) {
sendSftpError('rename', oldPath, err);
} else {
sendSftpSuccess('rename', oldPath, { oldPath, newPath }); // 返回新旧路径
}
});
break;
}
case 'sftp:chmod': {
const { targetPath, mode } = parsedMessage.payload || {};
const currentSftp = ensureSftp('chmod', targetPath);
if (!currentSftp) break;
if (typeof targetPath !== 'string' || typeof mode !== 'number') {
sendSftpError('chmod', targetPath, '无效的路径或权限模式。');
break;
}
console.log(`SFTP: 用户 ${ws.username} 请求修改权限: ${targetPath} -> ${mode.toString(8)}`); // 以八进制显示 mode
currentSftp.chmod(targetPath, mode, (err) => {
if (err) {
sendSftpError('chmod', targetPath, err);
} else {
sendSftpSuccess('chmod', targetPath, { mode }); // 返回设置的 mode
}
});
break;
}
default:
console.warn(`WebSocket:收到未知类型的消息: ${parsedMessage.type}`);
ws.send(JSON.stringify({ type: 'error', payload: `不支持的消息类型: ${parsedMessage.type}` }));
}
} catch (e) {
console.error('WebSocket:解析消息时出错:', e);
ws.send(JSON.stringify({ type: 'error', payload: '无效的消息格式' }));
}
});
ws.on('close', (code, reason) => {
console.log(`WebSocket:客户端 ${ws.username} (ID: ${ws.userId}) 已断开连接。代码: ${code}, 原因: ${reason.toString()}`);
cleanupSshConnection(ws); // 清理关联的 SSH 资源
});
ws.on('error', (error) => {
console.error(`WebSocket:客户端 ${ws.username} (ID: ${ws.userId}) 发生错误:`, error);
cleanupSshConnection(ws); // 清理关联的 SSH 资源
});
// 不再发送通用欢迎消息,等待前端发起 ssh:connect
// ws.send(JSON.stringify({ type: 'info', payload: `欢迎, ${ws.username}! WebSocket 连接已建立。` }));
});
wss.on('close', () => {
console.log('WebSocket 服务器正在关闭,清理心跳定时器...');
clearInterval(interval);
// 关闭所有活动的 SSH 连接
console.log('关闭所有活动的 SSH 连接...');
activeSshConnections.forEach((conn, ws) => {
cleanupSshConnection(ws);
});
});
console.log('WebSocket 服务器初始化完成。');
return wss;
};