fix: 修复SFTP上传在特定条件下因ERR_STREAM_DESTROYED错误导致失败的问题

- 调整了 `sftp.service.ts` 中 `WriteStream` 的 `end` 事件回调处理逻辑。
  当收到 `ERR_STREAM_DESTROYED` 错误但所有字节已确认写入时,不再立即中止上传流程,
  而是依赖后续的 `close` 事件来最终确认上传状态并通知前端。
This commit is contained in:
Baobhan Sith
2025-05-14 19:09:27 +08:00
parent 9384a38556
commit 2fb930e8e9
+52 -80
View File
@@ -1419,7 +1419,6 @@ export class SftpService {
// --- File Upload Methods ---
/** Start a new file upload */
// --- 修改:添加 relativePath 参数 ---
async startUpload(sessionId: string, uploadId: string, remotePath: string, totalSize: number, relativePath?: string): Promise<void> {
const state = this.clientStates.get(sessionId);
if (!state || !state.sftp) {
@@ -1433,28 +1432,26 @@ export class SftpService {
return;
}
console.log(`[SFTP Upload ${uploadId}] Starting upload for ${remotePath} (${totalSize} bytes) in session ${sessionId}`);
try {
// --- 在创建流之前确保目录存在 ---
if (relativePath) {
const targetDirectory = pathModule.dirname(remotePath).replace(/\\/g, '/');
console.log(`[SFTP Upload ${uploadId}] Ensuring directory exists: ${targetDirectory}`);
// console.log(`[SFTP Upload ${uploadId}] Ensuring directory exists: ${targetDirectory}`);
try {
// 确保 state.sftp 存在
if (!state.sftp) throw new Error('SFTP session is not available.');
await this.ensureDirectoryExists(state.sftp, targetDirectory);
console.log(`[SFTP Upload ${uploadId}] Directory ensured: ${targetDirectory}`); // +++ 增加成功日志 +++
// console.log(`[SFTP Upload ${uploadId}] Directory ensured: ${targetDirectory}`);
} catch (dirError: any) {
console.error(`[SFTP Upload ${uploadId}] Failed to create/ensure directory ${targetDirectory}:`, dirError);
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `创建目录失败: ${dirError.message}` } }));
// 不再删除 activeUploads,因为可能还没有创建
return; // Stop the upload process
return;
}
}
// --- 预检查文件是否可写 ---
console.log(`[SFTP Upload ${uploadId}] Pre-checking writability for: ${remotePath}`);
try {
// 确保 state.sftp 存在
if (!state.sftp) throw new Error('SFTP session is not available.');
@@ -1462,16 +1459,15 @@ export class SftpService {
// 'w' flag: Open file for writing. The file is created (if it does not exist) or truncated (if it exists).
state.sftp!.open(remotePath, 'w', (openErr, handle) => {
if (openErr) {
console.error(`[SFTP Upload ${uploadId}] Pre-check failed (sftp.open 'w') for ${remotePath}:`, openErr);
// console.error(`[SFTP Upload ${uploadId}] Pre-check failed (sftp.open 'w') for ${remotePath}:`, openErr);
return reject(openErr); // Reject if cannot open for writing
}
// Immediately close the handle, we just wanted to check writability
state.sftp!.close(handle, (closeErr) => {
if (closeErr) {
// Log warning but don't fail the pre-check if closing fails
console.warn(`[SFTP Upload ${uploadId}] Error closing handle during pre-check for ${remotePath}:`, closeErr);
// console.warn(`[SFTP Upload ${uploadId}] Error closing handle during pre-check for ${remotePath}:`, closeErr);
}
console.log(`[SFTP Upload ${uploadId}] Pre-check successful for: ${remotePath}`);
resolve();
});
});
@@ -1483,7 +1479,6 @@ export class SftpService {
}
console.log(`[SFTP Upload ${uploadId}] Creating write stream for: ${remotePath}`);
// 确保 state.sftp 存在
if (!state.sftp) throw new Error('SFTP session is not available after pre-check.');
const stream = state.sftp.createWriteStream(remotePath);
@@ -1498,26 +1493,22 @@ export class SftpService {
this.activeUploads.set(uploadId, uploadState);
stream.on('error', (err: Error) => {
console.error(`[SFTP Upload ${uploadId}] Write stream error for ${remotePath}:`, err);
console.error(`[SFTP Upload ${uploadId}] WriteStream 'error' event for ${remotePath}:`, err);
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `写入流错误: ${err.message}` } }));
this.activeUploads.delete(uploadId); // Clean up state on error
this.activeUploads.delete(uploadId);
// console.log(`[SFTP Upload ${uploadId}] Upload state removed due to stream 'error' event.`);
});
stream.on('close', () => {
console.log(`[SFTP Upload ${uploadId}] Write stream 'close' event triggered for ${remotePath}.`);
const finalState = this.activeUploads.get(uploadId);
if (finalState) {
// Check if the upload *should* have completed successfully based on bytes written
if (finalState.bytesWritten >= finalState.totalSize) {
console.log(`[SFTP Upload ${uploadId}] Stream closed and bytesWritten >= totalSize. Fetching final stats...`);
// Get final stats *after* the stream is closed
state.sftp!.lstat(finalState.remotePath, (statErr, stats) => {
if (statErr) {
console.error(`[SFTP Upload ${uploadId}] lstat after stream close ${finalState.remotePath} failed:`, statErr);
// Send error if lstat fails after close
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `获取最终文件状态失败: ${statErr.message}` } }));
} else {
// Ensure the size matches what we expect (or is at least the total size)
if (stats.size < finalState.totalSize) {
console.error(`[SFTP Upload ${uploadId}] Final file size (${stats.size}) is less than expected total size (${finalState.totalSize}) after stream close.`);
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `最终文件大小 (${stats.size}) 小于预期 (${finalState.totalSize})` } }));
@@ -1531,28 +1522,17 @@ export class SftpService {
isDirectory: stats.isDirectory(), isFile: stats.isFile(), isSymbolicLink: stats.isSymbolicLink(),
}
};
console.log(`[SFTP Upload ${uploadId}] Upload appears complete. Final size: ${stats.size}. Sending success message NOW.`);
// *** SEND SUCCESS MESSAGE HERE ***
state.ws.send(JSON.stringify({ type: 'sftp:upload:success', payload: finalStatsPayload, uploadId: uploadId, path: finalState.remotePath }));
}
}
// Clean up state regardless of lstat success/failure after close
this.activeUploads.delete(uploadId);
});
} else {
// Stream closed, but not all bytes were written (likely due to cancellation or error)
console.warn(`[SFTP Upload ${uploadId}] Write stream closed, but written bytes (${finalState.bytesWritten}) < total size (${finalState.totalSize}). Upload likely failed or was cancelled before completion.`);
this.activeUploads.delete(uploadId); // Clean up state
this.activeUploads.delete(uploadId);
}
} else {
console.log(`[SFTP Upload ${uploadId}] Write stream closed, but upload state was already removed (likely cancelled earlier).`);
}
});
stream.on('finish', () => {
// The 'finish' event fires when stream.end() is called and all data has been flushed.
console.log(`[SFTP Upload ${uploadId}] Write stream 'finish' event triggered for ${remotePath}. Waiting for 'close'.`);
});
// Notify client that we are ready for chunks
@@ -1579,17 +1559,11 @@ export class SftpService {
}
if (!uploadState) {
console.warn(`[SFTP Upload ${uploadId}] Received chunk ${chunkIndex}, but no active upload found.`);
// Send error back to client? Might flood if many chunks arrive after cancellation.
// state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: '无效的上传 ID 或上传已取消/完成' } }));
return;
}
try {
const chunkBuffer = Buffer.from(dataBase64, 'base64');
// console.debug(`[SFTP Upload ${uploadId}] Writing chunk ${chunkIndex} (${chunkBuffer.length} bytes) to ${uploadState.remotePath}`);
// Write the chunk. The 'drain' event is handled automatically by Node.js streams
// if the write buffer is full. We just write.
const writeSuccess = uploadState.stream.write(chunkBuffer, (err) => {
if (err) {
// This callback handles errors specifically related to *this* write operation.
@@ -1598,24 +1572,18 @@ export class SftpService {
// Consider cancelling the upload on write error
this.cancelUploadInternal(uploadId, `Write error on chunk ${chunkIndex}`);
}
// else { console.debug(`[SFTP Upload ${uploadId}] Chunk ${chunkIndex} write callback success.`); }
});
if (!writeSuccess) {
// This indicates the buffer is full and we should wait for 'drain'.
// However, for simplicity in this WebSocket context, we might rely on TCP backpressure
// or simply continue writing, letting the stream buffer handle it.
// Adding explicit 'drain' handling can add complexity.
console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Waiting for drain is recommended for large files/slow connections.`);
}
// --- FIX: Handle backpressure ---
if (!writeSuccess) {
console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`);
// console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`);
try {
await new Promise<void>(resolve => uploadState.stream.once('drain', resolve));
console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`);
// console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`);
} catch (drainError) {
// Should not happen with .once, but handle defensively
console.error(`[SFTP Upload ${uploadId}] Error waiting for drain event:`, drainError);
@@ -1624,15 +1592,13 @@ export class SftpService {
throw drainError; // Re-throw to stop further processing in this chunk handler
}
}
// --- End Backpressure Fix ---
// --- FIX: Handle backpressure ---
if (!writeSuccess) {
console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`);
// console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`);
try {
await new Promise<void>(resolve => uploadState.stream.once('drain', resolve));
console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`);
// console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`);
} catch (drainError) {
// Should not happen with .once, but handle defensively
console.error(`[SFTP Upload ${uploadId}] Error waiting for drain event:`, drainError);
@@ -1641,14 +1607,10 @@ export class SftpService {
throw drainError; // Re-throw to stop further processing in this chunk handler
}
}
// --- End Backpressure Fix ---
uploadState.bytesWritten += chunkBuffer.length;
// Send progress (optional, consider throttling)
// const progress = Math.round((uploadState.bytesWritten / uploadState.totalSize) * 100);
// state.ws.send(JSON.stringify({ type: 'sftp:upload:progress', payload: { uploadId, progress } }));
// Check if upload is complete
if (uploadState.bytesWritten > uploadState.totalSize) {
@@ -1656,29 +1618,28 @@ export class SftpService {
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: '写入字节数超过文件总大小' } }));
this.cancelUploadInternal(uploadId, 'Bytes written exceeded total size');
// --- FIX: Check if bytesWritten is >= totalSize ---
} else if (uploadState.bytesWritten >= uploadState.totalSize) {
// Prevent this block from running multiple times if more data arrives somehow
if (!uploadState.stream.writableEnded) {
console.log(`[SFTP Upload ${uploadId}] All expected bytes received (or exceeded). Written: ${uploadState.bytesWritten}, Total: ${uploadState.totalSize}. Ending stream.`);
// End the stream now that all data is received
uploadState.stream.end((endErr: Error | undefined) => {
if (endErr) {
console.error(`[SFTP Upload ${uploadId}] Error ending write stream for ${uploadState.remotePath}:`, endErr);
// If ending fails, send an error (the 'close' event might not fire reliably after error)
if (state && state.ws) { // Check if state and ws still exist
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `结束写入流时出错: ${endErr.message}` } }));
}
this.cancelUploadInternal(uploadId, `Stream end error`);
} else {
console.log(`[SFTP Upload ${uploadId}] stream.end() called successfully. Waiting for 'close' event.`);
// Success message will be sent in the 'close' handler
}
});
}
}
uploadState.stream.end((endErr: Error & { code?: string } | undefined) => { // Added code to Error type hint
const streamStateInEndCallback = uploadState?.stream; // Re-fetch in case it changed
if (endErr) {
// Check if it's the specific ERR_STREAM_DESTROYED error and all bytes were written
if (endErr.code === 'ERR_STREAM_DESTROYED' && uploadState && uploadState.bytesWritten >= uploadState.totalSize) {
console.warn(`[SFTP Upload ${uploadId}] stream.end() CALLBACK reported ERR_STREAM_DESTROYED, but all bytes written. UploadId: ${uploadId}. Error:`, endErr);
console.log(`[SFTP Upload ${uploadId}] Treating ERR_STREAM_DESTROYED as non-fatal for this upload. Expecting 'close' event to finalize success for ${uploadState.remotePath}.`);
} else {
console.error(`[SFTP Upload ${uploadId}] Error from stream.end() CALLBACK for ${uploadState?.remotePath || 'unknown path'}:`, endErr);
if (state && state.ws) {
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `结束写入流时出错: ${endErr.message}` } }));
}
this.cancelUploadInternal(uploadId, `Stream end error: ${endErr.message}`, endErr);
}
}
});
}
}
} catch (error: any) {
} catch (error: any) {
console.error(`[SFTP Upload ${uploadId}] Error handling chunk ${chunkIndex} for ${uploadState?.remotePath}:`, error);
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `处理块 ${chunkIndex} 时出错: ${error.message}` } }));
this.cancelUploadInternal(uploadId, `Error handling chunk ${chunkIndex}`);
@@ -1708,17 +1669,28 @@ export class SftpService {
}
/** Internal helper to clean up an upload */
private cancelUploadInternal(uploadId: string, reason: string): void {
private cancelUploadInternal(uploadId: string, reason: string, triggeringError?: any): void {
const uploadState = this.activeUploads.get(uploadId);
const callTimestamp = Date.now(); // Keep timestamp for potential future use if needed
if (uploadState) {
console.log(`[SFTP Upload ${uploadId}] Internal cancel (${reason}): Closing stream for ${uploadState.remotePath}`);
// End the stream. The 'close' handler should ideally detect the size mismatch or see the state is gone.
// Using destroy might be more immediate but could lead to unclosed file descriptors on the server in some cases.
uploadState.stream.end(); // Gracefully try to end
// uploadState.stream.destroy(); // More forceful, might be needed
const currentStream = uploadState.stream;
if (currentStream && !currentStream.destroyed) {
if (!currentStream.writableEnded) {
currentStream.end((endErr: Error | undefined) => {
if (endErr) {
console.error(`[SFTP Upload ${uploadId}] cancelUploadInternal: Error from stream.end() in cancel:`, endErr, `Original reason for cancel: ${reason}`);
if (!currentStream.destroyed) {
currentStream.destroy(); // Removed error argument
}
}
});
} else {
currentStream.destroy(); // Removed error argument
}
}
this.activeUploads.delete(uploadId);
} else {
// console.log(`[SFTP Upload ${uploadId}] Internal cancel called, but upload state already removed.`);
}
}
}