diff --git a/packages/backend/src/services/sftp.service.ts b/packages/backend/src/services/sftp.service.ts index f35758d..209514c 100644 --- a/packages/backend/src/services/sftp.service.ts +++ b/packages/backend/src/services/sftp.service.ts @@ -1084,28 +1084,54 @@ export class SftpService { }); stream.on('close', () => { - // This 'close' event now primarily handles cleanup after the stream is fully closed. - // The success message is sent earlier in handleUploadChunk. + console.log(`[SFTP Upload ${uploadId}] Write stream 'close' event triggered for ${remotePath}.`); const finalState = this.activeUploads.get(uploadId); if (finalState) { - // Check if bytes written match total size upon close, log warning if not (could indicate cancellation after success msg sent) - if (finalState.bytesWritten !== finalState.totalSize) { - console.warn(`[SFTP Upload ${uploadId}] Write stream closed for ${remotePath}, but written bytes (${finalState.bytesWritten}) != total size (${finalState.totalSize}). This might happen if cancelled after success message was sent.`); - // Optionally send an error if this state is unexpected, but success might have already been sent. - // state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: '文件大小不匹配或上传未完成' } })); - } else { - console.log(`[SFTP Upload ${uploadId}] Write stream closed successfully for ${remotePath}. State cleaned up.`); - } - this.activeUploads.delete(uploadId); // Clean up state when stream is closed + // Check if the upload *should* have completed successfully based on bytes written + if (finalState.bytesWritten >= finalState.totalSize) { + console.log(`[SFTP Upload ${uploadId}] Stream closed and bytesWritten >= totalSize. Fetching final stats...`); + // Get final stats *after* the stream is closed + state.sftp!.lstat(finalState.remotePath, (statErr, stats) => { + if (statErr) { + console.error(`[SFTP Upload ${uploadId}] lstat after stream close ${finalState.remotePath} failed:`, statErr); + // Send error if lstat fails after close + state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `获取最终文件状态失败: ${statErr.message}` } })); + } else { + // Ensure the size matches what we expect (or is at least the total size) + if (stats.size < finalState.totalSize) { + console.error(`[SFTP Upload ${uploadId}] Final file size (${stats.size}) is less than expected total size (${finalState.totalSize}) after stream close.`); + state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `最终文件大小 (${stats.size}) 小于预期 (${finalState.totalSize})` } })); + } else { + const finalStatsPayload = { + filename: finalState.remotePath.substring(finalState.remotePath.lastIndexOf('/') + 1), + longname: '', + attrs: { + size: stats.size, uid: stats.uid, gid: stats.gid, mode: stats.mode, + atime: stats.atime * 1000, mtime: stats.mtime * 1000, + isDirectory: stats.isDirectory(), isFile: stats.isFile(), isSymbolicLink: stats.isSymbolicLink(), + } + }; + console.log(`[SFTP Upload ${uploadId}] Upload appears complete. Final size: ${stats.size}. Sending success message NOW.`); + // *** SEND SUCCESS MESSAGE HERE *** + state.ws.send(JSON.stringify({ type: 'sftp:upload:success', payload: finalStatsPayload, uploadId: uploadId, path: finalState.remotePath })); + } + } + // Clean up state regardless of lstat success/failure after close + this.activeUploads.delete(uploadId); + }); + } else { + // Stream closed, but not all bytes were written (likely due to cancellation or error) + console.warn(`[SFTP Upload ${uploadId}] Write stream closed, but written bytes (${finalState.bytesWritten}) < total size (${finalState.totalSize}). Upload likely failed or was cancelled before completion.`); + this.activeUploads.delete(uploadId); // Clean up state + } } else { - console.log(`[SFTP Upload ${uploadId}] Write stream closed for ${remotePath}, but upload state was already removed.`); + console.log(`[SFTP Upload ${uploadId}] Write stream closed, but upload state was already removed (likely cancelled earlier).`); } }); stream.on('finish', () => { - // The 'finish' event fires when stream.end() is called and all data has been flushed to the underlying system. - // This might be a slightly earlier point than 'close'. Let's log it. - console.log(`[SFTP Upload ${uploadId}] Write stream finished for ${remotePath}. Waiting for close.`); + // The 'finish' event fires when stream.end() is called and all data has been flushed. + console.log(`[SFTP Upload ${uploadId}] Write stream 'finish' event triggered for ${remotePath}. Waiting for 'close'.`); }); @@ -1120,7 +1146,8 @@ export class SftpService { } /** Handle an incoming file chunk */ - handleUploadChunk(sessionId: string, uploadId: string, chunkIndex: number, dataBase64: string): void { + // --- FIX: Make async to handle await for drain --- + async handleUploadChunk(sessionId: string, uploadId: string, chunkIndex: number, dataBase64: string): Promise { const state = this.clientStates.get(sessionId); const uploadState = this.activeUploads.get(uploadId); @@ -1163,6 +1190,40 @@ export class SftpService { } + // --- FIX: Handle backpressure --- + if (!writeSuccess) { + console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`); + try { + await new Promise(resolve => uploadState.stream.once('drain', resolve)); + console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`); + } catch (drainError) { + // Should not happen with .once, but handle defensively + console.error(`[SFTP Upload ${uploadId}] Error waiting for drain event:`, drainError); + // Consider cancelling upload if waiting for drain fails critically + this.cancelUploadInternal(uploadId, 'Error waiting for drain'); + throw drainError; // Re-throw to stop further processing in this chunk handler + } + } + // --- End Backpressure Fix --- + + + // --- FIX: Handle backpressure --- + if (!writeSuccess) { + console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`); + try { + await new Promise(resolve => uploadState.stream.once('drain', resolve)); + console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`); + } catch (drainError) { + // Should not happen with .once, but handle defensively + console.error(`[SFTP Upload ${uploadId}] Error waiting for drain event:`, drainError); + // Consider cancelling upload if waiting for drain fails critically + this.cancelUploadInternal(uploadId, 'Error waiting for drain'); + throw drainError; // Re-throw to stop further processing in this chunk handler + } + } + // --- End Backpressure Fix --- + + uploadState.bytesWritten += chunkBuffer.length; // Send progress (optional, consider throttling) @@ -1175,39 +1236,26 @@ export class SftpService { state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: '写入字节数超过文件总大小' } })); this.cancelUploadInternal(uploadId, 'Bytes written exceeded total size'); - } else if (uploadState.bytesWritten === uploadState.totalSize) { - console.log(`[SFTP Upload ${uploadId}] All bytes (${uploadState.bytesWritten}) received for ${uploadState.remotePath}. Fetching stats before sending success...`); - - // Get stats for the newly uploaded file before sending success - state.sftp!.lstat(uploadState.remotePath, (statErr, stats) => { - let newItemPayload: any = null; // Default to null payload - if (statErr) { - console.error(`[SFTP Upload ${uploadId}] lstat after upload ${uploadState.remotePath} failed:`, statErr); - // Still send success, but with null payload as item details are unavailable - } else { - newItemPayload = { - filename: uploadState.remotePath.substring(uploadState.remotePath.lastIndexOf('/') + 1), - longname: '', // lstat doesn't provide longname - attrs: { - size: stats.size, uid: stats.uid, gid: stats.gid, mode: stats.mode, - atime: stats.atime * 1000, mtime: stats.mtime * 1000, - isDirectory: stats.isDirectory(), isFile: stats.isFile(), isSymbolicLink: stats.isSymbolicLink(), + // --- FIX: Check if bytesWritten is >= totalSize --- + } else if (uploadState.bytesWritten >= uploadState.totalSize) { + // Prevent this block from running multiple times if more data arrives somehow + if (!uploadState.stream.writableEnded) { + console.log(`[SFTP Upload ${uploadId}] All expected bytes received (or exceeded). Written: ${uploadState.bytesWritten}, Total: ${uploadState.totalSize}. Ending stream.`); + // End the stream now that all data is received + uploadState.stream.end((endErr: Error | undefined) => { + if (endErr) { + console.error(`[SFTP Upload ${uploadId}] Error ending write stream for ${uploadState.remotePath}:`, endErr); + // If ending fails, send an error (the 'close' event might not fire reliably after error) + if (state && state.ws) { // Check if state and ws still exist + state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `结束写入流时出错: ${endErr.message}` } })); } - }; - console.log(`[SFTP Upload ${uploadId}] Sending upload success with new item details for ${uploadState.remotePath}`); - } - // Send success message with the newItem payload (or null if lstat failed) - state.ws.send(JSON.stringify({ type: 'sftp:upload:success', payload: newItemPayload, uploadId: uploadId, path: uploadState.remotePath })); // Include uploadId and path for frontend context - - // End the stream *after* lstat completes and success message is sent - uploadState.stream.end((endErr: Error | undefined) => { // Add type annotation - if (endErr) { - console.error(`[SFTP Upload ${uploadId}] Error ending write stream after success for ${uploadState.remotePath}:`, endErr); - } else { - console.log(`[SFTP Upload ${uploadId}] Write stream ended successfully after success for ${uploadState.remotePath}.`); - } + this.cancelUploadInternal(uploadId, `Stream end error`); + } else { + console.log(`[SFTP Upload ${uploadId}] stream.end() called successfully. Waiting for 'close' event.`); + // Success message will be sent in the 'close' handler + } }); - }); + } } } catch (error: any) { diff --git a/packages/backend/src/websocket.ts b/packages/backend/src/websocket.ts index 948197a..9879081 100644 --- a/packages/backend/src/websocket.ts +++ b/packages/backend/src/websocket.ts @@ -1173,7 +1173,8 @@ connectionName: connInfo?.name || 'Unknown', // 添加连接名称 (使用可选 console.error(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求,但缺少 uploadId, chunkIndex 或 data。`); return; } - sftpService.handleUploadChunk(sessionId, payload.uploadId, payload.chunkIndex, payload.data); + // --- FIX: Add await for async backpressure handling --- + await sftpService.handleUploadChunk(sessionId, payload.uploadId, payload.chunkIndex, payload.data); break; } case 'sftp:upload:cancel': { diff --git a/packages/frontend/src/composables/useFileUploader.ts b/packages/frontend/src/composables/useFileUploader.ts index d9090b9..d70d0b1 100644 --- a/packages/frontend/src/composables/useFileUploader.ts +++ b/packages/frontend/src/composables/useFileUploader.ts @@ -52,6 +52,7 @@ export function useFileUploader( const reader = new FileReader(); let offset = startByte; let chunkIndex = 0; // Initialize chunk index counter + let currentChunkSize = 0; // Store the size of the chunk being processed reader.onload = (e) => { const currentUpload = uploads[uploadId]; @@ -72,8 +73,8 @@ export function useFileUploader( payload: { uploadId, chunkIndex: chunkIndex++, data: chunkBase64, isLast } // Add and increment chunkIndex }); - // 注意:直接使用 base64 长度估算字节大小并不完全准确,但对于进度条来说足够了 - offset += chunkBase64.length * 3 / 4; + // --- FIX: Update offset based on the actual chunk size that was read --- + offset += currentChunkSize; // Use the stored size of the slice currentUpload.progress = Math.min(100, Math.round((offset / file.size) * 100)); if (!isLast) { @@ -105,6 +106,7 @@ export function useFileUploader( // 读取下一个块之前再次检查状态 if (offset < file.size && uploads[uploadId]?.status === 'uploading') { const slice = file.slice(offset, offset + chunkSize); + currentChunkSize = slice.size; // Store the actual size of the slice being read reader.readAsDataURL(slice); } };