Update transfers.service.ts
This commit is contained in:
@@ -12,6 +12,7 @@ import type { ConnectionWithTags, DecryptedConnectionCredentials } from '../type
|
||||
export class TransfersService {
|
||||
private transferTasks: Map<string, TransferTask> = new Map();
|
||||
private readonly TEMP_KEY_PREFIX = 'nexus_target_key_';
|
||||
private readonly MAX_CONCURRENT_SUB_TASKS = 5; // Maximum concurrent sub-tasks
|
||||
|
||||
constructor() {
|
||||
console.info('[TransfersService] Initialized.');
|
||||
@@ -116,42 +117,123 @@ export class TransfersService {
|
||||
.connect(sourceConnectConfig);
|
||||
});
|
||||
|
||||
for (const subTask of task.subTasks) {
|
||||
// New concurrent processing logic for sub-tasks
|
||||
const subTaskExecutionPromises: Promise<void>[] = []; // Stores promises for all initiated sub-tasks
|
||||
let currentlyActiveSubTasks = 0;
|
||||
const maxConcurrentSubTasks = this.MAX_CONCURRENT_SUB_TASKS;
|
||||
let currentSubTaskIndex = 0; // Points to the next sub-task in task.subTasks to be processed
|
||||
const totalSubTasks = task.subTasks.length;
|
||||
|
||||
console.info(`[TransfersService] Task ${taskId}: Starting to process ${totalSubTasks} sub-tasks with max concurrency of ${maxConcurrentSubTasks}.`);
|
||||
|
||||
// Wrapper function to process a single sub-task and manage active counts
|
||||
const processSingleSubTaskWrapper = async (subTask: TransferSubTask, subTaskIndexForLog: number): Promise<void> => {
|
||||
console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (index ${subTaskIndexForLog}) started. Active: ${currentlyActiveSubTasks}/${maxConcurrentSubTasks}`);
|
||||
|
||||
const currentSourceItem = task.payload.sourceItems.find(it => it.name === subTask.sourceItemName);
|
||||
if (!currentSourceItem) {
|
||||
this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Source item '${subTask.sourceItemName}' not found in payload.`);
|
||||
continue;
|
||||
console.warn(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${subTask.sourceItemName}) - Source item not found.`);
|
||||
return; // This sub-task cannot proceed
|
||||
}
|
||||
|
||||
try {
|
||||
this.updateSubTaskStatus(taskId, subTask.subTaskId, 'connecting', undefined, `Preparing transfer for ${currentSourceItem.name} to target ID ${subTask.connectionId}`);
|
||||
console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) - Connecting to target ID ${subTask.connectionId}.`);
|
||||
|
||||
const targetConnectionResult = await getConnectionWithDecryptedCredentials(subTask.connectionId);
|
||||
|
||||
if (!targetConnectionResult || !targetConnectionResult.connection) {
|
||||
this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Target connection with ID ${subTask.connectionId} not found.`);
|
||||
continue;
|
||||
console.warn(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) - Target connection ID ${subTask.connectionId} not found.`);
|
||||
return; // This sub-task cannot proceed
|
||||
}
|
||||
const { connection: targetConnection, ...targetCredentials } = targetConnectionResult;
|
||||
|
||||
// sourceSshClient is established before this loop and should be valid.
|
||||
// Pass it with non-null assertion if TypeScript complains, or ensure it's correctly scoped.
|
||||
await this.executeRemoteTransferOnSource(
|
||||
taskId,
|
||||
subTask.subTaskId,
|
||||
sourceSshClient,
|
||||
sourceConnection, // For logging/info if needed
|
||||
sourceSshClient!,
|
||||
sourceConnection,
|
||||
currentSourceItem,
|
||||
targetConnection,
|
||||
targetCredentials,
|
||||
task.payload.remoteTargetPath,
|
||||
task.payload.transferMethod
|
||||
);
|
||||
console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) - Successfully processed by executeRemoteTransferOnSource.`);
|
||||
} catch (subTaskError: any) {
|
||||
console.error(`[TransfersService] Error in sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}):`, subTaskError);
|
||||
console.error(`[TransfersService] Task ${taskId}: Error in sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}) wrapper:`, subTaskError.message, subTaskError.stack);
|
||||
const subTaskInstance = task.subTasks.find(st => st.subTaskId === subTask.subTaskId);
|
||||
if (subTaskInstance && subTaskInstance.status !== 'failed' && subTaskInstance.status !== 'completed') {
|
||||
this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, subTaskError.message || 'Unknown sub-task error.');
|
||||
this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, subTaskError.message || `Unknown error in sub-task ${subTask.subTaskId} wrapper.`);
|
||||
}
|
||||
}
|
||||
// The finally block for decrementing currentlyActiveSubTasks and launching next is outside this wrapper, managed by the calling promise.
|
||||
};
|
||||
|
||||
// This promise resolves when all sub-tasks have been initiated and completed.
|
||||
await new Promise<void>(resolveAllTasksCompleted => {
|
||||
const launchNextSubTaskIfPossible = () => {
|
||||
// This function is called to attempt to launch new sub-tasks
|
||||
// whenever a slot might be free (initially, or when a task completes).
|
||||
|
||||
while (currentlyActiveSubTasks < maxConcurrentSubTasks && currentSubTaskIndex < totalSubTasks) {
|
||||
// There's a free slot and more tasks are pending.
|
||||
const subTaskToProcess = task.subTasks[currentSubTaskIndex];
|
||||
const capturedIndexForLog = currentSubTaskIndex; // Capture index for logging inside promise callbacks
|
||||
|
||||
console.info(`[TransfersService] Task ${taskId}: Queuing sub-task ${subTaskToProcess.subTaskId} (index ${capturedIndexForLog}). Active before launch: ${currentlyActiveSubTasks}, Max: ${maxConcurrentSubTasks}`);
|
||||
|
||||
currentlyActiveSubTasks++;
|
||||
currentSubTaskIndex++; // Increment index for the next task to be picked
|
||||
|
||||
const taskPromise = processSingleSubTaskWrapper(subTaskToProcess, capturedIndexForLog)
|
||||
.finally(() => {
|
||||
currentlyActiveSubTasks--;
|
||||
console.info(`[TransfersService] Task ${taskId}: Sub-task ${subTaskToProcess.subTaskId} (index ${capturedIndexForLog}) finished. Active after completion: ${currentlyActiveSubTasks}.`);
|
||||
|
||||
// Check if more tasks can be launched or if all are done.
|
||||
if (currentSubTaskIndex < totalSubTasks) {
|
||||
// Still more tasks in the main list, try to launch another.
|
||||
launchNextSubTaskIfPossible();
|
||||
} else if (currentlyActiveSubTasks === 0) {
|
||||
// All tasks from the main list have been initiated (currentSubTaskIndex === totalSubTasks)
|
||||
// AND this was the last active task to complete.
|
||||
console.info(`[TransfersService] Task ${taskId}: All ${totalSubTasks} sub-tasks have completed processing.`);
|
||||
resolveAllTasksCompleted();
|
||||
}
|
||||
// If currentSubTaskIndex === totalSubTasks but currentlyActiveSubTasks > 0,
|
||||
// other tasks are still running, so we wait for them to call this .finally block.
|
||||
});
|
||||
subTaskExecutionPromises.push(taskPromise); // Store for potential later inspection if needed
|
||||
}
|
||||
|
||||
// This condition handles the case where all tasks have been queued and all have finished.
|
||||
// It's primarily triggered by the .finally() block of the last completing task.
|
||||
if (currentSubTaskIndex === totalSubTasks && currentlyActiveSubTasks === 0) {
|
||||
if (totalSubTasks > 0) { // Only log and resolve if there were tasks to begin with
|
||||
console.info(`[TransfersService] Task ${taskId}: All ${totalSubTasks} sub-tasks have been launched and have completed.`);
|
||||
} else {
|
||||
console.info(`[TransfersService] Task ${taskId}: No sub-tasks were present to process.`);
|
||||
}
|
||||
resolveAllTasksCompleted();
|
||||
}
|
||||
};
|
||||
|
||||
if (totalSubTasks === 0) {
|
||||
console.info(`[TransfersService] Task ${taskId}: No sub-tasks to process.`);
|
||||
resolveAllTasksCompleted(); // No tasks, so resolve immediately.
|
||||
return;
|
||||
}
|
||||
|
||||
console.info(`[TransfersService] Task ${taskId}: Initiating processing. Will launch up to ${maxConcurrentSubTasks} sub-tasks concurrently.`);
|
||||
launchNextSubTaskIfPossible(); // Start the process by launching the initial set of concurrent tasks.
|
||||
});
|
||||
|
||||
console.info(`[TransfersService] Task ${taskId}: Concurrent sub-task processing phase finished for ${totalSubTasks} sub-tasks.`);
|
||||
} catch (error: any) {
|
||||
console.error(`[TransfersService] Major error processing task ${taskId}:`, error);
|
||||
this.updateOverallTaskStatus(taskId, 'failed', error.message || 'Failed to process task due to a major error.');
|
||||
|
||||
Reference in New Issue
Block a user