@@ -30,6 +30,14 @@ import { match } from 'ts-pattern'
3030import { getDatabase } from './db'
3131import { env } from './env'
3232import { generateNumberId } from './helpers'
33+ import { logger } from './logger'
34+
35+ export class ObjectNotFoundError extends Error {
36+ constructor ( objectName : string ) {
37+ super ( `Object not found in storage: ${ objectName } ` )
38+ this . name = 'ObjectNotFoundError'
39+ }
40+ }
3341
3442export class Storage {
3543 adapter
@@ -208,22 +216,24 @@ export class Storage {
208216 . where ( 'id' , '=' , storageLocation . id )
209217 . execute ( )
210218
211- if ( storageLocation . mergedAt || storageLocation . mergeStartedAt )
212- return this . downloadFromCacheEntryLocation ( storageLocation )
219+ try {
220+ if ( storageLocation . mergedAt || storageLocation . mergeStartedAt )
221+ return await this . downloadFromCacheEntryLocation ( storageLocation )
213222
214- await this . db
215- . updateTable ( 'storage_locations' )
216- . set ( {
217- mergeStartedAt : Date . now ( ) ,
218- } )
219- . where ( 'id' , '=' , storageLocation . id )
220- . execute ( )
223+ await this . ensurePartsExist ( storageLocation )
221224
222- const responseStream = new PassThrough ( )
223- const mergerStream = new PassThrough ( )
225+ await this . db
226+ . updateTable ( 'storage_locations' )
227+ . set ( {
228+ mergeStartedAt : Date . now ( ) ,
229+ } )
230+ . where ( 'id' , '=' , storageLocation . id )
231+ . execute ( )
224232
225- try {
226- const promise = this . adapter
233+ const responseStream = new PassThrough ( )
234+ const mergerStream = new PassThrough ( )
235+
236+ const mergePromise = this . adapter
227237 . uploadStream ( `${ storageLocation . folderName } /merged` , mergerStream )
228238 . then ( async ( ) => {
229239 await this . db
@@ -255,31 +265,36 @@ export class Storage {
255265 . execute ( )
256266 mergerStream . destroy ( )
257267 } )
258- this . mergeStreamPromises . add ( promise )
259- promise . finally ( ( ) => this . mergeStreamPromises . delete ( promise ) )
268+ this . mergeStreamPromises . add ( mergePromise )
269+ mergePromise . finally ( ( ) => this . mergeStreamPromises . delete ( mergePromise ) )
270+
271+ this . pumpPartsToStreams ( storageLocation , responseStream , mergerStream ) . catch ( ( err ) => {
272+ responseStream . destroy ( err )
273+ mergerStream . destroy ( err )
274+ if ( err instanceof ObjectNotFoundError )
275+ logger . warn ( `Stale cache entry ${ cacheEntryId } : ${ err . message } ` )
276+ } )
277+
278+ return responseStream
260279 } catch ( err ) {
261- await this . db
262- . updateTable ( 'storage_locations' )
263- . set ( {
264- mergedAt : null ,
265- mergeStartedAt : null ,
266- } )
267- . where ( 'id' , '=' , storageLocation . id )
268- . execute ( )
280+ if ( err instanceof ObjectNotFoundError ) {
281+ logger . warn ( `Stale cache entry ${ cacheEntryId } : ${ err . message } ` )
282+ return
283+ }
269284 throw err
270285 }
286+ }
271287
272- this . pumpPartsToStreams ( storageLocation , responseStream , mergerStream ) . catch ( ( err ) => {
273- responseStream . destroy ( err )
274- mergerStream . destroy ( err )
275- } )
276-
277- return responseStream
288+ private async ensurePartsExist ( location : StorageLocation ) {
289+ const partsFolder = `${ location . folderName } /parts`
290+ const actualPartCount = await this . adapter . countFilesInFolder ( partsFolder )
291+ if ( actualPartCount < location . partCount ) throw new ObjectNotFoundError ( partsFolder )
278292 }
279293
280294 private async downloadFromCacheEntryLocation ( location : StorageLocation ) {
281295 if ( location . mergedAt ) return this . adapter . createDownloadStream ( `${ location . folderName } /merged` )
282296
297+ await this . ensurePartsExist ( location )
283298 return Readable . from ( this . streamParts ( location ) )
284299 }
285300
@@ -522,15 +537,20 @@ class S3Adapter implements StorageAdapter {
522537 }
523538
524539 async createDownloadStream ( objectName : string ) {
525- const response = await this . s3 . send (
526- new GetObjectCommand ( {
527- Bucket : this . bucket ,
528- Key : `${ this . keyPrefix } /${ objectName } ` ,
529- } ) ,
530- )
531- if ( ! response . Body ) throw new Error ( 'No body in S3 get object response' )
540+ try {
541+ const response = await this . s3 . send (
542+ new GetObjectCommand ( {
543+ Bucket : this . bucket ,
544+ Key : `${ this . keyPrefix } /${ objectName } ` ,
545+ } ) ,
546+ )
547+ if ( ! response . Body ) throw new Error ( 'No body in S3 get object response' )
532548
533- return response . Body as Readable
549+ return response . Body as Readable
550+ } catch ( err : any ) {
551+ if ( err . name === 'NoSuchKey' ) throw new ObjectNotFoundError ( objectName )
552+ throw err
553+ }
534554 }
535555
536556 async deleteFolder ( folderName : string ) {
@@ -629,7 +649,13 @@ class FileSystemAdapter implements StorageAdapter {
629649 }
630650
631651 async createDownloadStream ( objectName : string ) {
632- return createReadStream ( path . join ( this . rootFolder , objectName ) )
652+ const filePath = path . join ( this . rootFolder , objectName )
653+ try {
654+ await fs . access ( filePath )
655+ } catch {
656+ throw new ObjectNotFoundError ( objectName )
657+ }
658+ return createReadStream ( filePath )
633659 }
634660
635661 async deleteFolder ( folderName : string ) {
@@ -656,11 +682,15 @@ class FileSystemAdapter implements StorageAdapter {
656682 }
657683
658684 async countFilesInFolder ( folderName : string ) {
659- const dir = await fs . readdir ( path . join ( this . rootFolder , folderName ) , {
660- withFileTypes : true ,
661- } )
662-
663- return dir . filter ( ( item ) => item . isFile ( ) ) . length
685+ try {
686+ const dir = await fs . readdir ( path . join ( this . rootFolder , folderName ) , {
687+ withFileTypes : true ,
688+ } )
689+ return dir . filter ( ( item ) => item . isFile ( ) ) . length
690+ } catch ( err : any ) {
691+ if ( err . code === 'ENOENT' ) return 0
692+ throw err
693+ }
664694 }
665695}
666696
@@ -690,7 +720,10 @@ class GcsAdapter implements StorageAdapter {
690720 }
691721
692722 async createDownloadStream ( objectName : string ) {
693- return this . bucket . file ( `${ this . keyPrefix } /${ objectName } ` ) . createReadStream ( )
723+ const file = this . bucket . file ( `${ this . keyPrefix } /${ objectName } ` )
724+ const [ exists ] = await file . exists ( )
725+ if ( ! exists ) throw new ObjectNotFoundError ( objectName )
726+ return file . createReadStream ( )
694727 }
695728
696729 async deleteFolder ( folderName : string ) {
0 commit comments