This commit is contained in:
Baobhan Sith
2025-04-28 15:06:53 +08:00
parent cfbc124295
commit cc41359093
7 changed files with 221 additions and 59 deletions
+168 -35
View File
@@ -1,5 +1,9 @@
import WebSocket, { WebSocketServer } from 'ws';
import WebSocket, { WebSocketServer, RawData } from 'ws';
import http from 'http';
import url from 'url';
// path and dotenv are no longer needed here as env vars are loaded in index.ts
// import path from 'path';
// import dotenv from 'dotenv';
import { Request, RequestHandler } from 'express';
import { Client, ClientChannel } from 'ssh2';
import { v4 as uuidv4 } from 'uuid';
@@ -372,10 +376,13 @@ const fetchRemoteDockerStatus = async (state: ClientState): Promise<{ available:
export const initializeWebSocket = async (server: http.Server, sessionParser: RequestHandler): Promise<WebSocketServer> => {
export const initializeWebSocket = async (server: http.Server, sessionParser: RequestHandler): Promise<WebSocketServer> => {
// Environment variables (including DEPLOYMENT_MODE and RDP URLs)
// are now expected to be loaded by index.ts before this function is called.
const wss = new WebSocketServer({ noServer: true });
const db = await getDbInstance();
const DOCKER_STATUS_INTERVAL = 2000;
const db = await getDbInstance();
const DOCKER_STATUS_INTERVAL = 2000;
// --- 心跳检测 ---
const heartbeatInterval = setInterval(() => {
@@ -393,58 +400,183 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
// --- WebSocket 升级处理 (认证) ---
server.on('upgrade', (request: Request, socket, head) => {
const parsedUrl = url.parse(request.url || '', true); // Parse URL and query string
const pathname = parsedUrl.pathname;
const ipAddress = request.ip; // Get IP address early
console.log(`WebSocket: 升级请求来自 IP: ${ipAddress}, Path: ${pathname}`);
// @ts-ignore Express-session 类型问题
sessionParser(request, {} as any, () => {
// --- 认证检查 ---
if (!request.session || !request.session.userId) {
console.log('WebSocket 认证失败:未找到会话或用户未登录。');
console.log(`WebSocket 认证失败 (Path: ${pathname}):未找到会话或用户未登录。`);
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
socket.destroy();
return;
}
console.log(`WebSocket 认证成功:用户 ${request.session.username} (ID: ${request.session.userId})`);
// 获取客户端 IP 地址
const ipAddress = request.ip;
console.log(`WebSocket: 升级请求来自 IP: ${ipAddress}`);
console.log(`WebSocket 认证成功 (Path: ${pathname}):用户 ${request.session.username} (ID: ${request.session.userId})`);
wss.handleUpgrade(request, socket, head, (ws) => {
const extWs = ws as AuthenticatedWebSocket;
extWs.userId = request.session.userId;
extWs.username = request.session.username;
// 将 IP 地址附加到 request 对象上传递给 connection 事件处理器,以便后续使用
(request as any).clientIpAddress = ipAddress;
wss.emit('connection', extWs, request);
});
// --- 根据路径处理升级 ---
if (pathname === '/rdp-proxy') {
// RDP 代理路径 - 直接处理升级,连接逻辑在 'connection' 事件中处理
console.log(`WebSocket: Handling RDP proxy upgrade for user ${request.session.username}`);
wss.handleUpgrade(request, socket, head, (ws) => {
const extWs = ws as AuthenticatedWebSocket;
extWs.userId = request.session.userId;
extWs.username = request.session.username;
// 传递必要信息给 connection 事件
(request as any).clientIpAddress = ipAddress;
(request as any).isRdpProxy = true; // 标记为 RDP 代理连接
(request as any).rdpToken = parsedUrl.query.token; // 传递 RDP token
wss.emit('connection', extWs, request);
});
} else {
// 默认路径 (SSH, SFTP, Docker etc.) - 按原逻辑处理
console.log(`WebSocket: Handling standard upgrade for user ${request.session.username}`);
wss.handleUpgrade(request, socket, head, (ws) => {
const extWs = ws as AuthenticatedWebSocket;
extWs.userId = request.session.userId;
extWs.username = request.session.username;
(request as any).clientIpAddress = ipAddress;
(request as any).isRdpProxy = false; // 标记为非 RDP 代理连接
wss.emit('connection', extWs, request);
});
}
});
});
// --- WebSocket 连接处理 ---
wss.on('connection', (ws: AuthenticatedWebSocket, request: Request) => {
ws.isAlive = true;
console.log(`WebSocket:客户端 ${ws.username} (ID: ${ws.userId}) 已连接。`);
const isRdpProxy = (request as any).isRdpProxy;
const clientIp = (request as any).clientIpAddress || 'unknown';
console.log(`WebSocket:客户端 ${ws.username} (ID: ${ws.userId}, IP: ${clientIp}, RDP Proxy: ${isRdpProxy}) 已连接。`);
ws.on('pong', () => { ws.isAlive = true; });
// --- 消息处理 ---
ws.on('message', async (message) => {
// console.log(`WebSocket:收到来自 ${ws.username} (会话: ${ws.sessionId}) 的消息: ${message.toString().substring(0, 100)}...`);
let parsedMessage: any;
try {
parsedMessage = JSON.parse(message.toString());
} catch (e) {
console.error(`WebSocket:来自 ${ws.username} 的无效 JSON 消息:`, message.toString());
ws.send(JSON.stringify({ type: 'error', payload: '无效的消息格式 (非 JSON)' }));
// --- RDP 代理连接处理 ---
if (isRdpProxy) {
const rdpToken = (request as any).rdpToken;
if (!rdpToken) {
console.error(`WebSocket: RDP Proxy connection for ${ws.username} missing token.`);
ws.send(JSON.stringify({ type: 'rdp:error', payload: 'Missing RDP connection token.' }));
ws.close(1008, 'Missing RDP token');
return;
}
const { type, payload, requestId } = parsedMessage; // requestId 用于 SFTP 操作
const sessionId = ws.sessionId; // 获取当前 WebSocket 的会话 ID
const state = sessionId ? clientStates.get(sessionId) : undefined; // 获取当前会话状态
// Determine RDP target URL based on deployment mode
const deploymentMode = process.env.DEPLOYMENT_MODE || 'docker'; // Default to docker mode
let rdpBaseUrl: string;
if (deploymentMode === 'local') {
rdpBaseUrl = process.env.RDP_SERVICE_URL_LOCAL || 'ws://localhost:18114'; // Default for local
console.log(`[WebSocket RDP Proxy] Using LOCAL deployment mode. RDP Target Base: ${rdpBaseUrl}`);
} else {
rdpBaseUrl = process.env.RDP_SERVICE_URL_DOCKER || 'ws://rdp:8081'; // Default for docker
console.log(`[WebSocket RDP Proxy] Using DOCKER deployment mode. RDP Target Base: ${rdpBaseUrl}`);
}
try {
switch (type) {
// --- SSH 连接请求 ---
case 'ssh:connect': {
if (sessionId && state) {
// Ensure base URL doesn't end with a slash before appending query params
const cleanRdpBaseUrl = rdpBaseUrl.endsWith('/') ? rdpBaseUrl.slice(0, -1) : rdpBaseUrl;
const rdpTargetUrl = `${cleanRdpBaseUrl}/?token=${rdpToken}`; // Append token query param
console.log(`WebSocket: RDP Proxy for ${ws.username} attempting to connect to ${rdpTargetUrl}`);
const rdpWs = new WebSocket(rdpTargetUrl);
let clientWsClosed = false;
let rdpWsClosed = false;
// --- 消息转发: Client -> RDP ---
ws.on('message', (message: RawData) => {
if (rdpWs.readyState === WebSocket.OPEN) {
// console.log(`RDP Proxy (C->S): Forwarding message from ${ws.username}`);
rdpWs.send(message);
} else {
console.warn(`RDP Proxy (C->S): RDP WS not open, dropping message from ${ws.username}`);
}
});
// --- 消息转发: RDP -> Client ---
rdpWs.on('message', (message: RawData) => {
if (ws.readyState === WebSocket.OPEN) {
// 将 RawData (可能是 Buffer) 转换为 UTF-8 字符串再发送
const messageString = message.toString('utf-8');
// console.log(`RDP Proxy (S->C): Forwarding message to ${ws.username}: ${messageString.substring(0, 50)}...`);
ws.send(messageString);
} else {
console.warn(`RDP Proxy (S->C): Client WS not open, dropping message for ${ws.username}`);
}
});
// --- 错误处理 ---
ws.on('error', (error) => {
console.error(`WebSocket: RDP Proxy Client WS Error for ${ws.username}:`, error);
if (!rdpWsClosed && rdpWs.readyState !== WebSocket.CLOSED && rdpWs.readyState !== WebSocket.CLOSING) {
console.log(`WebSocket: RDP Proxy closing RDP WS due to client WS error.`);
rdpWs.close(1011, 'Client WS Error');
rdpWsClosed = true;
}
clientWsClosed = true;
});
rdpWs.on('error', (error) => {
console.error(`WebSocket: RDP Proxy RDP WS Error for ${ws.username}:`, error);
if (!clientWsClosed && ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) {
console.log(`WebSocket: RDP Proxy closing Client WS due to RDP WS error.`);
ws.close(1011, `RDP WS Error: ${error.message}`);
clientWsClosed = true;
}
rdpWsClosed = true;
});
// --- 关闭处理 ---
ws.on('close', (code, reason) => {
clientWsClosed = true;
console.log(`WebSocket: RDP Proxy Client WS Closed for ${ws.username}. Code: ${code}, Reason: ${reason.toString()}`);
if (!rdpWsClosed && rdpWs.readyState !== WebSocket.CLOSED && rdpWs.readyState !== WebSocket.CLOSING) {
console.log(`WebSocket: RDP Proxy closing RDP WS due to client WS close.`);
rdpWs.close(1000, 'Client WS Closed');
rdpWsClosed = true;
}
});
rdpWs.on('close', (code, reason) => {
rdpWsClosed = true;
console.log(`WebSocket: RDP Proxy RDP WS Closed for ${ws.username}. Code: ${code}, Reason: ${reason.toString()}`);
if (!clientWsClosed && ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) {
console.log(`WebSocket: RDP Proxy closing Client WS due to RDP WS close.`);
ws.close(1000, 'RDP WS Closed');
clientWsClosed = true;
}
});
rdpWs.on('open', () => {
console.log(`WebSocket: RDP Proxy connection to ${rdpTargetUrl} established for ${ws.username}. Forwarding messages.`);
// Do not send custom message, let Guacamole protocol flow directly
});
// --- 标准 (SSH/SFTP/Docker) 连接处理 ---
} else {
// --- 消息处理 (原有逻辑) ---
ws.on('message', async (message) => {
// console.log(`WebSocket:收到来自 ${ws.username} (会话: ${ws.sessionId}) 的消息: ${message.toString().substring(0, 100)}...`);
let parsedMessage: any;
try {
parsedMessage = JSON.parse(message.toString());
} catch (e) {
console.error(`WebSocket:来自 ${ws.username} 的无效 JSON 消息:`, message.toString());
ws.send(JSON.stringify({ type: 'error', payload: '无效的消息格式 (非 JSON)' }));
return;
}
const { type, payload, requestId } = parsedMessage; // requestId 用于 SFTP 操作
const sessionId = ws.sessionId; // 获取当前 WebSocket 的会话 ID
const state = sessionId ? clientStates.get(sessionId) : undefined; // 获取当前会话状态
try {
switch (type) {
// --- SSH 连接请求 ---
case 'ssh:connect': {
if (sessionId && state) {
console.warn(`WebSocket: 用户 ${ws.username} (会话: ${sessionId}) 已有活动连接,忽略新的连接请求。`);
ws.send(JSON.stringify({ type: 'ssh:error', payload: '已存在活动的 SSH 连接。' }));
return;
@@ -1016,6 +1148,7 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
console.error(`WebSocket:客户端 ${ws.username} (会话: ${ws.sessionId}) 发生错误:`, error);
cleanupClientConnection(ws.sessionId);
});
} // End of else block for non-RDP connections
});
// --- WebSocket 服务器关闭处理 ---