fix: 修复文件上传因偏移量计算错误导致文件损坏
This commit is contained in:
@@ -1084,28 +1084,54 @@ export class SftpService {
|
|||||||
});
|
});
|
||||||
|
|
||||||
stream.on('close', () => {
|
stream.on('close', () => {
|
||||||
// This 'close' event now primarily handles cleanup after the stream is fully closed.
|
console.log(`[SFTP Upload ${uploadId}] Write stream 'close' event triggered for ${remotePath}.`);
|
||||||
// The success message is sent earlier in handleUploadChunk.
|
|
||||||
const finalState = this.activeUploads.get(uploadId);
|
const finalState = this.activeUploads.get(uploadId);
|
||||||
if (finalState) {
|
if (finalState) {
|
||||||
// Check if bytes written match total size upon close, log warning if not (could indicate cancellation after success msg sent)
|
// Check if the upload *should* have completed successfully based on bytes written
|
||||||
if (finalState.bytesWritten !== finalState.totalSize) {
|
if (finalState.bytesWritten >= finalState.totalSize) {
|
||||||
console.warn(`[SFTP Upload ${uploadId}] Write stream closed for ${remotePath}, but written bytes (${finalState.bytesWritten}) != total size (${finalState.totalSize}). This might happen if cancelled after success message was sent.`);
|
console.log(`[SFTP Upload ${uploadId}] Stream closed and bytesWritten >= totalSize. Fetching final stats...`);
|
||||||
// Optionally send an error if this state is unexpected, but success might have already been sent.
|
// Get final stats *after* the stream is closed
|
||||||
// state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: '文件大小不匹配或上传未完成' } }));
|
state.sftp!.lstat(finalState.remotePath, (statErr, stats) => {
|
||||||
} else {
|
if (statErr) {
|
||||||
console.log(`[SFTP Upload ${uploadId}] Write stream closed successfully for ${remotePath}. State cleaned up.`);
|
console.error(`[SFTP Upload ${uploadId}] lstat after stream close ${finalState.remotePath} failed:`, statErr);
|
||||||
}
|
// Send error if lstat fails after close
|
||||||
this.activeUploads.delete(uploadId); // Clean up state when stream is closed
|
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `获取最终文件状态失败: ${statErr.message}` } }));
|
||||||
|
} else {
|
||||||
|
// Ensure the size matches what we expect (or is at least the total size)
|
||||||
|
if (stats.size < finalState.totalSize) {
|
||||||
|
console.error(`[SFTP Upload ${uploadId}] Final file size (${stats.size}) is less than expected total size (${finalState.totalSize}) after stream close.`);
|
||||||
|
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `最终文件大小 (${stats.size}) 小于预期 (${finalState.totalSize})` } }));
|
||||||
|
} else {
|
||||||
|
const finalStatsPayload = {
|
||||||
|
filename: finalState.remotePath.substring(finalState.remotePath.lastIndexOf('/') + 1),
|
||||||
|
longname: '',
|
||||||
|
attrs: {
|
||||||
|
size: stats.size, uid: stats.uid, gid: stats.gid, mode: stats.mode,
|
||||||
|
atime: stats.atime * 1000, mtime: stats.mtime * 1000,
|
||||||
|
isDirectory: stats.isDirectory(), isFile: stats.isFile(), isSymbolicLink: stats.isSymbolicLink(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
console.log(`[SFTP Upload ${uploadId}] Upload appears complete. Final size: ${stats.size}. Sending success message NOW.`);
|
||||||
|
// *** SEND SUCCESS MESSAGE HERE ***
|
||||||
|
state.ws.send(JSON.stringify({ type: 'sftp:upload:success', payload: finalStatsPayload, uploadId: uploadId, path: finalState.remotePath }));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Clean up state regardless of lstat success/failure after close
|
||||||
|
this.activeUploads.delete(uploadId);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
// Stream closed, but not all bytes were written (likely due to cancellation or error)
|
||||||
|
console.warn(`[SFTP Upload ${uploadId}] Write stream closed, but written bytes (${finalState.bytesWritten}) < total size (${finalState.totalSize}). Upload likely failed or was cancelled before completion.`);
|
||||||
|
this.activeUploads.delete(uploadId); // Clean up state
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
console.log(`[SFTP Upload ${uploadId}] Write stream closed for ${remotePath}, but upload state was already removed.`);
|
console.log(`[SFTP Upload ${uploadId}] Write stream closed, but upload state was already removed (likely cancelled earlier).`);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
stream.on('finish', () => {
|
stream.on('finish', () => {
|
||||||
// The 'finish' event fires when stream.end() is called and all data has been flushed to the underlying system.
|
// The 'finish' event fires when stream.end() is called and all data has been flushed.
|
||||||
// This might be a slightly earlier point than 'close'. Let's log it.
|
console.log(`[SFTP Upload ${uploadId}] Write stream 'finish' event triggered for ${remotePath}. Waiting for 'close'.`);
|
||||||
console.log(`[SFTP Upload ${uploadId}] Write stream finished for ${remotePath}. Waiting for close.`);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
@@ -1120,7 +1146,8 @@ export class SftpService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Handle an incoming file chunk */
|
/** Handle an incoming file chunk */
|
||||||
handleUploadChunk(sessionId: string, uploadId: string, chunkIndex: number, dataBase64: string): void {
|
// --- FIX: Make async to handle await for drain ---
|
||||||
|
async handleUploadChunk(sessionId: string, uploadId: string, chunkIndex: number, dataBase64: string): Promise<void> {
|
||||||
const state = this.clientStates.get(sessionId);
|
const state = this.clientStates.get(sessionId);
|
||||||
const uploadState = this.activeUploads.get(uploadId);
|
const uploadState = this.activeUploads.get(uploadId);
|
||||||
|
|
||||||
@@ -1163,6 +1190,40 @@ export class SftpService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// --- FIX: Handle backpressure ---
|
||||||
|
if (!writeSuccess) {
|
||||||
|
console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`);
|
||||||
|
try {
|
||||||
|
await new Promise<void>(resolve => uploadState.stream.once('drain', resolve));
|
||||||
|
console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`);
|
||||||
|
} catch (drainError) {
|
||||||
|
// Should not happen with .once, but handle defensively
|
||||||
|
console.error(`[SFTP Upload ${uploadId}] Error waiting for drain event:`, drainError);
|
||||||
|
// Consider cancelling upload if waiting for drain fails critically
|
||||||
|
this.cancelUploadInternal(uploadId, 'Error waiting for drain');
|
||||||
|
throw drainError; // Re-throw to stop further processing in this chunk handler
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// --- End Backpressure Fix ---
|
||||||
|
|
||||||
|
|
||||||
|
// --- FIX: Handle backpressure ---
|
||||||
|
if (!writeSuccess) {
|
||||||
|
console.warn(`[SFTP Upload ${uploadId}] Write stream buffer full after chunk ${chunkIndex}. Pausing chunk processing until 'drain'.`);
|
||||||
|
try {
|
||||||
|
await new Promise<void>(resolve => uploadState.stream.once('drain', resolve));
|
||||||
|
console.log(`[SFTP Upload ${uploadId}] Write stream drained after chunk ${chunkIndex}. Resuming chunk processing.`);
|
||||||
|
} catch (drainError) {
|
||||||
|
// Should not happen with .once, but handle defensively
|
||||||
|
console.error(`[SFTP Upload ${uploadId}] Error waiting for drain event:`, drainError);
|
||||||
|
// Consider cancelling upload if waiting for drain fails critically
|
||||||
|
this.cancelUploadInternal(uploadId, 'Error waiting for drain');
|
||||||
|
throw drainError; // Re-throw to stop further processing in this chunk handler
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// --- End Backpressure Fix ---
|
||||||
|
|
||||||
|
|
||||||
uploadState.bytesWritten += chunkBuffer.length;
|
uploadState.bytesWritten += chunkBuffer.length;
|
||||||
|
|
||||||
// Send progress (optional, consider throttling)
|
// Send progress (optional, consider throttling)
|
||||||
@@ -1175,39 +1236,26 @@ export class SftpService {
|
|||||||
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: '写入字节数超过文件总大小' } }));
|
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: '写入字节数超过文件总大小' } }));
|
||||||
this.cancelUploadInternal(uploadId, 'Bytes written exceeded total size');
|
this.cancelUploadInternal(uploadId, 'Bytes written exceeded total size');
|
||||||
|
|
||||||
} else if (uploadState.bytesWritten === uploadState.totalSize) {
|
// --- FIX: Check if bytesWritten is >= totalSize ---
|
||||||
console.log(`[SFTP Upload ${uploadId}] All bytes (${uploadState.bytesWritten}) received for ${uploadState.remotePath}. Fetching stats before sending success...`);
|
} else if (uploadState.bytesWritten >= uploadState.totalSize) {
|
||||||
|
// Prevent this block from running multiple times if more data arrives somehow
|
||||||
// Get stats for the newly uploaded file before sending success
|
if (!uploadState.stream.writableEnded) {
|
||||||
state.sftp!.lstat(uploadState.remotePath, (statErr, stats) => {
|
console.log(`[SFTP Upload ${uploadId}] All expected bytes received (or exceeded). Written: ${uploadState.bytesWritten}, Total: ${uploadState.totalSize}. Ending stream.`);
|
||||||
let newItemPayload: any = null; // Default to null payload
|
// End the stream now that all data is received
|
||||||
if (statErr) {
|
uploadState.stream.end((endErr: Error | undefined) => {
|
||||||
console.error(`[SFTP Upload ${uploadId}] lstat after upload ${uploadState.remotePath} failed:`, statErr);
|
if (endErr) {
|
||||||
// Still send success, but with null payload as item details are unavailable
|
console.error(`[SFTP Upload ${uploadId}] Error ending write stream for ${uploadState.remotePath}:`, endErr);
|
||||||
} else {
|
// If ending fails, send an error (the 'close' event might not fire reliably after error)
|
||||||
newItemPayload = {
|
if (state && state.ws) { // Check if state and ws still exist
|
||||||
filename: uploadState.remotePath.substring(uploadState.remotePath.lastIndexOf('/') + 1),
|
state.ws.send(JSON.stringify({ type: 'sftp:upload:error', payload: { uploadId, message: `结束写入流时出错: ${endErr.message}` } }));
|
||||||
longname: '', // lstat doesn't provide longname
|
|
||||||
attrs: {
|
|
||||||
size: stats.size, uid: stats.uid, gid: stats.gid, mode: stats.mode,
|
|
||||||
atime: stats.atime * 1000, mtime: stats.mtime * 1000,
|
|
||||||
isDirectory: stats.isDirectory(), isFile: stats.isFile(), isSymbolicLink: stats.isSymbolicLink(),
|
|
||||||
}
|
}
|
||||||
};
|
this.cancelUploadInternal(uploadId, `Stream end error`);
|
||||||
console.log(`[SFTP Upload ${uploadId}] Sending upload success with new item details for ${uploadState.remotePath}`);
|
} else {
|
||||||
}
|
console.log(`[SFTP Upload ${uploadId}] stream.end() called successfully. Waiting for 'close' event.`);
|
||||||
// Send success message with the newItem payload (or null if lstat failed)
|
// Success message will be sent in the 'close' handler
|
||||||
state.ws.send(JSON.stringify({ type: 'sftp:upload:success', payload: newItemPayload, uploadId: uploadId, path: uploadState.remotePath })); // Include uploadId and path for frontend context
|
}
|
||||||
|
|
||||||
// End the stream *after* lstat completes and success message is sent
|
|
||||||
uploadState.stream.end((endErr: Error | undefined) => { // Add type annotation
|
|
||||||
if (endErr) {
|
|
||||||
console.error(`[SFTP Upload ${uploadId}] Error ending write stream after success for ${uploadState.remotePath}:`, endErr);
|
|
||||||
} else {
|
|
||||||
console.log(`[SFTP Upload ${uploadId}] Write stream ended successfully after success for ${uploadState.remotePath}.`);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
|
|||||||
@@ -1173,7 +1173,8 @@ connectionName: connInfo?.name || 'Unknown', // 添加连接名称 (使用可选
|
|||||||
console.error(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求,但缺少 uploadId, chunkIndex 或 data。`);
|
console.error(`WebSocket: 收到来自 ${ws.username} (会话: ${sessionId}) 的 ${type} 请求,但缺少 uploadId, chunkIndex 或 data。`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
sftpService.handleUploadChunk(sessionId, payload.uploadId, payload.chunkIndex, payload.data);
|
// --- FIX: Add await for async backpressure handling ---
|
||||||
|
await sftpService.handleUploadChunk(sessionId, payload.uploadId, payload.chunkIndex, payload.data);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'sftp:upload:cancel': {
|
case 'sftp:upload:cancel': {
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ export function useFileUploader(
|
|||||||
const reader = new FileReader();
|
const reader = new FileReader();
|
||||||
let offset = startByte;
|
let offset = startByte;
|
||||||
let chunkIndex = 0; // Initialize chunk index counter
|
let chunkIndex = 0; // Initialize chunk index counter
|
||||||
|
let currentChunkSize = 0; // Store the size of the chunk being processed
|
||||||
|
|
||||||
reader.onload = (e) => {
|
reader.onload = (e) => {
|
||||||
const currentUpload = uploads[uploadId];
|
const currentUpload = uploads[uploadId];
|
||||||
@@ -72,8 +73,8 @@ export function useFileUploader(
|
|||||||
payload: { uploadId, chunkIndex: chunkIndex++, data: chunkBase64, isLast } // Add and increment chunkIndex
|
payload: { uploadId, chunkIndex: chunkIndex++, data: chunkBase64, isLast } // Add and increment chunkIndex
|
||||||
});
|
});
|
||||||
|
|
||||||
// 注意:直接使用 base64 长度估算字节大小并不完全准确,但对于进度条来说足够了
|
// --- FIX: Update offset based on the actual chunk size that was read ---
|
||||||
offset += chunkBase64.length * 3 / 4;
|
offset += currentChunkSize; // Use the stored size of the slice
|
||||||
currentUpload.progress = Math.min(100, Math.round((offset / file.size) * 100));
|
currentUpload.progress = Math.min(100, Math.round((offset / file.size) * 100));
|
||||||
|
|
||||||
if (!isLast) {
|
if (!isLast) {
|
||||||
@@ -105,6 +106,7 @@ export function useFileUploader(
|
|||||||
// 读取下一个块之前再次检查状态
|
// 读取下一个块之前再次检查状态
|
||||||
if (offset < file.size && uploads[uploadId]?.status === 'uploading') {
|
if (offset < file.size && uploads[uploadId]?.status === 'uploading') {
|
||||||
const slice = file.slice(offset, offset + chunkSize);
|
const slice = file.slice(offset, offset + chunkSize);
|
||||||
|
currentChunkSize = slice.size; // Store the actual size of the slice being read
|
||||||
reader.readAsDataURL(slice);
|
reader.readAsDataURL(slice);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user