This commit is contained in:
Baobhan Sith
2025-05-15 20:17:31 +08:00
parent d59794928a
commit 0ce9f27fc6
3 changed files with 90 additions and 126 deletions
+47 -55
View File
@@ -1568,33 +1568,66 @@ export class SftpService {
const chunkBuffer = Buffer.from(dataBase64, 'base64');
const writeSuccess = uploadState.stream.write(chunkBuffer, (err) => {
if (err) {
// This callback handles errors specifically related to *this* write operation.
console.error(`[SFTP Upload ${uploadId}] Error writing chunk ${chunkIndex} to ${uploadState.remotePath}:`, err);
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `写入块 ${chunkIndex} 失败: ${err.message}` } }));
// Consider cancelling the upload on write error
this.cancelUploadInternal(uploadId, `Write error on chunk ${chunkIndex}`);
} else {
uploadState.bytesWritten += chunkBuffer.length;
if (state.ws && state.ws.readyState === WebSocket.OPEN) {
const progressPercent = Math.round((uploadState.bytesWritten / uploadState.totalSize) * 100);
state.ws.send(JSON.stringify({
type: 'sftp:upload:progress',
uploadId: uploadId,
payload: {
bytesWritten: uploadState.bytesWritten,
totalSize: uploadState.totalSize,
progress: Math.min(100, progressPercent)
}
}));
}
if (uploadState.bytesWritten >= uploadState.totalSize) {
if (!uploadState.stream.writableEnded) {
uploadState.stream.end((endErr: Error & { code?: string } | undefined) => {
const streamStateInEndCallback = uploadState?.stream;
if (endErr) {
if (endErr.code === 'ERR_STREAM_DESTROYED' && uploadState && uploadState.bytesWritten >= uploadState.totalSize) {
console.warn(`[SFTP Upload ${uploadId}] stream.end() CALLBACK reported ERR_STREAM_DESTROYED, but all bytes written. UploadId: ${uploadId}. Error:`, endErr);
console.log(`[SFTP Upload ${uploadId}] Treating ERR_STREAM_DESTROYED as non-fatal for this upload. Expecting 'close' event to finalize success for ${uploadState.remotePath}.`);
} else {
console.error(`[SFTP Upload ${uploadId}] Error from stream.end() CALLBACK for ${uploadState?.remotePath || 'unknown path'}:`, endErr);
if (state && state.ws) {
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `结束写入流时出错: ${endErr.message}` } }));
}
this.cancelUploadInternal(uploadId, `Stream end error: ${endErr.message}`, endErr);
}
}
});
}
}
}
});
if (!writeSuccess) {
// console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`);
if (!uploadState.drainPromise) {
// console.log(`[SFTP Upload ${uploadId}] Attaching new drain listener for chunk ${chunkIndex}`);
uploadState.drainPromise = new Promise<void>(resolve => {
uploadState.stream.once('drain', () => {
// console.log(`[SFTP Upload ${uploadId}] Drain event fired, resolving promise for chunk ${chunkIndex}`);
uploadState.drainPromise = null; // Reset for next time
uploadState.drainPromise = null;
resolve();
});
});
} else {
// console.log(`[SFTP Upload ${uploadId}] Waiting for existing drain promise for chunk ${chunkIndex}`);
}
try {
await uploadState.drainPromise;
// console.log(`[SFTP Upload ${uploadId}] Resumed after drain for chunk ${chunkIndex}`);
} catch (drainError) {
console.error(`[SFTP Upload ${uploadId}] Error awaiting drain promise for chunk ${chunkIndex}:`, drainError);
this.cancelUploadInternal(uploadId, 'Error waiting for drain promise');
@@ -1602,51 +1635,10 @@ export class SftpService {
}
}
uploadState.bytesWritten += chunkBuffer.length;
// +++ 发送进度更新 +++
if (state.ws && state.ws.readyState === WebSocket.OPEN) {
const progressPercent = Math.round((uploadState.bytesWritten / uploadState.totalSize) * 100);
// 将 uploadId 放在顶层,payload 中包含具体进度数据
state.ws.send(JSON.stringify({
type: 'sftp:upload:progress',
uploadId: uploadId,
payload: {
bytesWritten: uploadState.bytesWritten,
totalSize: uploadState.totalSize,
progress: Math.min(100, progressPercent) // 确保不超过100
}
}));
}
// --- 结束发送进度更新 ---
// Check if upload is complete
if (uploadState.bytesWritten > uploadState.totalSize) {
console.error(`[SFTP Upload ${uploadId}] Bytes written (${uploadState.bytesWritten}) exceeded total size (${uploadState.totalSize}) for ${uploadState.remotePath}.`);
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: '写入字节数超过文件总大小' } }));
this.cancelUploadInternal(uploadId, 'Bytes written exceeded total size');
} else if (uploadState.bytesWritten >= uploadState.totalSize) {
if (!uploadState.stream.writableEnded) {
uploadState.stream.end((endErr: Error & { code?: string } | undefined) => { // Added code to Error type hint
const streamStateInEndCallback = uploadState?.stream; // Re-fetch in case it changed
if (endErr) {
// Check if it's the specific ERR_STREAM_DESTROYED error and all bytes were written
if (endErr.code === 'ERR_STREAM_DESTROYED' && uploadState && uploadState.bytesWritten >= uploadState.totalSize) {
console.warn(`[SFTP Upload ${uploadId}] stream.end() CALLBACK reported ERR_STREAM_DESTROYED, but all bytes written. UploadId: ${uploadId}. Error:`, endErr);
console.log(`[SFTP Upload ${uploadId}] Treating ERR_STREAM_DESTROYED as non-fatal for this upload. Expecting 'close' event to finalize success for ${uploadState.remotePath}.`);
} else {
console.error(`[SFTP Upload ${uploadId}] Error from stream.end() CALLBACK for ${uploadState?.remotePath || 'unknown path'}:`, endErr);
if (state && state.ws) {
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `结束写入流时出错: ${endErr.message}` } }));
}
this.cancelUploadInternal(uploadId, `Stream end error: ${endErr.message}`, endErr);
}
}
});
}
}
} catch (error: any) {
console.error(`[SFTP Upload ${uploadId}] Error handling chunk ${chunkIndex} for ${uploadState?.remotePath}:`, error);
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `处理块 ${chunkIndex} 时出错: ${error.message}` } }));