update
This commit is contained in:
@@ -1,6 +1,14 @@
|
||||
import { Client, SFTPWrapper, Stats, WriteStream } from 'ssh2'; // Import WriteStream
|
||||
import { Client, SFTPWrapper, Stats, WriteStream } from 'ssh2'; // Import WriteStream (Removed Dirent)
|
||||
import { WebSocket } from 'ws';
|
||||
import { ClientState } from '../websocket'; // 导入统一的 ClientState
|
||||
import * as pathModule from 'path'; // +++ Import path module +++
|
||||
|
||||
// +++ Define local interface for readdir results +++
|
||||
interface SftpDirEntry {
|
||||
filename: string;
|
||||
longname: string;
|
||||
attrs: Stats;
|
||||
}
|
||||
|
||||
// 定义服务器状态的数据结构 (与前端 StatusMonitor.vue 匹配)
|
||||
// Note: This interface seems out of place here, but keeping it for now as it was in the original file.
|
||||
@@ -513,6 +521,286 @@ export class SftpService {
|
||||
}
|
||||
}
|
||||
|
||||
// +++ 新增:复制文件或目录 +++
|
||||
async copy(sessionId: string, sources: string[], destinationDir: string, requestId: string): Promise<void> {
|
||||
const state = this.clientStates.get(sessionId);
|
||||
if (!state || !state.sftp) {
|
||||
console.warn(`[SFTP Copy] SFTP 未准备好,无法在 ${sessionId} 上执行 copy (ID: ${requestId})`);
|
||||
state?.ws.send(JSON.stringify({ type: 'sftp:copy:error', payload: 'SFTP 会话未就绪', requestId: requestId }));
|
||||
return;
|
||||
}
|
||||
const sftp = state.sftp;
|
||||
console.debug(`[SFTP ${sessionId}] Received copy request (ID: ${requestId}) Sources: ${sources.join(', ')}, Dest: ${destinationDir}`);
|
||||
|
||||
const copiedItemsDetails: any[] = []; // Store details of successfully copied items
|
||||
let firstError: Error | null = null;
|
||||
|
||||
try {
|
||||
// Ensure destination directory exists
|
||||
try {
|
||||
await this.ensureDirectoryExists(sftp, destinationDir);
|
||||
} catch (ensureErr: any) {
|
||||
console.error(`[SFTP ${sessionId}] Failed to ensure destination directory ${destinationDir} exists (ID: ${requestId}):`, ensureErr);
|
||||
throw new Error(`无法创建或访问目标目录: ${ensureErr.message}`);
|
||||
}
|
||||
|
||||
for (const sourcePath of sources) {
|
||||
const sourceName = pathModule.basename(sourcePath);
|
||||
const destPath = pathModule.join(destinationDir, sourceName).replace(/\\/g, '/'); // Ensure forward slashes
|
||||
|
||||
if (sourcePath === destPath) {
|
||||
console.warn(`[SFTP ${sessionId}] Skipping copy: source and destination are the same (${sourcePath}) (ID: ${requestId})`);
|
||||
continue; // Skip if source and destination are identical
|
||||
}
|
||||
|
||||
try {
|
||||
const stats = await this.getStats(sftp, sourcePath);
|
||||
if (stats.isDirectory()) {
|
||||
console.log(`[SFTP ${sessionId}] Copying directory ${sourcePath} to ${destPath} (ID: ${requestId})`);
|
||||
await this.copyDirectoryRecursive(sftp, sourcePath, destPath);
|
||||
} else if (stats.isFile()) {
|
||||
console.log(`[SFTP ${sessionId}] Copying file ${sourcePath} to ${destPath} (ID: ${requestId})`);
|
||||
await this.copyFile(sftp, sourcePath, destPath);
|
||||
} else {
|
||||
// Handle symlinks or other types if necessary, for now just skip/warn
|
||||
console.warn(`[SFTP ${sessionId}] Skipping copy of unsupported file type: ${sourcePath} (ID: ${requestId})`);
|
||||
continue;
|
||||
}
|
||||
// Get stats of the *newly copied* item
|
||||
const copiedStats = await this.getStats(sftp, destPath);
|
||||
copiedItemsDetails.push(this.formatStatsToFileListItem(destPath, copiedStats));
|
||||
|
||||
} catch (copyErr: any) {
|
||||
console.error(`[SFTP ${sessionId}] Error copying ${sourcePath} to ${destPath} (ID: ${requestId}):`, copyErr);
|
||||
firstError = copyErr; // Store the first error encountered
|
||||
break; // Stop processing further sources on error
|
||||
}
|
||||
}
|
||||
|
||||
if (firstError) {
|
||||
throw firstError; // Throw the first error to be caught below
|
||||
}
|
||||
|
||||
// Send success message with details of copied items
|
||||
console.log(`[SFTP ${sessionId}] Copy operation completed successfully (ID: ${requestId}). Copied items: ${copiedItemsDetails.length}`);
|
||||
state.ws.send(JSON.stringify({
|
||||
type: 'sftp:copy:success',
|
||||
payload: { destination: destinationDir, items: copiedItemsDetails },
|
||||
requestId: requestId
|
||||
}));
|
||||
|
||||
} catch (error: any) {
|
||||
console.error(`[SFTP ${sessionId}] Copy operation failed (ID: ${requestId}):`, error);
|
||||
state.ws.send(JSON.stringify({ type: 'sftp:copy:error', payload: `复制操作失败: ${error.message}`, requestId: requestId }));
|
||||
}
|
||||
}
|
||||
|
||||
// +++ 新增:移动文件或目录 +++
|
||||
async move(sessionId: string, sources: string[], destinationDir: string, requestId: string): Promise<void> {
|
||||
const state = this.clientStates.get(sessionId);
|
||||
if (!state || !state.sftp) {
|
||||
console.warn(`[SFTP Move] SFTP 未准备好,无法在 ${sessionId} 上执行 move (ID: ${requestId})`);
|
||||
state?.ws.send(JSON.stringify({ type: 'sftp:move:error', payload: 'SFTP 会话未就绪', requestId: requestId }));
|
||||
return;
|
||||
}
|
||||
const sftp = state.sftp;
|
||||
console.debug(`[SFTP ${sessionId}] Received move request (ID: ${requestId}) Sources: ${sources.join(', ')}, Dest: ${destinationDir}`);
|
||||
|
||||
const movedItemsDetails: any[] = [];
|
||||
let firstError: Error | null = null;
|
||||
|
||||
try {
|
||||
// Ensure destination directory exists (important for move)
|
||||
try {
|
||||
await this.ensureDirectoryExists(sftp, destinationDir);
|
||||
} catch (ensureErr: any) {
|
||||
console.error(`[SFTP ${sessionId}] Failed to ensure destination directory ${destinationDir} exists for move (ID: ${requestId}):`, ensureErr);
|
||||
throw new Error(`无法创建或访问目标目录: ${ensureErr.message}`);
|
||||
}
|
||||
|
||||
for (const oldPath of sources) {
|
||||
const sourceName = pathModule.basename(oldPath);
|
||||
const newPath = pathModule.join(destinationDir, sourceName).replace(/\\/g, '/'); // Ensure forward slashes
|
||||
|
||||
if (oldPath === newPath) {
|
||||
console.warn(`[SFTP ${sessionId}] Skipping move: source and destination are the same (${oldPath}) (ID: ${requestId})`);
|
||||
continue; // Skip if source and destination are identical
|
||||
}
|
||||
|
||||
try {
|
||||
console.log(`[SFTP ${sessionId}] Moving ${oldPath} to ${newPath} (ID: ${requestId})`);
|
||||
await this.performRename(sftp, oldPath, newPath); // Use helper for rename logic
|
||||
|
||||
// Get stats of the *moved* item at the new location
|
||||
const movedStats = await this.getStats(sftp, newPath);
|
||||
movedItemsDetails.push(this.formatStatsToFileListItem(newPath, movedStats));
|
||||
|
||||
} catch (moveErr: any) {
|
||||
console.error(`[SFTP ${sessionId}] Error moving ${oldPath} to ${newPath} (ID: ${requestId}):`, moveErr);
|
||||
firstError = moveErr;
|
||||
break; // Stop on first error for move
|
||||
}
|
||||
}
|
||||
|
||||
if (firstError) {
|
||||
throw firstError;
|
||||
}
|
||||
|
||||
console.log(`[SFTP ${sessionId}] Move operation completed successfully (ID: ${requestId}). Moved items: ${movedItemsDetails.length}`);
|
||||
state.ws.send(JSON.stringify({
|
||||
type: 'sftp:move:success',
|
||||
payload: { sources: sources, destination: destinationDir, items: movedItemsDetails },
|
||||
requestId: requestId
|
||||
}));
|
||||
|
||||
} catch (error: any) {
|
||||
console.error(`[SFTP ${sessionId}] Move operation failed (ID: ${requestId}):`, error);
|
||||
state.ws.send(JSON.stringify({ type: 'sftp:move:error', payload: `移动操作失败: ${error.message}`, requestId: requestId }));
|
||||
}
|
||||
}
|
||||
|
||||
// +++ 新增:辅助方法 - 复制文件 +++
|
||||
private copyFile(sftp: SFTPWrapper, sourcePath: string, destPath: string): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const readStream = sftp.createReadStream(sourcePath);
|
||||
const writeStream = sftp.createWriteStream(destPath);
|
||||
let errorOccurred = false;
|
||||
|
||||
const onError = (err: Error) => {
|
||||
if (errorOccurred) return;
|
||||
errorOccurred = true;
|
||||
// Ensure streams are destroyed on error
|
||||
readStream.destroy();
|
||||
writeStream.destroy();
|
||||
console.error(`Error copying file ${sourcePath} to ${destPath}:`, err);
|
||||
reject(new Error(`复制文件失败: ${err.message}`));
|
||||
};
|
||||
|
||||
readStream.on('error', onError);
|
||||
writeStream.on('error', onError);
|
||||
|
||||
writeStream.on('close', () => { // Use 'close' for write stream completion
|
||||
if (!errorOccurred) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
readStream.pipe(writeStream);
|
||||
});
|
||||
}
|
||||
|
||||
// +++ 新增:辅助方法 - 递归复制目录 +++
|
||||
private async copyDirectoryRecursive(sftp: SFTPWrapper, sourcePath: string, destPath: string): Promise<void> {
|
||||
try {
|
||||
// Create destination directory
|
||||
await this.ensureDirectoryExists(sftp, destPath);
|
||||
|
||||
// Read source directory contents
|
||||
const items = await this.listDirectory(sftp, sourcePath);
|
||||
|
||||
for (const item of items) {
|
||||
const currentSourcePath = pathModule.join(sourcePath, item.filename).replace(/\\/g, '/');
|
||||
const currentDestPath = pathModule.join(destPath, item.filename).replace(/\\/g, '/');
|
||||
const itemStats = item.attrs; // Assuming readdir provides stats
|
||||
|
||||
if (itemStats.isDirectory()) {
|
||||
await this.copyDirectoryRecursive(sftp, currentSourcePath, currentDestPath);
|
||||
} else if (itemStats.isFile()) {
|
||||
await this.copyFile(sftp, currentSourcePath, currentDestPath);
|
||||
} else {
|
||||
console.warn(`[SFTP Copy Recurse] Skipping unsupported type: ${currentSourcePath}`);
|
||||
}
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.error(`Error recursively copying directory ${sourcePath} to ${destPath}:`, error);
|
||||
throw new Error(`递归复制目录失败: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// +++ 新增:辅助方法 - 获取 Stats (Promise wrapper) +++
|
||||
private getStats(sftp: SFTPWrapper, path: string): Promise<Stats> {
|
||||
return new Promise((resolve, reject) => {
|
||||
sftp.lstat(path, (err, stats) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(stats);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// +++ 新增:辅助方法 - 确保目录存在 (Promise wrapper) +++
|
||||
private ensureDirectoryExists(sftp: SFTPWrapper, dirPath: string): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
sftp.stat(dirPath, (err: any, stats) => { // Cast err to any
|
||||
if (err) {
|
||||
// If error is 'No such file', create the directory
|
||||
// Access err.code after casting to any
|
||||
if (err.code === 'ENOENT' || (err.message && err.message.includes('No such file'))) {
|
||||
sftp.mkdir(dirPath, (mkdirErr) => {
|
||||
if (mkdirErr) {
|
||||
reject(new Error(`创建目录失败 ${dirPath}: ${mkdirErr.message}`));
|
||||
} else {
|
||||
console.log(`[SFTP Util] Created directory: ${dirPath}`);
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
// Other stat error
|
||||
reject(new Error(`检查目录失败 ${dirPath}: ${err.message}`));
|
||||
}
|
||||
} else if (!stats.isDirectory()) {
|
||||
// Path exists but is not a directory
|
||||
reject(new Error(`路径 ${dirPath} 已存在但不是目录`));
|
||||
} else {
|
||||
// Directory already exists
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// +++ 新增:辅助方法 - 列出目录内容 (Promise wrapper) +++
|
||||
private listDirectory(sftp: SFTPWrapper, path: string): Promise<SftpDirEntry[]> { // 使用本地接口 SftpDirEntry
|
||||
return new Promise((resolve, reject) => {
|
||||
sftp.readdir(path, (err, list) => { // list 的类型现在是 SftpDirEntry[]
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(list);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// +++ 新增:辅助方法 - 执行重命名 (Promise wrapper) +++
|
||||
private performRename(sftp: SFTPWrapper, oldPath: string, newPath: string): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
sftp.rename(oldPath, newPath, (err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// +++ 新增:辅助方法 - 格式化 Stats 为 FileListItem +++
|
||||
private formatStatsToFileListItem(itemPath: string, stats: Stats): any {
|
||||
return {
|
||||
filename: pathModule.basename(itemPath),
|
||||
longname: '', // stat doesn't provide longname, maybe generate a basic one?
|
||||
attrs: {
|
||||
size: stats.size, uid: stats.uid, gid: stats.gid, mode: stats.mode,
|
||||
atime: stats.atime * 1000, mtime: stats.mtime * 1000,
|
||||
isDirectory: stats.isDirectory(), isFile: stats.isFile(), isSymbolicLink: stats.isSymbolicLink(),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
// --- File Upload Methods ---
|
||||
|
||||
/** Start a new file upload */
|
||||
|
||||
@@ -988,7 +988,10 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
case 'sftp:unlink':
|
||||
case 'sftp:rename':
|
||||
case 'sftp:chmod':
|
||||
case 'sftp:realpath': {
|
||||
case 'sftp:realpath':
|
||||
case 'sftp:copy':
|
||||
case 'sftp:move':
|
||||
{ // Keep the outer grouping for common checks
|
||||
if (!sessionId || !state) {
|
||||
console.warn(`WebSocket: 收到来自 ${ws.username} 的 SFTP 请求 (${type}),但无活动会话。`);
|
||||
const errPayload: { message: string; requestId?: string } = { message: '无效的会话' };
|
||||
@@ -1046,7 +1049,23 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
if (payload?.path) sftpService.realpath(sessionId, payload.path, requestId);
|
||||
else throw new Error("Missing 'path' in payload for realpath");
|
||||
break;
|
||||
default: throw new Error(`Unhandled SFTP type: ${type}`);
|
||||
// Cases for copy and move are now handled within this inner switch
|
||||
case 'sftp:copy':
|
||||
if (Array.isArray(payload?.sources) && payload?.destination) {
|
||||
sftpService.copy(sessionId, payload.sources, payload.destination, requestId);
|
||||
} else throw new Error("Missing 'sources' (array) or 'destination' in payload for copy");
|
||||
break;
|
||||
case 'sftp:move':
|
||||
if (Array.isArray(payload?.sources) && payload?.destination) {
|
||||
sftpService.move(sessionId, payload.sources, payload.destination, requestId);
|
||||
} else throw new Error("Missing 'sources' (array) or 'destination' in payload for move");
|
||||
break;
|
||||
default:
|
||||
// Only throw error if the type wasn't handled by any SFTP case
|
||||
console.warn(`WebSocket: Received unhandled SFTP message type inside SFTP block: ${type}`);
|
||||
// Optionally send a specific error back, or rely on the outer catch
|
||||
// ws.send(JSON.stringify({ type: 'sftp_error', payload: { message: `内部未处理的 SFTP 类型: ${type}`, requestId } }));
|
||||
throw new Error(`Unhandled SFTP type: ${type}`); // Keep throwing for the outer catch
|
||||
}
|
||||
} catch (sftpCallError: any) {
|
||||
console.error(`WebSocket: Error preparing/calling SFTP service for ${type} (Request ID: ${requestId}):`, sftpCallError);
|
||||
@@ -1197,3 +1216,4 @@ export const initializeWebSocket = async (server: http.Server, sessionParser: Re
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user