diff --git a/packages/backend/src/services/sftp.service.ts b/packages/backend/src/services/sftp.service.ts index 372771e..0bbc927 100644 --- a/packages/backend/src/services/sftp.service.ts +++ b/packages/backend/src/services/sftp.service.ts @@ -64,6 +64,7 @@ interface ActiveUpload { stream: WriteStream; sessionId: string; // Link back to the session for cleanup relativePath?: string; + drainPromise?: Promise | null; // +++ For managing drain event listeners +++ } export class SftpService { @@ -1489,6 +1490,7 @@ export class SftpService { stream, sessionId, relativePath, // +++ 存储 relativePath +++ + drainPromise: null // +++ Initialize drainPromise +++ }; this.activeUploads.set(uploadId, uploadState); @@ -1578,21 +1580,45 @@ export class SftpService { if (!writeSuccess) { // console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`); + if (!uploadState.drainPromise) { + // console.log(`[SFTP Upload ${uploadId}] Attaching new drain listener for chunk ${chunkIndex}`); + uploadState.drainPromise = new Promise(resolve => { + uploadState.stream.once('drain', () => { + // console.log(`[SFTP Upload ${uploadId}] Drain event fired, resolving promise for chunk ${chunkIndex}`); + uploadState.drainPromise = null; // Reset for next time + resolve(); + }); + }); + } else { + // console.log(`[SFTP Upload ${uploadId}] Waiting for existing drain promise for chunk ${chunkIndex}`); + } try { - await new Promise(resolve => uploadState.stream.once('drain', resolve)); - // console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`); + await uploadState.drainPromise; + // console.log(`[SFTP Upload ${uploadId}] Resumed after drain for chunk ${chunkIndex}`); } catch (drainError) { - // Should not happen with .once, but handle defensively - console.error(`[SFTP Upload ${uploadId}] Error waiting for drain event:`, drainError); - // Consider cancelling upload if waiting for drain fails critically - this.cancelUploadInternal(uploadId, 'Error waiting for drain'); - throw drainError; // Re-throw to stop further processing in this chunk handler + console.error(`[SFTP Upload ${uploadId}] Error awaiting drain promise for chunk ${chunkIndex}:`, drainError); + this.cancelUploadInternal(uploadId, 'Error waiting for drain promise'); + throw drainError; } } - uploadState.bytesWritten += chunkBuffer.length; + // +++ 发送进度更新 +++ + if (state.ws && state.ws.readyState === WebSocket.OPEN) { + const progressPercent = Math.round((uploadState.bytesWritten / uploadState.totalSize) * 100); + // 将 uploadId 放在顶层,payload 中包含具体进度数据 + state.ws.send(JSON.stringify({ + type: 'sftp:upload:progress', + uploadId: uploadId, + payload: { + bytesWritten: uploadState.bytesWritten, + totalSize: uploadState.totalSize, + progress: Math.min(100, progressPercent) // 确保不超过100 + } + })); + } + // --- 结束发送进度更新 --- // Check if upload is complete if (uploadState.bytesWritten > uploadState.totalSize) { diff --git a/packages/backend/src/websocket/types.ts b/packages/backend/src/websocket/types.ts index 2e2801f..6e515f8 100644 --- a/packages/backend/src/websocket/types.ts +++ b/packages/backend/src/websocket/types.ts @@ -293,4 +293,11 @@ export interface SftpDecompressErrorPayload { error: string; details?: string; // Stderr output or specific error details requestId: string; +} +// S -> C: SFTP Upload Progress (New) +export interface SftpUploadProgressPayload { + uploadId: string; // To correlate with the specific upload + bytesWritten: number; + totalSize: number; + progress: number; // Calculated percentage (0-100) } \ No newline at end of file diff --git a/packages/frontend/src/composables/useFileUploader.ts b/packages/frontend/src/composables/useFileUploader.ts index d70d0b1..74c7bc4 100644 --- a/packages/frontend/src/composables/useFileUploader.ts +++ b/packages/frontend/src/composables/useFileUploader.ts @@ -75,7 +75,7 @@ export function useFileUploader( // --- FIX: Update offset based on the actual chunk size that was read --- offset += currentChunkSize; // Use the stored size of the slice - currentUpload.progress = Math.min(100, Math.round((offset / file.size) * 100)); + // currentUpload.progress = Math.min(100, Math.round((offset / file.size) * 100)); // --- REMOVED: Optimistic progress update if (!isLast) { // 使用 requestAnimationFrame 或 nextTick 在块之间添加轻微延迟 @@ -303,6 +303,30 @@ export function useFileUploader( } }; + // +++ 新增:处理上传进度更新 +++ + const onUploadProgress = (payload: MessagePayload, message: WebSocketMessage) => { + const uploadId = message.uploadId || payload?.uploadId; // 从顶层获取 uploadId + if (!uploadId) { + console.warn(`[文件上传模块] 收到缺少 uploadId 的 upload:progress 消息:`, message); + return; + } + + const upload = uploads[uploadId]; + if (upload && upload.status === 'uploading') { + // payload 现在应该包含 bytesWritten 和 totalSize + if (typeof payload?.bytesWritten === 'number' && typeof payload?.totalSize === 'number') { + upload.progress = Math.min(100, Math.round((payload.bytesWritten / payload.totalSize) * 100)); + // console.debug(`[文件上传模块] 更新上传 ${uploadId} 进度: ${upload.progress}% (${payload.bytesWritten}/${payload.totalSize})`); + } else { + console.warn(`[文件上传模块] 收到 upload:progress 消息,但 payload 格式不正确:`, payload); + } + } else if (upload) { + // console.warn(`[文件上传模块] 收到 upload:progress 消息,但上传 ${uploadId} 状态为 ${upload.status},不予更新。`); + } else { + console.warn(`[文件上传模块] 收到未知上传 ID 的 upload:progress 消息: ${uploadId}`); + } + }; + // --- 结束新增 --- // --- 注册处理器 --- const unregisterUploadReady = onMessage('sftp:upload:ready', onUploadReady); @@ -311,6 +335,7 @@ export function useFileUploader( const unregisterUploadPause = onMessage('sftp:upload:pause', onUploadPause); const unregisterUploadResume = onMessage('sftp:upload:resume', onUploadResume); const unregisterUploadCancelled = onMessage('sftp:upload:cancelled', onUploadCancelled); + const unregisterUploadProgress = onMessage('sftp:upload:progress', onUploadProgress); // +++ 注册新处理器 +++ // --- 清理 --- onUnmounted(() => { @@ -321,6 +346,7 @@ export function useFileUploader( unregisterUploadPause?.(); unregisterUploadResume?.(); unregisterUploadCancelled?.(); + unregisterUploadProgress?.(); // +++ 注销新处理器 +++ // 当使用此 composable 的组件卸载时,取消任何正在进行的上传 Object.keys(uploads).forEach(uploadId => { diff --git a/packages/frontend/src/types/websocket.types.ts b/packages/frontend/src/types/websocket.types.ts index 6d953fd..bf0f7de 100644 --- a/packages/frontend/src/types/websocket.types.ts +++ b/packages/frontend/src/types/websocket.types.ts @@ -15,6 +15,17 @@ export interface WebSocketMessage { // 消息处理器函数类型 export type MessageHandler = (payload: MessagePayload, message: WebSocketMessage) => void; // 恢复 message 参数为必需 +export interface SftpUploadProgressPayload { + uploadId: string; // 虽然 uploadId 在 WebSocketMessage 顶层,payload 里也包含以明确关联 + bytesWritten: number; + totalSize: number; + progress: number; // 0-100 +} +export interface SftpUploadProgressMessage extends WebSocketMessage { + type: 'sftp:upload:progress'; + uploadId: string; // uploadId 也在顶层消息中,这里为了明确 payload 关联 + payload: SftpUploadProgressPayload; +} // --- SSH Suspend Mode WebSocket Message Types ---