diff --git a/shared/packages/worker/src/worker/accessorHandlers/ftp.ts b/shared/packages/worker/src/worker/accessorHandlers/ftp.ts index f2daa6d4..12d449e6 100644 --- a/shared/packages/worker/src/worker/accessorHandlers/ftp.ts +++ b/shared/packages/worker/src/worker/accessorHandlers/ftp.ts @@ -198,13 +198,15 @@ export class FTPAccessorHandle extends GenericAccessorHandle } else throw new Error(`getPackageActualVersion: ${response.reason.user}: ${response.reason.tech}`) } async ensurePackageFulfilled(): Promise { - await this.fileHandler.clearPackageRemoval(this.filePath) + if ((this.workOptions.removeDelay ?? -1) >= 0) { + // Only handle this if there is a removeDelay set, otherwise we can skip it to save time: + await this.fileHandler.clearPackageRemoval(this.filePath) + } } async removePackage(reason: string): Promise { await this.fileHandler.handleRemovePackage(this.filePath, this.packageName, reason) } async getPackageReadStream(): Promise { - // important that this is a 'read', so that it doesn't go into a deadlock with putPackageStream() in case of an upload/download to the same accessorPackageContainer const ftp = await this.prepareFTPClient() const response = await ftp.download(this.fullPath) @@ -267,7 +269,10 @@ export class FTPAccessorHandle extends GenericAccessorHandle operationName: string, source: string | GenericAccessorHandle ): Promise { - await this.fileHandler.clearPackageRemoval(this.filePath) + if ((this.workOptions.removeDelay ?? -1) >= 0) { + // Only handle this if there is a removeDelay set, otherwise we can skip it to save time: + await this.fileHandler.clearPackageRemoval(this.filePath) + } return this.logWorkOperation(operationName, source, this.packageName) } async finalizePackage(operation: PackageOperation): Promise { @@ -482,18 +487,26 @@ export class FTPAccessorHandle extends GenericAccessorHandle [accessorType: string]: CachedClients | undefined } - const cacheKey = JSON.stringify([ - this.accessorId, - ftpOptions.serverType, - ftpOptions.host, - ftpOptions.port, - this.accessor.basePath ?? '/', - ]) + const cacheKey = JSON.stringify([this.accessorId, options, this.accessor.basePath ?? '/']) let cachedClients = accessorCache[cacheKey] if (cachedClients) { // Check that options matches: if (!isEqual(cachedClients.options, options)) { + // It the options don't match, something is wrong with the cacheKey + + this.worker.logger.error( + `Something is wrong with the FTP client cacheKey. The options do not match the cacheKey. Deleting cached clients for this key. cacheKey: ${cacheKey}, options: ${JSON.stringify( + { + options, + password: '***', + } + )}, cachedOptions: ${JSON.stringify({ + ...cachedClients.options, + password: '***', + })}` + ) + for (const c of cachedClients.clients) { await c.client.destroy() } @@ -532,9 +545,10 @@ export class FTPAccessorHandle extends GenericAccessorHandle } if (!cachedClient) { + this.worker.logger.info(`Creating new FTP client for purpose="${purpose}", ${JSON.stringify(options)}`) // Set up a new FTP client: cachedClient = { - client: createFTPClient(ftpOptions.serverType, this.worker.logger, options), + client: createFTPClient(options.serverType, this.worker.logger, options), purpose: purpose, } cachedClients.clients.push(cachedClient) @@ -580,6 +594,11 @@ export class FTPAccessorHandle extends GenericAccessorHandle return ftp.downloadContent(fullPath) } async readFileIfExists(fullPath: string): Promise { + // On FTP, it is a much lighter operation to check if the file exists that reading it, + // so we'll do that first: + const exists = await this.fileExists(fullPath) + if (!exists) return undefined + try { return await this.readFile(fullPath) } catch (e) { diff --git a/shared/packages/worker/src/worker/accessorHandlers/lib/FTPClient/FTPClient.ts b/shared/packages/worker/src/worker/accessorHandlers/lib/FTPClient/FTPClient.ts index a78a1318..d7acc1a4 100644 --- a/shared/packages/worker/src/worker/accessorHandlers/lib/FTPClient/FTPClient.ts +++ b/shared/packages/worker/src/worker/accessorHandlers/lib/FTPClient/FTPClient.ts @@ -8,7 +8,7 @@ import { FileDownloadReturnType, FileExistsReturnType, FileInfoReturnType, FTPCl /** A FTP Client that supports FTP & FTPS (FTP over TLS) connections. */ export class FTPClient extends FTPClientBase { - private client: FTP.Client + private client: SafeFTPClient private initializing: Promise | null = null public destroyed = false @@ -23,49 +23,8 @@ export class FTPClient extends FTPClientBase { this.client = this.createFTPClient() } - private createFTPClient(): FTP.Client { - const ftpClient = new FTP.Client() - - // Because the FTP.Client methods needs to be called ONLY one at a time, - // we do a hack to override its methods with a promise queue: - const methodsToOverride: (keyof FTP.Client)[] = [ - 'access', - // 'close', - // 'closed', - 'downloadTo', - 'ensureDir', - 'lastMod', - 'list', - 'remove', - 'removeDir', - 'rename', - 'size', - // 'trackProgress', - 'uploadFrom', - ] - - const queue = new PQueue({ concurrency: 1 }) - - for (const method of methodsToOverride) { - const orgMethod = (ftpClient as any)[method] as (...args: any[]) => Promise - const newMethod = async (...args: any[]) => { - return queue.add(async () => { - const orgError = new Error(`Error executing ${method}: ${JSON.stringify(args)}`) // Used later - try { - const result = await orgMethod.apply(ftpClient, args) - return result - } catch (e) { - if (typeof e === 'object' && e !== null && 'stack' in e) { - e.stack += `\nOriginal stack: ${orgError.stack}` - } - throw e - } - }) - } - ;(ftpClient as any)[method] = newMethod - } - - return ftpClient + private createFTPClient(): SafeFTPClient { + return new SafeFTPClient() } async init(): Promise { @@ -92,8 +51,10 @@ export class FTPClient extends FTPClientBase { rejectUnauthorized: !this.options.allowAnyCertificate, // Allow self-signed certificates }, } - await this.client.access(options) - await this.client.cd('/') + await this.client.batch(async (ftpClient) => { + await ftpClient.access(options) + await ftpClient.cd('/') + }) this.initializing = null // Reset initializing promise after successful initialization }) @@ -173,61 +134,68 @@ export class FTPClient extends FTPClientBase { packageExists: false, } - const size = await this.client.size(fullPath) + return this.client.batch(async (ftpClient) => { + const size = await ftpClient.size(fullPath) - let modDate: Date | undefined = undefined - try { - modDate = await this.client.lastMod(fullPath) - } catch { - // This is not supported by every FTP server, ignore any error - } - return { - success: true, - fileInfo: { - size, - modified: modDate ? modDate.getTime() : 0, - }, - } + let modDate: Date | undefined = undefined + try { + modDate = await ftpClient.lastMod(fullPath) + } catch { + // This is not supported by every FTP server, ignore any error + } + return { + success: true, + fileInfo: { + size, + modified: modDate ? modDate.getTime() : 0, + }, + } + }) } async upload(sourceStream: NodeJS.ReadableStream, fullPath: string): Promise { await this.init() // Ensure the client is connected this.logger.silly(`Uploading file to: ${fullPath}`) - // Ensure the directory exists: - await this.client.ensureDir(path.dirname(fullPath)) - await this.client.cd('/') // Revert to root after ensureDir + await this.client.batch(async (ftpClient) => { + // Ensure the directory exists: + await ftpClient.ensureDir(path.dirname(fullPath)) + await ftpClient.cd('/') // Revert to root after ensureDir - // Remove the file if it already exists: - await this.client.remove(fullPath, true) + // Remove the file if it already exists: + await ftpClient.remove(fullPath, true) - const response = await this.client.uploadFrom(Readable.from(sourceStream), fullPath) + // Upload the file: + const response = await ftpClient.uploadFrom(Readable.from(sourceStream), fullPath) - if (response.code !== 226) { - // 226 means "Transfer complete" - throw new Error(`Upload failed: [${response.code}]: ${response.message}`) - } + if (response.code !== 226) { + // 226 means "Transfer complete" + throw new Error(`Upload failed: [${response.code}]: ${response.message}`) + } + }) } async uploadContent(fullPath: string, content: Buffer | string): Promise { await this.init() // Ensure the client is connected this.logger.silly(`Uploading content to: ${fullPath}`) - // Ensure the directory exists: - await this.client.ensureDir(path.dirname(fullPath)) - await this.client.cd('/') // Revert to root after ensureDir + await this.client.batch(async (ftpClient) => { + // Ensure the directory exists: + await ftpClient.ensureDir(path.dirname(fullPath)) + await ftpClient.cd('/') // Revert to root after ensureDir - const buffer = Buffer.isBuffer(content) ? content : Buffer.from(content, 'utf-8') + const buffer = Buffer.isBuffer(content) ? content : Buffer.from(content, 'utf-8') - // Feed the buffer into a readable stream: - const readableStream = Readable.from(buffer) - // Upload the readable stream: - const response = await this.client.uploadFrom(readableStream, fullPath) + // Feed the buffer into a readable stream: + const readableStream = Readable.from(buffer) + // Upload the readable stream: + const response = await ftpClient.uploadFrom(readableStream, fullPath) - if (response.code !== 226) { - // 226 means "Transfer complete" - throw new Error(`Upload failed [${response.code}]: ${response.message}`) - } + if (response.code !== 226) { + // 226 means "Transfer complete" + throw new Error(`Upload failed [${response.code}]: ${response.message}`) + } + }) } async download(fullPath: string): Promise { @@ -318,8 +286,10 @@ export class FTPClient extends FTPClientBase { const response = await this._fileExists(fullPath) if (response.exists) { - await this.client.removeDir(fullPath) - await this.client.cd('/') // Revert to root after removeDir + await this.client.batch(async (ftpClient) => { + await ftpClient.removeDir(fullPath) + await ftpClient.cd('/') // Revert to root after removeDir + }) return true } else { @@ -327,3 +297,83 @@ export class FTPClient extends FTPClientBase { } } } + +/** + * Wraps FTP.Client. Ensures that only one method of FTP.Client is called at a time, by putting all calls in a queue. + * This is necessary because FTP.Client does not support concurrent calls, and will throw an error if multiple methods are called at the same time. + */ +class SafeFTPClient { + private ftpClient: FTP.Client + private queue: PQueue + + constructor() { + this.ftpClient = new FTP.Client() + this.queue = new PQueue({ concurrency: 1 }) + } + + /** execute multiple ftp-commands in batch */ + async batch(cb: (ftp: FTP.Client) => Promise): Promise { + return this.putInQueue('batch', [], async () => { + const result = await cb(this.ftpClient) + return result + }) + } + + get closed(): boolean { + return this.ftpClient.closed + } + close(): void { + return this.ftpClient.close() + } + + async access(...args: Parameters): ReturnType { + return this.putInQueue('access', args, async () => this.ftpClient.access(...args)) + } + async downloadTo(...args: Parameters): ReturnType { + return this.putInQueue('downloadTo', args, async () => this.ftpClient.downloadTo(...args)) + } + async ensureDir(...args: Parameters): ReturnType { + return this.putInQueue('ensureDir', args, async () => this.ftpClient.ensureDir(...args)) + } + async lastMod(...args: Parameters): ReturnType { + return this.putInQueue('lastMod', args, async () => this.ftpClient.lastMod(...args)) + } + async list(...args: Parameters): ReturnType { + return this.putInQueue('list', args, async () => this.ftpClient.list(...args)) + } + async remove(...args: Parameters): ReturnType { + return this.putInQueue('remove', args, async () => this.ftpClient.remove(...args)) + } + async removeDir(...args: Parameters): ReturnType { + return this.putInQueue('removeDir', args, async () => this.ftpClient.removeDir(...args)) + } + async rename(...args: Parameters): ReturnType { + return this.putInQueue('rename', args, async () => this.ftpClient.rename(...args)) + } + async size(...args: Parameters): ReturnType { + return this.putInQueue('size', args, async () => this.ftpClient.size(...args)) + } + async uploadFrom(...args: Parameters): ReturnType { + return this.putInQueue('uploadFrom', args, async () => this.ftpClient.uploadFrom(...args)) + } + + private async putInQueue( + methodName: string, + args: Args, + cb: () => Promise + ): Promise { + return this.queue.add(async () => { + const orgError = new Error(`Error executing ${methodName}: ${JSON.stringify(args)}`) // Used later + try { + const result = await cb() + return result + } catch (e) { + if (typeof e === 'object' && e !== null && 'stack' in e) { + e.stack += `\nOriginal stack: ${orgError.stack}` + } + throw e + } + }) + } +} +type FTPInstance = InstanceType // Short type for FTP.Client instance diff --git a/shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts b/shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts index efbf5e93..1bd4d52e 100644 --- a/shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts +++ b/shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts @@ -302,7 +302,7 @@ export class JSONWriteFilesBestEffortHandler extends JSONWriteHandler { const newValue = cbManipulate(oldValue0?.value) const newValueStr = newValue !== undefined ? JSON.stringify(newValue) : '' - if (oldValue0?.str === newValueStr) { + if (oldValue0?.str === newValueStr || (oldValue0 === undefined && newValue === undefined)) { // do nothing return }