This commit is contained in:
Baobhan Sith
2025-05-16 20:15:12 +08:00
parent b91f8dfdb0
commit fcaf1be506
5 changed files with 394 additions and 257 deletions
@@ -11,6 +11,7 @@ export class TransfersController {
this.initiateTransfer = this.initiateTransfer.bind(this); this.initiateTransfer = this.initiateTransfer.bind(this);
this.getAllStatuses = this.getAllStatuses.bind(this); this.getAllStatuses = this.getAllStatuses.bind(this);
this.getTaskStatus = this.getTaskStatus.bind(this); this.getTaskStatus = this.getTaskStatus.bind(this);
this.cancelTransfer = this.cancelTransfer.bind(this); // +++ 绑定新方法 +++
} }
public async initiateTransfer(req: Request, res: Response, next: NextFunction): Promise<void> { public async initiateTransfer(req: Request, res: Response, next: NextFunction): Promise<void> {
@@ -90,4 +91,32 @@ export class TransfersController {
res.status(500).json({ message: 'Failed to retrieve task status.', error: error instanceof Error ? error.message : String(error) }); res.status(500).json({ message: 'Failed to retrieve task status.', error: error instanceof Error ? error.message : String(error) });
} }
} }
public async cancelTransfer(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
// @ts-ignore
const userId = req.session?.userId;
if (!userId) {
res.status(401).json({ message: '用户未认证或会话无效。' });
return;
}
const { taskId } = req.params;
if (!taskId) {
res.status(400).json({ message: 'Task ID is required for cancellation.' });
return;
}
const success = await this.transfersService.cancelTransferTask(taskId, userId);
if (success) {
res.status(200).json({ message: `Transfer task ${taskId} cancellation initiated.` });
} else {
// 可能任务不存在,或不属于该用户,或无法取消
res.status(404).json({ message: `Failed to initiate cancellation for task ${taskId}. It may not exist, not be accessible, or already be in a final state.` });
}
} catch (error) {
console.error(`[TransfersController] Error cancelling task ${req.params.taskId}:`, error);
res.status(500).json({ message: 'Failed to cancel task.', error: error instanceof Error ? error.message : String(error) });
}
}
} }
@@ -18,5 +18,8 @@ export const transfersRoutes = (): Router => {
// GET /api/transfers/status/:taskId - 获取特定传输任务的详细状态 // GET /api/transfers/status/:taskId - 获取特定传输任务的详细状态
router.get('/status/:taskId', controller.getTaskStatus); router.get('/status/:taskId', controller.getTaskStatus);
// POST /api/transfers/cancel/:taskId - 请求取消一个传输任务
router.post('/cancel/:taskId', controller.cancelTransfer);
return router; return router;
}; };
@@ -11,6 +11,7 @@ import type { ConnectionWithTags, DecryptedConnectionCredentials } from '../type
export class TransfersService { export class TransfersService {
private transferTasks: Map<string, TransferTask> = new Map(); private transferTasks: Map<string, TransferTask> = new Map();
private taskAbortControllers: Map<string, AbortController> = new Map(); // +++ 用于存储任务的 AbortController +++
private readonly TEMP_KEY_PREFIX = 'nexus_target_key_'; private readonly TEMP_KEY_PREFIX = 'nexus_target_key_';
private readonly MAX_CONCURRENT_SUB_TASKS = 5; // Maximum concurrent sub-tasks private readonly MAX_CONCURRENT_SUB_TASKS = 5; // Maximum concurrent sub-tasks
@@ -22,6 +23,8 @@ export class TransfersService {
const taskId = uuidv4(); const taskId = uuidv4();
const now = new Date(); const now = new Date();
const subTasks: TransferSubTask[] = []; const subTasks: TransferSubTask[] = [];
const abortController = new AbortController(); // +++ 创建 AbortController +++
this.taskAbortControllers.set(taskId, abortController); // +++ 存储 AbortController +++
// 每个 (目标服务器, 源文件) 组合都是一个子任务 // 每个 (目标服务器, 源文件) 组合都是一个子任务
for (const connectionId of payload.connectionIds) { // 目标服务器ID列表 for (const connectionId of payload.connectionIds) { // 目标服务器ID列表
@@ -51,14 +54,53 @@ export class TransfersService {
console.info(`[TransfersService] New transfer task created: ${taskId} for source ${payload.sourceConnectionId} with ${subTasks.length} sub-tasks.`); console.info(`[TransfersService] New transfer task created: ${taskId} for source ${payload.sourceConnectionId} with ${subTasks.length} sub-tasks.`);
// 异步启动传输,不阻塞当前请求 // 异步启动传输,不阻塞当前请求
this.processTransferTask(taskId).catch(error => { this.processTransferTask(taskId, abortController.signal).catch(error => { // +++ 传递 signal +++
console.error(`[TransfersService] Error processing task ${taskId} in background:`, error); console.error(`[TransfersService] Error processing task ${taskId} in background:`, error);
this.updateOverallTaskStatus(taskId, 'failed', `Background processing error: ${error.message}`); // 如果不是因为中止操作导致的错误,则更新状态
if (error.name !== 'AbortError') {
this.updateOverallTaskStatus(taskId, 'failed', `Background processing error: ${error.message}`);
}
}); });
return { ...newTask }; // 返回任务的副本 return { ...newTask }; // 返回任务的副本
} }
public async cancelTransferTask(taskId: string, userId: string | number): Promise<boolean> {
const task = this.transferTasks.get(taskId);
if (!task || task.userId !== userId) {
console.warn(`[TransfersService] Attempt to cancel non-existent task ${taskId} or task not owned by user ${userId}.`);
return false;
}
const abortController = this.taskAbortControllers.get(taskId);
if (abortController) {
console.info(`[TransfersService] Cancelling task ${taskId}.`);
abortController.abort(); // 触发中止信号
// 更新主任务状态
// 假设 'cancelling' 和 'cancelled' 是有效的状态
if (task.status !== 'completed' && task.status !== 'failed' && task.status !== 'cancelled') {
this.updateOverallTaskStatus(taskId, 'cancelling', 'Task cancellation initiated by user.');
// 可以在 processTransferTask 的 finally 中将状态设置为 'cancelled'
}
// 更新所有未完成的子任务状态
task.subTasks.forEach(subTask => {
if (subTask.status !== 'completed' && subTask.status !== 'failed' && subTask.status !== 'cancelled') {
this.updateSubTaskStatus(taskId, subTask.subTaskId, 'cancelled', subTask.progress, 'Cancelled due to parent task cancellation.');
}
});
// 确保在 AbortController Map 中移除,以防内存泄漏(如果任务不再处理)
// 也可以在任务彻底结束后移除
// this.taskAbortControllers.delete(taskId); // 暂时不在这里删除,可能在 processTransferTask 的 finally 中
return true;
}
console.warn(`[TransfersService] No AbortController found for task ${taskId} to cancel.`);
return false;
}
private buildSshConnectConfig( private buildSshConnectConfig(
connectionInfo: ConnectionWithTags, connectionInfo: ConnectionWithTags,
credentials: DecryptedConnectionCredentials credentials: DecryptedConnectionCredentials
@@ -81,18 +123,28 @@ export class TransfersService {
return config; return config;
} }
private async processTransferTask(taskId: string): Promise<void> { private async processTransferTask(taskId: string, signal: AbortSignal): Promise<void> { // +++ 接收 AbortSignal +++
const task = this.transferTasks.get(taskId); const task = this.transferTasks.get(taskId);
if (!task) { if (!task) {
console.error(`[TransfersService] Task ${taskId} not found for processing.`); console.error(`[TransfersService] Task ${taskId} not found for processing.`);
return; return;
} }
if (signal.aborted) {
console.info(`[TransfersService] Task ${taskId} was cancelled before processing started.`);
this.updateOverallTaskStatus(taskId, 'cancelled', 'Cancelled before start.');
this.taskAbortControllers.delete(taskId); // 清理
return;
}
this.updateOverallTaskStatus(taskId, 'in-progress'); this.updateOverallTaskStatus(taskId, 'in-progress');
let sourceSshClient: Client | undefined; let sourceSshClient: Client | undefined;
try { try {
if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
const sourceConnectionResult = await getConnectionWithDecryptedCredentials(task.payload.sourceConnectionId); const sourceConnectionResult = await getConnectionWithDecryptedCredentials(task.payload.sourceConnectionId);
if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
if (!sourceConnectionResult || !sourceConnectionResult.connection) { if (!sourceConnectionResult || !sourceConnectionResult.connection) {
throw new Error(`Source connection with ID ${task.payload.sourceConnectionId} not found or inaccessible.`); throw new Error(`Source connection with ID ${task.payload.sourceConnectionId} not found or inaccessible.`);
} }
@@ -102,21 +154,34 @@ export class TransfersService {
const sourceConnectConfig = this.buildSshConnectConfig(sourceConnection, sourceCredentials); const sourceConnectConfig = this.buildSshConnectConfig(sourceConnection, sourceCredentials);
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) => {
if (signal.aborted) return reject(new DOMException('Transfer cancelled by user.', 'AbortError'));
const onAbort = () => {
sourceSshClient?.end(); // 尝试关闭连接
reject(new DOMException('Transfer cancelled by user.', 'AbortError'));
};
signal.addEventListener('abort', onAbort, { once: true });
sourceSshClient! sourceSshClient!
.on('ready', () => { .on('ready', () => {
signal.removeEventListener('abort', onAbort);
console.info(`[TransfersService] SSH connection established to source server ${sourceConnection.host} for task ${taskId}.`); console.info(`[TransfersService] SSH connection established to source server ${sourceConnection.host} for task ${taskId}.`);
resolve(); resolve();
}) })
.on('error', (err) => { .on('error', (err) => {
signal.removeEventListener('abort', onAbort);
console.error(`[TransfersService] SSH connection error to source server ${sourceConnection.host} for task ${taskId}:`, err); console.error(`[TransfersService] SSH connection error to source server ${sourceConnection.host} for task ${taskId}:`, err);
reject(err); reject(err);
}) })
.on('close', () => { .on('close', () => {
signal.removeEventListener('abort', onAbort);
console.info(`[TransfersService] SSH connection closed to source server ${sourceConnection.host} for task ${taskId}.`); console.info(`[TransfersService] SSH connection closed to source server ${sourceConnection.host} for task ${taskId}.`);
}) })
.connect(sourceConnectConfig); .connect(sourceConnectConfig);
}); });
if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
// New concurrent processing logic for sub-tasks // New concurrent processing logic for sub-tasks
const subTaskExecutionPromises: Promise<void>[] = []; // Stores promises for all initiated sub-tasks const subTaskExecutionPromises: Promise<void>[] = []; // Stores promises for all initiated sub-tasks
let currentlyActiveSubTasks = 0; let currentlyActiveSubTasks = 0;
@@ -130,28 +195,33 @@ export class TransfersService {
const processSingleSubTaskWrapper = async (subTask: TransferSubTask, subTaskIndexForLog: number): Promise<void> => { const processSingleSubTaskWrapper = async (subTask: TransferSubTask, subTaskIndexForLog: number): Promise<void> => {
console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (index ${subTaskIndexForLog}) started. Active: ${currentlyActiveSubTasks}/${maxConcurrentSubTasks}`); console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (index ${subTaskIndexForLog}) started. Active: ${currentlyActiveSubTasks}/${maxConcurrentSubTasks}`);
if (signal.aborted) {
this.updateSubTaskStatus(taskId, subTask.subTaskId, 'cancelled', undefined, 'Cancelled before start.');
console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} cancelled before processing.`);
return; // Do not process this sub-task
}
const currentSourceItem = task.payload.sourceItems.find(it => it.name === subTask.sourceItemName); const currentSourceItem = task.payload.sourceItems.find(it => it.name === subTask.sourceItemName);
if (!currentSourceItem) { if (!currentSourceItem) {
this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Source item '${subTask.sourceItemName}' not found in payload.`); this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Source item '${subTask.sourceItemName}' not found in payload.`);
console.warn(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${subTask.sourceItemName}) - Source item not found.`); console.warn(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${subTask.sourceItemName}) - Source item not found.`);
return; // This sub-task cannot proceed return;
} }
try { try {
if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
this.updateSubTaskStatus(taskId, subTask.subTaskId, 'connecting', undefined, `Preparing transfer for ${currentSourceItem.name} to target ID ${subTask.connectionId}`); this.updateSubTaskStatus(taskId, subTask.subTaskId, 'connecting', undefined, `Preparing transfer for ${currentSourceItem.name} to target ID ${subTask.connectionId}`);
console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) - Connecting to target ID ${subTask.connectionId}.`); console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) - Connecting to target ID ${subTask.connectionId}.`);
const targetConnectionResult = await getConnectionWithDecryptedCredentials(subTask.connectionId); const targetConnectionResult = await getConnectionWithDecryptedCredentials(subTask.connectionId);
if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
if (!targetConnectionResult || !targetConnectionResult.connection) { if (!targetConnectionResult || !targetConnectionResult.connection) {
this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Target connection with ID ${subTask.connectionId} not found.`); this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Target connection with ID ${subTask.connectionId} not found.`);
console.warn(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) - Target connection ID ${subTask.connectionId} not found.`); return;
return; // This sub-task cannot proceed
} }
const { connection: targetConnection, ...targetCredentials } = targetConnectionResult; const { connection: targetConnection, ...targetCredentials } = targetConnectionResult;
// sourceSshClient is established before this loop and should be valid.
// Pass it with non-null assertion if TypeScript complains, or ensure it's correctly scoped.
await this.executeRemoteTransferOnSource( await this.executeRemoteTransferOnSource(
taskId, taskId,
subTask.subTaskId, subTask.subTaskId,
@@ -161,88 +231,124 @@ export class TransfersService {
targetConnection, targetConnection,
targetCredentials, targetCredentials,
task.payload.remoteTargetPath, task.payload.remoteTargetPath,
task.payload.transferMethod task.payload.transferMethod,
signal // +++ Pass signal +++
); );
console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) - Successfully processed by executeRemoteTransferOnSource.`);
} catch (subTaskError: any) { } catch (subTaskError: any) {
console.error(`[TransfersService] Task ${taskId}: Error in sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) wrapper:`, subTaskError.message, subTaskError.stack); if (subTaskError.name === 'AbortError') {
const subTaskInstance = task.subTasks.find(st => st.subTaskId === subTask.subTaskId); this.updateSubTaskStatus(taskId, subTask.subTaskId, 'cancelled', undefined, 'Sub-task cancelled by user.');
if (subTaskInstance && subTaskInstance.status !== 'failed' && subTaskInstance.status !== 'completed') { console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) was cancelled.`);
this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, subTaskError.message || `Unknown error in sub-task ${subTask.subTaskId} wrapper.`); } else {
console.error(`[TransfersService] Task ${taskId}: Error in sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) wrapper:`, subTaskError.message, subTaskError.stack);
const subTaskInstance = task.subTasks.find(st => st.subTaskId === subTask.subTaskId);
if (subTaskInstance && subTaskInstance.status !== 'failed' && subTaskInstance.status !== 'completed' && subTaskInstance.status !== 'cancelled') {
this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, subTaskError.message || `Unknown error in sub-task ${subTask.subTaskId} wrapper.`);
}
} }
} }
// The finally block for decrementing currentlyActiveSubTasks and launching next is outside this wrapper, managed by the calling promise.
}; };
// This promise resolves when all sub-tasks have been initiated and completed. await new Promise<void>((resolveAllTasksCompleted, rejectAllTasksCompleted) => {
await new Promise<void>(resolveAllTasksCompleted => { const onAbortOverall = () => {
console.info(`[TransfersService] Task ${taskId}: Overall cancellation signal received during sub-task processing phase.`);
// Attempt to clean up / signal running sub-tasks is handled by individual sub-task signal checks
rejectAllTasksCompleted(new DOMException('Transfer cancelled by user.', 'AbortError'));
};
signal.addEventListener('abort', onAbortOverall, { once: true });
const launchNextSubTaskIfPossible = () => { const launchNextSubTaskIfPossible = () => {
// This function is called to attempt to launch new sub-tasks if (signal.aborted) { // Check before launching new sub-tasks
// whenever a slot might be free (initially, or when a task completes). console.info(`[TransfersService] Task ${taskId}: Abort signal detected, not launching more sub-tasks.`);
if (currentlyActiveSubTasks === 0) resolveAllTasksCompleted(); // If no tasks are active, resolve.
return;
}
while (currentlyActiveSubTasks < maxConcurrentSubTasks && currentSubTaskIndex < totalSubTasks) { while (currentlyActiveSubTasks < maxConcurrentSubTasks && currentSubTaskIndex < totalSubTasks) {
// There's a free slot and more tasks are pending.
const subTaskToProcess = task.subTasks[currentSubTaskIndex]; const subTaskToProcess = task.subTasks[currentSubTaskIndex];
const capturedIndexForLog = currentSubTaskIndex; // Capture index for logging inside promise callbacks // If sub-task is already marked (e.g. cancelled by overall cancel before it started), skip.
if (subTaskToProcess.status === 'cancelled') {
console.info(`[TransfersService] Task ${taskId}: Queuing sub-task ${subTaskToProcess.subTaskId} (index ${capturedIndexForLog}). Active before launch: ${currentlyActiveSubTasks}, Max: ${maxConcurrentSubTasks}`); console.info(`[TransfersService] Task ${taskId}: Skipping already cancelled sub-task ${subTaskToProcess.subTaskId}`);
currentSubTaskIndex++;
if (currentSubTaskIndex === totalSubTasks && currentlyActiveSubTasks === 0) {
signal.removeEventListener('abort', onAbortOverall);
resolveAllTasksCompleted();
}
continue; // check next sub-task
}
const capturedIndexForLog = currentSubTaskIndex;
currentlyActiveSubTasks++; currentlyActiveSubTasks++;
currentSubTaskIndex++; // Increment index for the next task to be picked currentSubTaskIndex++;
const taskPromise = processSingleSubTaskWrapper(subTaskToProcess, capturedIndexForLog) const taskPromise = processSingleSubTaskWrapper(subTaskToProcess, capturedIndexForLog)
.finally(() => { .finally(() => {
currentlyActiveSubTasks--; currentlyActiveSubTasks--;
console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTaskToProcess.subTaskId} (index ${capturedIndexForLog}) finished. Active after completion: ${currentlyActiveSubTasks}.`); if (signal.aborted && currentlyActiveSubTasks === 0) {
console.info(`[TransfersService] Task ${taskId}: All active sub-tasks finished after main abort signal.`);
// Check if more tasks can be launched or if all are done. signal.removeEventListener('abort', onAbortOverall);
if (currentSubTaskIndex < totalSubTasks) { resolveAllTasksCompleted(); // All active tasks completed/aborted after main signal
// Still more tasks in the main list, try to launch another. return;
}
if (currentSubTaskIndex < totalSubTasks && !signal.aborted) {
launchNextSubTaskIfPossible(); launchNextSubTaskIfPossible();
} else if (currentlyActiveSubTasks === 0) { } else if (currentlyActiveSubTasks === 0) {
// All tasks from the main list have been initiated (currentSubTaskIndex === totalSubTasks) console.info(`[TransfersService] Task ${taskId}: All ${totalSubTasks} sub-tasks have completed or been skipped after processing.`);
// AND this was the last active task to complete. signal.removeEventListener('abort', onAbortOverall);
console.info(`[TransfersService] Task ${taskId}: All ${totalSubTasks} sub-tasks have completed processing.`);
resolveAllTasksCompleted(); resolveAllTasksCompleted();
} }
// If currentSubTaskIndex === totalSubTasks but currentlyActiveSubTasks > 0,
// other tasks are still running, so we wait for them to call this .finally block.
}); });
subTaskExecutionPromises.push(taskPromise); // Store for potential later inspection if needed subTaskExecutionPromises.push(taskPromise);
} }
// If all tasks were launched and some are still active, or if all tasks were skipped due to early cancellation
// This condition handles the case where all tasks have been queued and all have finished. if (currentSubTaskIndex === totalSubTasks && currentlyActiveSubTasks === 0 && !signal.aborted) {
// It's primarily triggered by the .finally() block of the last completing task. console.info(`[TransfersService] Task ${taskId}: All sub-tasks processed (no active, no more to launch).`);
if (currentSubTaskIndex === totalSubTasks && currentlyActiveSubTasks === 0) { signal.removeEventListener('abort', onAbortOverall);
if (totalSubTasks > 0) { // Only log and resolve if there were tasks to begin with resolveAllTasksCompleted();
console.info(`[TransfersService] Task ${taskId}: All ${totalSubTasks} sub-tasks have been launched and have completed.`);
} else {
console.info(`[TransfersService] Task ${taskId}: No sub-tasks were present to process.`);
}
resolveAllTasksCompleted();
} }
}; };
if (totalSubTasks === 0) { if (totalSubTasks === 0) {
console.info(`[TransfersService] Task ${taskId}: No sub-tasks to process.`); console.info(`[TransfersService] Task ${taskId}: No sub-tasks to process.`);
resolveAllTasksCompleted(); // No tasks, so resolve immediately. signal.removeEventListener('abort', onAbortOverall);
resolveAllTasksCompleted();
return; return;
} }
if (signal.aborted) { // Check if cancelled even before starting the loop
console.info(`[TransfersService] Task ${taskId}: Initiating processing. Will launch up to ${maxConcurrentSubTasks} sub-tasks concurrently.`); console.info(`[TransfersService] Task ${taskId}: Cancelled before sub-task loop initiation.`);
launchNextSubTaskIfPossible(); // Start the process by launching the initial set of concurrent tasks. task.subTasks.forEach(st => { // Mark all sub-tasks as cancelled
if(st.status !== 'completed' && st.status !== 'failed') this.updateSubTaskStatus(taskId, st.subTaskId, 'cancelled', undefined, 'Task cancelled before sub-task execution.');
});
signal.removeEventListener('abort', onAbortOverall);
rejectAllTasksCompleted(new DOMException('Transfer cancelled by user.', 'AbortError'));
return;
}
launchNextSubTaskIfPossible();
}); });
console.info(`[TransfersService] Task ${taskId}: Concurrent sub-task processing phase finished for ${totalSubTasks} sub-tasks.`);
} catch (error: any) { } catch (error: any) {
console.error(`[TransfersService] Major error processing task ${taskId}:`, error); if (error.name === 'AbortError') {
this.updateOverallTaskStatus(taskId, 'failed', error.message || 'Failed to process task due to a major error.'); console.info(`[TransfersService] Task ${taskId} processing was aborted.`);
} finally { this.updateOverallTaskStatus(taskId, 'cancelled', 'Transfer cancelled by user.');
if (sourceSshClient) { // No .readable property, just call end() } else {
sourceSshClient.end(); console.error(`[TransfersService] Major error processing task ${taskId}:`, error);
console.info(`[TransfersService] SSH connection to source server explicitly closed for task ${taskId}.`); this.updateOverallTaskStatus(taskId, 'failed', error.message || 'Failed to process task due to a major error.');
}
} finally {
if (sourceSshClient) { // 直接检查 sourceSshClient 是否存在
try {
sourceSshClient.end();
console.info(`[TransfersService] SSH connection to source server explicitly closed for task ${taskId}.`);
} catch (e) {
console.warn(`[TransfersService] Error ending sourceSshClient for task ${taskId}:`, e)
}
}
this.finalizeOverallTaskStatus(taskId); // Ensure final status is set
this.taskAbortControllers.delete(taskId); // +++ Clean up AbortController +++
if (task) { // task 可能未定义如果 taskId 错误
console.info(`[TransfersService] Task ${taskId} processing finished. Final status: ${task.status}.`);
} else {
console.info(`[TransfersService] Task ${taskId} processing finished (task object was not found at the end).`);
} }
this.finalizeOverallTaskStatus(taskId);
} }
} }
@@ -483,26 +589,33 @@ export class TransfersService {
return commandParts.join(' '); return commandParts.join(' ');
} }
private async executeRemoteTransferOnSource( private async executeRemoteTransferOnSource(
taskId: string, taskId: string,
subTaskId: string, subTaskId: string,
sourceSshClient: Client, sourceSshClient: Client,
sourceConnectionForInfo: ConnectionWithTags, // unused, but good for context if needed sourceConnectionForInfo: ConnectionWithTags,
sourceItem: { name: string; path: string; type: 'file' | 'directory' }, sourceItem: { name: string; path: string; type: 'file' | 'directory' },
targetConnection: ConnectionWithTags, targetConnection: ConnectionWithTags,
targetCredentials: DecryptedConnectionCredentials, targetCredentials: DecryptedConnectionCredentials,
remoteTargetPathOnTarget: string, // This is the base directory on target remoteTargetPathOnTarget: string,
transferMethodPreference: 'auto' | 'rsync' | 'scp' transferMethodPreference: 'auto' | 'rsync' | 'scp',
): Promise<void> { signal: AbortSignal // +++ Add AbortSignal parameter +++
console.error(`[Roo Debug][transfers.service.ts] ENTERING executeRemoteTransferOnSource for sub-task ${subTaskId}, item: ${sourceItem.name}`); ): Promise<void> {
this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 0, `Initializing remote transfer for ${sourceItem.name}`); console.error(`[Roo Debug][transfers.service.ts] ENTERING executeRemoteTransferOnSource for sub-task ${subTaskId}, item: ${sourceItem.name}`);
let tempTargetKeyPathOnSource: string | undefined; if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 0, `Initializing remote transfer for ${sourceItem.name}`);
let tempTargetKeyPathOnSource: string | undefined;
try { try {
if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Starting try block in executeRemoteTransferOnSource.`); console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Starting try block in executeRemoteTransferOnSource.`);
const sshpassPath = await this.checkCommandOnSource(sourceSshClient, 'sshpass'); // Pass signal to these check commands if they are made to support it. For now, they are quick.
const rsyncPathOnSource = await this.checkCommandOnSource(sourceSshClient, 'rsync'); // Renamed for clarity const sshpassPath = await this.checkCommandOnSource(sourceSshClient, 'sshpass' /*, signal */);
const scpPathOnSource = await this.checkCommandOnSource(sourceSshClient, 'scp'); // Renamed for clarity if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
const rsyncPathOnSource = await this.checkCommandOnSource(sourceSshClient, 'rsync' /*, signal */);
if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
const scpPathOnSource = await this.checkCommandOnSource(sourceSshClient, 'scp' /*, signal */);
if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Source checks -> sshpass: ${sshpassPath}, rsync: ${rsyncPathOnSource}, scp: ${scpPathOnSource}`); console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Source checks -> sshpass: ${sshpassPath}, rsync: ${rsyncPathOnSource}, scp: ${scpPathOnSource}`);
@@ -514,243 +627,185 @@ export class TransfersService {
if (rsyncPathOnSource) { if (rsyncPathOnSource) {
// Source has rsync, check target // Source has rsync, check target
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: 'auto' mode, rsync found on source. Checking target...`); console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: 'auto' mode, rsync found on source. Checking target...`);
rsyncPathOnTarget = await this.checkCommandOnTargetServer(targetConnection, targetCredentials, 'rsync'); rsyncPathOnTarget = await this.checkCommandOnTargetServer(targetConnection, targetCredentials, 'rsync' /*, signal */);
if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
if (rsyncPathOnTarget) { if (rsyncPathOnTarget) {
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Target check (for auto rsync) -> rsync found at '${rsyncPathOnTarget}'. Selecting rsync.`); executableCommandPath = rsyncPathOnSource;
executableCommandPath = rsyncPathOnSource; // Use source path for exec
commandTypeForLogic = 'rsync'; commandTypeForLogic = 'rsync';
} else {
console.warn(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Rsync found on source, but NOT on target. Rsync cannot be used for 'auto' mode.`);
} }
} }
// If rsync not chosen (either source or target missing), try SCP for 'auto' if (!commandTypeForLogic) { // If rsync not chosen, try SCP
if (!commandTypeForLogic) {
if (scpPathOnSource) { if (scpPathOnSource) {
executableCommandPath = scpPathOnSource; executableCommandPath = scpPathOnSource;
commandTypeForLogic = 'scp'; commandTypeForLogic = 'scp';
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: 'auto' mode falling back to SCP (Source SCP path: ${scpPathOnSource}).`);
} else { } else {
const msg = `Neither Rsync (source/target) nor SCP (source) are available for ${sourceItem.name} (auto mode).`; throw new Error(`Neither Rsync nor SCP are available on source for ${sourceItem.name} (auto mode).`);
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, msg);
throw new Error(msg);
} }
} }
} else if (transferMethodPreference === 'rsync') { } else if (transferMethodPreference === 'rsync') {
if (rsyncPathOnSource) { if (!rsyncPathOnSource) throw new Error(`Rsync preferred but not available on source for ${sourceItem.name}.`);
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: 'rsync' preference, rsync found on source. Checking target...`); rsyncPathOnTarget = await this.checkCommandOnTargetServer(targetConnection, targetCredentials, 'rsync' /*, signal */);
rsyncPathOnTarget = await this.checkCommandOnTargetServer(targetConnection, targetCredentials, 'rsync'); if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
if (rsyncPathOnTarget) { if (!rsyncPathOnTarget) throw new Error(`Rsync preferred, but not available on target for ${sourceItem.name}.`);
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Target check (for preferred rsync) -> rsync found at '${rsyncPathOnTarget}'. Selecting rsync.`); executableCommandPath = rsyncPathOnSource;
executableCommandPath = rsyncPathOnSource; commandTypeForLogic = 'rsync';
commandTypeForLogic = 'rsync';
} else {
const msg = `Rsync preferred, found on source, but rsync is NOT available on target server for ${sourceItem.name}.`;
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, msg);
throw new Error(msg);
}
} else {
const msg = `Rsync preferred but not available on source server for ${sourceItem.name}.`;
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, msg);
throw new Error(msg);
}
} else if (transferMethodPreference === 'scp') { } else if (transferMethodPreference === 'scp') {
if (scpPathOnSource) { if (!scpPathOnSource) throw new Error(`SCP preferred but not available on source for ${sourceItem.name}.`);
executableCommandPath = scpPathOnSource; executableCommandPath = scpPathOnSource;
commandTypeForLogic = 'scp'; commandTypeForLogic = 'scp';
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: 'scp' preference. Selecting SCP (Source SCP path: ${scpPathOnSource}).`);
} else {
const msg = `SCP preferred but not available on source server for ${sourceItem.name}.`;
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, msg);
throw new Error(msg);
}
} else {
// This case should ideally not be reached if transferMethodPreference is correctly typed
const msg = `Invalid transfer method preference: ${transferMethodPreference}`;
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, msg);
throw new Error(msg);
}
// Safeguard: ensure a command was actually selected
if (!executableCommandPath || !commandTypeForLogic) {
const msg = `Internal error: Could not determine a valid transfer command for ${sourceItem.name}. This should not happen.`;
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: ${msg} rsyncSrc=${rsyncPathOnSource}, scpSrc=${scpPathOnSource}, rsyncTgt=${rsyncPathOnTarget}, pref=${transferMethodPreference}`);
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, msg);
throw new Error(msg);
} }
this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 5, `Using ${commandTypeForLogic} (Path: ${executableCommandPath}). Source SSHPass: ${!!sshpassPath}, Rsync Src: ${!!rsyncPathOnSource}, Rsync Tgt: ${!!rsyncPathOnTarget}, SCP Src: ${!!scpPathOnSource}`); if (!executableCommandPath || !commandTypeForLogic) {
throw new Error(`Could not determine a valid transfer command for ${sourceItem.name}.`);
}
if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 5, `Using ${commandTypeForLogic}.`);
// +++ Declare and initialize cmdOptions here +++
const cmdOptions: {
targetUserAndHost: string;
sshPortOption?: string;
sshIdentityFileOption?: string;
sshPassCommand?: string;
} = {
targetUserAndHost: `${targetConnection.username}@${targetConnection.host}`,
sshPortOption: targetConnection.port ? (commandTypeForLogic === 'scp' ? `-P ${targetConnection.port}` : (commandTypeForLogic === 'rsync' ? `-p ${targetConnection.port}` : undefined)) : undefined,
};
const subTaskToUpdate = this.transferTasks.get(taskId)?.subTasks.find(st => st.subTaskId === subTaskId); const subTaskToUpdate = this.transferTasks.get(taskId)?.subTasks.find(st => st.subTaskId === subTaskId);
if (subTaskToUpdate) subTaskToUpdate.transferMethodUsed = commandTypeForLogic; if (subTaskToUpdate) subTaskToUpdate.transferMethodUsed = commandTypeForLogic;
const cmdOptions: any = {
targetUserAndHost: `${targetConnection.username}@${targetConnection.host}`,
// Port for rsync is handled in buildTransferCommandString via -e "ssh -p <port>"
// Port for scp is -P <port>
sshPortOption: targetConnection.port ? (commandTypeForLogic === 'scp' ? `-P ${targetConnection.port}` : (commandTypeForLogic === 'rsync' ? `-p ${targetConnection.port}` : undefined)) : undefined,
};
if (targetConnection.auth_method === 'key' && targetCredentials.decryptedPrivateKey) { if (targetConnection.auth_method === 'key' && targetCredentials.decryptedPrivateKey) {
const randomSuffix = crypto.randomBytes(6).toString('hex'); const randomSuffix = crypto.randomBytes(6).toString('hex');
tempTargetKeyPathOnSource = path.posix.join('/tmp', `${this.TEMP_KEY_PREFIX}${randomSuffix}`); // Use posix path for remote systems tempTargetKeyPathOnSource = path.posix.join('/tmp', `${this.TEMP_KEY_PREFIX}${randomSuffix}`);
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: BEFORE calling uploadKeyToSourceViaSftp for target key path: ${tempTargetKeyPathOnSource}`);
await this.uploadKeyToSourceViaSftp(sourceSshClient, targetCredentials.decryptedPrivateKey, tempTargetKeyPathOnSource); await this.uploadKeyToSourceViaSftp(sourceSshClient, targetCredentials.decryptedPrivateKey, tempTargetKeyPathOnSource);
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: AFTER calling uploadKeyToSourceViaSftp.`); if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
cmdOptions.sshIdentityFileOption = `-i ${this.escapeShellArg(tempTargetKeyPathOnSource)}`; cmdOptions.sshIdentityFileOption = `-i ${this.escapeShellArg(tempTargetKeyPathOnSource)}`;
if (targetCredentials.decryptedPassphrase && !sshpassPath) {
if (targetCredentials.decryptedPassphrase) { throw new Error(`Target key has passphrase, but sshpass is not available on source for ${sourceItem.name}.`);
if (sshpassPath) { // Check if sshpassPath is not null }
cmdOptions.sshPassCommand = `${this.escapeShellArg(sshpassPath)} -p ${this.escapeShellArg(targetCredentials.decryptedPassphrase)}`; if (targetCredentials.decryptedPassphrase && sshpassPath) {
} else { cmdOptions.sshPassCommand = `${this.escapeShellArg(sshpassPath)} -p ${this.escapeShellArg(targetCredentials.decryptedPassphrase)}`;
const msg = `Target key has passphrase, but sshpass is not available on source server for ${sourceItem.name}.`;
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, msg);
throw new Error(msg);
}
} }
} else if (targetConnection.auth_method === 'password' && targetCredentials.decryptedPassword) { } else if (targetConnection.auth_method === 'password' && targetCredentials.decryptedPassword) {
if (sshpassPath) { // Check if sshpassPath is not null if (!sshpassPath) {
cmdOptions.sshPassCommand = `${this.escapeShellArg(sshpassPath)} -p ${this.escapeShellArg(targetCredentials.decryptedPassword)}`; throw new Error(`Target uses password auth, but sshpass is not available on source for ${sourceItem.name}.`);
} else {
const msg = `Target uses password auth, but sshpass is not available on source server for ${sourceItem.name}.`;
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, msg);
throw new Error(msg);
} }
cmdOptions.sshPassCommand = `${this.escapeShellArg(sshpassPath)} -p ${this.escapeShellArg(targetCredentials.decryptedPassword)}`;
} else if (targetConnection.auth_method === 'key' && !targetCredentials.decryptedPrivateKey) { } else if (targetConnection.auth_method === 'key' && !targetCredentials.decryptedPrivateKey) {
const msg = `Target connection ${targetConnection.name} is key-based but no private key found. Sub-task for ${sourceItem.name} failed.`; throw new Error(`Target connection ${targetConnection.name} is key-based but no private key found.`);
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, msg);
throw new Error(msg);
} }
if (signal.aborted) throw new DOMException('Transfer cancelled by user.', 'AbortError');
const commandToExecute = this.buildTransferCommandString( const commandToExecute = this.buildTransferCommandString(
sourceItem.path, sourceItem.path, sourceItem.type === 'directory', targetConnection, remoteTargetPathOnTarget,
sourceItem.type === 'directory', executableCommandPath, commandTypeForLogic, cmdOptions
targetConnection, );
remoteTargetPathOnTarget, this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 10, `Executing: ${commandTypeForLogic}`);
executableCommandPath!, // Assert not null as we'd have thrown earlier
commandTypeForLogic,
cmdOptions
);
console.info(`[TransfersService] Executing on source for sub-task ${subTaskId} (item: ${sourceItem.name}): ${commandToExecute}`);
console.info(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Prepared command: ${commandToExecute}`);
console.info(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Command options: ${JSON.stringify(cmdOptions)}`);
this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 10, `Executing: ${commandTypeForLogic} from source to ${targetConnection.name}`);
const COMMAND_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes timeout for command execution
await new Promise<void>((resolveCmd, rejectCmd) => { await new Promise<void>((resolveCmd, rejectCmd) => {
let commandTimedOut = false; let streamClosed = false;
let stdoutCombined = ''; const onAbortCmd = () => {
let stderrCombined = ''; if (!streamClosed) {
const timeoutHandle = setTimeout(() => { console.warn(`[TransfersService] Abort signal received for command stream of sub-task ${subTaskId}. Attempting to close stream.`);
commandTimedOut = true; // execStream?.close(); // 'execStream' is not defined here, should be 'stream' from exec callback
const timeoutMsg = `${commandTypeForLogic} command for ${sourceItem.name} timed out after ${COMMAND_TIMEOUT_MS / 1000}s.`; // It's tricky to access the stream here to close it directly.
console.warn(`[Roo Debug][transfers.service.ts] TIMEOUT ${timeoutMsg} (Sub-task: ${subTaskId})`); // The main mechanism will be the timeout and the client connection eventually closing if task is aborted.
console.warn(`[Roo Debug][transfers.service.ts] TIMEOUT Sub-task ${subTaskId}: STDOUT at timeout: ${stdoutCombined}`); // Or, if ssh2's stream object can be made available to this scope, call .close() or .destroy().
console.warn(`[Roo Debug][transfers.service.ts] TIMEOUT Sub-task ${subTaskId}: STDERR at timeout: ${stderrCombined}`); }
// Attempt to close the stream, though it might not always work if process is stuck hard rejectCmd(new DOMException('Command cancelled by user.', 'AbortError'));
// stream?.close(); // stream is not in this scope yet, and might not exist };
rejectCmd(new Error(timeoutMsg)); signal.addEventListener('abort', onAbortCmd, { once: true });
}, COMMAND_TIMEOUT_MS);
const COMMAND_TIMEOUT_MS = 5 * 60 * 1000;
const execOptions: { pty?: boolean } = {}; const timeoutHandle = setTimeout(() => {
if (cmdOptions.sshPassCommand) { // Only use PTY if sshpass is involved signal.removeEventListener('abort', onAbortCmd);
execOptions.pty = true; if (!streamClosed) rejectCmd(new Error(`${commandTypeForLogic} command timed out for ${sourceItem.name}.`));
} }, COMMAND_TIMEOUT_MS);
const execOptions: { pty?: boolean } = {};
if (cmdOptions.sshPassCommand) execOptions.pty = true;
console.info(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Exec options for ssh2: ${JSON.stringify(execOptions)}`);
sourceSshClient.exec(commandToExecute, execOptions, (err, stream) => { sourceSshClient.exec(commandToExecute, execOptions, (err, stream) => {
if (commandTimedOut) { // If timeout already fired, don't process stream events if (signal.aborted && !streamClosed) { // Check signal immediately after exec callback
console.info(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: exec callback fired AFTER timeout. Closing stream.`); clearTimeout(timeoutHandle);
stream?.close(); // Try to close the stream if exec cb somehow still runs signal.removeEventListener('abort', onAbortCmd);
return; stream?.close(); // Attempt to close if stream exists
return rejectCmd(new DOMException('Command cancelled by user (at exec).', 'AbortError'));
} }
if (err) { if (err) {
clearTimeout(timeoutHandle); clearTimeout(timeoutHandle);
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Failed to initiate command execution:`, err); signal.removeEventListener('abort', onAbortCmd);
return rejectCmd(new Error(`Failed to execute command on source: ${err.message}`)); return rejectCmd(new Error(`Failed to execute command: ${err.message}`));
} }
console.info(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Stream obtained for command execution.`);
stream.on('data', (data: Buffer) => { stream.on('data', (data: Buffer) => {
if (commandTimedOut) return; if (signal.aborted) return; // Stop processing data if aborted
const output = data.toString(); // ... (progress update logic)
stdoutCombined += output;
// More verbose logging for stdout
console.debug(`[Roo Debug][transfers.service.ts] RAW STDOUT Sub-task ${subTaskId} (item ${sourceItem.name}): <<<${output}>>>`);
if (commandTypeForLogic === 'rsync') { if (commandTypeForLogic === 'rsync') {
const output = data.toString();
const progressMatch = output.match(/(\d+)%/); const progressMatch = output.match(/(\d+)%/);
if (progressMatch && progressMatch[1]) { if (progressMatch && progressMatch[1]) {
this.updateSubTaskStatus(taskId, subTaskId, 'transferring', parseInt(progressMatch[1], 10)); this.updateSubTaskStatus(taskId, subTaskId, 'transferring', parseInt(progressMatch[1], 10));
} }
} else { // scp } else {
this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 50, 'SCP in progress...'); this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 50, 'SCP in progress...');
} }
}); });
let stderrCombined = '';
stream.stderr.on('data', (data: Buffer) => { stream.stderr.on('data', (data: Buffer) => {
if (commandTimedOut) return; if (signal.aborted) return;
const errorOutput = data.toString(); stderrCombined += data.toString();
stderrCombined += errorOutput; console.warn(`[Roo Debug][transfers.service.ts] STDERR Sub-task ${subTaskId}: ${data.toString()}`);
// More verbose logging for stderr
console.warn(`[Roo Debug][transfers.service.ts] RAW STDERR Sub-task ${subTaskId} (item ${sourceItem.name}): <<<${errorOutput}>>>`);
}); });
stream.on('close', (code: number | null) => {
stream.on('close', (code: number | null, signal?: string) => { streamClosed = true;
clearTimeout(timeoutHandle); clearTimeout(timeoutHandle);
if (commandTimedOut) { signal.removeEventListener('abort', onAbortCmd);
console.info(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Stream closed AFTER timeout.`); if (signal.aborted) { // Check if aborted during the command run
return; // Already handled by timeout return rejectCmd(new DOMException('Command cancelled by user (on close).', 'AbortError'));
} }
console.info(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Stream closed. Code: ${code}, Signal: ${signal}.`);
console.info(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Final STDOUT: ${stdoutCombined}`);
console.info(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Final STDERR: ${stderrCombined}`);
if (code === 0) { if (code === 0) {
this.updateSubTaskStatus(taskId, subTaskId, 'completed', 100, `${commandTypeForLogic} successful for ${sourceItem.name} to ${targetConnection.name}.`); this.updateSubTaskStatus(taskId, subTaskId, 'completed', 100, `${commandTypeForLogic} successful.`);
resolveCmd(); resolveCmd();
} else { } else {
const errorMsg = `${commandTypeForLogic} failed for ${sourceItem.name} to ${targetConnection.name}. Exit code: ${code}, signal: ${signal}. Stderr: ${stderrCombined.trim()}`; rejectCmd(new Error(`${commandTypeForLogic} failed. Code: ${code}. Stderr: ${stderrCombined.trim()}`));
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, errorMsg);
rejectCmd(new Error(errorMsg));
} }
}); });
stream.on('error', (streamErr: Error) => { stream.on('error', (streamErr: Error) => {
streamClosed = true;
clearTimeout(timeoutHandle); clearTimeout(timeoutHandle);
if (commandTimedOut) { signal.removeEventListener('abort', onAbortCmd);
console.info(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Stream error AFTER timeout.`); if (signal.aborted && streamErr.message.includes('closed')) { // If aborted and stream closed, treat as AbortError
return; return rejectCmd(new DOMException('Command stream error due to cancellation.', 'AbortError'));
} }
console.error(`[Roo Debug][transfers.service.ts] Sub-task ${subTaskId}: Stream error event:`, streamErr);
const errorMsg = `Stream error during ${commandTypeForLogic} for ${sourceItem.name}: ${streamErr.message}`;
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, errorMsg);
rejectCmd(streamErr); rejectCmd(streamErr);
}); });
}); });
}); });
} catch (error: any) { } catch (error: any) {
// This will catch errors from checks, key upload, or the command execution promise if (error.name === 'AbortError') {
console.error(`[Roo Debug][transfers.service.ts] executeRemoteTransferOnSource CATCH block for sub-task ${subTaskId} (item: ${sourceItem.name}). Error type: ${error?.constructor?.name}`, error); console.info(`[TransfersService] executeRemoteTransferOnSource for sub-task ${subTaskId} (item: ${sourceItem.name}) was aborted.`);
console.error(`[TransfersService] executeRemoteTransferOnSource error for sub-task ${subTaskId} (item: ${sourceItem.name}):`, error); // Keep original error log // Status will be updated to 'cancelled' by the caller or here if not already
// Status should have been updated by the specific failure point, or update here if not already failed const subTaskInstance = this.transferTasks.get(taskId)?.subTasks.find(st => st.subTaskId === subTaskId);
const taskFromMap = this.transferTasks.get(taskId); if (subTaskInstance && subTaskInstance.status !== 'cancelled') {
const currentSubTask = taskFromMap?.subTasks.find((st: TransferSubTask) => st.subTaskId === subTaskId); this.updateSubTaskStatus(taskId, subTaskId, 'cancelled', undefined, error.message);
if (currentSubTask && currentSubTask.status !== 'failed' && currentSubTask.status !== 'completed') { }
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, error.message || `Remote transfer execution failed for ${sourceItem.name}.`); } else {
console.error(`[TransfersService] executeRemoteTransferOnSource error for sub-task ${subTaskId} (item: ${sourceItem.name}):`, error);
this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, error.message || `Remote transfer execution failed for ${sourceItem.name}.`);
} }
throw error; // Re-throw to be caught by processTransferTask's loop for this sub-task throw error; // Re-throw to be caught by processSingleSubTaskWrapper
} finally { } finally {
console.info(`[Roo Debug][transfers.service.ts] executeRemoteTransferOnSource FINALLY block for sub-task ${subTaskId} (item: ${sourceItem.name}). Temp key path: ${tempTargetKeyPathOnSource}`); console.info(`[Roo Debug][transfers.service.ts] executeRemoteTransferOnSource FINALLY for sub-task ${subTaskId}`);
if (tempTargetKeyPathOnSource) { if (tempTargetKeyPathOnSource) {
try { try {
// TODO: Make deleteFileOnSourceViaSftp accept signal
await this.deleteFileOnSourceViaSftp(sourceSshClient, tempTargetKeyPathOnSource); await this.deleteFileOnSourceViaSftp(sourceSshClient, tempTargetKeyPathOnSource);
} catch (cleanupError) { } catch (cleanupError) {
console.warn(`[TransfersService] Failed to cleanup temp key ${tempTargetKeyPathOnSource} on source for sub-task ${subTaskId}:`, cleanupError); console.warn(`[TransfersService] Failed to cleanup temp key ${tempTargetKeyPathOnSource} on source for sub-task ${subTaskId}:`, cleanupError);
// Log but don't fail the entire sub-task if it otherwise succeeded/failed clearly
} }
} }
} }
@@ -10,7 +10,7 @@ export interface TransferSubTask {
subTaskId: string; subTaskId: string;
connectionId: number; connectionId: number;
sourceItemName: string; sourceItemName: string;
status: 'queued' | 'connecting' | 'transferring' | 'completed' | 'failed'; status: 'queued' | 'connecting' | 'transferring' | 'completed' | 'failed' | 'cancelling' | 'cancelled'; // +++ 新增状态 +++
progress?: number; // 0-100 progress?: number; // 0-100
message?: string; // 例如错误信息 message?: string; // 例如错误信息
transferMethodUsed?: 'rsync' | 'scp'; transferMethodUsed?: 'rsync' | 'scp';
@@ -20,7 +20,7 @@ export interface TransferSubTask {
export interface TransferTask { export interface TransferTask {
taskId: string; taskId: string;
status: 'queued' | 'in-progress' | 'completed' | 'failed' | 'partially-completed'; status: 'queued' | 'in-progress' | 'completed' | 'failed' | 'partially-completed' | 'cancelling' | 'cancelled'; // +++ 新增状态 +++
userId: string | number; // 添加用户ID字段 userId: string | number; // 添加用户ID字段
createdAt: Date; createdAt: Date;
updatedAt: Date; updatedAt: Date;
@@ -52,7 +52,7 @@ interface TransferSubTask {
subTaskId: string; subTaskId: string;
connectionId: number; connectionId: number;
sourceItemName: string; sourceItemName: string;
status: 'queued' | 'connecting' | 'transferring' | 'completed' | 'failed'; status: 'queued' | 'connecting' | 'transferring' | 'completed' | 'failed' | 'cancelling' | 'cancelled'; // +++ 新增状态 +++
progress?: number; // 0-100 progress?: number; // 0-100
message?: string; message?: string;
transferMethodUsed?: 'rsync' | 'scp'; transferMethodUsed?: 'rsync' | 'scp';
@@ -60,7 +60,7 @@ interface TransferSubTask {
interface TransferTask { interface TransferTask {
taskId: string; taskId: string;
status: 'queued' | 'in-progress' | 'completed' | 'failed' | 'partially-completed'; status: 'queued' | 'in-progress' | 'completed' | 'failed' | 'partially-completed' | 'cancelling' | 'cancelled'; // +++ 新增状态 +++
createdAt: string | Date; createdAt: string | Date;
updatedAt: string | Date; updatedAt: string | Date;
subTasks: TransferSubTask[]; subTasks: TransferSubTask[];
@@ -112,6 +112,8 @@ const getDisplayStatus = (status: string): string => {
'partially-completed': 'transferProgressModal.status.partiallyCompleted', 'partially-completed': 'transferProgressModal.status.partiallyCompleted',
'connecting': 'transferProgressModal.status.connecting', 'connecting': 'transferProgressModal.status.connecting',
'transferring': 'transferProgressModal.status.transferring', 'transferring': 'transferProgressModal.status.transferring',
'cancelling': 'transferProgressModal.status.cancelling', // +++ 新增状态翻译键 +++
'cancelled': 'transferProgressModal.status.cancelled', // +++ 新增状态翻译键 +++
}; };
// 提供一个默认的回退文本,以防i18n key缺失 // 提供一个默认的回退文本,以防i18n key缺失
const defaultText = status.charAt(0).toUpperCase() + status.slice(1).replace('-', ' '); const defaultText = status.charAt(0).toUpperCase() + status.slice(1).replace('-', ' ');
@@ -180,6 +182,41 @@ const handleClose = () => {
internalVisible.value = false; internalVisible.value = false;
}; };
const isTaskCancellable = (taskStatus: TransferTask['status']): boolean => {
return ['queued', 'in-progress', 'connecting', 'transferring', 'cancelling'].includes(taskStatus);
};
const isTaskCancelling = (taskStatus: TransferTask['status']): boolean => {
return taskStatus === 'cancelling';
};
const handleCancelTask = async (taskId: string) => {
// 可以在这里添加一个确认对话框
// const confirmed = window.confirm(t('transferProgressModal.confirmCancel', '您确定要中止此传输任务吗?'));
// if (!confirmed) return;
try {
// 更新UI,将任务状态临时设置为 'cancelling' 或禁用按钮
const task = transferTasks.value.find(t => t.taskId === taskId);
if (task) {
// 优选: 如果后端会快速更新状态并通过轮询反映, 前端可能不需要立即改变状态。
// 否则, 可以临时改变: task.status = 'cancelling';
// 另一种方法是添加一个 loading 状态到按钮上
}
await apiClient.post(`/transfers/cancel/${taskId}`);
// 可以添加成功提示
// uiNotificationsStore.showSuccess(t('transferProgressModal.cancelRequested', '已发送中止请求。'));
// 立即刷新一次列表,或者等待下一次轮询
fetchTransferTasks();
} catch (error: any) {
console.error(`Failed to cancel task ${taskId}:`, error);
// uiNotificationsStore.showError(error.response?.data?.message || error.message || t('transferProgressModal.error.cancelFailed', '中止任务失败。'));
// 如果任务状态之前被临时修改,可能需要回滚
}
};
</script> </script>
<template> <template>
@@ -220,14 +257,27 @@ const handleClose = () => {
<span class="font-semibold text-md block">{{ t('transferProgressModal.task.idLabel', '任务') }}: {{ formatTaskTitle(task) }}</span> <span class="font-semibold text-md block">{{ t('transferProgressModal.task.idLabel', '任务') }}: {{ formatTaskTitle(task) }}</span>
<span class="text-xs text-text-muted">{{ t('transferProgressModal.task.createdAt', '创建于') }}: {{ formatDate(task.createdAt) }}</span> <span class="text-xs text-text-muted">{{ t('transferProgressModal.task.createdAt', '创建于') }}: {{ formatDate(task.createdAt) }}</span>
</div> </div>
<span :class="['px-2.5 py-1 text-xs font-semibold rounded-full', <div class="flex items-center space-x-2">
{ 'bg-green-100 text-green-700': task.status === 'completed' }, <span :class="['px-2.5 py-1 text-xs font-semibold rounded-full',
{ 'bg-red-100 text-red-700': task.status === 'failed' }, { 'bg-green-100 text-green-700': task.status === 'completed' },
{ 'bg-yellow-100 text-yellow-700': task.status === 'partially-completed' || task.status === 'queued' }, { 'bg-red-100 text-red-700': task.status === 'failed' },
{ 'bg-blue-100 text-blue-700': task.status === 'in-progress' } { 'bg-yellow-100 text-yellow-700': task.status === 'partially-completed' || task.status === 'queued' || task.status === 'cancelling' }, // cancelling 也用黄色
]"> { 'bg-blue-100 text-blue-700': task.status === 'in-progress' },
{{ getDisplayStatus(task.status) }} { 'bg-gray-100 text-gray-700': task.status === 'cancelled' } // cancelled 用灰色
</span> ]">
{{ getDisplayStatus(task.status) }}
</span>
<button
v-if="isTaskCancellable(task.status)"
@click="handleCancelTask(task.taskId)"
:disabled="isTaskCancelling(task.status)"
class="px-2 py-0.5 text-xs bg-red-500 hover:bg-red-600 text-white rounded-md focus:outline-none focus:ring-2 focus:ring-red-400 disabled:opacity-50 disabled:cursor-not-allowed transition-colors"
:title="isTaskCancelling(task.status) ? t('transferProgressModal.cancellingTooltip', '中止中...') : t('transferProgressModal.cancelTaskTooltip', '中止任务')"
>
<i v-if="isTaskCancelling(task.status)" class="fas fa-spinner fa-spin mr-1"></i>
{{ isTaskCancelling(task.status) ? t('transferProgressModal.cancellingButton', '中止中') : t('transferProgressModal.cancelButton', '中止') }}
</button>
</div>
</div> </div>
<div v-if="task.overallProgress !== undefined" class="mb-2"> <div v-if="task.overallProgress !== undefined" class="mb-2">