Skip to content

Commit a67bfdc

Browse files
committed
Merge branch 'fix/ftp-client' into test/nrkno-develop
2 parents cc17156 + c7f32d3 commit a67bfdc

3 files changed

Lines changed: 159 additions & 96 deletions

File tree

shared/packages/worker/src/worker/accessorHandlers/ftp.ts

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,15 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata>
198198
} else throw new Error(`getPackageActualVersion: ${response.reason.user}: ${response.reason.tech}`)
199199
}
200200
async ensurePackageFulfilled(): Promise<void> {
201-
await this.fileHandler.clearPackageRemoval(this.filePath)
201+
if ((this.workOptions.removeDelay ?? -1) >= 0) {
202+
// Only handle this if there is a removeDelay set, otherwise we can skip it to save time:
203+
await this.fileHandler.clearPackageRemoval(this.filePath)
204+
}
202205
}
203206
async removePackage(reason: string): Promise<void> {
204207
await this.fileHandler.handleRemovePackage(this.filePath, this.packageName, reason)
205208
}
206209
async getPackageReadStream(): Promise<PackageReadStream> {
207-
// important that this is a 'read', so that it doesn't go into a deadlock with putPackageStream() in case of an upload/download to the same accessorPackageContainer
208210
const ftp = await this.prepareFTPClient()
209211

210212
const response = await ftp.download(this.fullPath)
@@ -267,7 +269,10 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata>
267269
operationName: string,
268270
source: string | GenericAccessorHandle<any>
269271
): Promise<PackageOperation> {
270-
await this.fileHandler.clearPackageRemoval(this.filePath)
272+
if ((this.workOptions.removeDelay ?? -1) >= 0) {
273+
// Only handle this if there is a removeDelay set, otherwise we can skip it to save time:
274+
await this.fileHandler.clearPackageRemoval(this.filePath)
275+
}
271276
return this.logWorkOperation(operationName, source, this.packageName)
272277
}
273278
async finalizePackage(operation: PackageOperation): Promise<void> {
@@ -482,18 +487,20 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata>
482487
[accessorType: string]: CachedClients | undefined
483488
}
484489

485-
const cacheKey = JSON.stringify([
486-
this.accessorId,
487-
ftpOptions.serverType,
488-
ftpOptions.host,
489-
ftpOptions.port,
490-
this.accessor.basePath ?? '/',
491-
])
490+
const cacheKey = JSON.stringify([this.accessorId, options, this.accessor.basePath ?? '/'])
492491

493492
let cachedClients = accessorCache[cacheKey]
494493
if (cachedClients) {
495494
// Check that options matches:
496495
if (!isEqual(cachedClients.options, options)) {
496+
// It the options don't match, something is wrong with the cacheKey
497+
498+
this.worker.logger.error(
499+
`Something is wrong with the FTP client cacheKey. The options do not match the cacheKey. Deleting cached clients for this key. cacheKey: ${cacheKey}, options: ${JSON.stringify(
500+
options
501+
)}, cachedOptions: ${JSON.stringify(cachedClients.options)}`
502+
)
503+
497504
for (const c of cachedClients.clients) {
498505
await c.client.destroy()
499506
}
@@ -532,9 +539,10 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata>
532539
}
533540

534541
if (!cachedClient) {
542+
this.worker.logger.info(`Creating new FTP client for purpose="${purpose}", ${JSON.stringify(options)}`)
535543
// Set up a new FTP client:
536544
cachedClient = {
537-
client: createFTPClient(ftpOptions.serverType, this.worker.logger, options),
545+
client: createFTPClient(options.serverType, this.worker.logger, options),
538546
purpose: purpose,
539547
}
540548
cachedClients.clients.push(cachedClient)
@@ -580,6 +588,11 @@ export class FTPAccessorHandle<Metadata> extends GenericAccessorHandle<Metadata>
580588
return ftp.downloadContent(fullPath)
581589
}
582590
async readFileIfExists(fullPath: string): Promise<Buffer | undefined> {
591+
// On FTP, it is a much lighter operation to check if the file exists that reading it,
592+
// so we'll do that first:
593+
const exists = await this.fileExists(fullPath)
594+
if (!exists) return undefined
595+
583596
try {
584597
return await this.readFile(fullPath)
585598
} catch (e) {

shared/packages/worker/src/worker/accessorHandlers/lib/FTPClient/FTPClient.ts

Lines changed: 134 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { FileDownloadReturnType, FileExistsReturnType, FileInfoReturnType, FTPCl
88

99
/** A FTP Client that supports FTP & FTPS (FTP over TLS) connections. */
1010
export class FTPClient extends FTPClientBase {
11-
private client: FTP.Client
11+
private client: SafeFTPClient
1212

1313
private initializing: Promise<void> | null = null
1414
public destroyed = false
@@ -23,49 +23,8 @@ export class FTPClient extends FTPClientBase {
2323

2424
this.client = this.createFTPClient()
2525
}
26-
private createFTPClient(): FTP.Client {
27-
const ftpClient = new FTP.Client()
28-
29-
// Because the FTP.Client methods needs to be called ONLY one at a time,
30-
// we do a hack to override its methods with a promise queue:
31-
const methodsToOverride: (keyof FTP.Client)[] = [
32-
'access',
33-
// 'close',
34-
// 'closed',
35-
'downloadTo',
36-
'ensureDir',
37-
'lastMod',
38-
'list',
39-
'remove',
40-
'removeDir',
41-
'rename',
42-
'size',
43-
// 'trackProgress',
44-
'uploadFrom',
45-
]
46-
47-
const queue = new PQueue({ concurrency: 1 })
48-
49-
for (const method of methodsToOverride) {
50-
const orgMethod = (ftpClient as any)[method] as (...args: any[]) => Promise<any>
51-
const newMethod = async (...args: any[]) => {
52-
return queue.add(async () => {
53-
const orgError = new Error(`Error executing ${method}: ${JSON.stringify(args)}`) // Used later
54-
try {
55-
const result = await orgMethod.apply(ftpClient, args)
56-
return result
57-
} catch (e) {
58-
if (typeof e === 'object' && e !== null && 'stack' in e) {
59-
e.stack += `\nOriginal stack: ${orgError.stack}`
60-
}
61-
throw e
62-
}
63-
})
64-
}
65-
;(ftpClient as any)[method] = newMethod
66-
}
67-
68-
return ftpClient
26+
private createFTPClient(): SafeFTPClient {
27+
return new SafeFTPClient()
6928
}
7029

7130
async init(): Promise<void> {
@@ -92,8 +51,10 @@ export class FTPClient extends FTPClientBase {
9251
rejectUnauthorized: !this.options.allowAnyCertificate, // Allow self-signed certificates
9352
},
9453
}
95-
await this.client.access(options)
96-
await this.client.cd('/')
54+
await this.client.batch(async (ftpClient) => {
55+
await ftpClient.access(options)
56+
await ftpClient.cd('/')
57+
})
9758

9859
this.initializing = null // Reset initializing promise after successful initialization
9960
})
@@ -173,61 +134,68 @@ export class FTPClient extends FTPClientBase {
173134
packageExists: false,
174135
}
175136

176-
const size = await this.client.size(fullPath)
137+
return this.client.batch(async (ftpClient) => {
138+
const size = await ftpClient.size(fullPath)
177139

178-
let modDate: Date | undefined = undefined
179-
try {
180-
modDate = await this.client.lastMod(fullPath)
181-
} catch {
182-
// This is not supported by every FTP server, ignore any error
183-
}
184-
return {
185-
success: true,
186-
fileInfo: {
187-
size,
188-
modified: modDate ? modDate.getTime() : 0,
189-
},
190-
}
140+
let modDate: Date | undefined = undefined
141+
try {
142+
modDate = await ftpClient.lastMod(fullPath)
143+
} catch {
144+
// This is not supported by every FTP server, ignore any error
145+
}
146+
return {
147+
success: true,
148+
fileInfo: {
149+
size,
150+
modified: modDate ? modDate.getTime() : 0,
151+
},
152+
}
153+
})
191154
}
192155
async upload(sourceStream: NodeJS.ReadableStream, fullPath: string): Promise<void> {
193156
await this.init() // Ensure the client is connected
194157

195158
this.logger.silly(`Uploading file to: ${fullPath}`)
196159

197-
// Ensure the directory exists:
198-
await this.client.ensureDir(path.dirname(fullPath))
199-
await this.client.cd('/') // Revert to root after ensureDir
160+
await this.client.batch(async (ftpClient) => {
161+
// Ensure the directory exists:
162+
await ftpClient.ensureDir(path.dirname(fullPath))
163+
await ftpClient.cd('/') // Revert to root after ensureDir
200164

201-
// Remove the file if it already exists:
202-
await this.client.remove(fullPath, true)
165+
// Remove the file if it already exists:
166+
await ftpClient.remove(fullPath, true)
203167

204-
const response = await this.client.uploadFrom(Readable.from(sourceStream), fullPath)
168+
// Upload the file:
169+
const response = await ftpClient.uploadFrom(Readable.from(sourceStream), fullPath)
205170

206-
if (response.code !== 226) {
207-
// 226 means "Transfer complete"
208-
throw new Error(`Upload failed: [${response.code}]: ${response.message}`)
209-
}
171+
if (response.code !== 226) {
172+
// 226 means "Transfer complete"
173+
throw new Error(`Upload failed: [${response.code}]: ${response.message}`)
174+
}
175+
})
210176
}
211177
async uploadContent(fullPath: string, content: Buffer | string): Promise<void> {
212178
await this.init() // Ensure the client is connected
213179

214180
this.logger.silly(`Uploading content to: ${fullPath}`)
215181

216-
// Ensure the directory exists:
217-
await this.client.ensureDir(path.dirname(fullPath))
218-
await this.client.cd('/') // Revert to root after ensureDir
182+
await this.client.batch(async (ftpClient) => {
183+
// Ensure the directory exists:
184+
await ftpClient.ensureDir(path.dirname(fullPath))
185+
await ftpClient.cd('/') // Revert to root after ensureDir
219186

220-
const buffer = Buffer.isBuffer(content) ? content : Buffer.from(content, 'utf-8')
187+
const buffer = Buffer.isBuffer(content) ? content : Buffer.from(content, 'utf-8')
221188

222-
// Feed the buffer into a readable stream:
223-
const readableStream = Readable.from(buffer)
224-
// Upload the readable stream:
225-
const response = await this.client.uploadFrom(readableStream, fullPath)
189+
// Feed the buffer into a readable stream:
190+
const readableStream = Readable.from(buffer)
191+
// Upload the readable stream:
192+
const response = await ftpClient.uploadFrom(readableStream, fullPath)
226193

227-
if (response.code !== 226) {
228-
// 226 means "Transfer complete"
229-
throw new Error(`Upload failed [${response.code}]: ${response.message}`)
230-
}
194+
if (response.code !== 226) {
195+
// 226 means "Transfer complete"
196+
throw new Error(`Upload failed [${response.code}]: ${response.message}`)
197+
}
198+
})
231199
}
232200

233201
async download(fullPath: string): Promise<FileDownloadReturnType> {
@@ -318,12 +286,94 @@ export class FTPClient extends FTPClientBase {
318286

319287
const response = await this._fileExists(fullPath)
320288
if (response.exists) {
321-
await this.client.removeDir(fullPath)
322-
await this.client.cd('/') // Revert to root after removeDir
289+
await this.client.batch(async (ftpClient) => {
290+
await ftpClient.removeDir(fullPath)
291+
await ftpClient.cd('/') // Revert to root after removeDir
292+
})
323293

324294
return true
325295
} else {
326296
return false
327297
}
328298
}
329299
}
300+
301+
/**
302+
* Wraps FTP.Client. Ensures that only one method of FTP.Client is called at a time, by putting all calls in a queue.
303+
* This is necessary because FTP.Client does not support concurrent calls, and will throw an error if multiple methods are called at the same time.
304+
*/
305+
class SafeFTPClient {
306+
private ftpClient: FTP.Client
307+
private queue: PQueue
308+
309+
constructor() {
310+
this.ftpClient = new FTP.Client()
311+
this.queue = new PQueue({ concurrency: 1 })
312+
}
313+
314+
/** execute multiple ftp-commands in batch */
315+
async batch<T>(cb: (ftp: FTP.Client) => Promise<T>): Promise<T> {
316+
return this.putInQueue('batch', [], async () => {
317+
const result = await cb(this.ftpClient)
318+
return result
319+
})
320+
}
321+
322+
get closed(): boolean {
323+
return this.ftpClient.closed
324+
}
325+
close(): void {
326+
return this.ftpClient.close()
327+
}
328+
329+
async access(...args: Parameters<FTPInstance['access']>): ReturnType<FTPInstance['access']> {
330+
return this.putInQueue('access', args, async () => this.ftpClient.access(...args))
331+
}
332+
async downloadTo(...args: Parameters<FTPInstance['downloadTo']>): ReturnType<FTPInstance['downloadTo']> {
333+
return this.putInQueue('downloadTo', args, async () => this.ftpClient.downloadTo(...args))
334+
}
335+
async ensureDir(...args: Parameters<FTPInstance['ensureDir']>): ReturnType<FTPInstance['ensureDir']> {
336+
return this.putInQueue('ensureDir', args, async () => this.ftpClient.ensureDir(...args))
337+
}
338+
async lastMod(...args: Parameters<FTPInstance['lastMod']>): ReturnType<FTPInstance['lastMod']> {
339+
return this.putInQueue('lastMod', args, async () => this.ftpClient.lastMod(...args))
340+
}
341+
async list(...args: Parameters<FTPInstance['list']>): ReturnType<FTPInstance['list']> {
342+
return this.putInQueue('list', args, async () => this.ftpClient.list(...args))
343+
}
344+
async remove(...args: Parameters<FTPInstance['remove']>): ReturnType<FTPInstance['remove']> {
345+
return this.putInQueue('remove', args, async () => this.ftpClient.remove(...args))
346+
}
347+
async removeDir(...args: Parameters<FTPInstance['removeDir']>): ReturnType<FTPInstance['removeDir']> {
348+
return this.putInQueue('removeDir', args, async () => this.ftpClient.removeDir(...args))
349+
}
350+
async rename(...args: Parameters<FTPInstance['rename']>): ReturnType<FTPInstance['rename']> {
351+
return this.putInQueue('rename', args, async () => this.ftpClient.rename(...args))
352+
}
353+
async size(...args: Parameters<FTPInstance['size']>): ReturnType<FTPInstance['size']> {
354+
return this.putInQueue('size', args, async () => this.ftpClient.size(...args))
355+
}
356+
async uploadFrom(...args: Parameters<FTPInstance['uploadFrom']>): ReturnType<FTPInstance['uploadFrom']> {
357+
return this.putInQueue('uploadFrom', args, async () => this.ftpClient.uploadFrom(...args))
358+
}
359+
360+
private async putInQueue<T, Args extends unknown[]>(
361+
methodName: string,
362+
args: Args,
363+
cb: () => Promise<T>
364+
): Promise<T> {
365+
return this.queue.add(async () => {
366+
const orgError = new Error(`Error executing ${methodName}: ${JSON.stringify(args)}`) // Used later
367+
try {
368+
const result = await cb()
369+
return result
370+
} catch (e) {
371+
if (typeof e === 'object' && e !== null && 'stack' in e) {
372+
e.stack += `\nOriginal stack: ${orgError.stack}`
373+
}
374+
throw e
375+
}
376+
})
377+
}
378+
}
379+
type FTPInstance = InstanceType<typeof FTP.Client> // Short type for FTP.Client instance

shared/packages/worker/src/worker/accessorHandlers/lib/json-write-file.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ export class JSONWriteFilesBestEffortHandler extends JSONWriteHandler {
302302
const newValue = cbManipulate(oldValue0?.value)
303303
const newValueStr = newValue !== undefined ? JSON.stringify(newValue) : ''
304304

305-
if (oldValue0?.str === newValueStr) {
305+
if (oldValue0?.str === newValueStr || (oldValue0 === undefined && newValue === undefined)) {
306306
// do nothing
307307
return
308308
}

0 commit comments

Comments
 (0)