From 9be252bf2d654d7f0a8d12b3c66900402a292577 Mon Sep 17 00:00:00 2001 From: Baobhan Sith <80159437+Heavrnl@users.noreply.github.com> Date: Fri, 16 May 2025 17:42:52 +0800 Subject: [PATCH] update --- .../src/transfers/transfers.service.ts | 861 ++++++++++-------- .../backend/src/transfers/transfers.types.ts | 5 +- .../backend/src/types/connection.types.ts | 8 +- .../src/components/FileManagerContextMenu.vue | 13 +- .../src/components/SendFilesModal.vue | 11 +- 5 files changed, 504 insertions(+), 394 deletions(-) diff --git a/packages/backend/src/transfers/transfers.service.ts b/packages/backend/src/transfers/transfers.service.ts index 793a26d..c46b805 100644 --- a/packages/backend/src/transfers/transfers.service.ts +++ b/packages/backend/src/transfers/transfers.service.ts @@ -1,16 +1,17 @@ -import { spawn } from 'child_process'; -import * as fs from 'fs'; +import * as fs from 'fs/promises'; import * as os from 'os'; import * as path from 'path'; import * as crypto from 'crypto'; import { v4 as uuidv4 } from 'uuid'; // 用于生成唯一ID +import { Client, ConnectConfig, SFTPWrapper } from 'ssh2'; import { InitiateTransferPayload, TransferTask, TransferSubTask } from './transfers.types'; import { getConnectionWithDecryptedCredentials } from '../services/connection.service'; -import type { ConnectionWithTags } from '../types/connection.types'; +import type { ConnectionWithTags, DecryptedConnectionCredentials } from '../types/connection.types'; // import { logger } from '../utils/logger'; // 假设的日志工具路径 export class TransfersService { private transferTasks: Map = new Map(); + private readonly TEMP_KEY_PREFIX = 'nexus_target_key_'; constructor() { console.info('[TransfersService] Initialized.'); @@ -21,13 +22,14 @@ export class TransfersService { const now = new Date(); const subTasks: TransferSubTask[] = []; - for (const connectionId of payload.connectionIds) { - for (const item of payload.sourceItems) { + // 每个 (目标服务器, 源文件) 组合都是一个子任务 + for (const connectionId of payload.connectionIds) { // 目标服务器ID列表 + for (const item of payload.sourceItems) { // 源服务器上的文件/目录列表 const subTaskId = uuidv4(); subTasks.push({ subTaskId, - connectionId, - sourceItemName: item.name, + connectionId, // 这是目标服务器的ID + sourceItemName: item.name, // 源文件的名称,用于标识 status: 'queued', startTime: now, }); @@ -37,26 +39,47 @@ export class TransfersService { const newTask: TransferTask = { taskId, status: 'queued', - userId, // 添加 userId + userId, createdAt: now, updatedAt: now, subTasks, - payload, + payload, // payload 包含 sourceConnectionId }; this.transferTasks.set(taskId, newTask); - console.info(`[TransfersService] New transfer task created: ${taskId} with ${subTasks.length} sub-tasks.`); + console.info(`[TransfersService] New transfer task created: ${taskId} for source ${payload.sourceConnectionId} with ${subTasks.length} sub-tasks.`); // 异步启动传输,不阻塞当前请求 this.processTransferTask(taskId).catch(error => { console.error(`[TransfersService] Error processing task ${taskId} in background:`, error); - // 可能需要更新父任务状态为 failed this.updateOverallTaskStatus(taskId, 'failed', `Background processing error: ${error.message}`); }); return { ...newTask }; // 返回任务的副本 } + private buildSshConnectConfig( + connectionInfo: ConnectionWithTags, + credentials: DecryptedConnectionCredentials + ): ConnectConfig { + const config: ConnectConfig = { + host: connectionInfo.host, + port: connectionInfo.port || 22, + username: connectionInfo.username, + readyTimeout: 20000, // 20 seconds + keepaliveInterval: 10000, // 10 seconds + }; + if (connectionInfo.auth_method === 'password' && credentials.decryptedPassword) { + config.password = credentials.decryptedPassword; + } else if (connectionInfo.auth_method === 'key' && credentials.decryptedPrivateKey) { + config.privateKey = credentials.decryptedPrivateKey; + if (credentials.decryptedPassphrase) { + config.passphrase = credentials.decryptedPassphrase; + } + } + return config; + } + private async processTransferTask(taskId: string): Promise { const task = this.transferTasks.get(taskId); if (!task) { @@ -65,98 +88,388 @@ export class TransfersService { } this.updateOverallTaskStatus(taskId, 'in-progress'); + let sourceSshClient: Client | undefined; - for (const subTask of task.subTasks) { - let tempKeyPath: string | undefined; - try { - this.updateSubTaskStatus(taskId, subTask.subTaskId, 'connecting'); - const connectionResult = await getConnectionWithDecryptedCredentials(subTask.connectionId); + try { + const sourceConnectionResult = await getConnectionWithDecryptedCredentials(task.payload.sourceConnectionId); + if (!sourceConnectionResult || !sourceConnectionResult.connection) { + throw new Error(`Source connection with ID ${task.payload.sourceConnectionId} not found or inaccessible.`); + } + const { connection: sourceConnection, ...sourceCredentials } = sourceConnectionResult; - if (!connectionResult || !connectionResult.connection) { - this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Connection with ID ${subTask.connectionId} not found or inaccessible.`); + sourceSshClient = new Client(); + const sourceConnectConfig = this.buildSshConnectConfig(sourceConnection, sourceCredentials); + + await new Promise((resolve, reject) => { + sourceSshClient! + .on('ready', () => { + console.info(`[TransfersService] SSH connection established to source server ${sourceConnection.host} for task ${taskId}.`); + resolve(); + }) + .on('error', (err) => { + console.error(`[TransfersService] SSH connection error to source server ${sourceConnection.host} for task ${taskId}:`, err); + reject(err); + }) + .on('close', () => { + console.info(`[TransfersService] SSH connection closed to source server ${sourceConnection.host} for task ${taskId}.`); + }) + .connect(sourceConnectConfig); + }); + + for (const subTask of task.subTasks) { + const currentSourceItem = task.payload.sourceItems.find(it => it.name === subTask.sourceItemName); + if (!currentSourceItem) { + this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Source item '${subTask.sourceItemName}' not found in payload.`); continue; } - const { connection, decryptedPassword, decryptedPrivateKey, decryptedPassphrase } = connectionResult; - if (connection.auth_method === 'key' && decryptedPrivateKey) { - try { - const tempDir = os.tmpdir(); - const randomFileName = `nexus_tmp_key_${crypto.randomBytes(6).toString('hex')}`; - tempKeyPath = path.join(tempDir, randomFileName); - await fs.promises.writeFile(tempKeyPath, decryptedPrivateKey, { mode: 0o600 }); - console.info(`[TransfersService] Temporary private key created at ${tempKeyPath} for sub-task ${subTask.subTaskId}`); - } catch (keyError: any) { - console.error(`[TransfersService] Failed to prepare private key for sub-task ${subTask.subTaskId}:`, keyError); - this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Failed to prepare private key: ${keyError.message}`); - if (tempKeyPath) { - try { await fs.promises.unlink(tempKeyPath); } catch (e) { console.error(`[TransfersService] Error cleaning up partially created temp key ${tempKeyPath}:`, e); } - } - tempKeyPath = undefined; + try { + this.updateSubTaskStatus(taskId, subTask.subTaskId, 'connecting', undefined, `Preparing transfer for ${currentSourceItem.name} to target ID ${subTask.connectionId}`); + const targetConnectionResult = await getConnectionWithDecryptedCredentials(subTask.connectionId); + + if (!targetConnectionResult || !targetConnectionResult.connection) { + this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Target connection with ID ${subTask.connectionId} not found.`); continue; } - } + const { connection: targetConnection, ...targetCredentials } = targetConnectionResult; - const sourceItem = task.payload.sourceItems.find(s => s.name === subTask.sourceItemName); - if (!sourceItem) { - this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, `Source item ${subTask.sourceItemName} not found in payload.`); - // No 'continue' here, let finally block handle tempKeyPath cleanup if it was created - // However, if sourceItem is not found, we should not proceed with transfer commands. - // The 'continue' implies we might have created a temp key that needs cleanup *before* continuing. - // So, cleanup and continue pattern is better. - if (tempKeyPath) { - try { await fs.promises.unlink(tempKeyPath); console.info(`[TransfersService] Temporary private key ${tempKeyPath} deleted after source item not found.`);} - catch (e) { console.error(`[TransfersService] Error cleaning temp key ${tempKeyPath} after source item not found:`, e); } - tempKeyPath = undefined; // Ensure it is not used/cleaned again in finally - } - continue; - } - - const determinedMethod = await this.determineTransferCommand( - connection, - task.payload.transferMethod, - connection.host, - tempKeyPath, // Pass tempKeyPath (which is undefined if not key auth or error) - decryptedPassphrase - ); - this.updateSubTaskStatus(taskId, subTask.subTaskId, 'transferring', 0, `Using ${determinedMethod}.`); - subTask.transferMethodUsed = determinedMethod; - - if (determinedMethod === 'rsync') { - await this.executeRsync(taskId, subTask.subTaskId, connection, sourceItem.path, task.payload.remoteTargetPath, sourceItem.type === 'directory', decryptedPassword, tempKeyPath, decryptedPassphrase); - } else { // scp - await this.executeScp(taskId, subTask.subTaskId, connection, sourceItem.path, task.payload.remoteTargetPath, sourceItem.type === 'directory', decryptedPassword, tempKeyPath, decryptedPassphrase); - } - } catch (error: any) { - console.error(`[TransfersService] Error processing sub-task ${subTask.subTaskId} for task ${taskId}:`, error); - // Avoid double-updating status if it was already set to failed due to key prep error - const currentSubTask = task.subTasks.find(st => st.subTaskId === subTask.subTaskId); - if (currentSubTask && currentSubTask.status !== 'failed') { - this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, error.message || 'Unknown error during sub-task processing.'); - } - } finally { - if (tempKeyPath) { - try { - await fs.promises.unlink(tempKeyPath); - console.info(`[TransfersService] Temporary private key ${tempKeyPath} deleted for sub-task ${subTask.subTaskId}`); - } catch (cleanupError) { - console.error(`[TransfersService] Error cleaning up temporary private key ${tempKeyPath} for sub-task ${subTask.subTaskId}:`, cleanupError); + await this.executeRemoteTransferOnSource( + taskId, + subTask.subTaskId, + sourceSshClient, + sourceConnection, // For logging/info if needed + currentSourceItem, + targetConnection, + targetCredentials, + task.payload.remoteTargetPath, + task.payload.transferMethod + ); + } catch (subTaskError: any) { + console.error(`[TransfersService] Error in sub-task ${subTask.subTaskId} (item: ${currentSourceItem.name}):`, subTaskError); + const subTaskInstance = task.subTasks.find(st => st.subTaskId === subTask.subTaskId); + if (subTaskInstance && subTaskInstance.status !== 'failed' && subTaskInstance.status !== 'completed') { + this.updateSubTaskStatus(taskId, subTask.subTaskId, 'failed', undefined, subTaskError.message || 'Unknown sub-task error.'); } } } + } catch (error: any) { + console.error(`[TransfersService] Major error processing task ${taskId}:`, error); + this.updateOverallTaskStatus(taskId, 'failed', error.message || 'Failed to process task due to a major error.'); + } finally { + if (sourceSshClient) { // No .readable property, just call end() + sourceSshClient.end(); + console.info(`[TransfersService] SSH connection to source server explicitly closed for task ${taskId}.`); + } + this.finalizeOverallTaskStatus(taskId); } - this.finalizeOverallTaskStatus(taskId); } + private async checkCommandOnSource(client: Client, command: string): Promise { + return new Promise((resolve) => { + client.exec(`command -v ${command}`, (err, stream) => { + if (err) { + console.warn(`[TransfersService] Error checking for command '${command}' on source:`, err); + return resolve(false); + } + let stdout = ''; + stream + .on('data', (data: Buffer) => stdout += data.toString()) + .on('close', (code: number) => { + resolve(code === 0 && stdout.trim() !== ''); + }) + .stderr.on('data', (data: Buffer) => { + console.warn(`[TransfersService] STDERR checking for command '${command}' on source: ${data.toString()}`); + }); + }); + }); + } + private async uploadKeyToSourceViaSftp(client: Client, privateKeyContent: string, remotePath: string): Promise { + return new Promise((resolve, reject) => { + client.sftp((err, sftp) => { + if (err) return reject(new Error(`SFTP session error for key upload: ${err.message}`)); + const stream = sftp.createWriteStream(remotePath, { mode: 0o600 }); + stream.on('error', (writeErr: Error) => { // Added Error type for writeErr + sftp.end(); + reject(new Error(`Failed to write key to ${remotePath} on source: ${writeErr.message}`)) + }); + stream.on('finish', () => { // 'close' might be more reliable with sftp streams + console.info(`[TransfersService] Private key for target successfully uploaded to source at ${remotePath}`); + sftp.end(); // Ensure sftp session is closed after operation + resolve(); + }); + stream.end(privateKeyContent); + }); + }); + } + + private async deleteFileOnSourceViaSftp(client: Client, remotePath: string): Promise { + return new Promise((resolve, reject) => { + client.sftp((err, sftp) => { + if (err) return reject(new Error(`SFTP session error for key deletion: ${err.message}`)); + sftp.unlink(remotePath, (unlinkErr) => { + sftp.end(); // Ensure sftp session is closed + if (unlinkErr) { + // Log but don't necessarily reject if file just wasn't there (though it should be) + console.warn(`[TransfersService] Failed to delete temporary key ${remotePath} from source:`, unlinkErr); + return reject(new Error(`Failed to delete ${remotePath} from source: ${unlinkErr.message}`)); + } + console.info(`[TransfersService] Temporary key ${remotePath} deleted from source.`); + resolve(); + }); + }); + }); + } + + private escapeShellArg(arg: string): string { + // Basic escaping for paths and arguments. More robust escaping might be needed. + return `'${arg.replace(/'/g, "'\\''")}'`; + } + + private buildTransferCommandString( + sourceItemPathOnA: string, // Absolute path on source A + isDir: boolean, + targetConnection: ConnectionWithTags, // Target B connection details + targetPathOnB: string, // Base remote target path on B + transferCmd: 'scp' | 'rsync', + options: { // Options derived from checking source A and target B auth + sshPassCommand?: string; // e.g., "sshpass -p 'password'" + sshIdentityFileOption?: string; // e.g., "-i /tmp/key_B_XYZ" + targetUserAndHost: string; // e.g., "userB@hostB.com" + sshPortOption?: string; // e.g., "-P 2222" for scp, or part of rsync's -e 'ssh -p 2222' + } + ): string { + const remoteBase = targetPathOnB.endsWith('/') ? targetPathOnB : `${targetPathOnB}/`; + const remoteFullDest = `${options.targetUserAndHost}:${this.escapeShellArg(remoteBase)}`; // SCP/Rsync will append filename if source is file + + let commandParts: string[] = []; + if (options.sshPassCommand) { + commandParts.push(options.sshPassCommand); + } + + if (transferCmd === 'rsync') { + commandParts.push('rsync -avz --progress'); + let sshArgsForRsync = `ssh`; + if (options.sshPortOption && options.sshPortOption.startsWith('-p')) { // for rsync -e "ssh -p XXX" + sshArgsForRsync += ` ${options.sshPortOption}`; + } + if (options.sshIdentityFileOption) { + sshArgsForRsync += ` ${options.sshIdentityFileOption}`; + } + commandParts.push(`-e "${sshArgsForRsync.trim()}"`); + + let rsyncSourcePath = this.escapeShellArg(sourceItemPathOnA); + if (isDir && !rsyncSourcePath.endsWith('/\'')) { // if escaped and ends with /' + rsyncSourcePath = rsyncSourcePath.slice(0, -1) + '/\''; // Add trailing slash for rsync dir content copy + } + commandParts.push(rsyncSourcePath); + commandParts.push(remoteFullDest); + + } else { // scp + commandParts.push('scp'); + if (isDir) commandParts.push('-r'); + if (options.sshPortOption && options.sshPortOption.startsWith('-P')) { // for scp -P XXX + commandParts.push(options.sshPortOption); + } + if (options.sshIdentityFileOption) { + commandParts.push(options.sshIdentityFileOption); + } + commandParts.push(this.escapeShellArg(sourceItemPathOnA)); + commandParts.push(remoteFullDest); + } + return commandParts.join(' '); + } + + private async executeRemoteTransferOnSource( + taskId: string, + subTaskId: string, + sourceSshClient: Client, + sourceConnectionForInfo: ConnectionWithTags, // unused, but good for context if needed + sourceItem: { name: string; path: string; type: 'file' | 'directory' }, + targetConnection: ConnectionWithTags, + targetCredentials: DecryptedConnectionCredentials, + remoteTargetPathOnTarget: string, // This is the base directory on target + transferMethodPreference: 'auto' | 'rsync' | 'scp' + ): Promise { + this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 0, `Initializing remote transfer for ${sourceItem.name}`); + let tempTargetKeyPathOnSource: string | undefined; // Path of target's private key if temporarily on source A + + try { + const sshpassAvailableOnSource = await this.checkCommandOnSource(sourceSshClient, 'sshpass'); + const rsyncAvailableOnSource = await this.checkCommandOnSource(sourceSshClient, 'rsync'); + + let determinedTransferCmd: 'scp' | 'rsync' = 'scp'; // Default to scp + if (transferMethodPreference === 'rsync' && rsyncAvailableOnSource) { + determinedTransferCmd = 'rsync'; + } else if (transferMethodPreference === 'rsync' && !rsyncAvailableOnSource) { + this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, `Rsync preferred but not available on source server. Sub-task for ${sourceItem.name} failed.`); + throw new Error('Rsync preferred but not available on source server.'); + } else if (transferMethodPreference === 'auto') { + determinedTransferCmd = rsyncAvailableOnSource ? 'rsync' : 'scp'; + } + this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 5, `Using ${determinedTransferCmd}. Source SSHPass: ${sshpassAvailableOnSource}, Rsync: ${rsyncAvailableOnSource}`); + const subTaskToUpdate = this.transferTasks.get(taskId)?.subTasks.find(st => st.subTaskId === subTaskId); + if (subTaskToUpdate) subTaskToUpdate.transferMethodUsed = determinedTransferCmd; + + + const cmdOptions: any = { + targetUserAndHost: `${targetConnection.username}@${targetConnection.host}`, + sshPortOption: targetConnection.port ? (determinedTransferCmd === 'scp' ? `-P ${targetConnection.port}`: `-p ${targetConnection.port}`) : undefined, + }; + + if (targetConnection.auth_method === 'key' && targetCredentials.decryptedPrivateKey) { + const randomSuffix = crypto.randomBytes(6).toString('hex'); + tempTargetKeyPathOnSource = path.posix.join('/tmp', `${this.TEMP_KEY_PREFIX}${randomSuffix}`); // Use posix path for remote systems + + await this.uploadKeyToSourceViaSftp(sourceSshClient, targetCredentials.decryptedPrivateKey, tempTargetKeyPathOnSource); + cmdOptions.sshIdentityFileOption = `-i ${this.escapeShellArg(tempTargetKeyPathOnSource)}`; + + if (targetCredentials.decryptedPassphrase) { + if (sshpassAvailableOnSource) { + cmdOptions.sshPassCommand = `sshpass -p ${this.escapeShellArg(targetCredentials.decryptedPassphrase)}`; + } else { + const msg = `Target key has passphrase, but sshpass is not available on source server for ${sourceItem.name}.`; + this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, msg); + throw new Error(msg); + } + } + } else if (targetConnection.auth_method === 'password' && targetCredentials.decryptedPassword) { + if (sshpassAvailableOnSource) { + cmdOptions.sshPassCommand = `sshpass -p ${this.escapeShellArg(targetCredentials.decryptedPassword)}`; + } else { + const msg = `Target uses password auth, but sshpass is not available on source server for ${sourceItem.name}.`; + this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, msg); + throw new Error(msg); + } + } else if (targetConnection.auth_method === 'key' && !targetCredentials.decryptedPrivateKey) { + const msg = `Target connection ${targetConnection.name} is key-based but no private key found. Sub-task for ${sourceItem.name} failed.`; + this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, msg); + throw new Error(msg); + } + + + const commandToExecute = this.buildTransferCommandString( + sourceItem.path, + sourceItem.type === 'directory', + targetConnection, + remoteTargetPathOnTarget, + determinedTransferCmd, + cmdOptions + ); + + console.info(`[TransfersService] Executing on source for sub-task ${subTaskId} (item: ${sourceItem.name}): ${commandToExecute}`); + this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 10, `Executing: ${determinedTransferCmd} from source to ${targetConnection.name}`); + + const COMMAND_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes timeout for command execution + + await new Promise((resolveCmd, rejectCmd) => { + let commandTimedOut = false; + const timeoutHandle = setTimeout(() => { + commandTimedOut = true; + const timeoutMsg = `${determinedTransferCmd} command for ${sourceItem.name} timed out after ${COMMAND_TIMEOUT_MS / 1000}s.`; + console.warn(`[TransfersService] ${timeoutMsg} (Sub-task: ${subTaskId})`); + // Attempt to close the stream, though it might not always work if process is stuck hard + // stream?.close(); // stream is not in this scope yet. + rejectCmd(new Error(timeoutMsg)); + }, COMMAND_TIMEOUT_MS); + + const execOptions: { pty?: boolean } = {}; + if (cmdOptions.sshPassCommand) { // Only use PTY if sshpass is involved + execOptions.pty = true; + } + + sourceSshClient.exec(commandToExecute, execOptions, (err, stream) => { + if (commandTimedOut) { // If timeout already fired, don't process stream events + stream?.close(); // Try to close the stream if exec cb somehow still runs + return; + } + if (err) { + clearTimeout(timeoutHandle); + return rejectCmd(new Error(`Failed to execute command on source: ${err.message}`)); + } + + let stdoutCombined = ''; + let stderrCombined = ''; + + stream.on('data', (data: Buffer) => { + if (commandTimedOut) return; + const output = data.toString(); + stdoutCombined += output; + console.debug(`[TransfersService] CMD STDOUT (sub-task ${subTaskId}, item ${sourceItem.name}): ${output.trim()}`); + if (determinedTransferCmd === 'rsync') { + const progressMatch = output.match(/(\d+)%/); + if (progressMatch && progressMatch[1]) { + this.updateSubTaskStatus(taskId, subTaskId, 'transferring', parseInt(progressMatch[1], 10)); + } + } else { + this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 50, 'SCP in progress...'); + } + }); + + stream.stderr.on('data', (data: Buffer) => { + if (commandTimedOut) return; + const errorOutput = data.toString(); + stderrCombined += errorOutput; + console.warn(`[TransfersService] CMD STDERR (sub-task ${subTaskId}, item ${sourceItem.name}): ${errorOutput.trim()}`); + }); + + stream.on('close', (code: number | null, signal?: string) => { + clearTimeout(timeoutHandle); + if (commandTimedOut) return; // Already handled by timeout + + if (code === 0) { + this.updateSubTaskStatus(taskId, subTaskId, 'completed', 100, `${determinedTransferCmd} successful for ${sourceItem.name} to ${targetConnection.name}.`); + resolveCmd(); + } else { + const errorMsg = `${determinedTransferCmd} failed for ${sourceItem.name} to ${targetConnection.name}. Exit code: ${code}, signal: ${signal}. Stderr: ${stderrCombined.trim()}`; + this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, errorMsg); + rejectCmd(new Error(errorMsg)); + } + }); + + stream.on('error', (streamErr: Error) => { // Should not happen if exec cb err is null + clearTimeout(timeoutHandle); + if (commandTimedOut) return; + + const errorMsg = `Stream error during ${determinedTransferCmd} for ${sourceItem.name}: ${streamErr.message}`; + this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, errorMsg); + rejectCmd(streamErr); + }); + }); + }); + + } catch (error: any) { + // This will catch errors from checks, key upload, or the command execution promise + console.error(`[TransfersService] executeRemoteTransferOnSource error for sub-task ${subTaskId} (item: ${sourceItem.name}):`, error); + // Status should have been updated by the specific failure point, or update here if not already failed + const taskFromMap = this.transferTasks.get(taskId); + const currentSubTask = taskFromMap?.subTasks.find((st: TransferSubTask) => st.subTaskId === subTaskId); + if (currentSubTask && currentSubTask.status !== 'failed' && currentSubTask.status !== 'completed') { + this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, error.message || `Remote transfer execution failed for ${sourceItem.name}.`); + } + throw error; // Re-throw to be caught by processTransferTask's loop for this sub-task + } finally { + if (tempTargetKeyPathOnSource) { + try { + await this.deleteFileOnSourceViaSftp(sourceSshClient, tempTargetKeyPathOnSource); + } catch (cleanupError) { + console.warn(`[TransfersService] Failed to cleanup temp key ${tempTargetKeyPathOnSource} on source for sub-task ${subTaskId}:`, cleanupError); + // Log but don't fail the entire sub-task if it otherwise succeeded/failed clearly + } + } + } + } + + // --- Status Update and Retrieval Methods (largely unchanged) --- public async getTransferTaskDetails(taskId: string, userId: string | number): Promise { const task = this.transferTasks.get(taskId); console.debug(`[TransfersService] Retrieving details for task: ${taskId} for user: ${userId}`); if (task && task.userId === userId) { - return { ...task }; + return { ...task, subTasks: task.subTasks.map(st => ({...st})) }; // Return copies } if (task && task.userId !== userId) { console.warn(`[TransfersService] User ${userId} attempted to access task ${taskId} owned by ${task.userId}.`); - return null; // Or throw ForbiddenException + return null; } return null; } @@ -165,7 +478,7 @@ export class TransfersService { console.debug(`[TransfersService] Retrieving all transfer tasks for user: ${userId}.`); return Array.from(this.transferTasks.values()) .filter(task => task.userId === userId) - .map(task => ({ ...task })); + .map(task => ({ ...task, subTasks: task.subTasks.map(st => ({...st})) })); // Return copies } public updateSubTaskStatus( @@ -179,16 +492,21 @@ export class TransfersService { if (task) { const subTask = task.subTasks.find(st => st.subTaskId === subTaskId); if (subTask) { + // Prevent overwriting a final state with a transient one unless it's a retry mechanism (not implemented here) + if ((subTask.status === 'completed' || subTask.status === 'failed') && (newStatus !== 'completed' && newStatus !== 'failed')) { + console.warn(`[TransfersService] Attempted to update final sub-task ${subTaskId} status '${subTask.status}' to '${newStatus}'. Ignoring.`); + return; + } + subTask.status = newStatus; - if (progress !== undefined) subTask.progress = progress; + if (progress !== undefined) subTask.progress = Math.min(100, Math.max(0, progress)); // Clamp progress if (message !== undefined) subTask.message = message; - if (newStatus === 'completed' || newStatus === 'failed') { + if ((newStatus === 'completed' || newStatus === 'failed') && !subTask.endTime) { subTask.endTime = new Date(); } task.updatedAt = new Date(); - // 可能需要根据子任务状态更新父任务状态和进度 - this.updateOverallTaskStatusBasedOnSubTasks(taskId); - console.info(`[TransfersService] Sub-task ${subTaskId} for task ${taskId} updated: ${newStatus}, progress: ${progress}%, message: ${message}`); + this.updateOverallTaskStatusBasedOnSubTasks(taskId); // Important: update overall task + console.info(`[TransfersService] Sub-task ${subTaskId} (task ${taskId}) updated: ${newStatus}, progress: ${subTask.progress}%, msg: "${subTask.message}"`); } else { console.warn(`[TransfersService] Sub-task ${subTaskId} not found for task ${taskId} during status update.`); } @@ -200,15 +518,24 @@ export class TransfersService { private updateOverallTaskStatus(taskId: string, newStatus: TransferTask['status'], message?: string): void { const task = this.transferTasks.get(taskId); if (task) { + const isCurrentStatusFinal = task.status === 'completed' || task.status === 'failed' || task.status === 'partially-completed'; + // Check if newStatus is one of the transient states + const isNewStatusTransient = newStatus === 'queued' || newStatus === 'in-progress'; + + if (isCurrentStatusFinal && isNewStatusTransient) { + // If current status is final and new status is transient, ignore the update. + console.warn(`[TransfersService] Attempted to update final task ${taskId} status '${task.status}' to transient '${newStatus}'. Ignoring.`); + return; + } + + // Proceed with the update if: + // 1. Current status is not final. + // 2. Current status is final, and newStatus is also a final state (e.g., 'partially-completed' to 'failed'). task.status = newStatus; task.updatedAt = new Date(); - if (message && (newStatus === 'failed' || newStatus === 'partially-completed')) { - // Append to existing messages or set if none - task.payload.sourceItems.forEach(item => { // Simplified: maybe a task-level message array - // task.message = (task.message ? task.message + "; " : "") + message; - }); - } - console.info(`[TransfersService] Overall status for task ${taskId} updated to: ${newStatus}`); + // Overall task message could be an aggregation or just the first major error. + // For simplicity, not adding detailed message aggregation here. + console.info(`[TransfersService] Overall status for task ${taskId} directly updated to: ${newStatus}` + (message ? ` (Msg: ${message})` : '')); } } @@ -218,317 +545,75 @@ export class TransfersService { let completedCount = 0; let failedCount = 0; + let inProgressCount = 0; + let queuedCount = 0; let totalProgress = 0; - const activeSubTasks = task.subTasks.filter(st => st.status !== 'queued'); + const numSubTasks = task.subTasks.length; - - if (activeSubTasks.length === 0 && task.subTasks.length > 0) { - // If no subtasks have started processing, keep task as queued or in-progress if already set - if (task.status === 'queued') return; + if (numSubTasks === 0) { + task.overallProgress = 0; + // task.status remains as set by initiate or direct updateOverallTaskStatus if no subtasks. + return; } - task.subTasks.forEach(st => { - if (st.status === 'completed') { - completedCount++; - totalProgress += 100; - } else if (st.status === 'failed') { - failedCount++; - // Failed tasks contribute 0 to progress for simplicity, or 100 if considering them "done" - } else if (st.status === 'transferring' && st.progress !== undefined) { - totalProgress += st.progress; + switch (st.status) { + case 'completed': + completedCount++; + totalProgress += 100; + break; + case 'failed': + failedCount++; + // Failed tasks are "done" but contribute 0 to success progress. + // Depending on definition, they could count as 100 for task "completion" progress. + // Here, only successful completion adds to progress towards 100%. + break; + case 'transferring': + case 'connecting': // consider connecting as in-progress for overall status + inProgressCount++; + totalProgress += (st.progress !== undefined ? st.progress : (st.status === 'connecting' ? 5 : 0)); // Small progress for connecting + break; + case 'queued': + queuedCount++; + break; } - // 'queued' and 'connecting' don't add to progress here }); - if (task.subTasks.length > 0) { - task.overallProgress = Math.round(totalProgress / task.subTasks.length); + task.overallProgress = numSubTasks > 0 ? Math.round(totalProgress / numSubTasks) : 0; + + let newOverallStatus: TransferTask['status']; + if (failedCount === numSubTasks) { + newOverallStatus = 'failed'; + } else if (completedCount === numSubTasks) { + newOverallStatus = 'completed'; + } else if (failedCount > 0 && (completedCount + failedCount === numSubTasks)) { + newOverallStatus = 'partially-completed'; + } else if (inProgressCount > 0 || (queuedCount > 0 && (failedCount > 0 || completedCount > 0))) { + // If anything is in progress, or if some are queued while others are done/failed, it's in-progress + newOverallStatus = 'in-progress'; + } else if (queuedCount === numSubTasks) { + newOverallStatus = 'queued'; // All subtasks are still queued } else { - task.overallProgress = 0; + // Fallback or unexpected mixed state, treat as in-progress generally + // This case implies some completed, some queued, no failed, no in-progress items. + newOverallStatus = 'in-progress'; // Or 'partially-completed' if completedCount > 0 + if (completedCount > 0 && queuedCount > 0 && failedCount === 0 && inProgressCount === 0) { + newOverallStatus = 'partially-completed'; // More accurate for this specific mix + } } - - if (failedCount === task.subTasks.length && task.subTasks.length > 0) { - task.status = 'failed'; - } else if (completedCount === task.subTasks.length && task.subTasks.length > 0) { - task.status = 'completed'; - } else if (failedCount > 0 && (failedCount + completedCount) === task.subTasks.length) { - task.status = 'partially-completed'; - } else if (activeSubTasks.some(st => st.status === 'transferring' || st.status === 'connecting')) { - task.status = 'in-progress'; - } else if (task.subTasks.every(st => st.status === 'queued')) { - task.status = 'queued'; + + if (task.status !== newOverallStatus) { + console.info(`[TransfersService] Task ${taskId} overall status changing from ${task.status} to ${newOverallStatus} (P: ${task.overallProgress}%)`); + task.status = newOverallStatus; } - // else, if some are queued and others completed/failed, it might remain 'in-progress' or 'partially-completed' - // This logic might need refinement based on exact desired behavior for mixed states. - task.updatedAt = new Date(); - console.debug(`[TransfersService] Task ${taskId} overall progress: ${task.overallProgress}%, status: ${task.status}`); + // console.debug(`[TransfersService] Task ${taskId} overall progress: ${task.overallProgress}%, status: ${task.status}`); } private finalizeOverallTaskStatus(taskId: string): void { const task = this.transferTasks.get(taskId); if (!task) return; this.updateOverallTaskStatusBasedOnSubTasks(taskId); // Recalculate based on final sub-task states - console.info(`[TransfersService] Finalized overall status for task ${taskId}: ${task.status}`); - } - - - private async executeRsync( - taskId: string, - subTaskId: string, - connection: ConnectionWithTags, - sourcePath: string, - remoteBaseDestPath: string, - isDir: boolean, - decryptedPassword?: string, - privateKeyPath?: string, // Changed from decryptedPrivateKey - decryptedPassphrase?: string - ): Promise { - return new Promise((resolve, reject) => { - const { host, username, port, auth_method } = connection; - const remoteDest = `${username}@${host}:${remoteBaseDestPath.endsWith('/') ? remoteBaseDestPath : remoteBaseDestPath + '/'}`; - - let sshCommand = `ssh -p ${port || 22}`; - if (auth_method === 'key' && privateKeyPath) { - sshCommand += ` -i "${privateKeyPath}"`; // Use the provided temporary key path - } - - const rsyncArgs = [ - '-avz', - '--progress', - '-e', - sshCommand, - ]; - - if (isDir && !sourcePath.endsWith('/')) { - sourcePath += '/'; - } - rsyncArgs.push(sourcePath); - rsyncArgs.push(remoteDest); - - console.info(`[TransfersService] Executing rsync for sub-task ${subTaskId}: rsync ${rsyncArgs.join(' ')}`); - - let command = 'rsync'; - let finalArgs = rsyncArgs.filter(arg => arg); // Ensure no empty strings if sshCommand parts were conditional - - // Logic for sshpass with password auth remains as a comment/TODO, as per original - if (auth_method === 'password' && decryptedPassword) { - console.warn(`[TransfersService] Rsync with password authentication. Consider using sshpass if direct password input is needed and rsync/ssh doesn't prompt. Sub-task ${subTaskId} might fail if not configured for passwordless sudo or if sshpass is not used correctly.`); - // Example for sshpass (requires sshpass to be installed): - // command = 'sshpass'; - // finalArgs = ['-p', decryptedPassword, 'rsync', ...rsyncArgs.filter(arg => arg)]; - } else if (auth_method === 'key' && privateKeyPath && decryptedPassphrase) { - // If key (now a file path) has a passphrase, ssh itself will prompt or use ssh-agent. - // sshpass could be used here if ssh-agent is not an option and no TTY for prompt. - // console.warn(`[TransfersService] Rsync with passphrase-protected key. Ensure ssh-agent is configured or use sshpass if direct passphrase input is needed.`); - } - - const process = spawn(command, finalArgs); - - let stdoutData = ''; - let stderrData = ''; - - process.stdout.on('data', (data) => { - const output = data.toString(); - stdoutData += output; - const progressMatch = output.match(/(\d+)%/); - if (progressMatch && progressMatch[1]) { - this.updateSubTaskStatus(taskId, subTaskId, 'transferring', parseInt(progressMatch[1], 10)); - } - console.debug(`[TransfersService] Rsync STDOUT (sub-task ${subTaskId}): ${output.trim()}`); - }); - - process.stderr.on('data', (data) => { - stderrData += data.toString(); - console.warn(`[TransfersService] Rsync STDERR (sub-task ${subTaskId}): ${data.toString().trim()}`); - }); - - process.on('close', (code) => { - if (code === 0) { - this.updateSubTaskStatus(taskId, subTaskId, 'completed', 100, 'Rsync transfer successful.'); - console.info(`[TransfersService] Rsync completed successfully for sub-task ${subTaskId}.`); - resolve(); - } else { - const errorMessage = `Rsync failed with code ${code}. STDERR: ${stderrData.trim()} STDOUT: ${stdoutData.trim()}`; - this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, errorMessage); - console.error(`[TransfersService] Rsync failed for sub-task ${subTaskId}. Code: ${code}. Error: ${errorMessage}`); - reject(new Error(errorMessage)); - } - }); - - process.on('error', (err) => { - const errorMessage = `Rsync process error: ${err.message}`; - this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, errorMessage); - console.error(`[TransfersService] Rsync process error for sub-task ${subTaskId}:`, err); - reject(err); - }); - }); - } - - private async executeScp( - taskId: string, - subTaskId: string, - connection: ConnectionWithTags, - sourcePath: string, - remoteBaseDestPath: string, - isDir: boolean, - decryptedPassword?: string, - privateKeyPath?: string, // Changed from decryptedPrivateKey - decryptedPassphrase?: string - ): Promise { - const { host, username, port, auth_method } = connection; - // Source is on the remote server identified by 'connection' - const remoteSourceIdentifier = `${username}@${host}:${sourcePath}`; - - // Destination is local to the backend server. - // remoteBaseDestPath from payload is the local directory to save to. - const sourceFileName = path.basename(sourcePath); - // Ensure remoteBaseDestPath is treated as a directory for path.join - const localTargetDirectory = remoteBaseDestPath.endsWith(path.sep) ? remoteBaseDestPath : path.join(remoteBaseDestPath, path.sep); - const localTargetFullPath = path.join(localTargetDirectory, sourceFileName); - - try { - await fs.promises.mkdir(localTargetDirectory, { recursive: true }); - console.info(`[TransfersService] Ensured local destination directory exists: ${localTargetDirectory}`); - } catch (mkdirError: any) { - const errorMessage = `Failed to create local destination directory ${localTargetDirectory}: ${mkdirError.message}`; - console.error(`[TransfersService] ${errorMessage}`); - this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, errorMessage); - // Return a rejected promise directly, as the function is async - return Promise.reject(new Error(errorMessage)); - } - - return new Promise((resolve, reject) => { - const scpArgs = []; - if (port) scpArgs.push('-P', port.toString()); - if (auth_method === 'key' && privateKeyPath) { - scpArgs.push('-i', privateKeyPath); // Use the provided temporary key path - } - if (isDir) { // If the remote source is a directory, use -r - scpArgs.push('-r'); - } - - scpArgs.push(remoteSourceIdentifier); // Remote source - scpArgs.push(localTargetFullPath); // Local destination - - console.info(`[TransfersService] Executing SCP for sub-task ${subTaskId}: scp ${scpArgs.join(' ')}`); - - let command = 'scp'; - let finalArgs = [...scpArgs]; - - // Logic for sshpass with password auth remains as a comment/TODO, as per original - if (auth_method === 'password' && decryptedPassword) { - console.warn(`[TransfersService] SCP with password authentication. Consider using sshpass. Sub-task ${subTaskId} might fail if not configured for passwordless sudo or if sshpass is not used correctly.`); - // Example with sshpass (requires sshpass to be installed): - // command = 'sshpass'; - // finalArgs = ['-p', decryptedPassword, 'scp', ...scpArgs]; - } else if (auth_method === 'key' && privateKeyPath && decryptedPassphrase) { - // If key (now a file path) has a passphrase, scp/ssh itself will prompt or use ssh-agent. - // console.warn(`[TransfersService] SCP with passphrase-protected key. Ensure ssh-agent is configured or use sshpass if direct passphrase input is needed.`); - } - - const process = spawn(command, finalArgs); - let stderrData = ''; - let stdoutData = ''; - - process.stdout.on('data', (data) => { - stdoutData += data.toString(); - console.debug(`[TransfersService] SCP STDOUT (sub-task ${subTaskId}): ${data.toString().trim()}`); - this.updateSubTaskStatus(taskId, subTaskId, 'transferring', 50, 'SCP transfer in progress.'); - }); - - process.stderr.on('data', (data) => { - stderrData += data.toString(); - console.warn(`[TransfersService] SCP STDERR (sub-task ${subTaskId}): ${data.toString().trim()}`); - }); - - process.on('close', (code) => { - if (code === 0) { - this.updateSubTaskStatus(taskId, subTaskId, 'completed', 100, 'SCP transfer successful.'); - console.info(`[TransfersService] SCP completed successfully for sub-task ${subTaskId}.`); - resolve(); - } else { - const errorMessage = `SCP failed with code ${code}. STDERR: ${stderrData.trim()} STDOUT: ${stdoutData.trim()}`; - this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, errorMessage); - console.error(`[TransfersService] SCP failed for sub-task ${subTaskId}. Code: ${code}. Error: ${errorMessage}`); - reject(new Error(errorMessage)); - } - }); - - process.on('error', (err) => { - const errorMessage = `SCP process error: ${err.message}`; - this.updateSubTaskStatus(taskId, subTaskId, 'failed', undefined, errorMessage); - console.error(`[TransfersService] SCP process error for sub-task ${subTaskId}:`, err); - reject(err); - }); - }); - } - - private async determineTransferCommand( - connection: ConnectionWithTags, - method: 'auto' | 'rsync' | 'scp', - remoteHost: string, - privateKeyPath?: string, // Changed from decryptedPrivateKey - decryptedPassphrase?: string - ): Promise<'rsync' | 'scp'> { - if (method === 'rsync') return 'rsync'; - if (method === 'scp') return 'scp'; - - if (method === 'auto') { - console.info(`[TransfersService] Auto-detecting rsync capability on ${remoteHost}`); - return new Promise((resolve) => { - const { username, port, auth_method } = connection; - const sshArgs = []; - if (port) sshArgs.push('-p', port.toString()); - - if (auth_method === 'key' && privateKeyPath) { - sshArgs.push('-i', privateKeyPath); // Use the provided temporary key path - // If privateKeyPath (a file) is passphrase protected, ssh will handle it (prompt or agent) - // For detection, we hope it works without interactive passphrase entry if agent is not set up. - } - // Password auth detection remains best-effort as ssh won't take password directly for a command. - - const filteredSshArgs = sshArgs.filter(arg => !['-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null'].includes(arg)); - - const commandToRun = 'command -v rsync'; - const fullSshCommand = [...filteredSshArgs, `${username}@${remoteHost}`, commandToRun]; - - console.debug(`[TransfersService] Executing SSH for rsync check: ssh ${fullSshCommand.join(' ')}`); - - const process = spawn('ssh', fullSshCommand); - let stdout = ''; - let stderr = ''; - - process.stdout.on('data', (data) => stdout += data.toString()); - process.stderr.on('data', (data) => stderr += data.toString()); - - const timeoutDuration = 5000; // 5 seconds - const timeoutId = setTimeout(() => { - if (!process.killed) { - process.kill(); - console.warn(`[TransfersService] Rsync detection on ${remoteHost} timed out after ${timeoutDuration}ms. Falling back to SCP.`); - resolve('scp'); - } - }, timeoutDuration); - - process.on('close', (code) => { - clearTimeout(timeoutId); - if (code === 0 && stdout.trim() !== '') { - console.info(`[TransfersService] Rsync detected on ${remoteHost}. Path: ${stdout.trim()}`); - resolve('rsync'); - } else { - console.warn(`[TransfersService] Rsync not detected on ${remoteHost} (exit code ${code}, stderr: ${stderr.trim()}). Falling back to SCP.`); - resolve('scp'); - } - }); - - process.on('error', (err) => { - clearTimeout(timeoutId); - console.error(`[TransfersService] Error trying to detect rsync on ${remoteHost}: ${err.message}. Falling back to SCP.`); - resolve('scp'); - }); - }); - } - return 'scp'; // Default fallback + console.info(`[TransfersService] Finalized overall status for task ${taskId}: ${task.status}, progress: ${task.overallProgress}%`); } } \ No newline at end of file diff --git a/packages/backend/src/transfers/transfers.types.ts b/packages/backend/src/transfers/transfers.types.ts index 2b4b68a..9c43ff1 100644 --- a/packages/backend/src/transfers/transfers.types.ts +++ b/packages/backend/src/transfers/transfers.types.ts @@ -1,6 +1,7 @@ export interface InitiateTransferPayload { - connectionIds: number[]; - sourceItems: Array<{ name: string; path: string; type: 'file' | 'directory' }>; + sourceConnectionId: number; // ID of the source server (Server A) + connectionIds: number[]; // IDs of the target servers (Server B, C, etc.) + sourceItems: { name: string; path: string; type: 'file' | 'directory' }[]; remoteTargetPath: string; transferMethod: 'auto' | 'rsync' | 'scp'; } diff --git a/packages/backend/src/types/connection.types.ts b/packages/backend/src/types/connection.types.ts index 9f0e0de..8737e5c 100644 --- a/packages/backend/src/types/connection.types.ts +++ b/packages/backend/src/types/connection.types.ts @@ -1,5 +1,3 @@ - - export interface ConnectionBase { id: number; name: string | null; @@ -71,4 +69,10 @@ export interface FullConnectionData { notes: string | null; // 新增备注字段 (数据库原始字段) updated_at: number; last_connected_at: number | null; +} + +export interface DecryptedConnectionCredentials { + decryptedPassword?: string; + decryptedPrivateKey?: string; + decryptedPassphrase?: string; } \ No newline at end of file diff --git a/packages/frontend/src/components/FileManagerContextMenu.vue b/packages/frontend/src/components/FileManagerContextMenu.vue index ff37f9b..f5cd97b 100644 --- a/packages/frontend/src/components/FileManagerContextMenu.vue +++ b/packages/frontend/src/components/FileManagerContextMenu.vue @@ -1,10 +1,11 @@