This commit is contained in:
Baobhan Sith
2025-04-15 01:39:40 +08:00
parent a974b8b1d9
commit 0e863456a2
22 changed files with 2522 additions and 1722 deletions
@@ -1,18 +1,19 @@
import { Request, Response } from 'express';
import { Statement } from 'sqlite3'; // 引入 Statement 类型
import { getDb } from '../database';
import { encrypt } from '../utils/crypto'; // 引入加密函数
import { encrypt, decrypt } from '../utils/crypto'; // 引入加密函数
const db = getDb();
// 连接数据结构 (用于类型提示,不包含敏感信息)
interface ConnectionInfo {
// 连接数据结构 (用于类型提示,不包含敏感信息)
interface ConnectionInfoBase {
id: number;
name: string;
host: string;
port: number;
username: string;
auth_method: 'password'; // MVP 仅支持密码
auth_method: 'password' | 'key'; // 支持密码或密钥
// proxy_id: number | null; // 待添加代理支持
created_at: number;
updated_at: number;
last_connected_at: number | null;
@@ -22,13 +23,24 @@ interface ConnectionInfo {
* 创建新连接 (POST /api/v1/connections)
*/
export const createConnection = async (req: Request, res: Response): Promise<void> => {
const { name, host, port = 22, username, password } = req.body;
const auth_method = 'password'; // MVP 强制为 password
const { name, host, port = 22, username, auth_method, password, private_key, passphrase } = req.body;
const userId = req.session.userId; // 从会话获取用户 ID
// 输入验证 (基础)
if (!name || !host || !username || !password) {
res.status(400).json({ message: '缺少必要的连接信息 (name, host, username, password)。' });
if (!name || !host || !username || !auth_method) {
res.status(400).json({ message: '缺少必要的连接信息 (name, host, username, auth_method)。' });
return;
}
if (auth_method === 'password' && !password) {
res.status(400).json({ message: '密码认证方式需要提供 password。' });
return;
}
if (auth_method === 'key' && !private_key) {
res.status(400).json({ message: '密钥认证方式需要提供 private_key。' });
return;
}
if (auth_method !== 'password' && auth_method !== 'key') {
res.status(400).json({ message: '无效的认证方式 (auth_method),必须是 "password" 或 "key"。' });
return;
}
if (typeof port !== 'number' || port <= 0 || port > 65535) {
@@ -37,31 +49,44 @@ export const createConnection = async (req: Request, res: Response): Promise<voi
}
try {
// 加密密码
const encryptedPassword = encrypt(password);
let encryptedPassword = null;
let encryptedPrivateKey = null;
let encryptedPassphrase = null;
if (auth_method === 'password') {
encryptedPassword = encrypt(password);
} else if (auth_method === 'key') {
encryptedPrivateKey = encrypt(private_key);
if (passphrase) {
encryptedPassphrase = encrypt(passphrase);
}
}
const now = Math.floor(Date.now() / 1000); // 当前 Unix 时间戳 (秒)
// 插入数据库
const result = await new Promise<{ lastID: number }>((resolve, reject) => {
const stmt = db.prepare(
`INSERT INTO connections (name, host, port, username, auth_method, encrypted_password, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
`INSERT INTO connections (name, host, port, username, auth_method, encrypted_password, encrypted_private_key, encrypted_passphrase, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
);
// 注意:这里没有存储 userId,因为 MVP 只有一个用户。如果未来支持多用户,需要添加 user_id 字段。
// 使用 function 关键字以保留正确的 this 上下文,并为 err 和 this 添加类型注解
stmt.run(name, host, port, username, auth_method, encryptedPassword, now, now, function (this: Statement, err: Error | null) {
if (err) {
console.error('插入连接时出错:', err.message);
return reject(new Error('创建连接失败'));
stmt.run(
name, host, port, username, auth_method,
encryptedPassword, encryptedPrivateKey, encryptedPassphrase,
now, now,
function (this: Statement, err: Error | null) {
if (err) {
console.error('插入连接时出错:', err.message);
return reject(new Error('创建连接失败'));
}
resolve({ lastID: (this as any).lastID });
}
// this.lastID 包含新插入行的 ID
// 使用类型断言 (as any) 来解决 TS 类型检查问题
resolve({ lastID: (this as any).lastID });
});
);
stmt.finalize(); // 完成语句执行
});
// 返回成功响应
// 返回成功响应 (不包含敏感信息)
res.status(201).json({
message: '连接创建成功。',
connection: {
@@ -71,9 +96,9 @@ export const createConnection = async (req: Request, res: Response): Promise<voi
}
});
} catch (error) {
} catch (error: any) {
console.error('创建连接时发生错误:', error);
res.status(500).json({ message: '创建连接时发生内部服务器错误。' });
res.status(500).json({ message: error.message || '创建连接时发生内部服务器错误。' });
}
};
@@ -84,14 +109,14 @@ export const getConnections = async (req: Request, res: Response): Promise<void>
const userId = req.session.userId; // 虽然 MVP 只有一个用户,但保留以备将来使用
try {
// 查询数据库,排除敏感字段 encrypted_password
// 查询数据库,排除敏感字段 encrypted_password, encrypted_private_key, encrypted_passphrase
// 注意:如果未来支持多用户,需要添加 WHERE user_id = ? 条件
const connections = await new Promise<ConnectionInfo[]>((resolve, reject) => {
const connections = await new Promise<ConnectionInfoBase[]>((resolve, reject) => {
db.all(
`SELECT id, name, host, port, username, auth_method, created_at, updated_at, last_connected_at
FROM connections
ORDER BY name ASC`, // 按名称排序
(err, rows: ConnectionInfo[]) => {
(err, rows: ConnectionInfoBase[]) => { // 使用更新后的接口
if (err) {
console.error('查询连接列表时出错:', err.message);
return reject(new Error('获取连接列表失败'));
@@ -103,14 +128,238 @@ export const getConnections = async (req: Request, res: Response): Promise<void>
res.status(200).json(connections);
} catch (error) {
} catch (error: any) {
console.error('获取连接列表时发生错误:', error);
res.status(500).json({ message: '获取连接列表时发生内部服务器错误。' });
res.status(500).json({ message: error.message || '获取连接列表时发生内部服务器错误。' });
}
};
// 其他控制器函数的占位符
// export const getConnectionById = ...
// export const updateConnection = ...
// export const deleteConnection = ...
/**
* 获取单个连接信息 (GET /api/v1/connections/:id)
*/
export const getConnectionById = async (req: Request, res: Response): Promise<void> => {
const connectionId = parseInt(req.params.id, 10);
const userId = req.session.userId;
if (isNaN(connectionId)) {
res.status(400).json({ message: '无效的连接 ID。' });
return;
}
try {
// 查询数据库,排除敏感字段
// 注意:如果未来支持多用户,需要添加 AND user_id = ? 条件
const connection = await new Promise<ConnectionInfoBase | null>((resolve, reject) => {
db.get(
`SELECT id, name, host, port, username, auth_method, created_at, updated_at, last_connected_at
FROM connections
WHERE id = ?`,
[connectionId],
(err, row: ConnectionInfoBase) => { // 使用更新后的接口
if (err) {
console.error(`查询连接 ${connectionId} 时出错:`, err.message);
return reject(new Error('获取连接信息失败'));
}
resolve(row || null); // 如果找不到则返回 null
}
);
});
if (!connection) {
res.status(404).json({ message: '连接未找到。' });
} else {
res.status(200).json(connection);
}
} catch (error: any) {
console.error(`获取连接 ${connectionId} 时发生错误:`, error);
res.status(500).json({ message: error.message || '获取连接信息时发生内部服务器错误。' });
}
};
/**
* 更新连接信息 (PUT /api/v1/connections/:id)
*/
export const updateConnection = async (req: Request, res: Response): Promise<void> => {
const connectionId = parseInt(req.params.id, 10);
const { name, host, port, username, auth_method, password, private_key, passphrase } = req.body;
const userId = req.session.userId;
if (isNaN(connectionId)) {
res.status(400).json({ message: '无效的连接 ID。' });
return;
}
// 输入验证 (与创建类似,但允许部分更新)
if (!name && !host && port === undefined && !username && !auth_method && !password && !private_key && passphrase === undefined) {
res.status(400).json({ message: '没有提供要更新的字段。' });
return;
}
if (auth_method && auth_method !== 'password' && auth_method !== 'key') {
res.status(400).json({ message: '无效的认证方式 (auth_method),必须是 "password" 或 "key"。' });
return;
}
// 如果提供了 auth_method,需要确保对应的凭证也提供了或已存在
// (更复杂的验证逻辑可能需要先查询现有记录)
try {
const fieldsToUpdate: { [key: string]: any } = {};
const params: any[] = [];
// 构建要更新的字段和参数
if (name !== undefined) { fieldsToUpdate.name = name; params.push(name); }
if (host !== undefined) { fieldsToUpdate.host = host; params.push(host); }
if (port !== undefined) {
if (typeof port !== 'number' || port <= 0 || port > 65535) {
res.status(400).json({ message: '端口号无效。' });
return;
}
fieldsToUpdate.port = port; params.push(port);
}
if (username !== undefined) { fieldsToUpdate.username = username; params.push(username); }
// 处理认证方式和凭证更新
if (auth_method) {
fieldsToUpdate.auth_method = auth_method;
params.push(auth_method);
if (auth_method === 'password') {
if (!password) {
res.status(400).json({ message: '更新为密码认证时需要提供 password。' });
return;
}
fieldsToUpdate.encrypted_password = encrypt(password);
params.push(fieldsToUpdate.encrypted_password);
fieldsToUpdate.encrypted_private_key = null; // 清除旧密钥
params.push(null);
fieldsToUpdate.encrypted_passphrase = null; // 清除旧密码
params.push(null);
} else if (auth_method === 'key') {
if (!private_key) {
res.status(400).json({ message: '更新为密钥认证时需要提供 private_key。' });
return;
}
fieldsToUpdate.encrypted_private_key = encrypt(private_key);
params.push(fieldsToUpdate.encrypted_private_key);
fieldsToUpdate.encrypted_passphrase = passphrase ? encrypt(passphrase) : null;
params.push(fieldsToUpdate.encrypted_passphrase);
fieldsToUpdate.encrypted_password = null; // 清除旧密码
params.push(null);
}
} else {
// 如果只更新凭证而不改变 auth_method (需要先查询当前 auth_method)
// 为了简化,这里假设如果提供了 password/private_key,则 auth_method 也被提供了
// 或者,可以先查询记录再决定如何更新
if (password) {
// 假设当前是 password 方式或要切换到 password
fieldsToUpdate.encrypted_password = encrypt(password);
params.push(fieldsToUpdate.encrypted_password);
}
if (private_key) {
// 假设当前是 key 方式或要切换到 key
fieldsToUpdate.encrypted_private_key = encrypt(private_key);
params.push(fieldsToUpdate.encrypted_private_key);
fieldsToUpdate.encrypted_passphrase = passphrase ? encrypt(passphrase) : null;
params.push(fieldsToUpdate.encrypted_passphrase);
} else if (passphrase !== undefined && auth_method === 'key') { // 仅更新 passphrase
fieldsToUpdate.encrypted_passphrase = passphrase ? encrypt(passphrase) : null;
params.push(fieldsToUpdate.encrypted_passphrase);
}
}
const now = Math.floor(Date.now() / 1000);
fieldsToUpdate.updated_at = now;
params.push(now);
const setClauses = Object.keys(fieldsToUpdate).map(key => `${key} = ?`).join(', ');
if (!setClauses) {
res.status(400).json({ message: '没有有效的字段进行更新。' });
return;
}
params.push(connectionId); // 添加 WHERE id = ? 的参数
// 更新数据库
// 注意:如果未来支持多用户,需要添加 AND user_id = ? 条件
const result = await new Promise<{ changes: number }>((resolve, reject) => {
const stmt = db.prepare(
`UPDATE connections SET ${setClauses} WHERE id = ?`
);
stmt.run(...params, function (this: Statement, err: Error | null) {
if (err) {
console.error(`更新连接 ${connectionId} 时出错:`, err.message);
return reject(new Error('更新连接失败'));
}
// this.changes 包含受影响的行数
// 使用类型断言 (as any) 来解决 TS 类型检查问题
resolve({ changes: (this as any).changes });
});
stmt.finalize();
});
if (result.changes === 0) {
res.status(404).json({ message: '连接未找到或未作更改。' });
} else {
// 获取更新后的信息(不含敏感数据)并返回
const updatedConnection = await new Promise<ConnectionInfoBase | null>((resolve, reject) => {
db.get(
`SELECT id, name, host, port, username, auth_method, created_at, updated_at, last_connected_at
FROM connections WHERE id = ?`,
[connectionId],
(err, row: ConnectionInfoBase) => err ? reject(err) : resolve(row || null)
);
});
res.status(200).json({ message: '连接更新成功。', connection: updatedConnection });
}
} catch (error: any) {
console.error(`更新连接 ${connectionId} 时发生错误:`, error);
res.status(500).json({ message: error.message || '更新连接时发生内部服务器错误。' });
}
};
/**
* 删除连接 (DELETE /api/v1/connections/:id)
*/
export const deleteConnection = async (req: Request, res: Response): Promise<void> => {
const connectionId = parseInt(req.params.id, 10);
const userId = req.session.userId;
if (isNaN(connectionId)) {
res.status(400).json({ message: '无效的连接 ID。' });
return;
}
try {
// 删除数据库中的记录
// 注意:如果未来支持多用户,需要添加 AND user_id = ? 条件
const result = await new Promise<{ changes: number }>((resolve, reject) => {
const stmt = db.prepare(
`DELETE FROM connections WHERE id = ?`
);
stmt.run(connectionId, function (this: Statement, err: Error | null) {
if (err) {
console.error(`删除连接 ${connectionId} 时出错:`, err.message);
return reject(new Error('删除连接失败'));
}
// this.changes 包含受影响的行数
resolve({ changes: (this as any).changes });
});
stmt.finalize();
});
if (result.changes === 0) {
res.status(404).json({ message: '连接未找到。' });
} else {
res.status(200).json({ message: '连接删除成功。' }); // 也可以使用 204 No Content
}
} catch (error: any) {
console.error(`删除连接 ${connectionId} 时发生错误:`, error);
res.status(500).json({ message: error.message || '删除连接时发生内部服务器错误。' });
}
};
// TODO: 实现 testConnection
// export const testConnection = ...
@@ -1,6 +1,12 @@
import { Router } from 'express';
import { isAuthenticated } from '../auth/auth.middleware'; // 引入认证中间件
import { createConnection, getConnections } from './connections.controller';
import {
createConnection,
getConnections,
getConnectionById, // 引入获取单个连接的控制器
updateConnection, // 引入更新连接的控制器
deleteConnection // 引入删除连接的控制器
} from './connections.controller';
const router = Router();
@@ -13,10 +19,16 @@ router.get('/', getConnections);
// POST /api/v1/connections - 创建新连接
router.post('/', createConnection);
// 未来可以添加其他路由,如获取单个连接、更新、删除、测试连接等
// router.get('/:id', getConnectionById);
// router.put('/:id', updateConnection);
// router.delete('/:id', deleteConnection);
// GET /api/v1/connections/:id - 获取单个连接信息
router.get('/:id', getConnectionById);
// PUT /api/v1/connections/:id - 更新连接信息
router.put('/:id', updateConnection);
// DELETE /api/v1/connections/:id - 删除连接
router.delete('/:id', deleteConnection); // 使用占位符
// TODO: 添加测试连接路由
// router.post('/:id/test', testConnection);
export default router;
+91 -27
View File
@@ -12,7 +12,7 @@ CREATE TABLE IF NOT EXISTS users (
);
`;
// MVP (最小可行产品) 阶段: 只包含基础字段,支持密码认证,暂不考虑代理和标签
// 更新后的 Schema,支持密码和密钥认证
const createConnectionsTableSQL = `
CREATE TABLE IF NOT EXISTS connections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -20,11 +20,11 @@ CREATE TABLE IF NOT EXISTS connections (
host TEXT NOT NULL,
port INTEGER NOT NULL DEFAULT 22,
username TEXT NOT NULL,
auth_method TEXT NOT NULL CHECK(auth_method IN ('password')), -- MVP 阶段仅支持密码认证
encrypted_password TEXT NULL, -- 加密存储的密码占位符 (加密逻辑在应用层实现)
-- encrypted_private_key TEXT NULL, -- MVP 阶段跳过密钥认证相关字段
-- encrypted_passphrase TEXT NULL, -- MVP 阶段跳过密钥认证相关字段
-- proxy_id INTEGER NULL, -- MVP 阶段跳过代理相关字段
auth_method TEXT NOT NULL CHECK(auth_method IN ('password', 'key')), -- 更新 CHECK 约束
encrypted_password TEXT NULL,
encrypted_private_key TEXT NULL, -- 取消注释
encrypted_passphrase TEXT NULL, -- 取消注释
-- proxy_id INTEGER NULL, -- 代理相关字段 (暂未实现)
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
last_connected_at INTEGER NULL
@@ -39,35 +39,99 @@ CREATE TABLE IF NOT EXISTS connections (
// const createAuditLogsTableSQL = \`...\`; // 审计日志表
// const createApiKeysTableSQL = \`...\`; // API 密钥表
// Interface for PRAGMA table_info result rows
interface TableInfoColumn {
cid: number;
name: string;
type: string;
notnull: number;
dflt_value: any;
pk: number;
}
// Helper function to add a column if it doesn't exist
const addColumnIfNotExists = (db: Database, tableName: string, columnName: string, columnDefinition: string): Promise<void> => {
return new Promise((resolve, reject) => {
// Check if the column exists using PRAGMA table_info
// Explicitly type the 'columns' parameter
db.all(`PRAGMA table_info(${tableName})`, (err, columns: TableInfoColumn[]) => {
if (err) {
console.error(`Error checking table info for ${tableName}:`, err.message);
return reject(err);
}
// Now 'col' inside .some() will have the correct type
const columnExists = columns.some(col => col.name === columnName);
if (!columnExists) {
// Column doesn't exist, add it
const sql = `ALTER TABLE ${tableName} ADD COLUMN ${columnName} ${columnDefinition}`;
db.run(sql, (alterErr) => {
if (alterErr) {
console.error(`Error adding column ${columnName} to ${tableName}:`, alterErr.message);
// Don't reject immediately, maybe it's a harmless error (like constraint issue)
// Let subsequent migrations try. If it's critical, the app might fail later.
console.warn(`Potential harmless error adding column ${columnName}. Continuing migration.`);
resolve();
// return reject(alterErr);
} else {
console.log(`Column ${columnName} added to table ${tableName}.`);
resolve();
}
});
} else {
// Column already exists
// console.log(`Column ${columnName} already exists in table ${tableName}.`);
resolve();
}
});
});
};
/**
* 执行数据库迁移 (创建表)
* 执行数据库迁移 (创建表和添加列)
* @param db - 数据库实例
* @returns Promise,在所有迁移完成后 resolve
*/
export const runMigrations = (db: Database): Promise<void> => {
return new Promise((resolve, reject) => {
db.serialize(() => {
export const runMigrations = async (db: Database): Promise<void> => {
// Use async/await for better readability with sequential operations
try {
await new Promise<void>((resolve, reject) => {
db.run(createUsersTableSQL, (err) => {
if (err) {
console.error('创建 users 表时出错:', err.message);
return reject(err);
}
if (err) return reject(new Error(`创建 users 表时出错: ${err.message}`));
console.log('Users 表已检查/创建。');
resolve();
});
db.run(createConnectionsTableSQL, (err) => {
if (err) {
console.error('创建 connections 表时出错:', err.message);
return reject(err);
}
console.log('Connections 表已检查/创建。');
resolve(); // 所有表创建完成后 resolve Promise
});
// 如果未来添加了更多表,在此处继续链式调用 db.run(...)
// db.run(createProxiesTableSQL, callback);
});
});
await new Promise<void>((resolve, reject) => {
db.run(createConnectionsTableSQL, (err) => {
// Ignore "duplicate column name" error if table already exists partially
if (err && !err.message.includes('duplicate column name')) {
return reject(new Error(`创建 connections 表时出错: ${err.message}`));
}
if (err && err.message.includes('duplicate column name')) {
console.warn('创建 connections 表时遇到 "duplicate column name" 错误,可能表已部分存在,将尝试 ALTER TABLE。');
}
console.log('Connections 表已检查/尝试创建。');
resolve();
});
});
// Add columns to connections table if they don't exist
// Add auth_method first in case it's missing from very old schema
await addColumnIfNotExists(db, 'connections', 'auth_method', "TEXT NOT NULL DEFAULT 'password'"); // Add default for existing rows
await addColumnIfNotExists(db, 'connections', 'encrypted_private_key', 'TEXT NULL');
await addColumnIfNotExists(db, 'connections', 'encrypted_passphrase', 'TEXT NULL');
// Add other tables or columns here in the future
// await addColumnIfNotExists(db, 'connections', 'proxy_id', 'INTEGER NULL');
console.log('数据库迁移检查完成。');
} catch (error) {
console.error('数据库迁移过程中发生错误:', error);
throw error; // Re-throw the error to be caught by the caller
}
};
// 允许通过命令行直接运行此文件来执行迁移 (例如: node dist/migrations.js)
+407 -30
View File
@@ -15,24 +15,28 @@ interface AuthenticatedWebSocket extends WebSocket {
sshClient?: Client; // 关联的 SSH Client 实例
sshShellStream?: ClientChannel; // 关联的 SSH Shell Stream
sftpStream?: SFTPWrapper; // 关联的 SFTP Stream
statusIntervalId?: NodeJS.Timeout; // 用于存储状态轮询的 Interval ID
}
// 存储活跃的 SSH/SFTP 连接 (导出以便其他模块访问)
export const activeSshConnections = new Map<AuthenticatedWebSocket, { client: Client, shell: ClientChannel, sftp?: SFTPWrapper }>();
export const activeSshConnections = new Map<AuthenticatedWebSocket, { client: Client, shell: ClientChannel, sftp?: SFTPWrapper, statusIntervalId?: NodeJS.Timeout }>();
// 存储正在进行的 SFTP 上传操作 (key: uploadId, value: WriteStream)
// 注意:WriteStream 类型来自 'fs',但 ssh2 的流行为类似
const activeUploads = new Map<string, WriteStream>();
// 数据库连接信息接口 (包含加密密码)
// 数据库连接信息接口 (包含所有可能的凭证字段)
interface DbConnectionInfo {
id: number;
name: string;
host: string;
port: number;
username: string;
auth_method: 'password';
encrypted_password?: string; // 注意是可选的,因为可能没有密码 (虽然 MVP 要求有)
auth_method: 'password' | 'key'; // 支持密码或密钥
encrypted_password?: string | null;
encrypted_private_key?: string | null;
encrypted_passphrase?: string | null;
// proxy_id: number | null; // 待添加代理支持
// 其他字段...
}
@@ -48,11 +52,355 @@ const cleanupSshConnection = (ws: AuthenticatedWebSocket) => {
// 注意:SFTP 流通常不需要显式关闭,它依赖于 SSH Client 的关闭
// connection.sftp?.end(); // SFTPWrapper 没有 end 方法
connection.shell?.end(); // 尝试结束 shell 流
// 清除状态轮询定时器
if (connection.statusIntervalId) {
clearInterval(connection.statusIntervalId);
console.log(`WebSocket: 清理用户 ${ws.username} 的状态轮询定时器。`);
}
connection.client?.end(); // 结束 SSH 客户端连接会隐式关闭 SFTP
activeSshConnections.delete(ws); // 从 Map 中移除
}
};
// --- 状态获取相关 ---
const STATUS_POLL_INTERVAL = 5000; // 每 5 秒获取一次状态
// Helper function to execute a command and return its stdout
const executeSshCommand = (client: Client, command: string): Promise<string> => {
return new Promise((resolve, reject) => {
let output = '';
let stderrOutput = ''; // Capture stderr too
client.exec(command, (err, stream) => {
if (err) {
console.error(`SSH Command (${command}) exec error:`, err);
return reject(err); // Reject on initial exec error
}
stream.on('data', (data: Buffer) => {
output += data.toString();
}).stderr.on('data', (data: Buffer) => {
stderrOutput += data.toString(); // Capture stderr
// Log stderr as warning, but don't reject based on it unless needed
// console.warn(`SSH Command (${command}) stderr: ${data.toString().trim()}`);
}).on('close', (code: number | null | undefined, signal: string | null) => {
const trimmedOutput = output.trim();
const trimmedStderr = stderrOutput.trim();
if (signal) {
console.error(`Command "${command}" terminated by signal: ${signal}. Stderr: ${trimmedStderr}`);
return reject(new Error(`Command "${command}" terminated by signal: ${signal}`));
}
// **Crucial Change:** Prioritize resolving if we have ANY stdout, regardless of exit code.
if (trimmedOutput) {
if (code !== 0 && code != null) {
console.warn(`Command "${command}" exited with code ${code} but produced output. Resolving with output. Stderr: ${trimmedStderr}`);
} else if (code == null) {
console.warn(`Command "${command}" exited with code undefined but produced output. Resolving with output. Stderr: ${trimmedStderr}`);
}
return resolve(trimmedOutput);
}
// If NO stdout, then reject based on error code or lack thereof.
if (code !== 0 && code != null) {
console.error(`Command "${command}" failed with code ${code} and no output. Stderr: ${trimmedStderr}`);
return reject(new Error(`Command "${command}" failed with code ${code} and no output. Stderr: ${trimmedStderr}`));
}
if (code == null) {
// This case now specifically means no output AND undefined code - likely a genuine failure
console.error(`Command "${command}" failed with code undefined and no output. Stderr: ${trimmedStderr}`);
return reject(new Error(`Command "${command}" failed with code undefined and no output. Stderr: ${trimmedStderr}`));
}
// If code is 0 and no output, resolve with empty string (command succeeded but printed nothing)
resolve('');
}).on('error', (streamErr: Error) => { // Handle stream-specific errors
reject(streamErr);
});
});
});
};
// Interface for the detailed status object
interface ServerStatusDetails {
cpuPercent?: number; // Percentage
memPercent?: number; // Percentage
memUsed?: number; // MB
memTotal?: number; // MB
swapPercent?: number; // Percentage
swapUsed?: number; // MB
swapTotal?: number; // MB
diskPercent?: number; // Percentage for /
diskUsed?: number; // KB
diskTotal?: number; // KB
cpuModel?: string;
netRxRate?: number; // Bytes per second
netTxRate?: number; // Bytes per second
netInterface?: string; // Detected network interface
osName?: string; // Added OS Name
}
// Store previous network stats for rate calculation
interface NetStats {
rx: number;
tx: number;
timestamp: number;
}
const previousNetStats = new Map<AuthenticatedWebSocket, NetStats>();
// Function to fetch server status metrics
const fetchServerStatus = async (ws: AuthenticatedWebSocket, client: Client): Promise<ServerStatusDetails> => {
const status: ServerStatusDetails = {};
const connection = activeSshConnections.get(ws); // Needed for network stats
try {
// CPU Usage (%) using vmstat (100 - idle)
// Try vmstat first
try {
const cpuCmd = `vmstat 1 2 | tail -1 | awk '{print 100-$15}'`;
const cpuOutput = await executeSshCommand(client, cpuCmd);
const cpuUsage = parseFloat(cpuOutput);
if (!isNaN(cpuUsage)) status.cpuPercent = parseFloat(cpuUsage.toFixed(1));
} catch (vmstatError) {
console.warn(`获取 CPU 使用率失败 (vmstat):`, vmstatError, `尝试 top...`);
// Fallback attempt using top if vmstat failed
try {
const cpuCmdFallback = `top -bn1 | grep '%Cpu(s)' | head -1 | awk '{print $2+$4}'`; // Sum User + System CPU %
const cpuOutputFallback = await executeSshCommand(client, cpuCmdFallback);
const cpuUsageFallback = parseFloat(cpuOutputFallback);
if (!isNaN(cpuUsageFallback)) status.cpuPercent = parseFloat(cpuUsageFallback.toFixed(1));
} catch (topError) {
console.warn(`获取 CPU 使用率失败 (top fallback):`, topError);
}
}
} catch (error) { // Catch potential outer errors, though unlikely now
console.error(`获取 CPU 使用率时发生意外错误:`, error);
}
// --- Corrected CPU Model Fetch ---
try {
// CPU Model Name from /proc/cpuinfo
const cpuModelCmd = `cat /proc/cpuinfo | grep 'model name' | head -1 | cut -d ':' -f 2 | sed 's/^[ \t]*//'`;
const cpuModelOutput = await executeSshCommand(client, cpuModelCmd); // Use correct command and variable
if (cpuModelOutput) status.cpuModel = cpuModelOutput;
} catch (error) { // Use standard 'error' variable name and remove the incorrect logic/extra brace
console.warn(`获取 CPU 型号失败:`, error);
}
// Removed duplicated CPU Model fetch block here (Comment remains from previous step, actual change is above)
// --- Fetch OS Name ---
try {
const osCmd = `cat /etc/os-release`;
const osOutput = await executeSshCommand(client, osCmd);
const lines = osOutput.split('\n');
const prettyNameLine = lines.find(line => line.startsWith('PRETTY_NAME='));
if (prettyNameLine) {
// Extract value, remove potential quotes
status.osName = prettyNameLine.split('=')[1]?.trim().replace(/^"(.*)"$/, '$1');
} else {
// Fallback or alternative methods if needed (e.g., uname -a)
const unameCmd = `uname -a`; // Less pretty, but usually available
const unameOutput = await executeSshCommand(client, unameCmd);
if (unameOutput) status.osName = unameOutput.trim(); // Trim uname output
}
} catch (error) {
console.warn(`获取操作系统名称失败:`, error);
// Attempt uname as a last resort even if os-release failed
try {
const unameCmd = `uname -a`;
const unameOutput = await executeSshCommand(client, unameCmd);
if (unameOutput) status.osName = unameOutput.trim(); // Trim uname output
} catch (unameError) {
console.warn(`获取操作系统名称失败 (uname fallback):`, unameError);
}
}
try {
// Memory Usage (Total and Used in MB, and Percentage)
const memCmd = `free -m | awk 'NR==2{print $2 " " $3}'`; // Output: "total used"
const memOutput = await executeSshCommand(client, memCmd);
const memValues = memOutput.split(' ');
if (memValues.length === 2) {
const total = parseInt(memValues[0], 10);
const used = parseInt(memValues[1], 10);
if (!isNaN(total) && !isNaN(used) && total > 0) {
status.memTotal = total;
status.memUsed = used;
status.memPercent = parseFloat(((used / total) * 100).toFixed(1));
}
}
} catch (error) {
console.warn(`获取内存状态失败:`, error);
}
// Removed duplicated Memory fetch block here
try {
// Swap Usage (Total and Used in MB, and Percentage)
const swapCmd = `free -m | awk 'NR==3{print $2 " " $3}'`; // Output: "total used" for swap
const swapOutput = await executeSshCommand(client, swapCmd);
const swapValues = swapOutput.split(' ');
if (swapValues.length === 2) {
const total = parseInt(swapValues[0], 10);
const used = parseInt(swapValues[1], 10);
// Only report swap if total > 0
if (!isNaN(total) && !isNaN(used) && total > 0) {
status.swapTotal = total;
status.swapUsed = used;
status.swapPercent = parseFloat(((used / total) * 100).toFixed(1));
} else if (!isNaN(total) && total === 0) {
status.swapTotal = 0;
status.swapUsed = 0;
status.swapPercent = 0;
}
}
} catch (error) {
console.warn(`获取 Swap 状态失败:`, error);
}
try {
// Disk Usage - Using POSIX standard output 'df -Pk /' for reliable parsing
const diskCmd = `df -Pk /`; // Use -P flag for POSIX standard output
const diskOutput = await executeSshCommand(client, diskCmd);
const lines = diskOutput.trim().split('\n'); // Trim output and split into lines
if (lines.length >= 2) {
// Skip header line (usually the first line)
let dataLine = '';
// Find the line ending with ' /' (mount point)
for (let i = 1; i < lines.length; i++) {
// Trim the line before checking the ending
if (lines[i].trim().endsWith(' /')) {
dataLine = lines[i].trim();
break;
}
}
// The second line (index 1) should contain the data in POSIX format
if (lines.length >= 2) {
const dataLine = lines[1].trim();
console.log(`[Disk P Debug] dataLine: "${dataLine}"`); // Log the line
const parts = dataLine.split(/\s+/);
console.log(`[Disk P Debug] parts:`, parts); // Log the split parts
// POSIX format: Filesystem, 1024-blocks (Total), Used, Available, Capacity, Mounted on
if (parts.length >= 4) { // Need at least up to 'Available' column
const totalKb = parseInt(parts[1], 10);
const usedKb = parseInt(parts[2], 10);
// const availableKb = parseInt(parts[3], 10); // Available if needed
// const capacityPercent = parts[4]; // Percentage string like "20%"
if (!isNaN(totalKb) && !isNaN(usedKb) && totalKb >= 0) {
status.diskTotal = totalKb;
status.diskUsed = usedKb;
// Calculate percent only if total > 0 to avoid division by zero
status.diskPercent = totalKb > 0 ? parseFloat(((usedKb / totalKb) * 100).toFixed(1)) : 0;
// Optional: Could also try parsing parts[4] if calculation seems off
} else {
console.warn(`无法从 'df -Pk /' 行解析有效的磁盘大小 (Total=${parts[1]}, Used=${parts[2]}):`, dataLine);
}
} else {
console.warn(`'df -Pk /' 数据行格式不符合预期 (列数不足):`, dataLine);
}
} else {
console.warn(`无法从 'df -k /' 输出中找到根目录 ('/') 的数据行:`, diskOutput);
}
} else {
console.warn(`'df -k /' 命令输出格式不符合预期 (行数不足):`, diskOutput);
}
} catch (error) {
console.warn(`获取磁盘状态失败 (df -k):`, error);
}
// Network Rate Calculation
let defaultInterface = '';
try {
const routeCmd = `ip route | grep default | awk '{print $5}' | head -1`;
defaultInterface = await executeSshCommand(client, routeCmd);
status.netInterface = defaultInterface; // Store detected interface
} catch (error) {
console.warn(`获取默认网络接口失败:`, error);
}
if (defaultInterface && connection) {
try {
const netCmd = `cat /proc/net/dev | grep '${defaultInterface}:' | awk '{print $2 " " $10}'`; // RX bytes (col 2), TX bytes (col 10)
const netOutput = await executeSshCommand(client, netCmd);
const netValues = netOutput.split(' ');
if (netValues.length === 2) {
const currentRx = parseInt(netValues[0], 10);
const currentTx = parseInt(netValues[1], 10);
const currentTime = Date.now();
const prevStats = previousNetStats.get(ws);
if (prevStats && !isNaN(currentRx) && !isNaN(currentTx)) {
const timeDiffSeconds = (currentTime - prevStats.timestamp) / 1000;
if (timeDiffSeconds > 0) {
status.netRxRate = Math.max(0, Math.round((currentRx - prevStats.rx) / timeDiffSeconds)); // Corrected property name
status.netTxRate = Math.max(0, Math.round((currentTx - prevStats.tx) / timeDiffSeconds)); // Corrected property name
}
}
// Store current stats for next calculation
if (!isNaN(currentRx) && !isNaN(currentTx)) {
previousNetStats.set(ws, { rx: currentRx, tx: currentTx, timestamp: currentTime });
}
}
} catch (error) {
console.warn(`获取网络速率失败 (${defaultInterface}):`, error);
}
} else if (!defaultInterface) {
console.warn(`无法计算网络速率,因为未找到默认接口。`);
}
return status;
};
// Function to start status polling for a connection
const startStatusPolling = (ws: AuthenticatedWebSocket, client: Client) => {
const connection = activeSshConnections.get(ws);
if (!connection || connection.statusIntervalId) {
console.warn(`用户 ${ws.username} 的状态轮询已启动或连接不存在。`);
return; // Already polling or connection gone
}
console.log(`WebSocket: 为用户 ${ws.username} 启动状态轮询 (间隔: ${STATUS_POLL_INTERVAL}ms)...`);
const intervalId = setInterval(async () => {
// Double check connection still exists before fetching
const currentConnection = activeSshConnections.get(ws);
if (!currentConnection || !currentConnection.client || !ws || ws.readyState !== WebSocket.OPEN) {
console.log(`WebSocket: 用户 ${ws.username} 连接已关闭或无效,停止状态轮询。`);
if (intervalId) clearInterval(intervalId); // Clear interval if connection is gone
// Also ensure it's cleared from the map if cleanup didn't catch it
if (currentConnection?.statusIntervalId === intervalId) {
delete currentConnection.statusIntervalId;
}
previousNetStats.delete(ws); // Clear previous stats on disconnect/error
return;
}
try {
const status = await fetchServerStatus(ws, currentConnection.client); // Pass ws for net stats map
// Send status only if we got at least one metric
if (Object.keys(status).length > 0) {
// console.log(`[Status Poll] Sending status for ${ws.username}:`, status); // Debug log
ws.send(JSON.stringify({ type: 'ssh:status:update', payload: status }));
}
} catch (error) {
console.error(`用户 ${ws.username} 状态轮询时出错:`, error);
// Optionally send an error message to the client
// ws.send(JSON.stringify({ type: 'ssh:status:error', payload: '无法获取服务器状态' }));
// Consider stopping polling if errors persist? For now, continue polling.
}
}, STATUS_POLL_INTERVAL);
connection.statusIntervalId = intervalId; // Store the interval ID
// Initialize previous network stats
previousNetStats.set(ws, { rx: 0, tx: 0, timestamp: Date.now() - STATUS_POLL_INTERVAL }); // Initialize with dummy past data
};
export const initializeWebSocket = (server: http.Server, sessionParser: RequestHandler): WebSocketServer => {
const wss = new WebSocketServer({ noServer: true });
@@ -146,13 +494,22 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
console.log(`WebSocket: 用户 ${ws.username} 请求连接到 ID: ${connectionId}`);
ws.send(JSON.stringify({ type: 'ssh:status', payload: '正在获取连接信息...' }));
// 1. 从数据库获取连接信息
// 1. 从数据库获取连接信息 (包括所有凭证字段)
const connInfo = await new Promise<DbConnectionInfo | null>((resolve, reject) => {
// 注意:如果多用户,需要验证 connectionId 是否属于当前 userId
db.get('SELECT * FROM connections WHERE id = ?', [connectionId], (err, row: DbConnectionInfo) => {
if (err) return reject(new Error('查询连接信息失败'));
resolve(row ?? null);
});
db.get(
`SELECT id, name, host, port, username, auth_method,
encrypted_password, encrypted_private_key, encrypted_passphrase
FROM connections WHERE id = ?`,
[connectionId],
(err, row: DbConnectionInfo) => {
if (err) {
console.error(`查询连接 ${connectionId} 详细信息时出错:`, err);
return reject(new Error('查询连接信息失败'));
}
resolve(row ?? null);
}
);
});
if (!connInfo) {
@@ -161,18 +518,43 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
}
if (!connInfo.encrypted_password) {
ws.send(JSON.stringify({ type: 'ssh:error', payload: '连接配置缺少密码信息。' }));
return;
// This check might be too early if key auth is used
// ws.send(JSON.stringify({ type: 'ssh:error', payload: '连接配置缺少密码信息。' }));
// return;
}
ws.send(JSON.stringify({ type: 'ssh:status', payload: `正在连接到 ${connInfo.host}...` }));
// 2. 解密密码
let password = '';
// 2. 解密凭证并构建连接配置
let connectConfig: any = {
host: connInfo.host,
port: connInfo.port,
username: connInfo.username,
keepaliveInterval: 30000, // Send keep-alive every 30 seconds (milliseconds)
keepaliveCountMax: 3, // Disconnect after 3 missed keep-alives
readyTimeout: 20000 // 连接超时时间 (毫秒)
};
try {
password = decrypt(connInfo.encrypted_password);
if (connInfo.auth_method === 'password') {
if (!connInfo.encrypted_password) {
throw new Error('连接配置缺少密码信息。');
}
connectConfig.password = decrypt(connInfo.encrypted_password);
} else if (connInfo.auth_method === 'key') {
if (!connInfo.encrypted_private_key) {
throw new Error('连接配置缺少私钥信息。');
}
connectConfig.privateKey = decrypt(connInfo.encrypted_private_key);
if (connInfo.encrypted_passphrase) {
connectConfig.passphrase = decrypt(connInfo.encrypted_passphrase);
}
} else {
throw new Error(`不支持的认证方式: ${connInfo.auth_method}`);
}
} catch (decryptError: any) {
console.error(`解密连接 ${connectionId} 密码失败:`, decryptError);
ws.send(JSON.stringify({ type: 'ssh:error', payload: '无法解密连接凭证。' }));
console.error(`处理连接 ${connectionId} 凭证失败:`, decryptError);
ws.send(JSON.stringify({ type: 'ssh:error', payload: `无法处理连接凭证: ${decryptError.message}` }));
return;
}
@@ -214,12 +596,18 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
const existingConn = activeSshConnections.get(ws);
if (existingConn) {
existingConn.sftp = sftp;
// SFTP 就绪后,才真正通知前端连接完成
ws.send(JSON.stringify({ type: 'ssh:connected' }));
// 启动状态轮询
startStatusPolling(ws, sshClient);
} else {
// This case should ideally not happen if the connection was set earlier
console.error(`SFTP: 无法找到用户 ${ws.username} 的活动连接记录以存储 SFTP 或启动轮询。`);
ws.send(JSON.stringify({ type: 'ssh:error', payload: '内部服务器错误:无法关联 SFTP 会话。' }));
cleanupSshConnection(ws);
}
// SFTP 就绪后,才真正通知前端连接完成
ws.send(JSON.stringify({ type: 'ssh:connected' }));
});
// 5. 数据转发:Shell -> WebSocket (发送 Base64 编码的数据)
stream.on('data', (data: Buffer) => {
// console.log('SSH Output Buffer Length:', data.length); // Debug log
@@ -257,18 +645,7 @@ export const initializeWebSocket = (server: http.Server, sessionParser: RequestH
ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: 'SSH 连接已关闭。' }));
cleanupSshConnection(ws);
}
}).connect({
host: connInfo.host,
port: connInfo.port,
username: connInfo.username,
password: password, // 使用解密后的密码
// TODO: 添加对密钥认证的支持
// privateKey: require('fs').readFileSync('/path/to/key'),
// passphrase: 'key passphrase'
keepaliveInterval: 30000, // Send keep-alive every 30 seconds (milliseconds)
keepaliveCountMax: 3, // Disconnect after 3 missed keep-alives
readyTimeout: 20000 // 连接超时时间 (毫秒)
});
}).connect(connectConfig); // 使用前面构建的 connectConfig 对象
break;
} // end case 'ssh:connect'