diff --git a/packages/backend/src/transfers/transfers.service.ts b/packages/backend/src/transfers/transfers.service.ts index 26a5525..5731e54 100644 --- a/packages/backend/src/transfers/transfers.service.ts +++ b/packages/backend/src/transfers/transfers.service.ts @@ -12,6 +12,7 @@ import type { ConnectionWithTags, DecryptedConnectionCredentials } from '../type export class TransfersService { private transferTasks: Map = new Map(); private readonly TEMP_KEY_PREFIX = 'nexus_target_key_'; + private readonly MAX_CONCURRENT_SUB_TASKS = 5; // Maximum concurrent sub-tasks constructor() { console.info('[TransfersService] Initialized.'); @@ -116,42 +117,123 @@ export class TransfersService { .connect(sourceConnectConfig); }); - for (const subTask of task.subTasks) { + // New concurrent processing logic for sub-tasks + const subTaskExecutionPromises: Promise[] = []; // Stores promises for all initiated sub-tasks + let currentlyActiveSubTasks = 0; + const maxConcurrentSubTasks = this.MAX_CONCURRENT_SUB_TASKS; + let currentSubTaskIndex = 0; // Points to the next sub-task in task.subTasks to be processed + const totalSubTasks = task.subTasks.length; + + console.info(`[TransfersService] Task ${taskId}: Starting to process ${totalSubTasks} sub-tasks with max concurrency of ${maxConcurrentSubTasks}.`); + + // Wrapper function to process a single sub-task and manage active counts + const processSingleSubTaskWrapper = async (subTask: TransferSubTask, subTaskIndexForLog: number): Promise => { + console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (index ${subTaskIndexForLog}) started. Active: ${currentlyActiveSubTasks}/${maxConcurrentSubTasks}`); + const currentSourceItem = task.payload.sourceItems.find(it => it.name === subTask.sourceItemName); if (!currentSourceItem) { this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Source item '${subTask.sourceItemName}' not found in payload.`); - continue; + console.warn(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${subTask.sourceItemName}) - Source item not found.`); + return; // This sub-task cannot proceed } try { this.updateSubTaskStatus(taskId, subTask.subTaskId, 'connecting', undefined, `Preparing transfer for ${currentSourceItem.name} to target ID ${subTask.connectionId}`); + console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) - Connecting to target ID ${subTask.connectionId}.`); + const targetConnectionResult = await getConnectionWithDecryptedCredentials(subTask.connectionId); if (!targetConnectionResult || !targetConnectionResult.connection) { this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Target connection with ID ${subTask.connectionId} not found.`); - continue; + console.warn(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) - Target connection ID ${subTask.connectionId} not found.`); + return; // This sub-task cannot proceed } const { connection: targetConnection, ...targetCredentials } = targetConnectionResult; + // sourceSshClient is established before this loop and should be valid. + // Pass it with non-null assertion if TypeScript complains, or ensure it's correctly scoped. await this.executeRemoteTransferOnSource( taskId, subTask.subTaskId, - sourceSshClient, - sourceConnection, // For logging/info if needed + sourceSshClient!, + sourceConnection, currentSourceItem, targetConnection, targetCredentials, task.payload.remoteTargetPath, task.payload.transferMethod ); + console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) - Successfully processed by executeRemoteTransferOnSource.`); } catch (subTaskError: any) { - console.error(`[TransfersService] Error in sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}):`, subTaskError); + console.error(`[TransfersService] Task ${taskId}: Error in sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) wrapper:`, subTaskError.message, subTaskError.stack); const subTaskInstance = task.subTasks.find(st => st.subTaskId === subTask.subTaskId); if (subTaskInstance && subTaskInstance.status !== 'failed' && subTaskInstance.status !== 'completed') { - this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, subTaskError.message || 'Unknown sub-task error.'); + this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, subTaskError.message || `Unknown error in sub-task ${subTask.subTaskId} wrapper.`); } } - } + // The finally block for decrementing currentlyActiveSubTasks and launching next is outside this wrapper, managed by the calling promise. + }; + + // This promise resolves when all sub-tasks have been initiated and completed. + await new Promise(resolveAllTasksCompleted => { + const launchNextSubTaskIfPossible = () => { + // This function is called to attempt to launch new sub-tasks + // whenever a slot might be free (initially, or when a task completes). + + while (currentlyActiveSubTasks < maxConcurrentSubTasks && currentSubTaskIndex < totalSubTasks) { + // There's a free slot and more tasks are pending. + const subTaskToProcess = task.subTasks[currentSubTaskIndex]; + const capturedIndexForLog = currentSubTaskIndex; // Capture index for logging inside promise callbacks + + console.info(`[TransfersService] Task ${taskId}: Queuing sub-task ${subTaskToProcess.subTaskId} (index ${capturedIndexForLog}). Active before launch: ${currentlyActiveSubTasks}, Max: ${maxConcurrentSubTasks}`); + + currentlyActiveSubTasks++; + currentSubTaskIndex++; // Increment index for the next task to be picked + + const taskPromise = processSingleSubTaskWrapper(subTaskToProcess, capturedIndexForLog) + .finally(() => { + currentlyActiveSubTasks--; + console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTaskToProcess.subTaskId} (index ${capturedIndexForLog}) finished. Active after completion: ${currentlyActiveSubTasks}.`); + + // Check if more tasks can be launched or if all are done. + if (currentSubTaskIndex < totalSubTasks) { + // Still more tasks in the main list, try to launch another. + launchNextSubTaskIfPossible(); + } else if (currentlyActiveSubTasks === 0) { + // All tasks from the main list have been initiated (currentSubTaskIndex === totalSubTasks) + // AND this was the last active task to complete. + console.info(`[TransfersService] Task ${taskId}: All ${totalSubTasks} sub-tasks have completed processing.`); + resolveAllTasksCompleted(); + } + // If currentSubTaskIndex === totalSubTasks but currentlyActiveSubTasks > 0, + // other tasks are still running, so we wait for them to call this .finally block. + }); + subTaskExecutionPromises.push(taskPromise); // Store for potential later inspection if needed + } + + // This condition handles the case where all tasks have been queued and all have finished. + // It's primarily triggered by the .finally() block of the last completing task. + if (currentSubTaskIndex === totalSubTasks && currentlyActiveSubTasks === 0) { + if (totalSubTasks > 0) { // Only log and resolve if there were tasks to begin with + console.info(`[TransfersService] Task ${taskId}: All ${totalSubTasks} sub-tasks have been launched and have completed.`); + } else { + console.info(`[TransfersService] Task ${taskId}: No sub-tasks were present to process.`); + } + resolveAllTasksCompleted(); + } + }; + + if (totalSubTasks === 0) { + console.info(`[TransfersService] Task ${taskId}: No sub-tasks to process.`); + resolveAllTasksCompleted(); // No tasks, so resolve immediately. + return; + } + + console.info(`[TransfersService] Task ${taskId}: Initiating processing. Will launch up to ${maxConcurrentSubTasks} sub-tasks concurrently.`); + launchNextSubTaskIfPossible(); // Start the process by launching the initial set of concurrent tasks. + }); + + console.info(`[TransfersService] Task ${taskId}: Concurrent sub-task processing phase finished for ${totalSubTasks} sub-tasks.`); } catch (error: any) { console.error(`[TransfersService] Major error processing task ${taskId}:`, error); this.updateOverallTaskStatus(taskId, 'failed', error.message || 'Failed to process task due to a major error.');