This commit is contained in:
Baobhan Sith
2025-05-15 19:49:12 +08:00
parent b9d1a70b9a
commit d59794928a
4 changed files with 79 additions and 9 deletions
+34 -8
View File
@@ -64,6 +64,7 @@ interface ActiveUpload {
stream: WriteStream;
sessionId: string; // Link back to the session for cleanup
relativePath?: string;
drainPromise?: Promise<void> | null; // +++ For managing drain event listeners +++
}
export class SftpService {
@@ -1489,6 +1490,7 @@ export class SftpService {
stream,
sessionId,
relativePath, // +++ 存储 relativePath +++
drainPromise: null // +++ Initialize drainPromise +++
};
this.activeUploads.set(uploadId, uploadState);
@@ -1578,21 +1580,45 @@ export class SftpService {
if (!writeSuccess) {
// console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`);
if (!uploadState.drainPromise) {
// console.log(`[SFTP Upload ${uploadId}] Attaching new drain listener for chunk ${chunkIndex}`);
uploadState.drainPromise = new Promise<void>(resolve => {
uploadState.stream.once('drain', () => {
// console.log(`[SFTP Upload ${uploadId}] Drain event fired, resolving promise for chunk ${chunkIndex}`);
uploadState.drainPromise = null; // Reset for next time
resolve();
});
});
} else {
// console.log(`[SFTP Upload ${uploadId}] Waiting for existing drain promise for chunk ${chunkIndex}`);
}
try {
await new Promise<void>(resolve => uploadState.stream.once('drain', resolve));
// console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`);
await uploadState.drainPromise;
// console.log(`[SFTP Upload ${uploadId}] Resumed after drain for chunk ${chunkIndex}`);
} catch (drainError) {
// Should not happen with .once, but handle defensively
console.error(`[SFTP Upload ${uploadId}] Error waiting for drain event:`, drainError);
// Consider cancelling upload if waiting for drain fails critically
this.cancelUploadInternal(uploadId, 'Error waiting for drain');
throw drainError; // Re-throw to stop further processing in this chunk handler
console.error(`[SFTP Upload ${uploadId}] Error awaiting drain promise for chunk ${chunkIndex}:`, drainError);
this.cancelUploadInternal(uploadId, 'Error waiting for drain promise');
throw drainError;
}
}
uploadState.bytesWritten += chunkBuffer.length;
// +++ 发送进度更新 +++
if (state.ws && state.ws.readyState === WebSocket.OPEN) {
const progressPercent = Math.round((uploadState.bytesWritten / uploadState.totalSize) * 100);
// 将 uploadId 放在顶层,payload 中包含具体进度数据
state.ws.send(JSON.stringify({
type: 'sftp:upload:progress',
uploadId: uploadId,
payload: {
bytesWritten: uploadState.bytesWritten,
totalSize: uploadState.totalSize,
progress: Math.min(100, progressPercent) // 确保不超过100
}
}));
}
// --- 结束发送进度更新 ---
// Check if upload is complete
if (uploadState.bytesWritten > uploadState.totalSize) {