Skip to content

Commit f435656

Browse files
committed
fix: refactor FTPClient to avoid future footguns. Run some operations in batches to increase reliability
1 parent 9468b7f commit f435656

1 file changed

Lines changed: 134 additions & 84 deletions

File tree

  • shared/packages/worker/src/worker/accessorHandlers/lib/FTPClient

shared/packages/worker/src/worker/accessorHandlers/lib/FTPClient/FTPClient.ts

Lines changed: 134 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { FileDownloadReturnType, FileExistsReturnType, FileInfoReturnType, FTPCl
88

99
/** A FTP Client that supports FTP & FTPS (FTP over TLS) connections. */
1010
export class FTPClient extends FTPClientBase {
11-
private client: FTP.Client
11+
private client: SafeFTPClient
1212

1313
private initializing: Promise<void> | null = null
1414
public destroyed = false
@@ -23,49 +23,8 @@ export class FTPClient extends FTPClientBase {
2323

2424
this.client = this.createFTPClient()
2525
}
26-
private createFTPClient(): FTP.Client {
27-
const ftpClient = new FTP.Client()
28-
29-
// Because the FTP.Client methods needs to be called ONLY one at a time,
30-
// we do a hack to override its methods with a promise queue:
31-
const methodsToOverride: (keyof FTP.Client)[] = [
32-
'access',
33-
// 'close',
34-
// 'closed',
35-
'downloadTo',
36-
'ensureDir',
37-
'lastMod',
38-
'list',
39-
'remove',
40-
'removeDir',
41-
'rename',
42-
'size',
43-
// 'trackProgress',
44-
'uploadFrom',
45-
]
46-
47-
const queue = new PQueue({ concurrency: 1 })
48-
49-
for (const method of methodsToOverride) {
50-
const orgMethod = (ftpClient as any)[method] as (...args: any[]) => Promise<any>
51-
const newMethod = async (...args: any[]) => {
52-
return queue.add(async () => {
53-
const orgError = new Error(`Error executing ${method}: ${JSON.stringify(args)}`) // Used later
54-
try {
55-
const result = await orgMethod.apply(ftpClient, args)
56-
return result
57-
} catch (e) {
58-
if (typeof e === 'object' && e !== null && 'stack' in e) {
59-
e.stack += `\nOriginal stack: ${orgError.stack}`
60-
}
61-
throw e
62-
}
63-
})
64-
}
65-
;(ftpClient as any)[method] = newMethod
66-
}
67-
68-
return ftpClient
26+
private createFTPClient(): SafeFTPClient {
27+
return new SafeFTPClient()
6928
}
7029

7130
async init(): Promise<void> {
@@ -92,8 +51,10 @@ export class FTPClient extends FTPClientBase {
9251
rejectUnauthorized: !this.options.allowAnyCertificate, // Allow self-signed certificates
9352
},
9453
}
95-
await this.client.access(options)
96-
await this.client.cd('/')
54+
await this.client.batch(async (ftpClient) => {
55+
await ftpClient.access(options)
56+
await ftpClient.cd('/')
57+
})
9758

9859
this.initializing = null // Reset initializing promise after successful initialization
9960
})
@@ -173,61 +134,68 @@ export class FTPClient extends FTPClientBase {
173134
packageExists: false,
174135
}
175136

176-
const size = await this.client.size(fullPath)
137+
return this.client.batch(async (ftpClient) => {
138+
const size = await ftpClient.size(fullPath)
177139

178-
let modDate: Date | undefined = undefined
179-
try {
180-
modDate = await this.client.lastMod(fullPath)
181-
} catch {
182-
// This is not supported by every FTP server, ignore any error
183-
}
184-
return {
185-
success: true,
186-
fileInfo: {
187-
size,
188-
modified: modDate ? modDate.getTime() : 0,
189-
},
190-
}
140+
let modDate: Date | undefined = undefined
141+
try {
142+
modDate = await ftpClient.lastMod(fullPath)
143+
} catch {
144+
// This is not supported by every FTP server, ignore any error
145+
}
146+
return {
147+
success: true,
148+
fileInfo: {
149+
size,
150+
modified: modDate ? modDate.getTime() : 0,
151+
},
152+
}
153+
})
191154
}
192155
async upload(sourceStream: NodeJS.ReadableStream, fullPath: string): Promise<void> {
193156
await this.init() // Ensure the client is connected
194157

195158
this.logger.silly(`Uploading file to: ${fullPath}`)
196159

197-
// Ensure the directory exists:
198-
await this.client.ensureDir(path.dirname(fullPath))
199-
await this.client.cd('/') // Revert to root after ensureDir
160+
await this.client.batch(async (ftpClient) => {
161+
// Ensure the directory exists:
162+
await ftpClient.ensureDir(path.dirname(fullPath))
163+
await ftpClient.cd('/') // Revert to root after ensureDir
200164

201-
// Remove the file if it already exists:
202-
await this.client.remove(fullPath, true)
165+
// Remove the file if it already exists:
166+
await ftpClient.remove(fullPath, true)
203167

204-
const response = await this.client.uploadFrom(Readable.from(sourceStream), fullPath)
168+
// Upload the file:
169+
const response = await ftpClient.uploadFrom(Readable.from(sourceStream), fullPath)
205170

206-
if (response.code !== 226) {
207-
// 226 means "Transfer complete"
208-
throw new Error(`Upload failed: [${response.code}]: ${response.message}`)
209-
}
171+
if (response.code !== 226) {
172+
// 226 means "Transfer complete"
173+
throw new Error(`Upload failed: [${response.code}]: ${response.message}`)
174+
}
175+
})
210176
}
211177
async uploadContent(fullPath: string, content: Buffer | string): Promise<void> {
212178
await this.init() // Ensure the client is connected
213179

214180
this.logger.silly(`Uploading content to: ${fullPath}`)
215181

216-
// Ensure the directory exists:
217-
await this.client.ensureDir(path.dirname(fullPath))
218-
await this.client.cd('/') // Revert to root after ensureDir
182+
await this.client.batch(async (ftpClient) => {
183+
// Ensure the directory exists:
184+
await ftpClient.ensureDir(path.dirname(fullPath))
185+
await ftpClient.cd('/') // Revert to root after ensureDir
219186

220-
const buffer = Buffer.isBuffer(content) ? content : Buffer.from(content, 'utf-8')
187+
const buffer = Buffer.isBuffer(content) ? content : Buffer.from(content, 'utf-8')
221188

222-
// Feed the buffer into a readable stream:
223-
const readableStream = Readable.from(buffer)
224-
// Upload the readable stream:
225-
const response = await this.client.uploadFrom(readableStream, fullPath)
189+
// Feed the buffer into a readable stream:
190+
const readableStream = Readable.from(buffer)
191+
// Upload the readable stream:
192+
const response = await ftpClient.uploadFrom(readableStream, fullPath)
226193

227-
if (response.code !== 226) {
228-
// 226 means "Transfer complete"
229-
throw new Error(`Upload failed [${response.code}]: ${response.message}`)
230-
}
194+
if (response.code !== 226) {
195+
// 226 means "Transfer complete"
196+
throw new Error(`Upload failed [${response.code}]: ${response.message}`)
197+
}
198+
})
231199
}
232200

233201
async download(fullPath: string): Promise<FileDownloadReturnType> {
@@ -318,12 +286,94 @@ export class FTPClient extends FTPClientBase {
318286

319287
const response = await this._fileExists(fullPath)
320288
if (response.exists) {
321-
await this.client.removeDir(fullPath)
322-
await this.client.cd('/') // Revert to root after removeDir
289+
await this.client.batch(async (ftpClient) => {
290+
await ftpClient.removeDir(fullPath)
291+
await ftpClient.cd('/') // Revert to root after removeDir
292+
})
323293

324294
return true
325295
} else {
326296
return false
327297
}
328298
}
329299
}
300+
301+
/**
302+
* Wraps FTP.Client. Ensures that only one method of FTP.Client is called at a time, by putting all calls in a queue.
303+
* This is necessary because FTP.Client does not support concurrent calls, and will throw an error if multiple methods are called at the same time.
304+
*/
305+
class SafeFTPClient {
306+
private ftpClient: FTP.Client
307+
private queue: PQueue
308+
309+
constructor() {
310+
this.ftpClient = new FTP.Client()
311+
this.queue = new PQueue({ concurrency: 1 })
312+
}
313+
314+
/** execute multiple ftp-commands in batch */
315+
async batch<T>(cb: (ftp: FTP.Client) => Promise<T>): Promise<T> {
316+
return this.putInQueue('batch', [], async () => {
317+
const result = await cb(this.ftpClient)
318+
return result
319+
})
320+
}
321+
322+
get closed(): boolean {
323+
return this.ftpClient.closed
324+
}
325+
close(): void {
326+
return this.ftpClient.close()
327+
}
328+
329+
async access(...args: Parameters<FTPInstance['access']>): ReturnType<FTPInstance['access']> {
330+
return this.putInQueue('access', args, async () => this.ftpClient.access(...args))
331+
}
332+
async downloadTo(...args: Parameters<FTPInstance['downloadTo']>): ReturnType<FTPInstance['downloadTo']> {
333+
return this.putInQueue('downloadTo', args, async () => this.ftpClient.downloadTo(...args))
334+
}
335+
async ensureDir(...args: Parameters<FTPInstance['ensureDir']>): ReturnType<FTPInstance['ensureDir']> {
336+
return this.putInQueue('ensureDir', args, async () => this.ftpClient.ensureDir(...args))
337+
}
338+
async lastMod(...args: Parameters<FTPInstance['lastMod']>): ReturnType<FTPInstance['lastMod']> {
339+
return this.putInQueue('lastMod', args, async () => this.ftpClient.lastMod(...args))
340+
}
341+
async list(...args: Parameters<FTPInstance['list']>): ReturnType<FTPInstance['list']> {
342+
return this.putInQueue('list', args, async () => this.ftpClient.list(...args))
343+
}
344+
async remove(...args: Parameters<FTPInstance['remove']>): ReturnType<FTPInstance['remove']> {
345+
return this.putInQueue('remove', args, async () => this.ftpClient.remove(...args))
346+
}
347+
async removeDir(...args: Parameters<FTPInstance['removeDir']>): ReturnType<FTPInstance['removeDir']> {
348+
return this.putInQueue('removeDir', args, async () => this.ftpClient.removeDir(...args))
349+
}
350+
async rename(...args: Parameters<FTPInstance['rename']>): ReturnType<FTPInstance['rename']> {
351+
return this.putInQueue('rename', args, async () => this.ftpClient.rename(...args))
352+
}
353+
async size(...args: Parameters<FTPInstance['size']>): ReturnType<FTPInstance['size']> {
354+
return this.putInQueue('size', args, async () => this.ftpClient.size(...args))
355+
}
356+
async uploadFrom(...args: Parameters<FTPInstance['uploadFrom']>): ReturnType<FTPInstance['uploadFrom']> {
357+
return this.putInQueue('uploadFrom', args, async () => this.ftpClient.uploadFrom(...args))
358+
}
359+
360+
private async putInQueue<T, Args extends unknown[]>(
361+
methodName: string,
362+
args: Args,
363+
cb: () => Promise<T>
364+
): Promise<T> {
365+
return this.queue.add(async () => {
366+
const orgError = new Error(`Error executing ${methodName}: ${JSON.stringify(args)}`) // Used later
367+
try {
368+
const result = await cb()
369+
return result
370+
} catch (e) {
371+
if (typeof e === 'object' && e !== null && 'stack' in e) {
372+
e.stack += `\nOriginal stack: ${orgError.stack}`
373+
}
374+
throw e
375+
}
376+
})
377+
}
378+
}
379+
type FTPInstance = InstanceType<typeof FTP.Client> // Short type for FTP.Client instance

0 commit comments

Comments
 (0)