-
Notifications
You must be signed in to change notification settings - Fork 8
fix: improve FTPClient #304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -198,13 +198,15 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata> | |
| } else throw new Error(`getPackageActualVersion: ${response.reason.user}: ${response.reason.tech}`) | ||
| } | ||
| async ensurePackageFulfilled(): Promise<void> { | ||
| await this.fileHandler.clearPackageRemoval(this.filePath) | ||
| if ((this.workOptions.removeDelay ?? -1) >= 0) { | ||
| // Only handle this if there is a removeDelay set, otherwise we can skip it to save time: | ||
| await this.fileHandler.clearPackageRemoval(this.filePath) | ||
| } | ||
| } | ||
| async removePackage(reason: string): Promise<void> { | ||
| await this.fileHandler.handleRemovePackage(this.filePath, this.packageName, reason) | ||
| } | ||
| async getPackageReadStream(): Promise<PackageReadStream> { | ||
| // important that this is a 'read', so that it doesn't go into a deadlock with putPackageStream() in case of an upload/download to the same accessorPackageContainer | ||
| const ftp = await this.prepareFTPClient() | ||
|
|
||
| const response = await ftp.download(this.fullPath) | ||
|
|
@@ -267,7 +269,10 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata> | |
| operationName: string, | ||
| source: string | GenericAccessorHandle<any> | ||
| ): Promise<PackageOperation> { | ||
| await this.fileHandler.clearPackageRemoval(this.filePath) | ||
| if ((this.workOptions.removeDelay ?? -1) >= 0) { | ||
| // Only handle this if there is a removeDelay set, otherwise we can skip it to save time: | ||
| await this.fileHandler.clearPackageRemoval(this.filePath) | ||
| } | ||
| return this.logWorkOperation(operationName, source, this.packageName) | ||
| } | ||
| async finalizePackage(operation: PackageOperation): Promise<void> { | ||
|
|
@@ -482,18 +487,26 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata> | |
| [accessorType: string]: CachedClients | undefined | ||
| } | ||
|
|
||
| const cacheKey = JSON.stringify([ | ||
| this.accessorId, | ||
| ftpOptions.serverType, | ||
| ftpOptions.host, | ||
| ftpOptions.port, | ||
| this.accessor.basePath ?? '/', | ||
| ]) | ||
| const cacheKey = JSON.stringify([this.accessorId, options, this.accessor.basePath ?? '/']) | ||
|
|
||
| let cachedClients = accessorCache[cacheKey] | ||
| if (cachedClients) { | ||
| // Check that options matches: | ||
| if (!isEqual(cachedClients.options, options)) { | ||
| // It the options don't match, something is wrong with the cacheKey | ||
|
|
||
| this.worker.logger.error( | ||
| `Something is wrong with the FTP client cacheKey. The options do not match the cacheKey. Deleting cached clients for this key. cacheKey: ${cacheKey}, options: ${JSON.stringify( | ||
| { | ||
| options, | ||
| password: '***', | ||
| } | ||
| )}, cachedOptions: ${JSON.stringify({ | ||
| ...cachedClients.options, | ||
| password: '***', | ||
| })}` | ||
| ) | ||
|
Comment on lines
+496
to
+508
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid logging FTP credentials in cache-mismatch error.
🔒 Proposed fix — redact password before logging (apply to both call sites)+ const safeOptions = { ...options, password: options.password ? '***' : options.password }
+ const safeCacheKey = JSON.stringify([this.accessorId, safeOptions, this.accessor.basePath ?? '/'])
+
let cachedClients = accessorCache[cacheKey]
if (cachedClients) {
// Check that options matches:
if (!isEqual(cachedClients.options, options)) {
- // It the options don't match, something is wrong with the cacheKey
-
- this.worker.logger.error(
- `Something is wrong with the FTP client cacheKey. The options do not match the cacheKey. Deleting cached clients for this key. cacheKey: ${cacheKey}, options: ${JSON.stringify(
- options
- )}, cachedOptions: ${JSON.stringify(cachedClients.options)}`
- )
+ // If the options don't match, something is wrong with the cacheKey
+ const safeCachedOptions = {
+ ...cachedClients.options,
+ password: cachedClients.options.password ? '***' : cachedClients.options.password,
+ }
+ this.worker.logger.error(
+ `Something is wrong with the FTP client cacheKey. The options do not match the cacheKey. Deleting cached clients for this key. cacheKey: ${safeCacheKey}, options: ${JSON.stringify(
+ safeOptions
+ )}, cachedOptions: ${JSON.stringify(safeCachedOptions)}`
+ )And at line 542: - this.worker.logger.info(`Creating new FTP client for purpose="${purpose}", ${JSON.stringify(options)}`)
+ this.worker.logger.info(`Creating new FTP client for purpose="${purpose}", ${JSON.stringify(safeOptions)}`)Also consider fixing the typo "It the options" → "If the options" on line 496. 🤖 Prompt for AI Agents |
||
|
|
||
| for (const c of cachedClients.clients) { | ||
| await c.client.destroy() | ||
| } | ||
|
|
@@ -532,9 +545,10 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata> | |
| } | ||
|
|
||
| if (!cachedClient) { | ||
| this.worker.logger.info(`Creating new FTP client for purpose="${purpose}", ${JSON.stringify(options)}`) | ||
| // Set up a new FTP client: | ||
| cachedClient = { | ||
| client: createFTPClient(ftpOptions.serverType, this.worker.logger, options), | ||
| client: createFTPClient(options.serverType, this.worker.logger, options), | ||
| purpose: purpose, | ||
| } | ||
| cachedClients.clients.push(cachedClient) | ||
|
|
@@ -580,6 +594,11 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata> | |
| return ftp.downloadContent(fullPath) | ||
| } | ||
| async readFileIfExists(fullPath: string): Promise<Buffer | undefined> { | ||
| // On FTP, it is a much lighter operation to check if the file exists that reading it, | ||
| // so we'll do that first: | ||
| const exists = await this.fileExists(fullPath) | ||
| if (!exists) return undefined | ||
|
|
||
| try { | ||
| return await this.readFile(fullPath) | ||
| } catch (e) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,7 @@ | |
|
|
||
| /** A FTP Client that supports FTP & FTPS (FTP over TLS) connections. */ | ||
| export class FTPClient extends FTPClientBase { | ||
| private client: FTP.Client | ||
| private client: SafeFTPClient | ||
|
|
||
| private initializing: Promise<void> | null = null | ||
| public destroyed = false | ||
|
|
@@ -23,49 +23,8 @@ | |
|
|
||
| this.client = this.createFTPClient() | ||
| } | ||
| private createFTPClient(): FTP.Client { | ||
| const ftpClient = new FTP.Client() | ||
|
|
||
| // Because the FTP.Client methods needs to be called ONLY one at a time, | ||
| // we do a hack to override its methods with a promise queue: | ||
| const methodsToOverride: (keyof FTP.Client)[] = [ | ||
| 'access', | ||
| // 'close', | ||
| // 'closed', | ||
| 'downloadTo', | ||
| 'ensureDir', | ||
| 'lastMod', | ||
| 'list', | ||
| 'remove', | ||
| 'removeDir', | ||
| 'rename', | ||
| 'size', | ||
| // 'trackProgress', | ||
| 'uploadFrom', | ||
| ] | ||
|
|
||
| const queue = new PQueue({ concurrency: 1 }) | ||
|
|
||
| for (const method of methodsToOverride) { | ||
| const orgMethod = (ftpClient as any)[method] as (...args: any[]) => Promise<any> | ||
| const newMethod = async (...args: any[]) => { | ||
| return queue.add(async () => { | ||
| const orgError = new Error(`Error executing ${method}: ${JSON.stringify(args)}`) // Used later | ||
| try { | ||
| const result = await orgMethod.apply(ftpClient, args) | ||
| return result | ||
| } catch (e) { | ||
| if (typeof e === 'object' && e !== null && 'stack' in e) { | ||
| e.stack += `\nOriginal stack: ${orgError.stack}` | ||
| } | ||
| throw e | ||
| } | ||
| }) | ||
| } | ||
| ;(ftpClient as any)[method] = newMethod | ||
| } | ||
|
|
||
| return ftpClient | ||
| private createFTPClient(): SafeFTPClient { | ||
| return new SafeFTPClient() | ||
| } | ||
|
|
||
| async init(): Promise<void> { | ||
|
|
@@ -92,8 +51,10 @@ | |
| rejectUnauthorized: !this.options.allowAnyCertificate, // Allow self-signed certificates | ||
| }, | ||
| } | ||
| await this.client.access(options) | ||
| await this.client.cd('/') | ||
| await this.client.batch(async (ftpClient) => { | ||
| await ftpClient.access(options) | ||
| await ftpClient.cd('/') | ||
| }) | ||
|
|
||
| this.initializing = null // Reset initializing promise after successful initialization | ||
| }) | ||
|
|
@@ -173,61 +134,68 @@ | |
| packageExists: false, | ||
| } | ||
|
|
||
| const size = await this.client.size(fullPath) | ||
| return this.client.batch(async (ftpClient) => { | ||
| const size = await ftpClient.size(fullPath) | ||
|
|
||
| let modDate: Date | undefined = undefined | ||
| try { | ||
| modDate = await this.client.lastMod(fullPath) | ||
| } catch { | ||
| // This is not supported by every FTP server, ignore any error | ||
| } | ||
| return { | ||
| success: true, | ||
| fileInfo: { | ||
| size, | ||
| modified: modDate ? modDate.getTime() : 0, | ||
| }, | ||
| } | ||
| let modDate: Date | undefined = undefined | ||
| try { | ||
| modDate = await ftpClient.lastMod(fullPath) | ||
| } catch { | ||
| // This is not supported by every FTP server, ignore any error | ||
| } | ||
| return { | ||
| success: true, | ||
| fileInfo: { | ||
| size, | ||
| modified: modDate ? modDate.getTime() : 0, | ||
| }, | ||
| } | ||
| }) | ||
| } | ||
| async upload(sourceStream: NodeJS.ReadableStream, fullPath: string): Promise<void> { | ||
| await this.init() // Ensure the client is connected | ||
|
|
||
| this.logger.silly(`Uploading file to: ${fullPath}`) | ||
|
|
||
| // Ensure the directory exists: | ||
| await this.client.ensureDir(path.dirname(fullPath)) | ||
| await this.client.cd('/') // Revert to root after ensureDir | ||
| await this.client.batch(async (ftpClient) => { | ||
| // Ensure the directory exists: | ||
| await ftpClient.ensureDir(path.dirname(fullPath)) | ||
| await ftpClient.cd('/') // Revert to root after ensureDir | ||
|
|
||
| // Remove the file if it already exists: | ||
| await this.client.remove(fullPath, true) | ||
| // Remove the file if it already exists: | ||
| await ftpClient.remove(fullPath, true) | ||
|
Comment on lines
+165
to
+166
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: The basic-ftp Client's uploadFrom method overwrites existing files on the remote FTP server by default. If a file with the specified remotePath already exists, it will be overwritten with the data from the provided readable stream or local file.[1][2][3] Citations:
🏁 Script executed: cat -n shared/packages/worker/src/worker/accessorHandlers/lib/FTPClient/FTPClient.ts | sed -n '155,200p'Repository: Sofie-Automation/sofie-package-manager Length of output: 1998 Inconsistency: The 🤖 Prompt for AI Agents |
||
|
|
||
| const response = await this.client.uploadFrom(Readable.from(sourceStream), fullPath) | ||
| // Upload the file: | ||
| const response = await ftpClient.uploadFrom(Readable.from(sourceStream), fullPath) | ||
|
|
||
| if (response.code !== 226) { | ||
| // 226 means "Transfer complete" | ||
| throw new Error(`Upload failed: [${response.code}]: ${response.message}`) | ||
| } | ||
| if (response.code !== 226) { | ||
| // 226 means "Transfer complete" | ||
| throw new Error(`Upload failed: [${response.code}]: ${response.message}`) | ||
| } | ||
| }) | ||
| } | ||
| async uploadContent(fullPath: string, content: Buffer | string): Promise<void> { | ||
| await this.init() // Ensure the client is connected | ||
|
|
||
| this.logger.silly(`Uploading content to: ${fullPath}`) | ||
|
|
||
| // Ensure the directory exists: | ||
| await this.client.ensureDir(path.dirname(fullPath)) | ||
| await this.client.cd('/') // Revert to root after ensureDir | ||
| await this.client.batch(async (ftpClient) => { | ||
| // Ensure the directory exists: | ||
| await ftpClient.ensureDir(path.dirname(fullPath)) | ||
| await ftpClient.cd('/') // Revert to root after ensureDir | ||
|
|
||
| const buffer = Buffer.isBuffer(content) ? content : Buffer.from(content, 'utf-8') | ||
| const buffer = Buffer.isBuffer(content) ? content : Buffer.from(content, 'utf-8') | ||
|
|
||
| // Feed the buffer into a readable stream: | ||
| const readableStream = Readable.from(buffer) | ||
| // Upload the readable stream: | ||
| const response = await this.client.uploadFrom(readableStream, fullPath) | ||
| // Feed the buffer into a readable stream: | ||
| const readableStream = Readable.from(buffer) | ||
| // Upload the readable stream: | ||
| const response = await ftpClient.uploadFrom(readableStream, fullPath) | ||
|
|
||
| if (response.code !== 226) { | ||
| // 226 means "Transfer complete" | ||
| throw new Error(`Upload failed [${response.code}]: ${response.message}`) | ||
| } | ||
| if (response.code !== 226) { | ||
| // 226 means "Transfer complete" | ||
| throw new Error(`Upload failed [${response.code}]: ${response.message}`) | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| async download(fullPath: string): Promise<FileDownloadReturnType> { | ||
|
|
@@ -318,12 +286,94 @@ | |
|
|
||
| const response = await this._fileExists(fullPath) | ||
| if (response.exists) { | ||
| await this.client.removeDir(fullPath) | ||
| await this.client.cd('/') // Revert to root after removeDir | ||
| await this.client.batch(async (ftpClient) => { | ||
| await ftpClient.removeDir(fullPath) | ||
| await ftpClient.cd('/') // Revert to root after removeDir | ||
| }) | ||
|
|
||
| return true | ||
| } else { | ||
| return false | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Wraps FTP.Client. Ensures that only one method of FTP.Client is called at a time, by putting all calls in a queue. | ||
| * This is necessary because FTP.Client does not support concurrent calls, and will throw an error if multiple methods are called at the same time. | ||
| */ | ||
| class SafeFTPClient { | ||
| private ftpClient: FTP.Client | ||
|
Check warning on line 306 in shared/packages/worker/src/worker/accessorHandlers/lib/FTPClient/FTPClient.ts
|
||
| private queue: PQueue | ||
|
Check warning on line 307 in shared/packages/worker/src/worker/accessorHandlers/lib/FTPClient/FTPClient.ts
|
||
|
|
||
| constructor() { | ||
| this.ftpClient = new FTP.Client() | ||
| this.queue = new PQueue({ concurrency: 1 }) | ||
| } | ||
|
|
||
| /** execute multiple ftp-commands in batch */ | ||
| async batch<T>(cb: (ftp: FTP.Client) => Promise<T>): Promise<T> { | ||
| return this.putInQueue('batch', [], async () => { | ||
| const result = await cb(this.ftpClient) | ||
| return result | ||
| }) | ||
| } | ||
|
|
||
| get closed(): boolean { | ||
| return this.ftpClient.closed | ||
| } | ||
| close(): void { | ||
| return this.ftpClient.close() | ||
| } | ||
|
|
||
| async access(...args: Parameters<FTPInstance['access']>): ReturnType<FTPInstance['access']> { | ||
| return this.putInQueue('access', args, async () => this.ftpClient.access(...args)) | ||
| } | ||
|
Comment on lines
+329
to
+331
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid serializing arbitrary args into error messages — risk of leaking credentials.
Consider either dropping args from the error string, redacting them, or only including the method name: 🛡️ Proposed hardening- const orgError = new Error(`Error executing ${methodName}: ${JSON.stringify(args)}`) // Used later
+ const orgError = new Error(`Error executing ${methodName}`) // Used later (args omitted to avoid leaking credentials/secrets)Also applies to: 360-377 🤖 Prompt for AI Agents |
||
| async downloadTo(...args: Parameters<FTPInstance['downloadTo']>): ReturnType<FTPInstance['downloadTo']> { | ||
| return this.putInQueue('downloadTo', args, async () => this.ftpClient.downloadTo(...args)) | ||
| } | ||
| async ensureDir(...args: Parameters<FTPInstance['ensureDir']>): ReturnType<FTPInstance['ensureDir']> { | ||
| return this.putInQueue('ensureDir', args, async () => this.ftpClient.ensureDir(...args)) | ||
| } | ||
| async lastMod(...args: Parameters<FTPInstance['lastMod']>): ReturnType<FTPInstance['lastMod']> { | ||
| return this.putInQueue('lastMod', args, async () => this.ftpClient.lastMod(...args)) | ||
| } | ||
| async list(...args: Parameters<FTPInstance['list']>): ReturnType<FTPInstance['list']> { | ||
| return this.putInQueue('list', args, async () => this.ftpClient.list(...args)) | ||
| } | ||
| async remove(...args: Parameters<FTPInstance['remove']>): ReturnType<FTPInstance['remove']> { | ||
| return this.putInQueue('remove', args, async () => this.ftpClient.remove(...args)) | ||
| } | ||
| async removeDir(...args: Parameters<FTPInstance['removeDir']>): ReturnType<FTPInstance['removeDir']> { | ||
| return this.putInQueue('removeDir', args, async () => this.ftpClient.removeDir(...args)) | ||
| } | ||
| async rename(...args: Parameters<FTPInstance['rename']>): ReturnType<FTPInstance['rename']> { | ||
| return this.putInQueue('rename', args, async () => this.ftpClient.rename(...args)) | ||
| } | ||
| async size(...args: Parameters<FTPInstance['size']>): ReturnType<FTPInstance['size']> { | ||
| return this.putInQueue('size', args, async () => this.ftpClient.size(...args)) | ||
| } | ||
| async uploadFrom(...args: Parameters<FTPInstance['uploadFrom']>): ReturnType<FTPInstance['uploadFrom']> { | ||
| return this.putInQueue('uploadFrom', args, async () => this.ftpClient.uploadFrom(...args)) | ||
| } | ||
|
|
||
| private async putInQueue<T, Args extends unknown[]>( | ||
| methodName: string, | ||
| args: Args, | ||
| cb: () => Promise<T> | ||
| ): Promise<T> { | ||
| return this.queue.add(async () => { | ||
| const orgError = new Error(`Error executing ${methodName}: ${JSON.stringify(args)}`) // Used later | ||
| try { | ||
| const result = await cb() | ||
| return result | ||
| } catch (e) { | ||
| if (typeof e === 'object' && e !== null && 'stack' in e) { | ||
| e.stack += `\nOriginal stack: ${orgError.stack}` | ||
| } | ||
| throw e | ||
| } | ||
| }) | ||
| } | ||
| } | ||
| type FTPInstance = InstanceType<typeof FTP.Client> // Short type for FTP.Client instance | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: Sofie-Automation/sofie-package-manager
Length of output: 16656
Remove the guard or apply consistently across all file handlers.
The
clearPackageRemoval()guard at lines 201-204 and 272-275 creates a behavioral inconsistency. All other handlers (s3, localFolder, httpProxy, fileShare) callclearPackageRemoval()unconditionally in equivalent methods. When FTP skips this due toremoveDelay < 0, any stale delayed-removal entry persists in thepackagesToRemovelist (see GenericFileOperations.ts:41-45) and can still execute later viaremoveDuePackages(). If a package is scheduled for removal, then re-written withremoveDelay = undefined, the stale removal entry will not be canceled. Either remove the guard to match other handlers or apply it to all handlers consistently.🤖 Prompt for AI Agents