refactor: 重构后端

This commit is contained in:
Baobhan Sith
2025-04-15 09:56:33 +08:00
parent 1d3ad45c96
commit d1f874d38b
8 changed files with 1111 additions and 1636 deletions
+286 -101
View File
@@ -1,17 +1,44 @@
import { Client, SFTPWrapper, Stats, Dirent } from 'ssh2'; // 导入 Stats 和 Dirent 类型
import { Client, SFTPWrapper, Stats } from 'ssh2';
import { WebSocket } from 'ws';
// import { logger } from '../utils/logger'; // 不再使用自定义 logger,改用 console
import { ClientState } from '../websocket'; // 导入统一的 ClientState
// 定义客户端状态接口
interface ClientState {
ws: WebSocket;
sshClient: Client;
sftp?: SFTPWrapper;
// 如果需要,可以添加其他相关的状态属性
// 定义服务器状态的数据结构 (与前端 StatusMonitor.vue 匹配)
// Note: This interface seems out of place here, but keeping it for now as it was in the original file.
// Ideally, it should be in a shared types file.
interface ServerStatus {
cpuPercent?: number;
memPercent?: number;
memUsed?: number; // MB
memTotal?: number; // MB
swapPercent?: number;
swapUsed?: number; // MB
swapTotal?: number; // MB
diskPercent?: number;
diskUsed?: number; // KB
diskTotal?: number; // KB
cpuModel?: string;
netRxRate?: number; // Bytes per second
netTxRate?: number; // Bytes per second
netInterface?: string;
osName?: string;
loadAvg?: number[]; // 系统平均负载 [1min, 5min, 15min]
timestamp: number; // 状态获取时间戳
}
// Interface for parsed network stats - Also seems out of place here.
interface NetworkStats {
[interfaceName: string]: {
rx_bytes: number;
tx_bytes: number;
}
}
// Note: These constants seem related to StatusMonitorService, not SftpService.
const DEFAULT_POLLING_INTERVAL = 1000;
const previousNetStats = new Map<string, { rx: number, tx: number, timestamp: number }>();
export class SftpService {
private clientStates: Map<string, ClientState>; // 存储 connectionId 到 ClientState 的映射
private clientStates: Map<string, ClientState>; // 使用导入的 ClientState
constructor(clientStates: Map<string, ClientState>) {
this.clientStates = clientStates;
@@ -19,164 +46,322 @@ export class SftpService {
/**
* 初始化 SFTP 会话
* @param connectionId 连接 ID
* @param sessionId 会话 ID
*/
async initializeSftpSession(connectionId: string): Promise<void> {
const state = this.clientStates.get(connectionId);
async initializeSftpSession(sessionId: string): Promise<void> {
const state = this.clientStates.get(sessionId);
if (!state || !state.sshClient || state.sftp) {
console.warn(`[SFTP] 无法为 ${connectionId} 初始化 SFTP:状态无效或 SFTP 已初始化。`);
console.warn(`[SFTP] 无法为会话 ${sessionId} 初始化 SFTP:状态无效、SSH客户端不存在或 SFTP 已初始化。`);
return;
}
if (!state.sshClient) {
console.error(`[SFTP] 会话 ${sessionId} 的 SSH 客户端不存在,无法初始化 SFTP。`);
return;
}
return new Promise((resolve, reject) => {
state.sshClient.sftp((err, sftp) => {
state.sshClient.sftp((err, sftpInstance) => {
if (err) {
console.error(`[SFTP] 为 ${connectionId} 初始化 SFTP 会话失败:`, err);
state.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, message: 'SFTP 初始化失败' } }));
console.error(`[SFTP] 为会话 ${sessionId} 初始化 SFTP 会话失败:`, err);
state.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId: state.dbConnectionId, message: 'SFTP 初始化失败' } }));
reject(err);
} else {
console.log(`[SFTP] 为 ${connectionId} 初始化 SFTP 会话成功。`);
state.sftp = sftp;
state.ws.send(JSON.stringify({ type: 'sftp_ready', payload: { connectionId } }));
sftp.on('end', () => {
console.log(`[SFTP] ${connectionId} 的 SFTP 会话已结束。`);
if (state) state.sftp = undefined; // 在结束时清除 SFTP 实例
console.log(`[SFTP] 为会话 ${sessionId} 初始化 SFTP 会话成功。`);
state.sftp = sftpInstance;
state.ws.send(JSON.stringify({ type: 'sftp_ready', payload: { connectionId: state.dbConnectionId } }));
sftpInstance.on('end', () => {
console.log(`[SFTP] 会话 ${sessionId} 的 SFTP 会话已结束。`);
if (state) state.sftp = undefined;
});
sftp.on('close', () => {
console.log(`[SFTP] ${connectionId} 的 SFTP 会话已关闭。`);
if (state) state.sftp = undefined; // 在关闭时清除 SFTP 实例
sftpInstance.on('close', () => {
console.log(`[SFTP] 会话 ${sessionId} 的 SFTP 会话已关闭。`);
if (state) state.sftp = undefined;
});
sftp.on('error', (sftpErr: Error) => { // 为 sftpErr 添加 Error 类型
console.error(`[SFTP] ${connectionId} 的 SFTP 会话出错:`, sftpErr);
if (state) state.sftp = undefined; // 在出错时清除 SFTP 实例
// 可选:通知客户端
state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, message: 'SFTP 会话错误' } }));
sftpInstance.on('error', (sftpErr: Error) => {
console.error(`[SFTP] 会话 ${sessionId} 的 SFTP 会话出错:`, sftpErr);
if (state) state.sftp = undefined;
state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId: state.dbConnectionId, message: 'SFTP 会话错误' } }));
});
resolve();
}
});
});
}
cleanupSftpSession(connectionId: string): void {
const state = this.clientStates.get(connectionId);
/**
* 清理 SFTP 会话
* @param sessionId 会话 ID
*/
cleanupSftpSession(sessionId: string): void {
const state = this.clientStates.get(sessionId);
if (state?.sftp) {
logger.info(`[SFTP] Cleaning up SFTP session for ${connectionId}`);
console.log(`[SFTP] 正在清理 ${sessionId} 的 SFTP 会话...`);
state.sftp.end();
state.sftp = undefined;
}
}
// Placeholder methods for SFTP operations - to be implemented
async readdir(connectionId: string, path: string, requestId: string): Promise<void> {
const state = this.clientStates.get(connectionId);
// --- SFTP 操作方法 ---
/** 读取目录内容 */
async readdir(sessionId: string, path: string, requestId: string): Promise<void> {
const state = this.clientStates.get(sessionId);
if (!state || !state.sftp) {
logger.warn(`[SFTP] SFTP not ready for readdir on ${connectionId}`);
state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } }));
console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 readdir (ID: ${requestId})`);
state?.ws.send(JSON.stringify({ type: 'sftp:readdir:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId }));
return;
}
// Implementation to follow
logger.debug(`[SFTP] Received readdir request for ${connectionId}:${path}`);
// Example: state.sftp.readdir(...)
state.ws.send(JSON.stringify({ type: 'sftp_readdir_result', payload: { connectionId, requestId, files: [] /* Placeholder */ } }));
console.debug(`[SFTP ${sessionId}] Received readdir request for ${path} (ID: ${requestId})`);
try {
state.sftp.readdir(path, (err, list) => {
if (err) {
console.error(`[SFTP ${sessionId}] readdir ${path} failed (ID: ${requestId}):`, err);
state.ws.send(JSON.stringify({ type: 'sftp:readdir:error', path: path, payload: `读取目录失败: ${err.message}`, requestId: requestId }));
} else {
const files = list.map((item) => ({
filename: item.filename,
longname: item.longname,
attrs: {
size: item.attrs.size, uid: item.attrs.uid, gid: item.attrs.gid, mode: item.attrs.mode,
atime: item.attrs.atime * 1000, mtime: item.attrs.mtime * 1000,
isDirectory: item.attrs.isDirectory(), isFile: item.attrs.isFile(), isSymbolicLink: item.attrs.isSymbolicLink(),
}
}));
state.ws.send(JSON.stringify({ type: 'sftp:readdir:success', path: path, payload: files, requestId: requestId }));
}
});
} catch (error: any) {
console.error(`[SFTP ${sessionId}] readdir ${path} caught unexpected error (ID: ${requestId}):`, error);
state.ws.send(JSON.stringify({ type: 'sftp:readdir:error', path: path, payload: `读取目录时发生意外错误: ${error.message}`, requestId: requestId }));
}
}
async stat(connectionId: string, path: string, requestId: string): Promise<void> {
const state = this.clientStates.get(connectionId);
/** 获取文件/目录状态信息 */
async stat(sessionId: string, path: string, requestId: string): Promise<void> {
const state = this.clientStates.get(sessionId);
if (!state || !state.sftp) {
logger.warn(`[SFTP] SFTP not ready for stat on ${connectionId}`);
state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } }));
console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 stat (ID: ${requestId})`);
state?.ws.send(JSON.stringify({ type: 'sftp:stat:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); // Use specific error type
return;
}
logger.debug(`[SFTP] Received stat request for ${connectionId}:${path}`);
// Implementation to follow
state.ws.send(JSON.stringify({ type: 'sftp_stat_result', payload: { connectionId, requestId, stats: null /* Placeholder */ } }));
console.debug(`[SFTP ${sessionId}] Received stat request for ${path} (ID: ${requestId})`);
try {
state.sftp.lstat(path, (err, stats: Stats) => {
if (err) {
console.error(`[SFTP ${sessionId}] stat ${path} failed (ID: ${requestId}):`, err);
state.ws.send(JSON.stringify({ type: 'sftp:stat:error', path: path, payload: `获取状态失败: ${err.message}`, requestId: requestId }));
} else {
const fileStats = {
size: stats.size, uid: stats.uid, gid: stats.gid, mode: stats.mode,
atime: stats.atime * 1000, mtime: stats.mtime * 1000,
isDirectory: stats.isDirectory(), isFile: stats.isFile(), isSymbolicLink: stats.isSymbolicLink(),
};
// Send specific success type
state.ws.send(JSON.stringify({ type: 'sftp:stat:success', path: path, payload: fileStats, requestId: requestId }));
}
});
} catch (error: any) {
console.error(`[SFTP ${sessionId}] stat ${path} caught unexpected error (ID: ${requestId}):`, error);
state.ws.send(JSON.stringify({ type: 'sftp:stat:error', path: path, payload: `获取状态时发生意外错误: ${error.message}`, requestId: requestId }));
}
}
async readFile(connectionId: string, path: string, requestId: string): Promise<void> {
const state = this.clientStates.get(connectionId);
/** 读取文件内容 */
async readFile(sessionId: string, path: string, requestId: string): Promise<void> {
const state = this.clientStates.get(sessionId);
if (!state || !state.sftp) {
logger.warn(`[SFTP] SFTP not ready for readFile on ${connectionId}`);
state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } }));
console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 readFile (ID: ${requestId})`);
state?.ws.send(JSON.stringify({ type: 'sftp:readfile:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId }));
return;
}
logger.debug(`[SFTP] Received readFile request for ${connectionId}:${path}`);
// Implementation to follow
state.ws.send(JSON.stringify({ type: 'sftp_readfile_result', payload: { connectionId, requestId, data: '' /* Placeholder */ } }));
console.debug(`[SFTP ${sessionId}] Received readFile request for ${path} (ID: ${requestId})`);
try {
const readStream = state.sftp.createReadStream(path);
let fileData = Buffer.alloc(0);
let errorOccurred = false;
readStream.on('data', (chunk: Buffer) => { fileData = Buffer.concat([fileData, chunk]); });
readStream.on('error', (err: Error) => {
if (errorOccurred) return; errorOccurred = true;
console.error(`[SFTP ${sessionId}] readFile ${path} stream error (ID: ${requestId}):`, err);
state.ws.send(JSON.stringify({ type: 'sftp:readfile:error', path: path, payload: `读取文件流错误: ${err.message}`, requestId: requestId }));
});
readStream.on('end', () => {
if (!errorOccurred) {
console.log(`[SFTP ${sessionId}] readFile ${path} success, size: ${fileData.length} bytes (ID: ${requestId})`);
state.ws.send(JSON.stringify({ type: 'sftp:readfile:success', path: path, payload: { content: fileData.toString('base64'), encoding: 'base64' }, requestId: requestId }));
}
});
} catch (error: any) {
console.error(`[SFTP ${sessionId}] readFile ${path} caught unexpected error (ID: ${requestId}):`, error);
state.ws.send(JSON.stringify({ type: 'sftp:readfile:error', path: path, payload: `读取文件时发生意外错误: ${error.message}`, requestId: requestId }));
}
}
async writeFile(connectionId: string, path: string, data: string, requestId: string): Promise<void> {
const state = this.clientStates.get(connectionId);
/** 写入文件内容 */
async writefile(sessionId: string, path: string, data: string, requestId: string): Promise<void> {
const state = this.clientStates.get(sessionId);
if (!state || !state.sftp) {
logger.warn(`[SFTP] SFTP not ready for writeFile on ${connectionId}`);
state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } }));
console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 writefile (ID: ${requestId})`);
state?.ws.send(JSON.stringify({ type: 'sftp:writefile:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId }));
return;
}
logger.debug(`[SFTP] Received writeFile request for ${connectionId}:${path}`);
// Implementation to follow
state.ws.send(JSON.stringify({ type: 'sftp_writefile_result', payload: { connectionId, requestId, success: false /* Placeholder */ } }));
console.debug(`[SFTP ${sessionId}] Received writefile request for ${path} (ID: ${requestId})`);
try {
const buffer = Buffer.from(data, 'utf8');
console.debug(`[SFTP ${sessionId}] Creating write stream for ${path} (ID: ${requestId})`);
const writeStream = state.sftp.createWriteStream(path);
let errorOccurred = false;
writeStream.on('error', (err: Error) => {
if (errorOccurred) return; // Prevent sending multiple errors
errorOccurred = true;
console.error(`[SFTP ${sessionId}] writefile ${path} stream error (ID: ${requestId}):`, err);
state.ws.send(JSON.stringify({ type: 'sftp:writefile:error', path: path, payload: `写入文件流错误: ${err.message}`, requestId: requestId }));
});
// Listen for the 'close' event which indicates the stream has finished writing and the file descriptor is closed.
writeStream.on('close', () => {
if (!errorOccurred) {
console.log(`[SFTP ${sessionId}] writefile ${path} stream closed successfully (ID: ${requestId})`);
state.ws.send(JSON.stringify({ type: 'sftp:writefile:success', path: path, requestId: requestId }));
}
});
console.debug(`[SFTP ${sessionId}] Writing ${buffer.length} bytes to ${path} (ID: ${requestId})`);
writeStream.end(buffer); // Start writing and close the stream afterwards
console.debug(`[SFTP ${sessionId}] writefile ${path} end() called (ID: ${requestId})`);
// Success message is now sent in the 'close' event handler
} catch (error: any) {
console.error(`[SFTP ${sessionId}] writefile ${path} caught unexpected error (ID: ${requestId}):`, error);
state.ws.send(JSON.stringify({ type: 'sftp:writefile:error', path: path, payload: `写入文件时发生意外错误: ${error.message}`, requestId: requestId }));
}
}
async mkdir(connectionId: string, path: string, requestId: string): Promise<void> {
const state = this.clientStates.get(connectionId);
/** 创建目录 */
async mkdir(sessionId: string, path: string, requestId: string): Promise<void> {
const state = this.clientStates.get(sessionId);
if (!state || !state.sftp) {
logger.warn(`[SFTP] SFTP not ready for mkdir on ${connectionId}`);
state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } }));
console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 mkdir (ID: ${requestId})`);
state?.ws.send(JSON.stringify({ type: 'sftp:mkdir:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); // Use specific error type
return;
}
logger.debug(`[SFTP] Received mkdir request for ${connectionId}:${path}`);
// Implementation to follow
state.ws.send(JSON.stringify({ type: 'sftp_mkdir_result', payload: { connectionId, requestId, success: false /* Placeholder */ } }));
console.debug(`[SFTP ${sessionId}] Received mkdir request for ${path} (ID: ${requestId})`);
try {
state.sftp.mkdir(path, (err) => {
if (err) {
console.error(`[SFTP ${sessionId}] mkdir ${path} failed (ID: ${requestId}):`, err);
state.ws.send(JSON.stringify({ type: 'sftp:mkdir:error', path: path, payload: `创建目录失败: ${err.message}`, requestId: requestId }));
} else {
console.log(`[SFTP ${sessionId}] mkdir ${path} success (ID: ${requestId})`);
state.ws.send(JSON.stringify({ type: 'sftp:mkdir:success', path: path, requestId: requestId })); // Send specific success type
}
});
} catch (error: any) {
console.error(`[SFTP ${sessionId}] mkdir ${path} caught unexpected error (ID: ${requestId}):`, error);
state.ws.send(JSON.stringify({ type: 'sftp:mkdir:error', path: path, payload: `创建目录时发生意外错误: ${error.message}`, requestId: requestId }));
}
}
async rmdir(connectionId: string, path: string, requestId: string): Promise<void> {
const state = this.clientStates.get(connectionId);
/** 删除空目录 */
async rmdir(sessionId: string, path: string, requestId: string): Promise<void> {
const state = this.clientStates.get(sessionId);
if (!state || !state.sftp) {
logger.warn(`[SFTP] SFTP not ready for rmdir on ${connectionId}`);
state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } }));
console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 rmdir (ID: ${requestId})`);
state?.ws.send(JSON.stringify({ type: 'sftp:rmdir:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); // Use specific error type
return;
}
logger.debug(`[SFTP] Received rmdir request for ${connectionId}:${path}`);
// Implementation to follow
state.ws.send(JSON.stringify({ type: 'sftp_rmdir_result', payload: { connectionId, requestId, success: false /* Placeholder */ } }));
console.debug(`[SFTP ${sessionId}] Received rmdir request for ${path} (ID: ${requestId})`);
try {
state.sftp.rmdir(path, (err) => {
if (err) {
console.error(`[SFTP ${sessionId}] rmdir ${path} failed (ID: ${requestId}):`, err);
state.ws.send(JSON.stringify({ type: 'sftp:rmdir:error', path: path, payload: `删除目录失败: ${err.message}`, requestId: requestId }));
} else {
console.log(`[SFTP ${sessionId}] rmdir ${path} success (ID: ${requestId})`);
state.ws.send(JSON.stringify({ type: 'sftp:rmdir:success', path: path, requestId: requestId })); // Send specific success type
}
});
} catch (error: any) {
console.error(`[SFTP ${sessionId}] rmdir ${path} caught unexpected error (ID: ${requestId}):`, error);
state.ws.send(JSON.stringify({ type: 'sftp:rmdir:error', path: path, payload: `删除目录时发生意外错误: ${error.message}`, requestId: requestId }));
}
}
async unlink(connectionId: string, path: string, requestId: string): Promise<void> {
const state = this.clientStates.get(connectionId);
/** 删除文件 */
async unlink(sessionId: string, path: string, requestId: string): Promise<void> {
const state = this.clientStates.get(sessionId);
if (!state || !state.sftp) {
logger.warn(`[SFTP] SFTP not ready for unlink on ${connectionId}`);
state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } }));
console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 unlink (ID: ${requestId})`);
state?.ws.send(JSON.stringify({ type: 'sftp:unlink:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); // Use specific error type
return;
}
logger.debug(`[SFTP] Received unlink request for ${connectionId}:${path}`);
// Implementation to follow
state.ws.send(JSON.stringify({ type: 'sftp_unlink_result', payload: { connectionId, requestId, success: false /* Placeholder */ } }));
console.debug(`[SFTP ${sessionId}] Received unlink request for ${path} (ID: ${requestId})`);
try {
state.sftp.unlink(path, (err) => {
if (err) {
console.error(`[SFTP ${sessionId}] unlink ${path} failed (ID: ${requestId}):`, err);
state.ws.send(JSON.stringify({ type: 'sftp:unlink:error', path: path, payload: `删除文件失败: ${err.message}`, requestId: requestId }));
} else {
console.log(`[SFTP ${sessionId}] unlink ${path} success (ID: ${requestId})`);
state.ws.send(JSON.stringify({ type: 'sftp:unlink:success', path: path, requestId: requestId })); // Send specific success type
}
});
} catch (error: any) {
console.error(`[SFTP ${sessionId}] unlink ${path} caught unexpected error (ID: ${requestId}):`, error);
state.ws.send(JSON.stringify({ type: 'sftp:unlink:error', path: path, payload: `删除文件时发生意外错误: ${error.message}`, requestId: requestId }));
}
}
async rename(connectionId: string, oldPath: string, newPath: string, requestId: string): Promise<void> {
const state = this.clientStates.get(connectionId);
/** 重命名/移动文件或目录 */
async rename(sessionId: string, oldPath: string, newPath: string, requestId: string): Promise<void> {
const state = this.clientStates.get(sessionId);
if (!state || !state.sftp) {
logger.warn(`[SFTP] SFTP not ready for rename on ${connectionId}`);
state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } }));
console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 rename (ID: ${requestId})`);
state?.ws.send(JSON.stringify({ type: 'sftp:rename:error', oldPath: oldPath, newPath: newPath, payload: 'SFTP 会话未就绪', requestId: requestId })); // Use specific error type
return;
}
logger.debug(`[SFTP] Received rename request for ${connectionId}: ${oldPath} -> ${newPath}`);
// Implementation to follow
state.ws.send(JSON.stringify({ type: 'sftp_rename_result', payload: { connectionId, requestId, success: false /* Placeholder */ } }));
console.debug(`[SFTP ${sessionId}] Received rename request ${oldPath} -> ${newPath} (ID: ${requestId})`);
try {
state.sftp.rename(oldPath, newPath, (err) => {
if (err) {
console.error(`[SFTP ${sessionId}] rename ${oldPath} -> ${newPath} failed (ID: ${requestId}):`, err);
state.ws.send(JSON.stringify({ type: 'sftp:rename:error', oldPath: oldPath, newPath: newPath, payload: `重命名/移动失败: ${err.message}`, requestId: requestId }));
} else {
console.log(`[SFTP ${sessionId}] rename ${oldPath} -> ${newPath} success (ID: ${requestId})`);
state.ws.send(JSON.stringify({ type: 'sftp:rename:success', oldPath: oldPath, newPath: newPath, requestId: requestId })); // Send specific success type
}
});
} catch (error: any) {
console.error(`[SFTP ${sessionId}] rename ${oldPath} -> ${newPath} caught unexpected error (ID: ${requestId}):`, error);
state.ws.send(JSON.stringify({ type: 'sftp:rename:error', oldPath: oldPath, newPath: newPath, payload: `重命名/移动时发生意外错误: ${error.message}`, requestId: requestId }));
}
}
async chmod(connectionId: string, path: string, mode: number, requestId: string): Promise<void> {
const state = this.clientStates.get(connectionId);
/** 修改文件/目录权限 */
async chmod(sessionId: string, path: string, mode: number, requestId: string): Promise<void> {
const state = this.clientStates.get(sessionId);
if (!state || !state.sftp) {
logger.warn(`[SFTP] SFTP not ready for chmod on ${connectionId}`);
state?.ws.send(JSON.stringify({ type: 'sftp_error', payload: { connectionId, requestId, message: 'SFTP session not ready' } }));
console.warn(`[SFTP] SFTP 未准备好,无法在 ${sessionId} 上执行 chmod (ID: ${requestId})`);
state?.ws.send(JSON.stringify({ type: 'sftp:chmod:error', path: path, payload: 'SFTP 会话未就绪', requestId: requestId })); // Use specific error type
return;
}
logger.debug(`[SFTP] Received chmod request for ${connectionId}:${path} to mode ${mode.toString(8)}`);
// Implementation to follow
state.ws.send(JSON.stringify({ type: 'sftp_chmod_result', payload: { connectionId, requestId, success: false /* Placeholder */ } }));
console.debug(`[SFTP ${sessionId}] Received chmod request for ${path} to ${mode.toString(8)} (ID: ${requestId})`);
try {
state.sftp.chmod(path, mode, (err) => {
if (err) {
console.error(`[SFTP ${sessionId}] chmod ${path} to ${mode.toString(8)} failed (ID: ${requestId}):`, err);
state.ws.send(JSON.stringify({ type: 'sftp:chmod:error', path: path, payload: `修改权限失败: ${err.message}`, requestId: requestId }));
} else {
console.log(`[SFTP ${sessionId}] chmod ${path} to ${mode.toString(8)} success (ID: ${requestId})`);
state.ws.send(JSON.stringify({ type: 'sftp:chmod:success', path: path, requestId: requestId })); // Send specific success type
}
});
} catch (error: any) {
console.error(`[SFTP ${sessionId}] chmod ${path} caught unexpected error (ID: ${requestId}):`, error);
state.ws.send(JSON.stringify({ type: 'sftp:chmod:error', path: path, payload: `修改权限时发生意外错误: ${error.message}`, requestId: requestId }));
}
}
// TODO: Implement file upload/download logic with progress reporting
+129 -317
View File
@@ -1,45 +1,15 @@
import { Client, ClientChannel, ConnectConfig } from 'ssh2'; // Import ClientChannel and ConnectConfig
import { SocksClient, SocksClientOptions } from 'socks'; // Import SocksClientOptions
import { Client, ClientChannel, ConnectConfig } from 'ssh2';
import { SocksClient, SocksClientOptions } from 'socks';
import http from 'http';
import net from 'net';
import WebSocket from 'ws'; // Import WebSocket for type hint
import * as ConnectionRepository from '../repositories/connection.repository';
import { decrypt } from '../utils/crypto';
// Import SftpService if needed later for initialization
// import * as SftpService from './sftp.service';
// Import StatusMonitorService if needed later for initialization
// import * as StatusMonitorService from './status-monitor.service';
const CONNECT_TIMEOUT = 20000; // 连接超时时间 (毫秒)
const TEST_TIMEOUT = 15000; // 测试连接超时时间 (毫秒)
// Define AuthenticatedWebSocket interface (or import from websocket.ts if refactored there)
// This is needed to associate SSH clients with specific WS connections
interface AuthenticatedWebSocket extends WebSocket {
isAlive?: boolean;
userId?: number;
username?: string;
// sshClient?: Client; // Managed by the service now
// sshShellStream?: ClientChannel; // Managed by the service now
}
// Structure to hold active SSH connection details managed by this service
interface ActiveSshSession {
client: Client;
shell: ClientChannel;
// sftp?: SFTPWrapper; // SFTP will be managed by SftpService
// statusIntervalId?: NodeJS.Timeout; // Status polling managed by StatusMonitorService
connectionInfo: DecryptedConnectionDetails; // Store connection info for context (Fix typo)
}
// Map to store active sessions associated with WebSocket clients
const activeSessions = new Map<AuthenticatedWebSocket, ActiveSshSession>();
// 辅助接口:定义解密后的凭证和代理信息结构 (可以共享到 types 文件)
// Renamed to avoid conflict if imported later
interface DecryptedConnectionDetails {
// 辅助接口:定义解密后的凭证和代理信息结构 (导出以便 websocket.ts 使用)
export interface DecryptedConnectionDetails {
id: number;
name: string;
host: string;
@@ -57,30 +27,27 @@ interface DecryptedConnectionDetails {
port: number;
username?: string;
password?: string; // Decrypted
auth_method?: string; // Proxy auth method
privateKey?: string; // Decrypted proxy key
passphrase?: string; // Decrypted proxy passphrase
// auth_method?: string; // Proxy auth method (如果需要可以保留)
// privateKey?: string; // Decrypted proxy key (如果需要可以保留)
// passphrase?: string; // Decrypted proxy passphrase (如果需要可以保留)
} | null;
}
/**
* 测试给定 ID 的 SSH 连接(包括代理)
* 获取并解密指定 ID 的完整连接信息(包括代理)
* @param connectionId 连接 ID
* @returns Promise<void> - 如果连接成功则 resolve,否则 reject
* @throws Error 如果连接失败或配置错误
* @returns Promise<DecryptedConnectionDetails> 解密后的连接详情
* @throws Error 如果连接配置未找到或解密失败
*/
export const testConnection = async (connectionId: number): Promise<void> => {
console.log(`SshService: Testing connection ${connectionId}...`);
// 1. 获取完整的连接信息(包括加密凭证和代理信息)
const rawConnInfo = await ConnectionRepository.findFullConnectionById(connectionId); // Assuming this fetches proxy details too
export const getConnectionDetails = async (connectionId: number): Promise<DecryptedConnectionDetails> => {
console.log(`SshService: 获取连接 ${connectionId} 的详细信息...`);
const rawConnInfo = await ConnectionRepository.findFullConnectionById(connectionId);
if (!rawConnInfo) {
throw new Error('连接配置未找到。');
throw new Error(`连接配置 ID ${connectionId} 未找到。`);
}
// 2. 解密凭证并构建结构化的连接信息
let fullConnInfo: DecryptedConnectionDetails;
try {
fullConnInfo = {
const fullConnInfo: DecryptedConnectionDetails = {
id: rawConnInfo.id,
name: rawConnInfo.name,
host: rawConnInfo.host,
@@ -101,99 +68,91 @@ export const testConnection = async (connectionId: number): Promise<void> => {
host: rawConnInfo.proxy_host,
port: rawConnInfo.proxy_port,
username: rawConnInfo.proxy_username || undefined,
auth_method: rawConnInfo.proxy_auth_method, // Include proxy auth method
password: rawConnInfo.proxy_encrypted_password ? decrypt(rawConnInfo.proxy_encrypted_password) : undefined,
privateKey: rawConnInfo.proxy_encrypted_private_key ? decrypt(rawConnInfo.proxy_encrypted_private_key) : undefined, // Decrypt proxy key
passphrase: rawConnInfo.proxy_encrypted_passphrase ? decrypt(rawConnInfo.proxy_encrypted_passphrase) : undefined, // Decrypt proxy passphrase
// 可以根据需要解密代理的其他凭证
};
}
console.log(`SshService: 连接 ${connectionId} 的详细信息获取并解密成功。`);
return fullConnInfo;
} catch (decryptError: any) {
console.error(`Service: 处理连接 ${connectionId} 凭证或代理凭证失败:`, decryptError);
console.error(`SshService: 处理连接 ${connectionId} 凭证或代理凭证失败:`, decryptError);
throw new Error(`处理凭证失败: ${decryptError.message}`);
}
// 3. 构建 ssh2 连接配置
const connectConfig: ConnectConfig = { // Use ConnectConfig type
host: fullConnInfo.host,
port: fullConnInfo.port,
username: fullConnInfo.username,
password: fullConnInfo.password,
privateKey: fullConnInfo.privateKey,
passphrase: fullConnInfo.passphrase,
readyTimeout: TEST_TIMEOUT,
keepaliveInterval: 0, // 测试连接不需要 keepalive
};
// 4. 应用代理配置并执行连接 (Refactored into helper)
const sshClient = new Client();
try {
await establishSshConnection(sshClient, connectConfig, fullConnInfo.proxy); // Use helper
console.log(`SshService: Test connection ${connectionId} successful.`);
// Test successful, void promise resolves implicitly
} catch (error) {
console.error(`SshService: Test connection ${connectionId} failed:`, error);
throw error; // Re-throw the specific error
} finally {
// 无论成功失败,都关闭 SSH 客户端
sshClient.end();
console.log(`SshService: Test connection ${connectionId} client closed.`);
}
};
// --- NEW FUNCTIONS FOR MANAGING LIVE CONNECTIONS ---
/**
* Establishes an SSH connection, handling proxies.
* Internal helper function.
* @param sshClient - The ssh2 Client instance.
* @param connectConfig - Base SSH connection config.
* @param proxyInfo - Optional proxy details.
* @returns Promise that resolves when SSH is ready, or rejects on error.
* 根据解密后的连接详情建立 SSH 连接(处理代理)
* @param connDetails - 解密后的连接详情
* @param timeout - 连接超时时间 (毫秒),可选
* @returns Promise<Client> 连接成功的 SSH Client 实例
* @throws Error 如果连接失败
*/
const establishSshConnection = (
sshClient: Client,
connectConfig: ConnectConfig,
proxyInfo: DecryptedConnectionDetails['proxy']
): Promise<void> => {
export const establishSshConnection = (
connDetails: DecryptedConnectionDetails,
timeout: number = CONNECT_TIMEOUT
): Promise<Client> => {
return new Promise((resolve, reject) => {
const readyHandler = () => {
sshClient.removeListener('error', errorHandler); // Clean up error listener on success
resolve();
const sshClient = new Client();
const connectConfig: ConnectConfig = {
host: connDetails.host,
port: connDetails.port,
username: connDetails.username,
password: connDetails.password,
privateKey: connDetails.privateKey,
passphrase: connDetails.passphrase,
readyTimeout: timeout,
keepaliveInterval: 30000, // 保持连接
keepaliveCountMax: 3,
};
const readyHandler = () => {
console.log(`SshService: SSH 连接到 ${connDetails.host}:${connDetails.port} (ID: ${connDetails.id}) 成功。`);
sshClient.removeListener('error', errorHandler); // 成功后移除错误监听器
resolve(sshClient); // 返回 Client 实例
};
const errorHandler = (err: Error) => {
sshClient.removeListener('ready', readyHandler); // Clean up ready listener on error
reject(err); // Reject with the specific error
console.error(`SshService: SSH 连接到 ${connDetails.host}:${connDetails.port} (ID: ${connDetails.id}) 失败:`, err);
sshClient.removeListener('ready', readyHandler); // 失败后移除成功监听器
sshClient.end(); // 确保关闭客户端
reject(err);
};
sshClient.once('ready', readyHandler);
sshClient.once('error', errorHandler); // Generic error handler for direct connect issues
sshClient.once('error', errorHandler);
if (proxyInfo) {
const proxy = proxyInfo;
console.log(`SshService: Applying proxy ${proxy.name} (${proxy.type})`);
// --- 处理代理 ---
if (connDetails.proxy) {
const proxy = connDetails.proxy;
console.log(`SshService: 应用代理 ${proxy.name} (${proxy.type}) 连接到 ${connDetails.host}:${connDetails.port}`);
if (proxy.type === 'SOCKS5') {
const socksOptions: SocksClientOptions = {
proxy: { host: proxy.host, port: proxy.port, type: 5, userId: proxy.username, password: proxy.password }, // Type 5 is implicit
proxy: { host: proxy.host, port: proxy.port, type: 5, userId: proxy.username, password: proxy.password },
command: 'connect',
destination: { host: connectConfig.host!, port: connectConfig.port! }, // Use base config host/port
timeout: connectConfig.readyTimeout ?? CONNECT_TIMEOUT, // Use connection timeout
destination: { host: connectConfig.host!, port: connectConfig.port! },
timeout: connectConfig.readyTimeout,
};
SocksClient.createConnection(socksOptions)
.then(({ socket }) => {
console.log(`SshService: SOCKS5 proxy connection successful.`);
console.log(`SshService: SOCKS5 代理连接成功 (目标: ${connDetails.host}:${connDetails.port})。`);
connectConfig.sock = socket;
sshClient.connect(connectConfig); // Connect SSH via proxy socket
sshClient.connect(connectConfig);
})
.catch(socksError => {
console.error(`SshService: SOCKS5 proxy connection failed:`, socksError);
// Reject the main promise, remove listeners handled by errorHandler
errorHandler(new Error(`SOCKS5 代理连接失败: ${socksError.message}`));
errorHandler(new Error(`SOCKS5 代理 ${proxy.host}:${proxy.port} 连接失败: ${socksError.message}`));
});
} else if (proxy.type === 'HTTP') {
console.log(`SshService: Attempting HTTP proxy tunnel via ${proxy.host}:${proxy.port}...`);
const reqOptions: http.RequestOptions = { method: 'CONNECT', host: proxy.host, port: proxy.port, path: `${connectConfig.host}:${connectConfig.port}`, timeout: connectConfig.readyTimeout ?? CONNECT_TIMEOUT, agent: false };
console.log(`SshService: 尝试通过 HTTP 代理 ${proxy.host}:${proxy.port} 建立隧道到 ${connDetails.host}:${connDetails.port}...`);
const reqOptions: http.RequestOptions = {
method: 'CONNECT',
host: proxy.host,
port: proxy.port,
path: `${connectConfig.host}:${connectConfig.port}`,
timeout: connectConfig.readyTimeout,
agent: false
};
if (proxy.username) {
const auth = 'Basic ' + Buffer.from(proxy.username + ':' + (proxy.password || '')).toString('base64');
reqOptions.headers = { ...reqOptions.headers, 'Proxy-Authorization': auth, 'Proxy-Connection': 'Keep-Alive', 'Host': `${connectConfig.host}:${connectConfig.port}` };
@@ -201,233 +160,86 @@ const establishSshConnection = (
const req = http.request(reqOptions);
req.on('connect', (res, socket, head) => {
if (res.statusCode === 200) {
console.log(`SshService: HTTP proxy tunnel established.`);
console.log(`SshService: HTTP 代理隧道建立成功 (目标: ${connDetails.host}:${connDetails.port})。`);
connectConfig.sock = socket;
sshClient.connect(connectConfig); // Connect SSH via tunnel socket
sshClient.connect(connectConfig);
} else {
console.error(`SshService: HTTP proxy CONNECT request failed, status code: ${res.statusCode}`);
socket.destroy();
errorHandler(new Error(`HTTP 代理连接失败 (状态码: ${res.statusCode})`)); // Reject main promise
} // <-- Added missing closing parenthesis here
errorHandler(new Error(`HTTP 代理 ${proxy.host}:${proxy.port} 连接失败 (状态码: ${res.statusCode})`));
}
});
req.on('error', (err) => {
console.error(`SshService: HTTP proxy request error:`, err);
errorHandler(new Error(`HTTP 代理连接错误: ${err.message}`)); // Reject main promise
errorHandler(new Error(`HTTP 代理 ${proxy.host}:${proxy.port} 请求错误: ${err.message}`));
});
req.on('timeout', () => {
console.error(`SshService: HTTP proxy request timeout.`);
req.destroy();
errorHandler(new Error('HTTP 代理连接超时')); // Reject main promise
errorHandler(new Error(`HTTP 代理 ${proxy.host}:${proxy.port} 连接超时`));
});
req.end(); // Send the CONNECT request
req.end();
} else {
errorHandler(new Error(`不支持的代理类型: ${proxy.type}`)); // Reject main promise
errorHandler(new Error(`不支持的代理类型: ${proxy.type}`));
}
} else {
// No proxy, connect directly
console.log(`SshService: No proxy detected, connecting directly...`);
// 无代理,直接连接
console.log(`SshService: 无代理,直接连接到 ${connDetails.host}:${connDetails.port}`);
sshClient.connect(connectConfig);
}
});
};
/**
* 在已连接的 SSH Client 上打开 Shell 通道
* @param sshClient - 已连接的 SSH Client 实例
* @returns Promise<ClientChannel> Shell 通道实例
* @throws Error 如果打开 Shell 失败
*/
export const openShell = (sshClient: Client): Promise<ClientChannel> => {
return new Promise((resolve, reject) => {
sshClient.shell((err, stream) => {
if (err) {
console.error(`SshService: 打开 Shell 失败:`, err);
return reject(new Error(`打开 Shell 失败: ${err.message}`));
}
console.log(`SshService: Shell 通道已打开。`);
resolve(stream);
});
});
};
/**
* Connects to SSH, opens a shell, and sets up event forwarding via WebSocket.
* @param connectionId - The ID of the connection config in the database.
* @param ws - The authenticated WebSocket client instance.
* 测试给定 ID 的 SSH 连接(包括代理)
* @param connectionId 连接 ID
* @returns Promise<void> - 如果连接成功则 resolve,否则 reject
* @throws Error 如果连接失败或配置错误
*/
export const connectAndOpenShell = async (connectionId: number, ws: AuthenticatedWebSocket): Promise<void> => {
console.log(`SshService: User ${ws.username} requested connection to ID: ${connectionId}`);
if (activeSessions.has(ws)) {
console.warn(`SshService: User ${ws.username} already has an active session.`);
throw new Error('已存在活动的 SSH 连接。');
}
ws.send(JSON.stringify({ type: 'ssh:status', payload: '正在获取连接信息...' }));
// 1. Get connection info
const rawConnInfo = await ConnectionRepository.findFullConnectionById(connectionId);
if (!rawConnInfo) {
throw new Error('连接配置未找到。');
}
// 2. Decrypt and prepare connection details
let fullConnInfo: DecryptedConnectionDetails;
export const testConnection = async (connectionId: number): Promise<void> => {
console.log(`SshService: 测试连接 ${connectionId}...`);
let sshClient: Client | null = null;
try {
// (Decryption logic similar to testConnection, could be refactored)
fullConnInfo = { /* ... decryption ... */
id: rawConnInfo.id, name: rawConnInfo.name, host: rawConnInfo.host, port: rawConnInfo.port, username: rawConnInfo.username, auth_method: rawConnInfo.auth_method,
password: (rawConnInfo.auth_method === 'password' && rawConnInfo.encrypted_password) ? decrypt(rawConnInfo.encrypted_password) : undefined,
privateKey: (rawConnInfo.auth_method === 'key' && rawConnInfo.encrypted_private_key) ? decrypt(rawConnInfo.encrypted_private_key) : undefined,
passphrase: (rawConnInfo.auth_method === 'key' && rawConnInfo.encrypted_passphrase) ? decrypt(rawConnInfo.encrypted_passphrase) : undefined,
proxy: null,
};
if (rawConnInfo.proxy_db_id) {
fullConnInfo.proxy = { /* ... proxy decryption ... */
id: rawConnInfo.proxy_db_id, name: rawConnInfo.proxy_name, type: rawConnInfo.proxy_type, host: rawConnInfo.proxy_host, port: rawConnInfo.proxy_port, username: rawConnInfo.proxy_username || undefined, auth_method: rawConnInfo.proxy_auth_method,
password: rawConnInfo.proxy_encrypted_password ? decrypt(rawConnInfo.proxy_encrypted_password) : undefined,
privateKey: rawConnInfo.proxy_encrypted_private_key ? decrypt(rawConnInfo.proxy_encrypted_private_key) : undefined,
passphrase: rawConnInfo.proxy_encrypted_passphrase ? decrypt(rawConnInfo.proxy_encrypted_passphrase) : undefined,
};
// 1. 获取并解密连接信息
const connDetails = await getConnectionDetails(connectionId);
// 2. 尝试建立连接 (使用较短的测试超时时间)
sshClient = await establishSshConnection(connDetails, TEST_TIMEOUT);
console.log(`SshService: 测试连接 ${connectionId} 成功。`);
// 测试成功,Promise 自动 resolve void
} catch (error) {
console.error(`SshService: 测试连接 ${connectionId} 失败:`, error);
throw error; // 将错误向上抛出
} finally {
// 无论成功失败,都关闭 SSH 客户端
if (sshClient) {
sshClient.end();
console.log(`SshService: 测试连接 ${connectionId} 的客户端已关闭。`);
}
} catch (decryptError: any) {
console.error(`SshService: Handling credentials failed for ${connectionId}:`, decryptError);
throw new Error(`无法处理连接凭证: ${decryptError.message}`);
}
// 3. Prepare SSH config
const connectConfig: ConnectConfig = {
host: fullConnInfo.host,
port: fullConnInfo.port,
username: fullConnInfo.username,
password: fullConnInfo.password,
privateKey: fullConnInfo.privateKey,
passphrase: fullConnInfo.passphrase,
readyTimeout: CONNECT_TIMEOUT,
keepaliveInterval: 30000,
keepaliveCountMax: 3,
};
// 4. Establish connection and open shell
const sshClient = new Client();
// Generic error/close handlers for the client
const clientCloseHandler = () => {
console.log(`SshService: SSH client for ${ws.username} closed.`);
if (activeSessions.has(ws)) { // Check if cleanup wasn't already called
if (!ws.CLOSED && !ws.CLOSING) {
ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'SSH 连接已关闭。' }));
}
cleanupConnection(ws); // Ensure cleanup
}
};
const clientErrorHandler = (err: Error) => {
console.error(`SshService: SSH client error for ${ws.username}:`, err);
if (activeSessions.has(ws)) { // Check if cleanup wasn't already called
if (!ws.CLOSED && !ws.CLOSING) {
ws.send(JSON.stringify({ type: 'ssh:error', payload: `SSH 连接错误: ${err.message}` }));
}
cleanupConnection(ws); // Ensure cleanup
}
};
sshClient.on('close', clientCloseHandler);
sshClient.on('error', clientErrorHandler);
try {
ws.send(JSON.stringify({ type: 'ssh:status', payload: `正在连接到 ${fullConnInfo.host}...` }));
await establishSshConnection(sshClient, connectConfig, fullConnInfo.proxy); // Use helper
ws.send(JSON.stringify({ type: 'ssh:status', payload: 'SSH 连接成功,正在打开 Shell...' }));
// 5. Open Shell Stream
const shellStream = await new Promise<ClientChannel>((resolve, reject) => {
sshClient.shell((err, stream) => {
if (err) {
console.error(`SshService: User ${ws.username} failed to open shell:`, err);
return reject(new Error(`打开 Shell 失败: ${err.message}`));
}
console.log(`SshService: User ${ws.username} shell channel opened.`);
resolve(stream);
});
});
// 6. Store active session
const session: ActiveSshSession = { client: sshClient, shell: shellStream, connectionInfo: fullConnInfo };
activeSessions.set(ws, session);
console.log(`SshService: Active session stored for ${ws.username}.`);
// 7. Setup event forwarding for the shell stream
shellStream.on('data', (data: Buffer) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' }));
}
});
shellStream.stderr.on('data', (data: Buffer) => {
console.error(`SSH Stderr (${ws.username}): ${data.toString('utf8').substring(0,100)}...`);
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' })); // Send stderr as output
}
});
shellStream.on('close', () => {
console.log(`SshService: Shell stream for ${ws.username} closed.`);
if (activeSessions.has(ws)) { // Check if cleanup wasn't already called by client close
if (!ws.CLOSED && !ws.CLOSING) {
ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'Shell 通道已关闭。' }));
}
cleanupConnection(ws); // Trigger cleanup if shell closes independently
}
});
// 8. Initialize SFTP (TODO: Move to SftpService) and Status Polling (TODO: Move to StatusMonitorService)
// For now, just notify connection success
ws.send(JSON.stringify({ type: 'ssh:connected' }));
// TODO: Call SftpService.initializeSftpSession(ws, sshClient);
// TODO: Call StatusMonitorService.startStatusPolling(ws, sshClient);
} catch (error: any) {
console.error(`SshService: Failed to connect or open shell for ${ws.username}:`, error);
// Ensure client listeners are removed and client is ended on failure
sshClient.removeListener('close', clientCloseHandler);
sshClient.removeListener('error', clientErrorHandler);
sshClient.end();
cleanupConnection(ws); // Clean up any partial state
throw error; // Re-throw for the controller
}
};
/**
* Sends input data to the SSH shell stream associated with a WebSocket connection.
* @param ws - The authenticated WebSocket client.
* @param data - The data string to send.
*/
export const sendInput = (ws: AuthenticatedWebSocket, data: string): void => {
const session = activeSessions.get(ws);
if (session?.shell && session.shell.writable) {
session.shell.write(data);
} else {
console.warn(`SshService: Cannot send input for ${ws.username}, no active/writable shell stream found.`);
// Optionally notify the client ws.send(...)
}
};
/**
* Resizes the pseudo-terminal associated with a WebSocket connection.
* @param ws - The authenticated WebSocket client.
* @param cols - Terminal width in columns.
* @param rows - Terminal height in rows.
*/
export const resizeTerminal = (ws: AuthenticatedWebSocket, cols: number, rows: number): void => {
const session = activeSessions.get(ws);
if (session?.shell) {
console.log(`SshService: Resizing terminal for ${ws.username} to ${cols}x${rows}`);
session.shell.setWindow(rows, cols, 0, 0); // Note: rows, cols order
} else {
console.warn(`SshService: Cannot resize terminal for ${ws.username}, no active shell stream found.`);
}
};
/**
* Cleans up SSH resources associated with a WebSocket connection.
* @param ws - The authenticated WebSocket client.
*/
export const cleanupConnection = (ws: AuthenticatedWebSocket): void => {
const session = activeSessions.get(ws);
if (session) {
console.log(`SshService: Cleaning up SSH session for ${ws.username}...`);
// TODO: Call StatusMonitorService.stopStatusPolling(ws);
// TODO: Call SftpService.cleanupSftpSession(ws);
// End streams and client
session.shell?.end(); // End the shell stream first
session.client?.end(); // End the main SSH client connection
activeSessions.delete(ws); // Remove from active sessions map
console.log(`SshService: SSH session for ${ws.username} cleaned up.`);
} else {
// console.log(`SshService: No active SSH session found for ${ws.username} during cleanup.`);
}
};
// --- 移除旧的函数 ---
// - connectAndOpenShell
// - sendInput
// - resizeTerminal
// - cleanupConnection
// - activeSessions Map
// - AuthenticatedWebSocket interface (如果仅在此文件使用)
@@ -0,0 +1,318 @@
import { Client } from 'ssh2';
import { WebSocket } from 'ws';
import { ClientState } from '../websocket'; // 导入统一的 ClientState
// 定义服务器状态的数据结构 (与前端 StatusMonitor.vue 匹配)
interface ServerStatus {
cpuPercent?: number;
memPercent?: number;
memUsed?: number; // MB
memTotal?: number; // MB
swapPercent?: number;
swapUsed?: number; // MB
swapTotal?: number; // MB
diskPercent?: number;
diskUsed?: number; // KB
diskTotal?: number; // KB
cpuModel?: string;
netRxRate?: number; // Bytes per second
netTxRate?: number; // Bytes per second
netInterface?: string;
osName?: string;
loadAvg?: number[]; // 系统平均负载 [1min, 5min, 15min]
timestamp: number; // 状态获取时间戳
}
// Interface for parsed network stats
interface NetworkStats {
[interfaceName: string]: {
rx_bytes: number;
tx_bytes: number;
}
}
const DEFAULT_POLLING_INTERVAL = 1000; // 修改为 1 秒轮询间隔 (毫秒)
// 用于存储上一次的网络统计信息以计算速率
const previousNetStats = new Map<string, { rx: number, tx: number, timestamp: number }>();
export class StatusMonitorService {
private clientStates: Map<string, ClientState>; // 使用导入的 ClientState
constructor(clientStates: Map<string, ClientState>) {
this.clientStates = clientStates;
}
/**
* 启动指定会话的状态轮询
* @param sessionId 会话 ID
* @param interval 轮询间隔 (毫秒),可选,默认为 DEFAULT_POLLING_INTERVAL
*/
startStatusPolling(sessionId: string, interval: number = DEFAULT_POLLING_INTERVAL): void {
const state = this.clientStates.get(sessionId);
if (!state || !state.sshClient) {
console.warn(`[StatusMonitor] 无法为会话 ${sessionId} 启动状态轮询:状态无效或 SSH 客户端不存在。`);
return;
}
if (state.statusIntervalId) {
console.warn(`[StatusMonitor] 会话 ${sessionId} 的状态轮询已在运行中。`);
return;
}
console.log(`[StatusMonitor] 为会话 ${sessionId} 启动状态轮询,间隔 ${interval}ms`);
this.fetchAndSendServerStatus(sessionId); // 立即执行一次
state.statusIntervalId = setInterval(() => {
this.fetchAndSendServerStatus(sessionId);
}, interval);
}
/**
* 停止指定会话的状态轮询
* @param sessionId 会话 ID
*/
stopStatusPolling(sessionId: string): void {
const state = this.clientStates.get(sessionId);
if (state?.statusIntervalId) {
console.log(`[StatusMonitor] 停止会话 ${sessionId} 的状态轮询。`);
clearInterval(state.statusIntervalId);
state.statusIntervalId = undefined;
previousNetStats.delete(sessionId); // 清理网络统计缓存
}
}
/**
* 获取并发送服务器状态给客户端
* @param sessionId 会话 ID
*/
private async fetchAndSendServerStatus(sessionId: string): Promise<void> {
const state = this.clientStates.get(sessionId);
if (!state || !state.sshClient || state.ws.readyState !== WebSocket.OPEN) {
console.warn(`[StatusMonitor] 无法获取会话 ${sessionId} 的状态,停止轮询。原因:状态无效、SSH断开或WS关闭。`);
this.stopStatusPolling(sessionId);
return;
}
try {
// 传递 sessionId 给 fetchServerStatus 以便查找 previousNetStats
const status = await this.fetchServerStatus(state.sshClient, sessionId);
state.ws.send(JSON.stringify({ type: 'status_update', payload: { connectionId: state.dbConnectionId, status } }));
} catch (error: any) {
console.error(`[StatusMonitor] 获取会话 ${sessionId} 服务器状态失败:`, error);
state.ws.send(JSON.stringify({ type: 'status_error', payload: { connectionId: state.dbConnectionId, message: `获取状态失败: ${error.message}` } }));
}
}
/**
* 通过 SSH 执行命令获取服务器状态信息
* @param sshClient SSH 客户端实例
* @param sessionId 当前会话 ID,用于网络速率计算
* @returns Promise<ServerStatus> 服务器状态信息
*/
private async fetchServerStatus(sshClient: Client, sessionId: string): Promise<ServerStatus> {
console.debug(`[StatusMonitor ${sessionId}] Fetching server status...`);
const timestamp = Date.now();
let status: Partial<ServerStatus> = { timestamp };
try {
// --- OS Name ---
try {
const osReleaseOutput = await this.executeSshCommand(sshClient, 'cat /etc/os-release');
const nameMatch = osReleaseOutput.match(/^PRETTY_NAME="?([^"]+)"?/m);
status.osName = nameMatch ? nameMatch[1] : (osReleaseOutput.match(/^NAME="?([^"]+)"?/m)?.[1] ?? 'Unknown');
} catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get OS name:`, err); }
// --- CPU Model ---
try {
const lscpuOutput = await this.executeSshCommand(sshClient, "lscpu | grep 'Model name:'");
status.cpuModel = lscpuOutput.match(/Model name:\s+(.*)/)?.[1].trim() ?? 'Unknown';
} catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get CPU model:`, err); }
// --- Memory and Swap ---
try {
const freeOutput = await this.executeSshCommand(sshClient, 'free -m');
const lines = freeOutput.split('\n');
const memLine = lines.find(line => line.startsWith('Mem:'));
const swapLine = lines.find(line => line.startsWith('Swap:'));
if (memLine) {
const parts = memLine.split(/\s+/);
if (parts.length >= 4) {
const total = parseInt(parts[1], 10);
const used = parseInt(parts[2], 10);
if (!isNaN(total) && !isNaN(used)) {
status.memTotal = total; status.memUsed = used;
status.memPercent = total > 0 ? parseFloat(((used / total) * 100).toFixed(1)) : 0;
}
}
}
if (swapLine) {
const parts = swapLine.split(/\s+/);
if (parts.length >= 4) {
const total = parseInt(parts[1], 10);
const used = parseInt(parts[2], 10);
if (!isNaN(total) && !isNaN(used)) {
status.swapTotal = total; status.swapUsed = used;
status.swapPercent = total > 0 ? parseFloat(((used / total) * 100).toFixed(1)) : 0;
}
}
} else { status.swapTotal = 0; status.swapUsed = 0; status.swapPercent = 0; }
} catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get memory/swap usage:`, err); }
// --- Disk Usage (Root Partition) ---
try {
const dfOutput = await this.executeSshCommand(sshClient, "df -k / | tail -n 1");
const parts = dfOutput.split(/\s+/);
if (parts.length >= 5) {
const total = parseInt(parts[1], 10); const used = parseInt(parts[2], 10);
const percentMatch = parts[4].match(/(\d+)%/);
if (!isNaN(total) && !isNaN(used) && percentMatch) {
status.diskTotal = total; status.diskUsed = used;
status.diskPercent = parseFloat(percentMatch[1]);
}
}
} catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get disk usage:`, err); }
// --- CPU Usage (Simplified from top) ---
try {
const topOutput = await this.executeSshCommand(sshClient, "top -bn1 | grep '%Cpu(s)' | head -n 1");
const idleMatch = topOutput.match(/(\d+\.?\d*)\s+id/); // Adjusted regex for float
if (idleMatch) {
const idlePercent = parseFloat(idleMatch[1]);
status.cpuPercent = parseFloat((100 - idlePercent).toFixed(1));
}
} catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get CPU usage from top:`, err); }
// --- Load Average ---
try {
const uptimeOutput = await this.executeSshCommand(sshClient, 'uptime');
const match = uptimeOutput.match(/load average(?:s)?:\s*([\d.]+)[, ]?\s*([\d.]+)[, ]?\s*([\d.]+)/);
if (match) status.loadAvg = [parseFloat(match[1]), parseFloat(match[2]), parseFloat(match[3])];
} catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get uptime/load average:`, err); }
// --- Network Rates ---
try {
const currentStats = await this.parseProcNetDev(sshClient);
if (currentStats) {
const defaultInterface = await this.getDefaultInterface(sshClient) || Object.keys(currentStats).find(iface => iface !== 'lo'); // Detect or fallback excluding loopback
if (defaultInterface && currentStats[defaultInterface]) {
status.netInterface = defaultInterface;
const currentRx = currentStats[defaultInterface].rx_bytes;
const currentTx = currentStats[defaultInterface].tx_bytes;
const prevStats = previousNetStats.get(sessionId);
if (prevStats && prevStats.timestamp < timestamp) { // Ensure time has passed
const timeDiffSeconds = (timestamp - prevStats.timestamp) / 1000;
if (timeDiffSeconds > 0.1) { // Avoid division by zero or tiny intervals
status.netRxRate = Math.max(0, Math.round((currentRx - prevStats.rx) / timeDiffSeconds));
status.netTxRate = Math.max(0, Math.round((currentTx - prevStats.tx) / timeDiffSeconds));
} else { status.netRxRate = 0; status.netTxRate = 0; } // Rate is 0 if interval too small
} else { status.netRxRate = 0; status.netTxRate = 0; } // First run or no time diff
previousNetStats.set(sessionId, { rx: currentRx, tx: currentTx, timestamp });
} else { console.warn(`[StatusMonitor ${sessionId}] Could not find stats for default interface ${defaultInterface}`); }
}
} catch (err) { console.warn(`[StatusMonitor ${sessionId}] Failed to get network stats:`, err); }
} catch (error) {
console.error(`[StatusMonitor ${sessionId}] General error fetching server status:`, error);
}
return status as ServerStatus;
}
/**
* 解析 /proc/net/dev 的输出
* @param sshClient SSH 客户端实例
* @returns Promise<NetworkStats | null> 解析后的网络统计信息或 null
*/
private async parseProcNetDev(sshClient: Client): Promise<NetworkStats | null> {
try {
const output = await this.executeSshCommand(sshClient, 'cat /proc/net/dev');
const lines = output.split('\n').slice(2); // Skip header lines
const stats: NetworkStats = {};
for (const line of lines) {
const parts = line.trim().split(/:\s+|\s+/);
if (parts.length < 17) continue; // Need at least interface name + 16 stats
const interfaceName = parts[0];
const rx_bytes = parseInt(parts[1], 10);
const tx_bytes = parseInt(parts[9], 10); // TX bytes is the 10th field (index 9)
if (!isNaN(rx_bytes) && !isNaN(tx_bytes)) {
stats[interfaceName] = { rx_bytes, tx_bytes };
}
}
return Object.keys(stats).length > 0 ? stats : null;
} catch (error) {
console.error("[StatusMonitor] Error parsing /proc/net/dev:", error);
return null;
}
}
/**
* 获取默认网络接口名称 (Linux specific)
* @param sshClient SSH 客户端实例
* @returns Promise<string | null> 默认接口名称或 null
*/
private async getDefaultInterface(sshClient: Client): Promise<string | null> {
try {
// 使用 ip route 命令查找默认路由对应的接口
const output = await this.executeSshCommand(sshClient, "ip route get 1.1.1.1 | grep -oP 'dev\\s+\\K\\S+'");
return output.trim() || null;
} catch (error) {
console.warn("[StatusMonitor] Failed to get default interface using 'ip route':", error);
// Fallback: 尝试查找第一个非 lo 接口
try {
const netDevOutput = await this.executeSshCommand(sshClient, 'cat /proc/net/dev');
const lines = netDevOutput.split('\n').slice(2);
for (const line of lines) {
const iface = line.trim().split(':')[0];
if (iface && iface !== 'lo') {
return iface;
}
}
} catch (fallbackError) {
console.error("[StatusMonitor] Failed to fallback to /proc/net/dev for interface:", fallbackError);
}
return null;
}
}
/**
* 在 SSH 连接上执行单个命令
* @param sshClient SSH 客户端实例
* @param command 要执行的命令
* @returns Promise<string> 命令的标准输出
* @throws Error 如果命令执行失败
*/
private executeSshCommand(sshClient: Client, command: string): Promise<string> {
return new Promise((resolve, reject) => {
let output = '';
sshClient.exec(command, (err, stream) => {
if (err) {
return reject(new Error(`执行命令 '${command}' 失败: ${err.message}`));
}
stream.on('close', (code: number, signal?: string) => {
// Don't reject on non-zero exit code, as some commands might return non-zero normally
// if (code !== 0) {
// console.warn(`[StatusMonitor] Command '${command}' exited with code ${code}`);
// }
resolve(output.trim());
}).on('data', (data: Buffer) => {
output += data.toString('utf8');
}).stderr.on('data', (data: Buffer) => {
console.warn(`[StatusMonitor] Command '${command}' stderr: ${data.toString('utf8').trim()}`);
});
});
});
}
/**
* 查找与给定 SSH 客户端关联的会话 ID (辅助函数)
* @param sshClientToFind 要查找的 SSH 客户端实例
* @returns string | undefined 找到的会话 ID 或 undefined
*/
private findSessionIdForClient(sshClientToFind: Client): string | undefined {
for (const [sessionId, state] of this.clientStates.entries()) {
if (state.sshClient === sshClientToFind) {
return sessionId;
}
}
return undefined;
}
}
File diff suppressed because it is too large Load Diff