update
This commit is contained in:
@@ -15,12 +15,16 @@ import {
|
||||
SshSuspendTerminatedResponse,
|
||||
SshSuspendEntryRemovedResponse,
|
||||
SshSuspendNameEditedResponse,
|
||||
SshSuspendAutoTerminatedNotification, // 尽管此消息由服务发起,但类型定义在此处有用
|
||||
ClientState // 导入 ClientState 以便访问 sshClient 等信息
|
||||
SshSuspendAutoTerminatedNotification,
|
||||
SshMarkForSuspendRequest, // +++ 新增导入
|
||||
SshMarkedForSuspendAck, // +++ 新增导入
|
||||
ClientState
|
||||
} from './types';
|
||||
import { SshSuspendService } from '../services/ssh-suspend.service';
|
||||
import { SftpService } from '../services/sftp.service';
|
||||
import { cleanupClientConnection } from './utils';
|
||||
import { clientStates } from './state'; // Import clientStates for session management
|
||||
import { clientStates } from './state';
|
||||
import { temporaryLogStorageService } from '../services/temporary-log-storage.service'; // +++ 新增导入
|
||||
|
||||
// Handlers
|
||||
import { handleRdpProxyConnection } from './handlers/rdp.handler';
|
||||
@@ -41,7 +45,7 @@ import {
|
||||
handleSftpUploadCancel
|
||||
} from './handlers/sftp.handler';
|
||||
|
||||
export function initializeConnectionHandler(wss: WebSocketServer, sshSuspendService: SshSuspendService): void {
|
||||
export function initializeConnectionHandler(wss: WebSocketServer, sshSuspendService: SshSuspendService, sftpService: SftpService): void { // +++ Add sftpService parameter +++
|
||||
wss.on('connection', (ws: AuthenticatedWebSocket, request: Request) => {
|
||||
ws.isAlive = true;
|
||||
const isRdpProxy = (request as any).isRdpProxy;
|
||||
@@ -126,48 +130,9 @@ export function initializeConnectionHandler(wss: WebSocketServer, sshSuspendServ
|
||||
break;
|
||||
|
||||
// --- SSH Suspend Cases ---
|
||||
case 'SSH_SUSPEND_START': {
|
||||
const { sessionId: originalFrontendSessionId } = payload as SshSuspendStartRequest['payload'];
|
||||
console.log(`[WebSocket Handler] Received SSH_SUSPEND_START. UserID: ${ws.userId}, WsSessionID: ${ws.sessionId}, TargetOriginalFrontendSessionID: ${originalFrontendSessionId}`);
|
||||
console.log(`[SSH_SUSPEND_START] (Debug) 当前 clientStates 中的 keys: ${JSON.stringify(Array.from(clientStates.keys()))}`);
|
||||
// console.log(`[SSH_SUSPEND_START] 当前 WebSocket (ws.sessionId): ${ws.sessionId}`); // 重复,已包含在上一条日志
|
||||
// 旧的 SSH_SUSPEND_START 逻辑已被新的 SSH_MARK_FOR_SUSPEND 和 SshSuspendService.takeOverMarkedSession 取代
|
||||
// case 'SSH_SUSPEND_START': { ... } // Removed
|
||||
|
||||
if (!ws.userId) {
|
||||
console.error(`[SSH_SUSPEND_START] 用户 ID 未定义。`);
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'SSH_SUSPEND_STARTED_RESP', payload: { frontendSessionId: originalFrontendSessionId, suspendSessionId: '', success: false, error: '用户认证失败' } }));
|
||||
break;
|
||||
}
|
||||
const activeSessionState = clientStates.get(originalFrontendSessionId);
|
||||
if (!activeSessionState || !activeSessionState.sshClient || !activeSessionState.sshShellStream) {
|
||||
console.error(`[SSH_SUSPEND_START] 找不到活动的SSH会话或其组件: ${originalFrontendSessionId}`);
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'SSH_SUSPEND_STARTED_RESP', payload: { frontendSessionId: originalFrontendSessionId, suspendSessionId: '', success: false, error: '未找到活动的SSH会话' } }));
|
||||
break;
|
||||
}
|
||||
try {
|
||||
const suspendSessionId = await sshSuspendService.startSuspend(
|
||||
ws.userId,
|
||||
originalFrontendSessionId,
|
||||
activeSessionState.sshClient,
|
||||
activeSessionState.sshShellStream,
|
||||
activeSessionState.connectionName || '未知连接',
|
||||
String(activeSessionState.dbConnectionId), // 确保是 string
|
||||
// customSuspendName 初始时可以为空或基于 connectionName
|
||||
);
|
||||
const response: SshSuspendStartedResponse = {
|
||||
type: 'SSH_SUSPEND_STARTED',
|
||||
payload: { frontendSessionId: originalFrontendSessionId, suspendSessionId, success: true }
|
||||
};
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify(response));
|
||||
// 设计文档提到:“原有的直接将 SSH 输出发送到前端 WebSocket 的逻辑需要暂停或修改”
|
||||
// 这部分可能需要修改 ssh.handler.ts,或者 SshSuspendService 内部通过移除监听器等方式实现。
|
||||
// SshSuspendService.startSuspend 内部应该已经处理了数据流重定向到日志。
|
||||
// clientStates.delete(originalFrontendSessionId); // 原会话不再由 websocket 直接管理,转由 SshSuspendService 管理
|
||||
} catch (error: any) {
|
||||
console.error(`[SSH_SUSPEND_START] 启动挂起失败:`, error);
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'SSH_SUSPEND_STARTED_RESP', payload: { frontendSessionId: originalFrontendSessionId, suspendSessionId: '', success: false, error: error.message || '启动挂起失败' } }));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'SSH_SUSPEND_LIST_REQUEST': {
|
||||
if (!ws.userId) {
|
||||
console.error(`[SSH_SUSPEND_LIST_REQUEST] 用户 ID 未定义。`);
|
||||
@@ -188,18 +153,22 @@ export function initializeConnectionHandler(wss: WebSocketServer, sshSuspendServ
|
||||
break;
|
||||
}
|
||||
case 'SSH_SUSPEND_RESUME_REQUEST': {
|
||||
const { suspendSessionId, newFrontendSessionId } = payload as SshSuspendResumeRequest['payload'];
|
||||
console.log(`[WebSocket Handler] Received SSH_SUSPEND_RESUME_REQUEST. UserID: ${ws.userId}, WsSessionID: ${ws.sessionId}, SuspendSessionID: ${suspendSessionId}, NewFrontendSessionID: ${newFrontendSessionId}`);
|
||||
const resumePayload = payload as SshSuspendResumeRequest['payload'];
|
||||
const { suspendSessionId, newFrontendSessionId } = resumePayload;
|
||||
// console.log(`[WebSocket Handler][${type}] 接到请求。UserID: ${ws.userId}, WsSessionID: ${ws.sessionId}, Payload: ${JSON.stringify(resumePayload)}`);
|
||||
|
||||
if (!ws.userId) {
|
||||
console.error(`[SSH_SUSPEND_RESUME_REQUEST] 用户 ID 未定义。Payload: ${JSON.stringify(payload)}`);
|
||||
console.error(`[WebSocket Handler][${type}] 用户 ID 未定义。`);
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'SSH_SUSPEND_RESUMED_NOTIF', payload: { suspendSessionId, newFrontendSessionId, success: false, error: '用户认证失败' } }));
|
||||
break;
|
||||
}
|
||||
try {
|
||||
// console.log(`[WebSocket Handler][${type}] 调用 sshSuspendService.resumeSession (userId: ${ws.userId}, suspendSessionId: ${suspendSessionId})`);
|
||||
const result = await sshSuspendService.resumeSession(ws.userId, suspendSessionId);
|
||||
// console.log(`[WebSocket Handler][${type}] sshSuspendService.resumeSession 返回: ${result ? `包含 sshClient: ${!!result.sshClient}, channel: ${!!result.channel}, logData长度: ${result.logData?.length}` : 'null'}`);
|
||||
|
||||
if (result) {
|
||||
// 将恢复的 sshClient 和 channel 重新关联到新的前端会话 ID
|
||||
// 这部分逻辑需要与 handleSshConnect 类似,创建一个新的 ClientState
|
||||
// console.log(`[WebSocket Handler][${type}] 成功恢复会话。准备设置新的 ClientState (ID: ${newFrontendSessionId})。`);
|
||||
const newSessionState: ClientState = {
|
||||
ws, // 当前的 WebSocket 连接
|
||||
sshClient: result.sshClient,
|
||||
@@ -211,48 +180,74 @@ export function initializeConnectionHandler(wss: WebSocketServer, sshSuspendServ
|
||||
};
|
||||
clientStates.set(newFrontendSessionId, newSessionState);
|
||||
ws.sessionId = newFrontendSessionId; // 将当前 ws 与新会话关联
|
||||
// console.log(`[WebSocket Handler][${type}] 新 ClientState (ID: ${newFrontendSessionId}) 已设置并关联到当前 WebSocket。`);
|
||||
|
||||
// +++ 为恢复的会话初始化 SFTP +++
|
||||
// console.log(`[WebSocket Handler][${type}] 尝试为恢复的会话 ${newFrontendSessionId} 初始化 SFTP。`);
|
||||
sftpService.initializeSftpSession(newFrontendSessionId)
|
||||
.then(() => {
|
||||
// console.log(`[WebSocket Handler][${type}] SFTP 初始化调用完成 (可能异步) for ${newFrontendSessionId}。`);
|
||||
// sftp_ready 消息会由 sftpService 内部发送
|
||||
})
|
||||
.catch(sftpInitErr => {
|
||||
console.error(`[WebSocket Handler][${type}] 为恢复的会话 ${newFrontendSessionId} 初始化 SFTP 失败:`, sftpInitErr);
|
||||
// 即使 SFTP 初始化失败,SSH 会话仍然恢复
|
||||
});
|
||||
// +++ 结束 SFTP 初始化 +++
|
||||
|
||||
// 重新设置事件监听器,将数据流导向新的前端会话
|
||||
result.channel.removeAllListeners('data'); // 清除 SshSuspendService 可能设置的监听器
|
||||
result.channel.on('data', (data: Buffer) => {
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(JSON.stringify({ type: 'ssh:output', payload: { sessionId: newFrontendSessionId, data: data.toString('utf-8') } }));
|
||||
// console.debug(`[WebSocket Handler][${type}] 发送 ssh:output for ${newFrontendSessionId}`);
|
||||
// 保持与 ssh.handler.ts 中 ssh:output 格式一致
|
||||
ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' }));
|
||||
}
|
||||
});
|
||||
result.channel.on('close', () => {
|
||||
console.log(`[WebSocket Handler][${type}] 恢复的会话 ${newFrontendSessionId} 的 channel 已关闭。`);
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(JSON.stringify({ type: 'ssh:disconnected', payload: { sessionId: newFrontendSessionId } }));
|
||||
}
|
||||
cleanupClientConnection(newFrontendSessionId);
|
||||
});
|
||||
result.sshClient.on('error', (err: Error) => {
|
||||
console.error(`恢复后的 SSH 客户端错误 (会话: ${newFrontendSessionId}):`, err);
|
||||
console.error(`[WebSocket Handler][${type}] 恢复后的 SSH 客户端错误 (会话: ${newFrontendSessionId}):`, err);
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'ssh:error', payload: { sessionId: newFrontendSessionId, error: err.message } }));
|
||||
cleanupClientConnection(newFrontendSessionId);
|
||||
});
|
||||
// console.log(`[WebSocket Handler][${type}] 已为恢复的会话 ${newFrontendSessionId} 设置事件监听器。`);
|
||||
|
||||
// 发送缓存日志块
|
||||
// 设计文档建议 SSH_OUTPUT_CACHED_CHUNK
|
||||
// 这个服务返回的是一个完整的 logData 字符串,我们需要分块吗?
|
||||
// 假设暂时不分块,或者由前端处理。如果需要分块,逻辑会更复杂。
|
||||
// 这里简单处理,一次性发送。如果日志过大,这可能不是最佳实践。
|
||||
console.log('[SSH Suspend Backend] Log data to send to frontend:', result.logData);
|
||||
const logChunkResponse: SshOutputCachedChunk = {
|
||||
type: 'SSH_OUTPUT_CACHED_CHUNK',
|
||||
payload: { frontendSessionId: newFrontendSessionId, data: result.logData, isLastChunk: true }
|
||||
};
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify(logChunkResponse));
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(JSON.stringify(logChunkResponse));
|
||||
// console.log(`[WebSocket Handler][${type}] 已发送 SSH_OUTPUT_CACHED_CHUNK 给 ${newFrontendSessionId} (数据长度: ${result.logData.length})。`);
|
||||
} else {
|
||||
// console.warn(`[WebSocket Handler][${type}] WebSocket 在发送 SSH_OUTPUT_CACHED_CHUNK 前已关闭 (会话 ${newFrontendSessionId})。`);
|
||||
}
|
||||
|
||||
const response: SshSuspendResumedNotification = {
|
||||
type: 'SSH_SUSPEND_RESUMED',
|
||||
payload: { suspendSessionId, newFrontendSessionId, success: true }
|
||||
};
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify(response));
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(JSON.stringify(response));
|
||||
// console.log(`[WebSocket Handler][${type}] 已发送 SSH_SUSPEND_RESUMED_NOTIF 给 ${newFrontendSessionId}。`);
|
||||
} else {
|
||||
// console.warn(`[WebSocket Handler][${type}] WebSocket 在发送 SSH_SUSPEND_RESUMED_NOTIF 前已关闭 (会话 ${newFrontendSessionId})。`);
|
||||
}
|
||||
|
||||
} else {
|
||||
throw new Error('无法恢复会话,或会话不存在/状态不正确。');
|
||||
// console.warn(`[WebSocket Handler][${type}] sshSuspendService.resumeSession 返回 null,无法恢复会话 ${suspendSessionId}。`);
|
||||
throw new Error('服务未能恢复会话,或会话不存在/状态不正确。');
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.error(`[SSH_SUSPEND_RESUME_REQUEST] 恢复会话 ${suspendSessionId} 失败:`, error);
|
||||
// console.error(`[WebSocket Handler][${type}] 处理恢复会话 ${suspendSessionId} 时发生错误:`, error);
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'SSH_SUSPEND_RESUMED_NOTIF', payload: { suspendSessionId, newFrontendSessionId, success: false, error: error.message || '恢复会话失败' } }));
|
||||
}
|
||||
break;
|
||||
@@ -319,6 +314,57 @@ export function initializeConnectionHandler(wss: WebSocketServer, sshSuspendServ
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'SSH_MARK_FOR_SUSPEND': {
|
||||
const markPayload = payload as SshMarkForSuspendRequest['payload'];
|
||||
const sessionToMarkId = markPayload.sessionId;
|
||||
console.log(`[WebSocket Handler] Received SSH_MARK_FOR_SUSPEND. UserID: ${ws.userId}, TargetSessionID: ${sessionToMarkId}`);
|
||||
|
||||
if (!ws.userId) {
|
||||
console.error(`[SSH_MARK_FOR_SUSPEND] 用户 ID 未定义。`);
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'SSH_MARKED_FOR_SUSPEND_ACK', payload: { sessionId: sessionToMarkId, success: false, error: '用户认证失败' } as SshMarkedForSuspendAck['payload'] }));
|
||||
break;
|
||||
}
|
||||
|
||||
const activeSessionState = clientStates.get(sessionToMarkId);
|
||||
if (!activeSessionState || !activeSessionState.sshClient || !activeSessionState.sshShellStream) {
|
||||
console.error(`[SSH_MARK_FOR_SUSPEND] 找不到活动的SSH会话或其组件: ${sessionToMarkId}`);
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'SSH_MARKED_FOR_SUSPEND_ACK', payload: { sessionId: sessionToMarkId, success: false, error: '未找到要标记的活动SSH会话' } as SshMarkedForSuspendAck['payload'] }));
|
||||
break;
|
||||
}
|
||||
|
||||
if (activeSessionState.isMarkedForSuspend) {
|
||||
console.warn(`[SSH_MARK_FOR_SUSPEND] 会话 ${sessionToMarkId} 已被标记。`);
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'SSH_MARKED_FOR_SUSPEND_ACK', payload: { sessionId: sessionToMarkId, success: true, error: '会话已被标记' } as SshMarkedForSuspendAck['payload'] }));
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
// 使用活动会话ID作为日志文件名的一部分
|
||||
const logPathSuffix = sessionToMarkId; // 使用原始 sessionId 作为日志文件名
|
||||
activeSessionState.isMarkedForSuspend = true;
|
||||
activeSessionState.suspendLogPath = logPathSuffix; // 存储日志标识符 (服务内部会拼接完整路径)
|
||||
|
||||
// 确保日志目录存在 (服务内部通常会做,但这里也可以调用一次)
|
||||
await temporaryLogStorageService.ensureLogDirectoryExists();
|
||||
// 可以在这里预先写入一个标记,表明日志开始记录
|
||||
await temporaryLogStorageService.writeToLog(logPathSuffix, `--- Log recording started for session ${sessionToMarkId} at ${new Date().toISOString()} ---\n`);
|
||||
|
||||
console.log(`[SSH_MARK_FOR_SUSPEND] 会话 ${sessionToMarkId} 已成功标记待挂起。日志将记录到与 ${logPathSuffix} 关联的文件。`);
|
||||
const response: SshMarkedForSuspendAck = {
|
||||
type: 'SSH_MARKED_FOR_SUSPEND_ACK',
|
||||
payload: { sessionId: sessionToMarkId, success: true }
|
||||
};
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify(response));
|
||||
} catch (error: any) {
|
||||
console.error(`[SSH_MARK_FOR_SUSPEND] 标记会话 ${sessionToMarkId} 失败:`, error);
|
||||
if (activeSessionState) { // 如果状态存在,尝试回滚标记
|
||||
activeSessionState.isMarkedForSuspend = false;
|
||||
activeSessionState.suspendLogPath = undefined;
|
||||
}
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'SSH_MARKED_FOR_SUSPEND_ACK', payload: { sessionId: sessionToMarkId, success: false, error: error.message || '标记会话失败' } as SshMarkedForSuspendAck['payload'] }));
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
console.warn(`WebSocket:收到来自 ${ws.username} (会话: ${sessionId}) 的未知消息类型: ${type}`);
|
||||
if (ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify({ type: 'error', payload: `不支持的消息类型: ${type}` }));
|
||||
|
||||
@@ -4,6 +4,7 @@ import { AuthenticatedWebSocket, ClientState } from '../types';
|
||||
import { clientStates, sftpService, statusMonitorService, auditLogService, notificationService } from '../state';
|
||||
import * as SshService from '../../services/ssh.service';
|
||||
import { cleanupClientConnection } from '../utils';
|
||||
import { temporaryLogStorageService } from '../../services/temporary-log-storage.service'; // +++ 新增导入
|
||||
import { startDockerStatusPolling } from './docker.handler';
|
||||
import WebSocket from 'ws';
|
||||
|
||||
@@ -102,12 +103,26 @@ export async function handleSshConnect(
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' }));
|
||||
}
|
||||
// 如果会话被标记为待挂起,则将输出写入日志
|
||||
const currentState = clientStates.get(newSessionId); // 获取最新的状态
|
||||
if (currentState?.isMarkedForSuspend && currentState.suspendLogPath) {
|
||||
temporaryLogStorageService.writeToLog(currentState.suspendLogPath, data.toString('utf-8')).catch(err => {
|
||||
console.error(`[SSH Handler] 写入标记会话 ${newSessionId} 的日志失败 (路径: ${currentState.suspendLogPath}):`, err);
|
||||
});
|
||||
}
|
||||
});
|
||||
stream.stderr.on('data', (data: Buffer) => {
|
||||
console.error(`SSH Stderr (会话: ${newSessionId}): ${data.toString('utf8').substring(0, 100)}...`);
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(JSON.stringify({ type: 'ssh:output', payload: data.toString('base64'), encoding: 'base64' }));
|
||||
}
|
||||
// 同样,如果会话被标记为待挂起,则将 stderr 输出写入日志
|
||||
const currentState = clientStates.get(newSessionId);
|
||||
if (currentState?.isMarkedForSuspend && currentState.suspendLogPath) {
|
||||
temporaryLogStorageService.writeToLog(currentState.suspendLogPath, `[STDERR] ${data.toString('utf-8')}`).catch(err => {
|
||||
console.error(`[SSH Handler] 写入标记会话 ${newSessionId} 的 STDERR 日志失败 (路径: ${currentState.suspendLogPath}):`, err);
|
||||
});
|
||||
}
|
||||
});
|
||||
stream.on('close', () => {
|
||||
console.log(`SSH: 会话 ${newSessionId} 的 Shell 通道已关闭。`);
|
||||
|
||||
@@ -22,6 +22,9 @@ export interface ClientState { // 导出以便 Service 可以导入
|
||||
ipAddress?: string; // 添加 IP 地址字段
|
||||
isShellReady?: boolean; // 新增:标记 Shell 是否已准备好处理输入和调整大小
|
||||
isSuspendedByService?: boolean; // 新增:标记此会话是否已被 SshSuspendService 接管
|
||||
isMarkedForSuspend?: boolean; // 新增:标记此会话是否已被用户请求挂起(等待断开连接)
|
||||
suspendLogPath?: string; // 新增:如果标记挂起,则存储日志路径
|
||||
suspendLogWritableStream?: NodeJS.WritableStream; // 新增:用于写入挂起日志的流
|
||||
}
|
||||
|
||||
export interface PortInfo {
|
||||
@@ -65,6 +68,7 @@ export interface SshSuspendStartRequest {
|
||||
type: "SSH_SUSPEND_START";
|
||||
payload: {
|
||||
sessionId: string; // The ID of the active SSH session to be suspended
|
||||
initialBuffer?: string; // Optional: content of the terminal buffer at the time of suspend
|
||||
};
|
||||
}
|
||||
|
||||
@@ -102,6 +106,13 @@ export interface SshSuspendEditNameRequest {
|
||||
};
|
||||
}
|
||||
|
||||
export interface SshMarkForSuspendRequest {
|
||||
type: "SSH_MARK_FOR_SUSPEND";
|
||||
payload: {
|
||||
sessionId: string; // The ID of the active SSH session to be marked
|
||||
};
|
||||
}
|
||||
|
||||
// Server -> Client
|
||||
export interface SshSuspendStartedResponse {
|
||||
type: "SSH_SUSPEND_STARTED";
|
||||
@@ -177,6 +188,15 @@ export interface SshSuspendNameEditedResponse {
|
||||
};
|
||||
}
|
||||
|
||||
export interface SshMarkedForSuspendAck {
|
||||
type: "SSH_MARKED_FOR_SUSPEND_ACK";
|
||||
payload: {
|
||||
sessionId: string; // The ID of the session that was marked
|
||||
success: boolean;
|
||||
error?: string;
|
||||
};
|
||||
}
|
||||
|
||||
export interface SshSuspendAutoTerminatedNotification {
|
||||
type: "SSH_SUSPEND_AUTO_TERMINATED";
|
||||
payload: {
|
||||
@@ -192,7 +212,8 @@ export type SshSuspendClientToServerMessages =
|
||||
| SshSuspendResumeRequest
|
||||
| SshSuspendTerminateRequest
|
||||
| SshSuspendRemoveEntryRequest
|
||||
| SshSuspendEditNameRequest;
|
||||
| SshSuspendEditNameRequest
|
||||
| SshMarkForSuspendRequest; // Added new request type
|
||||
|
||||
// Union type for all server-to-client messages for SSH Suspend
|
||||
export type SshSuspendServerToClientMessages =
|
||||
@@ -203,7 +224,8 @@ export type SshSuspendServerToClientMessages =
|
||||
| SshSuspendTerminatedResponse
|
||||
| SshSuspendEntryRemovedResponse
|
||||
| SshSuspendNameEditedResponse
|
||||
| SshSuspendAutoTerminatedNotification;
|
||||
| SshSuspendAutoTerminatedNotification
|
||||
| SshMarkedForSuspendAck; // Added new response type
|
||||
|
||||
// It might be useful to have a general type for incoming messages if not already present
|
||||
// For example, if you have a main message handler:
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import { PortInfo, ClientState } from './types';
|
||||
import { SftpService } from '../services/sftp.service'; // 将被 state.ts 中的实例替换,但类型导入保留
|
||||
import { StatusMonitorService } from '../services/status-monitor.service'; // 将被 state.ts 中的实例替换,但类型导入保留
|
||||
import { SftpService } from '../services/sftp.service';
|
||||
import { StatusMonitorService } from '../services/status-monitor.service';
|
||||
import { clientStates, sftpService, statusMonitorService } from './state';
|
||||
import { sshSuspendService } from '../services/ssh-suspend.service'; // +++ 新增导入 +++
|
||||
|
||||
// --- 新增:解析 Ports 字符串的辅助函数 ---
|
||||
export function parsePortsString(portsString: string | undefined | null): PortInfo[] {
|
||||
@@ -66,29 +67,74 @@ export function parsePortsString(portsString: string | undefined | null): PortIn
|
||||
* 清理指定会话 ID 关联的所有资源
|
||||
* @param sessionId - 会话 ID
|
||||
*/
|
||||
export const cleanupClientConnection = (sessionId: string | undefined) => {
|
||||
export const cleanupClientConnection = async (sessionId: string | undefined) => { // Made async
|
||||
if (!sessionId) return;
|
||||
|
||||
const state = clientStates.get(sessionId);
|
||||
if (state) {
|
||||
console.log(`WebSocket: 清理会话 ${sessionId} (用户: ${state.ws.username}, DB 连接 ID: ${state.dbConnectionId})...`);
|
||||
|
||||
// 1. 停止状态轮询
|
||||
statusMonitorService.stopStatusPolling(sessionId);
|
||||
// 1. 停止状态轮询 (如果存在)
|
||||
if (statusMonitorService) statusMonitorService.stopStatusPolling(sessionId);
|
||||
|
||||
// 2. 清理 SFTP 会话
|
||||
sftpService.cleanupSftpSession(sessionId);
|
||||
// 2. 清理 SFTP 会话 (如果存在)
|
||||
if (sftpService) sftpService.cleanupSftpSession(sessionId);
|
||||
|
||||
// 3. 清理 SSH 连接
|
||||
// +++ 仅当会话未被 SshSuspendService 接管时才关闭 SSH 连接 +++
|
||||
if (!state.isSuspendedByService) {
|
||||
state.sshShellStream?.end(); // 结束 shell 流
|
||||
state.sshClient?.end(); // 结束 SSH 客户端
|
||||
console.log(`WebSocket: 会话 ${sessionId} 的 SSH 连接已关闭 (未被挂起服务接管)。`);
|
||||
} else {
|
||||
console.log(`WebSocket: 会话 ${sessionId} 的 SSH 连接由挂起服务管理,跳过关闭。`);
|
||||
// 3. 处理 SSH 连接 (核心修改点)
|
||||
if (state.isMarkedForSuspend && state.sshClient && state.sshShellStream && state.suspendLogPath && state.ws.userId !== undefined) {
|
||||
console.log(`WebSocket: 会话 ${sessionId} 已被标记为待挂起,尝试移交给 SshSuspendService...`);
|
||||
try {
|
||||
const takeoverDetails = {
|
||||
userId: state.ws.userId,
|
||||
originalSessionId: sessionId, // sessionId 是原始活动会话的ID
|
||||
sshClient: state.sshClient,
|
||||
channel: state.sshShellStream,
|
||||
connectionName: state.connectionName || '未知连接',
|
||||
connectionId: String(state.dbConnectionId),
|
||||
logIdentifier: state.suspendLogPath, // 这是基于 originalSessionId 的日志标识
|
||||
customSuspendName: undefined, // 如果需要,可以从 state 或其他地方获取
|
||||
};
|
||||
|
||||
// 从 state 中“分离”SSH资源,防止后续意外关闭
|
||||
const sshClientToPass = state.sshClient;
|
||||
const channelToPass = state.sshShellStream;
|
||||
state.sshClient = undefined as any; // 清除引用
|
||||
state.sshShellStream = undefined; // 清除引用
|
||||
state.isSuspendedByService = true; // 标记为已被服务接管(即使是尝试接管)
|
||||
|
||||
const newSuspendId = await sshSuspendService.takeOverMarkedSession({
|
||||
...takeoverDetails,
|
||||
sshClient: sshClientToPass, // 传递分离出来的实例
|
||||
channel: channelToPass, // 传递分离出来的实例
|
||||
});
|
||||
|
||||
if (newSuspendId) {
|
||||
console.log(`WebSocket: 会话 ${sessionId} 已成功移交给 SshSuspendService,新的挂起ID: ${newSuspendId}。SSH 连接将由服务管理。`);
|
||||
// SSH 资源已移交,不需要在这里关闭它们
|
||||
} else {
|
||||
console.warn(`WebSocket: 会话 ${sessionId} 移交给 SshSuspendService 失败 (takeOverMarkedSession 返回 null)。可能 SSH 连接在标记后已断开。将执行常规清理。`);
|
||||
// 移交失败,执行常规关闭
|
||||
channelToPass?.end();
|
||||
sshClientToPass?.end();
|
||||
state.isSuspendedByService = false; // 重置标记,因为接管失败
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`WebSocket: 会话 ${sessionId} 移交给 SshSuspendService 时发生错误:`, error);
|
||||
// 发生错误,也执行常规关闭以防资源泄露
|
||||
if (state.sshClient) state.sshClient.end(); // 如果引用还在,尝试关闭
|
||||
if (state.sshShellStream) state.sshShellStream.end(); // 如果引用还在,尝试关闭
|
||||
state.isSuspendedByService = false; // 重置标记
|
||||
}
|
||||
} else if (!state.isSuspendedByService && state.sshClient) {
|
||||
// 未标记挂起,也未被服务接管,执行常规关闭
|
||||
state.sshShellStream?.end();
|
||||
state.sshClient?.end();
|
||||
console.log(`WebSocket: 会话 ${sessionId} 的 SSH 连接已关闭 (未标记挂起,未被服务接管)。`);
|
||||
} else if (state.isSuspendedByService) {
|
||||
// 已被服务接管(例如通过旧的 startSuspend 流程,或成功移交后),不在此处关闭
|
||||
console.log(`WebSocket: 会话 ${sessionId} 的 SSH 连接已由挂起服务管理,跳过关闭。`);
|
||||
}
|
||||
// +++ 结束条件关闭 +++
|
||||
|
||||
|
||||
// 4. 清理 Docker 状态轮询定时器
|
||||
if (state.dockerStatusIntervalId) {
|
||||
|
||||
Reference in New Issue
Block a user