@@ -42,13 +42,11 @@ class AdminAPI {
4242 this . publishBatchSize = publishBatchSize ;
4343 this . authToken = authToken ;
4444 this . context = context ;
45- this . isProcessing = false ;
46- this . processingPromise = null ;
47- this . shouldStop = false ;
45+ this . onQueuesProcessed = null ;
46+ this . stopProcessing$ = null ;
4847 this . lastStatusLog = 0 ;
4948 this . previewDurations = [ ] ;
5049 this . queue = [ ] ;
51- this . processingChain = null ;
5250 }
5351
5452 previewAndPublish ( records , locale , batchNumber ) {
@@ -64,109 +62,38 @@ class AdminAPI {
6462 }
6563
6664 async startProcessing ( ) {
67- if ( this . isProcessing ) {
68- return this . processingPromise ;
65+ if ( this . stopProcessing$ ) {
66+ // only restart processing after awaiting stopProcessing
67+ await this . stopProcessing$ ;
68+ }
69+ if ( ! this . interval ) {
70+ this . interval = setInterval ( ( ) => this . processQueues ( ) , 1000 ) ;
6971 }
70-
71- this . isProcessing = true ;
72- this . shouldStop = false ;
73-
74- this . processingPromise = this . processQueuesWithPromiseChain ( ) ;
75- return this . processingPromise ;
7672 }
7773
7874 async stopProcessing ( ) {
79- this . shouldStop = true ;
80-
81- if ( this . processingPromise ) {
82- await this . processingPromise ;
83- }
84-
85- this . isProcessing = false ;
86- this . processingPromise = null ;
87- }
88-
89- async processQueuesWithPromiseChain ( ) {
90- const { logger } = this . context ;
91-
92- while ( ! this . shouldStop ) {
93- if ( ! this . hasWorkToDo ( ) ) {
94- // No work to do, exit the loop
95- break ;
96- }
97-
98- try {
99- await this . processNextBatch ( ) ;
100-
101- // Small delay to prevent overwhelming the system
102- await this . delay ( 100 ) ;
103-
104- // Log status periodically
105- if ( this . lastStatusLog < new Date ( ) - 1000 ) {
106- this . logQueueStatus ( ) ;
107- this . lastStatusLog = new Date ( ) ;
108- }
109- } catch ( error ) {
110- logger . error ( 'Error in processing chain:' , error ) ;
111- // Continue processing even if one batch fails
112- await this . delay ( 1000 ) ; // Longer delay on error
113- }
114- }
115-
116- logger . info ( 'Processing chain completed' ) ;
117-
118- // Reset processing state when chain completes
119- this . isProcessing = false ;
120- this . processingPromise = null ;
121- }
122-
123- hasWorkToDo ( ) {
124- return this . previewQueue . length > 0 ||
125- this . publishQueue . length > 0 ||
126- this . unpublishQueue . length > 0 ||
127- this . unpublishPreviewQueue . length > 0 ||
128- this . inflight . length > 0 ;
129- }
130-
131- async processNextBatch ( ) {
132- // Process queues in priority order
133- if ( this . publishQueue . length > 0 ) {
134- const batch = this . publishQueue . shift ( ) ;
135- await this . doBatchPublishAsync ( batch ) ;
136- return ;
137- }
138-
139- if ( this . previewQueue . length > 0 ) {
140- const batch = this . previewQueue . shift ( ) ;
141- await this . doBatchPreviewAsync ( batch ) ;
142- return ;
143- }
144-
145- if ( this . unpublishQueue . length > 0 ) {
146- const batch = this . unpublishQueue . shift ( ) ;
147- await this . doBatchUnpublishAsync ( batch , 'live' ) ;
75+ if ( ! this . interval ) {
14876 return ;
14977 }
78+ // stopProcessing only once by keeping a single promise resolving after all queues are processed
79+ if ( ! this . stopProcessing$ ) {
80+ this . stopProcessing$ = new Promise ( ( resolve ) => {
81+ this . onQueuesProcessed = ( ) => {
82+ if ( this . previewQueue . length + this . publishQueue . length + this . unpublishQueue . length + this . unpublishPreviewQueue . length + this . inflight . length > 0 ) {
83+ // still running
84+ return ;
85+ }
15086
151- if ( this . unpublishPreviewQueue . length > 0 ) {
152- const batch = this . unpublishPreviewQueue . shift ( ) ;
153- await this . doBatchUnpublishAsync ( batch , 'preview' ) ;
154- return ;
87+ // reset callback
88+ clearInterval ( this . interval ) ;
89+ this . onQueuesProcessed = null ;
90+ this . stopProcessing$ = null ;
91+ this . interval = null ;
92+ resolve ( ) ;
93+ } ;
94+ } ) ;
15595 }
156- }
157-
158- logQueueStatus ( ) {
159- const { logger } = this . context ;
160- logger . info ( `Queues: preview=${ this . previewQueue . length } ,`
161- + ` publish=${ this . publishQueue . length } ,`
162- + ` unpublish live=${ this . unpublishQueue . length } ,`
163- + ` unpublish preview=${ this . unpublishPreviewQueue . length } ,`
164- + ` inflight=${ this . inflight . length } ,`
165- + ` in queue=${ this . queue . length } ` ) ;
166- }
167-
168- delay ( ms ) {
169- return new Promise ( resolve => setTimeout ( resolve , ms ) ) ;
96+ return this . stopProcessing$ ;
17097 }
17198
17299 trackInFlight ( name , callback ) {
@@ -392,56 +319,6 @@ class AdminAPI {
392319 } ) ;
393320 }
394321
395- async doBatchPreviewAsync ( batch ) {
396- const { logger } = this . context ;
397- const { records, locale, batchNumber } = batch ;
398- const paths = records . map ( record => record . path ) ;
399-
400- if ( paths . length === 0 ) {
401- logger . info ( `Skipping preview for batch id=${ batchNumber } for locale=${ locale } : no paths to process.` ) ;
402- batch . resolve ( { records, locale, batchNumber } ) ;
403- return ;
404- }
405-
406- const body = {
407- forceUpdate : true ,
408- paths,
409- delete : false
410- } ;
411- const start = new Date ( ) ;
412-
413- try {
414- // Try to preview the batch using bulk preview API
415- const response = await this . runWithRetry (
416- async ( ) => {
417- return await this . execAdminRequest ( 'POST' , 'preview' , '/*' , body ) ;
418- } ,
419- { name : `preview batch number ${ batchNumber } for locale ${ locale } ` , isBatch : true }
420- ) ;
421-
422- if ( response ?. job ) {
423- logger . info ( `Previewed batch number ${ batchNumber } for locale ${ locale } ` ) ;
424- const successPaths = await this . checkJobStatus ( response . job ) ;
425- batch . records . forEach ( record => {
426- if ( successPaths . includes ( record . path ) ) {
427- record . previewedAt = new Date ( ) ;
428- }
429- } ) ;
430-
431- this . publishQueue . push ( batch ) ;
432- } else {
433- logger . error ( `Error previewing batch number ${ batchNumber } for locale ${ locale } ` ) ;
434- batch . resolve ( { records, locale, batchNumber} ) ;
435- }
436- } catch ( error ) {
437- logger . error ( `Error in preview batch ${ batchNumber } :` , error ) ;
438- batch . resolve ( { records, locale, batchNumber} ) ;
439- }
440-
441- // Complete the batch preview
442- this . previewDurations . push ( new Date ( ) - start ) ;
443- }
444-
445322 doBatchPublish ( batch ) {
446323 this . trackInFlight ( 'publish' , async ( complete ) => {
447324 const { logger } = this . context ;
@@ -487,51 +364,6 @@ class AdminAPI {
487364 } ) ;
488365 }
489366
490- async doBatchPublishAsync ( batch ) {
491- const { logger } = this . context ;
492- const { records, locale, batchNumber } = batch ;
493- const paths = records . filter ( record => record . previewedAt ) . map ( record => record . path ) ;
494-
495- if ( paths . length === 0 ) {
496- logger . info ( `Skipping publish in batch id=${ batchNumber } for locale=${ locale } : no paths to process.` ) ;
497- batch . resolve ( { records, locale, batchNumber } ) ;
498- return ;
499- }
500-
501- const body = {
502- forceUpdate : true ,
503- paths,
504- delete : false
505- } ;
506-
507- try {
508- // Try to publish the batch using bulk publish API
509- const response = await this . runWithRetry (
510- async ( ) => {
511- return await this . execAdminRequest ( 'POST' , 'live' , '/*' , body ) ;
512- } ,
513- { name : `publish batch number ${ batchNumber } for locale ${ locale } ` , isBatch : true }
514- ) ;
515-
516- if ( response ?. job ) {
517- logger . info ( `Published batch number ${ batchNumber } for locale ${ locale } ` ) ;
518- const successPaths = await this . checkJobStatus ( response . job ) ;
519- batch . records . forEach ( record => {
520- if ( successPaths . includes ( record . path ) ) {
521- record . publishedAt = new Date ( ) ;
522- }
523- } ) ;
524- } else {
525- logger . error ( `Error publishing batch number ${ batchNumber } for locale ${ locale } ` ) ;
526- }
527- } catch ( error ) {
528- logger . error ( `Error in publish batch ${ batchNumber } :` , error ) ;
529- }
530-
531- // Resolve the original promises
532- batch . resolve ( { records, locale, batchNumber} ) ;
533- }
534-
535367 doBatchUnpublish ( batch , route ) {
536368 this . trackInFlight ( 'unpublish' , async ( complete ) => {
537369 const { logger } = this . context ;
@@ -559,7 +391,7 @@ class AdminAPI {
559391 async ( ) => {
560392 return await this . execAdminRequest ( 'POST' , route , '/*' , body ) ;
561393 } ,
562- { name : `unpublish ${ route } batch number ${ batchNumber } for locale ${ locale } ` , isBatch : true }
394+ `unpublish ${ route } batch number ${ batchNumber } for locale ${ locale } `
563395 ) ;
564396
565397 if ( response ?. job ) {
@@ -595,70 +427,6 @@ class AdminAPI {
595427 } ) ;
596428 }
597429
598- async doBatchUnpublishAsync ( batch , route ) {
599- const { logger } = this . context ;
600- const { records, locale, batchNumber } = batch ;
601-
602- const paths = route === 'live'
603- ? records . map ( record => record . path )
604- : records . filter ( record => record . liveUnpublishedAt ) . map ( record => record . path ) ;
605-
606- if ( paths . length === 0 ) {
607- logger . info ( `Skipping unpublish for route=${ route } in batch id=${ batchNumber } for locale=${ locale } : no paths to process.` ) ;
608- batch . resolve ( { records, locale, batchNumber } ) ;
609- return ;
610- }
611-
612- const body = {
613- forceUpdate : true ,
614- paths,
615- delete : true ,
616- } ;
617-
618- try {
619- // Try to unpublish live the batch using bulk publish API
620- const response = await this . runWithRetry (
621- async ( ) => {
622- return await this . execAdminRequest ( 'POST' , route , '/*' , body ) ;
623- } ,
624- { name : `unpublish ${ route } batch number ${ batchNumber } for locale ${ locale } ` , isBatch : true }
625- ) ;
626-
627- if ( response ?. job ) {
628- logger . info ( `Unpublished ${ route } batch number ${ batchNumber } for locale ${ locale } ` ) ;
629- const successPaths = await this . checkJobStatus ( response . job ) ;
630- batch . records . forEach ( record => {
631- if ( successPaths . includes ( record . path ) ) {
632- if ( route === 'live' ) {
633- record . liveUnpublishedAt = new Date ( ) ;
634- } else {
635- record . previewUnpublishedAt = new Date ( ) ;
636- }
637- }
638- } ) ;
639-
640- if ( route === 'live' ) {
641- this . unpublishPreviewQueue . push ( batch ) ;
642- }
643- } else {
644- logger . error ( `Error unpublishing ${ route } batch number ${ batchNumber } for locale ${ locale } ` ) ;
645- if ( route === 'live' ) {
646- batch . resolve ( { records, locale, batchNumber} ) ;
647- }
648- }
649- } catch ( error ) {
650- logger . error ( `Error in unpublish batch ${ batchNumber } for route ${ route } :` , error ) ;
651- if ( route === 'live' ) {
652- batch . resolve ( { records, locale, batchNumber} ) ;
653- }
654- }
655-
656- // Resolve the original promises
657- if ( route === 'preview' ) {
658- batch . resolve ( { records, locale, batchNumber} ) ;
659- }
660- }
661-
662430 processQueues ( ) {
663431 if ( this . lastStatusLog < new Date ( ) - 1000 ) {
664432 const { logger } = this . context ;
0 commit comments