From 2fb930e8e939024da5c74ba2cc66b124a588fad8 Mon Sep 17 00:00:00 2001 From: Baobhan Sith <80159437+Heavrnl@users.noreply.github.com> Date: Wed, 14 May 2025 19:09:27 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8DSFTP=E4=B8=8A=E4=BC=A0?= =?UTF-8?q?=E5=9C=A8=E7=89=B9=E5=AE=9A=E6=9D=A1=E4=BB=B6=E4=B8=8B=E5=9B=A0?= =?UTF-8?q?ERR=5FSTREAM=5FDESTROYED=E9=94=99=E8=AF=AF=E5=AF=BC=E8=87=B4?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 调整了 `sftp.service.ts` 中 `WriteStream` 的 `end` 事件回调处理逻辑。 当收到 `ERR_STREAM_DESTROYED` 错误但所有字节已确认写入时,不再立即中止上传流程, 而是依赖后续的 `close` 事件来最终确认上传状态并通知前端。 --- packages/backend/src/services/sftp.service.ts | 132 +++++++----------- 1 file changed, 52 insertions(+), 80 deletions(-) diff --git a/packages/backend/src/services/sftp.service.ts b/packages/backend/src/services/sftp.service.ts index 3320d12..308ab8c 100644 --- a/packages/backend/src/services/sftp.service.ts +++ b/packages/backend/src/services/sftp.service.ts @@ -1419,7 +1419,6 @@ export class SftpService { // --- File Upload Methods --- /** Start a new file upload */ - // --- 修改:添加 relativePath 参数 --- async startUpload(sessionId: string, uploadId: string, remotePath: string, totalSize: number, relativePath?: string): Promise { const state = this.clientStates.get(sessionId); if (!state || !state.sftp) { @@ -1433,28 +1432,26 @@ export class SftpService { return; } - console.log(`[SFTP Upload ${uploadId}] Starting upload for ${remotePath} (${totalSize} bytes) in session ${sessionId}`); try { // --- 在创建流之前确保目录存在 --- if (relativePath) { const targetDirectory = pathModule.dirname(remotePath).replace(/\\/g, '/'); - console.log(`[SFTP Upload ${uploadId}] Ensuring directory exists: ${targetDirectory}`); + // console.log(`[SFTP Upload ${uploadId}] Ensuring directory exists: ${targetDirectory}`); try { // 确保 state.sftp 存在 if (!state.sftp) throw new Error('SFTP session is not available.'); await this.ensureDirectoryExists(state.sftp, targetDirectory); - console.log(`[SFTP Upload ${uploadId}] Directory ensured: ${targetDirectory}`); // +++ 增加成功日志 +++ + // console.log(`[SFTP Upload ${uploadId}] Directory ensured: ${targetDirectory}`); } catch (dirError: any) { console.error(`[SFTP Upload ${uploadId}] Failed to create/ensure directory ${targetDirectory}:`, dirError); state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `创建目录失败: ${dirError.message}` } })); // 不再删除 activeUploads,因为可能还没有创建 - return; // Stop the upload process + return; } } // --- 预检查文件是否可写 --- - console.log(`[SFTP Upload ${uploadId}] Pre-checking writability for: ${remotePath}`); try { // 确保 state.sftp 存在 if (!state.sftp) throw new Error('SFTP session is not available.'); @@ -1462,16 +1459,15 @@ export class SftpService { // 'w' flag: Open file for writing. The file is created (if it does not exist) or truncated (if it exists). state.sftp!.open(remotePath, 'w', (openErr, handle) => { if (openErr) { - console.error(`[SFTP Upload ${uploadId}] Pre-check failed (sftp.open 'w') for ${remotePath}:`, openErr); + // console.error(`[SFTP Upload ${uploadId}] Pre-check failed (sftp.open 'w') for ${remotePath}:`, openErr); return reject(openErr); // Reject if cannot open for writing } // Immediately close the handle, we just wanted to check writability state.sftp!.close(handle, (closeErr) => { if (closeErr) { // Log warning but don't fail the pre-check if closing fails - console.warn(`[SFTP Upload ${uploadId}] Error closing handle during pre-check for ${remotePath}:`, closeErr); + // console.warn(`[SFTP Upload ${uploadId}] Error closing handle during pre-check for ${remotePath}:`, closeErr); } - console.log(`[SFTP Upload ${uploadId}] Pre-check successful for: ${remotePath}`); resolve(); }); }); @@ -1483,7 +1479,6 @@ export class SftpService { } - console.log(`[SFTP Upload ${uploadId}] Creating write stream for: ${remotePath}`); // 确保 state.sftp 存在 if (!state.sftp) throw new Error('SFTP session is not available after pre-check.'); const stream = state.sftp.createWriteStream(remotePath); @@ -1498,26 +1493,22 @@ export class SftpService { this.activeUploads.set(uploadId, uploadState); stream.on('error', (err: Error) => { - console.error(`[SFTP Upload ${uploadId}] Write stream error for ${remotePath}:`, err); + console.error(`[SFTP Upload ${uploadId}] WriteStream 'error' event for ${remotePath}:`, err); state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `写入流错误: ${err.message}` } })); - this.activeUploads.delete(uploadId); // Clean up state on error + this.activeUploads.delete(uploadId); + // console.log(`[SFTP Upload ${uploadId}] Upload state removed due to stream 'error' event.`); }); stream.on('close', () => { - console.log(`[SFTP Upload ${uploadId}] Write stream 'close' event triggered for ${remotePath}.`); const finalState = this.activeUploads.get(uploadId); + if (finalState) { - // Check if the upload *should* have completed successfully based on bytes written if (finalState.bytesWritten >= finalState.totalSize) { - console.log(`[SFTP Upload ${uploadId}] Stream closed and bytesWritten >= totalSize. Fetching final stats...`); - // Get final stats *after* the stream is closed state.sftp!.lstat(finalState.remotePath, (statErr, stats) => { if (statErr) { console.error(`[SFTP Upload ${uploadId}] lstat after stream close ${finalState.remotePath} failed:`, statErr); - // Send error if lstat fails after close state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `获取最终文件状态失败: ${statErr.message}` } })); } else { - // Ensure the size matches what we expect (or is at least the total size) if (stats.size < finalState.totalSize) { console.error(`[SFTP Upload ${uploadId}] Final file size (${stats.size}) is less than expected total size (${finalState.totalSize}) after stream close.`); state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `最终文件大小 (${stats.size}) 小于预期 (${finalState.totalSize})` } })); @@ -1531,28 +1522,17 @@ export class SftpService { isDirectory: stats.isDirectory(), isFile: stats.isFile(), isSymbolicLink: stats.isSymbolicLink(), } }; - console.log(`[SFTP Upload ${uploadId}] Upload appears complete. Final size: ${stats.size}. Sending success message NOW.`); - // *** SEND SUCCESS MESSAGE HERE *** state.ws.send(JSON.stringify({ type: 'sftp:upload:success', payload: finalStatsPayload, uploadId: uploadId, path: finalState.remotePath })); } } - // Clean up state regardless of lstat success/failure after close this.activeUploads.delete(uploadId); }); } else { - // Stream closed, but not all bytes were written (likely due to cancellation or error) - console.warn(`[SFTP Upload ${uploadId}] Write stream closed, but written bytes (${finalState.bytesWritten}) < total size (${finalState.totalSize}). Upload likely failed or was cancelled before completion.`); - this.activeUploads.delete(uploadId); // Clean up state + this.activeUploads.delete(uploadId); } - } else { - console.log(`[SFTP Upload ${uploadId}] Write stream closed, but upload state was already removed (likely cancelled earlier).`); } }); - stream.on('finish', () => { - // The 'finish' event fires when stream.end() is called and all data has been flushed. - console.log(`[SFTP Upload ${uploadId}] Write stream 'finish' event triggered for ${remotePath}. Waiting for 'close'.`); - }); // Notify client that we are ready for chunks @@ -1579,17 +1559,11 @@ export class SftpService { } if (!uploadState) { console.warn(`[SFTP Upload ${uploadId}] Received chunk ${chunkIndex}, but no active upload found.`); - // Send error back to client? Might flood if many chunks arrive after cancellation. - // state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: '无效的上传 ID 或上传已取消/完成' } })); return; } try { const chunkBuffer = Buffer.from(dataBase64, 'base64'); - // console.debug(`[SFTP Upload ${uploadId}] Writing chunk ${chunkIndex} (${chunkBuffer.length} bytes) to ${uploadState.remotePath}`); - - // Write the chunk. The 'drain' event is handled automatically by Node.js streams - // if the write buffer is full. We just write. const writeSuccess = uploadState.stream.write(chunkBuffer, (err) => { if (err) { // This callback handles errors specifically related to *this* write operation. @@ -1598,24 +1572,18 @@ export class SftpService { // Consider cancelling the upload on write error this.cancelUploadInternal(uploadId, `Write error on chunk ${chunkIndex}`); } - // else { console.debug(`[SFTP Upload ${uploadId}] Chunk ${chunkIndex} write callback success.`); } }); if (!writeSuccess) { - // This indicates the buffer is full and we should wait for 'drain'. - // However, for simplicity in this WebSocket context, we might rely on TCP backpressure - // or simply continue writing, letting the stream buffer handle it. - // Adding explicit 'drain' handling can add complexity. console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Waiting for drain is recommended for large files/slow connections.`); } - // --- FIX: Handle backpressure --- if (!writeSuccess) { - console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`); + // console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`); try { await new Promise(resolve => uploadState.stream.once('drain', resolve)); - console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`); + // console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`); } catch (drainError) { // Should not happen with .once, but handle defensively console.error(`[SFTP Upload ${uploadId}] Error waiting for drain event:`, drainError); @@ -1624,15 +1592,13 @@ export class SftpService { throw drainError; // Re-throw to stop further processing in this chunk handler } } - // --- End Backpressure Fix --- - // --- FIX: Handle backpressure --- if (!writeSuccess) { - console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`); + // console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`); try { await new Promise(resolve => uploadState.stream.once('drain', resolve)); - console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`); + // console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`); } catch (drainError) { // Should not happen with .once, but handle defensively console.error(`[SFTP Upload ${uploadId}] Error waiting for drain event:`, drainError); @@ -1641,14 +1607,10 @@ export class SftpService { throw drainError; // Re-throw to stop further processing in this chunk handler } } - // --- End Backpressure Fix --- uploadState.bytesWritten += chunkBuffer.length; - // Send progress (optional, consider throttling) - // const progress = Math.round((uploadState.bytesWritten / uploadState.totalSize) * 100); - // state.ws.send(JSON.stringify({ type: 'sftp:upload:progress', payload: { uploadId, progress } })); // Check if upload is complete if (uploadState.bytesWritten > uploadState.totalSize) { @@ -1656,29 +1618,28 @@ export class SftpService { state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: '写入字节数超过文件总大小' } })); this.cancelUploadInternal(uploadId, 'Bytes written exceeded total size'); - // --- FIX: Check if bytesWritten is >= totalSize --- } else if (uploadState.bytesWritten >= uploadState.totalSize) { - // Prevent this block from running multiple times if more data arrives somehow if (!uploadState.stream.writableEnded) { - console.log(`[SFTP Upload ${uploadId}] All expected bytes received (or exceeded). Written: ${uploadState.bytesWritten}, Total: ${uploadState.totalSize}. Ending stream.`); - // End the stream now that all data is received - uploadState.stream.end((endErr: Error | undefined) => { - if (endErr) { - console.error(`[SFTP Upload ${uploadId}] Error ending write stream for ${uploadState.remotePath}:`, endErr); - // If ending fails, send an error (the 'close' event might not fire reliably after error) - if (state && state.ws) { // Check if state and ws still exist - state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `结束写入流时出错: ${endErr.message}` } })); - } - this.cancelUploadInternal(uploadId, `Stream end error`); - } else { - console.log(`[SFTP Upload ${uploadId}] stream.end() called successfully. Waiting for 'close' event.`); - // Success message will be sent in the 'close' handler - } - }); - } - } + uploadState.stream.end((endErr: Error & { code?: string } | undefined) => { // Added code to Error type hint + const streamStateInEndCallback = uploadState?.stream; // Re-fetch in case it changed + if (endErr) { + // Check if it's the specific ERR_STREAM_DESTROYED error and all bytes were written + if (endErr.code === 'ERR_STREAM_DESTROYED' && uploadState && uploadState.bytesWritten >= uploadState.totalSize) { + console.warn(`[SFTP Upload ${uploadId}] stream.end() CALLBACK reported ERR_STREAM_DESTROYED, but all bytes written. UploadId: ${uploadId}. Error:`, endErr); + console.log(`[SFTP Upload ${uploadId}] Treating ERR_STREAM_DESTROYED as non-fatal for this upload. Expecting 'close' event to finalize success for ${uploadState.remotePath}.`); + } else { + console.error(`[SFTP Upload ${uploadId}] Error from stream.end() CALLBACK for ${uploadState?.remotePath || 'unknown path'}:`, endErr); + if (state && state.ws) { + state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `结束写入流时出错: ${endErr.message}` } })); + } + this.cancelUploadInternal(uploadId, `Stream end error: ${endErr.message}`, endErr); + } + } + }); + } + } - } catch (error: any) { + } catch (error: any) { console.error(`[SFTP Upload ${uploadId}] Error handling chunk ${chunkIndex} for ${uploadState?.remotePath}:`, error); state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `处理块 ${chunkIndex} 时出错: ${error.message}` } })); this.cancelUploadInternal(uploadId, `Error handling chunk ${chunkIndex}`); @@ -1708,17 +1669,28 @@ export class SftpService { } /** Internal helper to clean up an upload */ - private cancelUploadInternal(uploadId: string, reason: string): void { + private cancelUploadInternal(uploadId: string, reason: string, triggeringError?: any): void { const uploadState = this.activeUploads.get(uploadId); + const callTimestamp = Date.now(); // Keep timestamp for potential future use if needed + if (uploadState) { - console.log(`[SFTP Upload ${uploadId}] Internal cancel (${reason}): Closing stream for ${uploadState.remotePath}`); - // End the stream. The 'close' handler should ideally detect the size mismatch or see the state is gone. - // Using destroy might be more immediate but could lead to unclosed file descriptors on the server in some cases. - uploadState.stream.end(); // Gracefully try to end - // uploadState.stream.destroy(); // More forceful, might be needed + const currentStream = uploadState.stream; + + if (currentStream && !currentStream.destroyed) { + if (!currentStream.writableEnded) { + currentStream.end((endErr: Error | undefined) => { + if (endErr) { + console.error(`[SFTP Upload ${uploadId}] cancelUploadInternal: Error from stream.end() in cancel:`, endErr, `Original reason for cancel: ${reason}`); + if (!currentStream.destroyed) { + currentStream.destroy(); // Removed error argument + } + } + }); + } else { + currentStream.destroy(); // Removed error argument + } + } this.activeUploads.delete(uploadId); - } else { - // console.log(`[SFTP Upload ${uploadId}] Internal cancel called, but upload state already removed.`); } } }