@@ -25,7 +25,8 @@ const ts = new Date()
2525 . toISOString ( )
2626 . replace ( / [ - : T . Z ] / g, '' )
2727 . slice ( 0 , 15 )
28- const STREAM = 'customers'
28+ const CUSTOMERS_STREAM = 'customers'
29+ const PRODUCTS_STREAM = 'products'
2930const BACKFILL_LIMIT = 10
3031
3132function memoryPipelineStore ( ) {
@@ -84,7 +85,7 @@ describeWithEnv(
8485 return {
8586 source : { type : 'stripe' , stripe : sourceConfig } ,
8687 destination : { type : 'postgres' , postgres : destConfig } ,
87- streams : [ { name : STREAM } ] ,
88+ streams : [ { name : CUSTOMERS_STREAM } , { name : PRODUCTS_STREAM } ] ,
8889 }
8990 }
9091
@@ -107,7 +108,7 @@ describeWithEnv(
107108 await pool . end ( )
108109 } )
109110
110- it ( 'tombstones customers deleted in stripe without a delete event ' , async ( ) => {
111+ it ( 'tombstones deleted customers and products ' , async ( ) => {
111112 const engine = await createEngine ( resolver )
112113 const pipeline = makePipeline ( )
113114 const pipelineStore = memoryPipelineStore ( )
@@ -121,23 +122,39 @@ describeWithEnv(
121122 const doomed = await stripe . customers . create ( {
122123 name : `e2e-recon-doomed-${ Date . now ( ) } ` ,
123124 } )
124- const cleanupIds = new Set < string > ( [ survivor . id , doomed . id ] )
125+ const productSurvivor = await stripe . products . create ( {
126+ name : `e2e-recon-product-survivor-${ Date . now ( ) } ` ,
127+ } )
128+ const productDoomed = await stripe . products . create ( {
129+ name : `e2e-recon-product-doomed-${ Date . now ( ) } ` ,
130+ } )
131+ const cleanupCustomerIds = new Set < string > ( [ survivor . id , doomed . id ] )
132+ const cleanupProductIds = new Set < string > ( [ productSurvivor . id , productDoomed . id ] )
125133
126134 try {
127- // Backfill-only sync (no websocket, no event polling) — both rows
135+ // Backfill-only sync (no websocket, no event polling) — all rows
128136 // land in postgres with `_last_synced_at ≈ T0`.
129137 await drain ( engine . pipeline_sync ( pipeline ) )
130138
131139 const seeded = await pool . query < { id : string } > (
132- `SELECT id FROM "${ SCHEMA } "."${ STREAM } " WHERE id = ANY($1)` ,
140+ `SELECT id FROM "${ SCHEMA } "."${ CUSTOMERS_STREAM } " WHERE id = ANY($1)` ,
133141 [ [ survivor . id , doomed . id ] ]
134142 )
135143 expect ( new Set ( seeded . rows . map ( ( r ) => r . id ) ) ) . toEqual ( new Set ( [ survivor . id , doomed . id ] ) )
144+ const seededProducts = await pool . query < { id : string } > (
145+ `SELECT id FROM "${ SCHEMA } "."${ PRODUCTS_STREAM } " WHERE id = ANY($1)` ,
146+ [ [ productSurvivor . id , productDoomed . id ] ]
147+ )
148+ expect ( new Set ( seededProducts . rows . map ( ( r ) => r . id ) ) ) . toEqual (
149+ new Set ( [ productSurvivor . id , productDoomed . id ] )
150+ )
136151
137- // Hard-delete one customer WITHOUT replaying the customer .deleted
152+ // Hard-delete one object per stream WITHOUT replaying the * .deleted
138153 // event — this is the "missed delete" reconcile-cleanup catches.
139154 await stripe . customers . del ( doomed . id )
140- cleanupIds . delete ( doomed . id )
155+ cleanupCustomerIds . delete ( doomed . id )
156+ await stripe . products . del ( productDoomed . id )
157+ cleanupProductIds . delete ( productDoomed . id )
141158
142159 // `_last_synced_at` is set with millisecond precision by the destination,
143160 // so a small forward skew guarantees `syncRunStartedAt > _last_synced_at`.
@@ -152,21 +169,43 @@ describeWithEnv(
152169 await env . run ( activities . reconcileCleanup , PIPELINE_ID , syncRunStartedAt )
153170
154171 const after = await pool . query < { id : string } > (
155- `SELECT id FROM "${ SCHEMA } "."${ STREAM } " WHERE id = ANY($1)` ,
172+ `SELECT id FROM "${ SCHEMA } "."${ CUSTOMERS_STREAM } " WHERE id = ANY($1)` ,
156173 [ [ survivor . id , doomed . id ] ]
157174 )
158175 const remaining = new Set ( after . rows . map ( ( r ) => r . id ) )
159176 expect ( remaining . has ( survivor . id ) , `survivor ${ survivor . id } was tombstoned` ) . toBe ( true )
160177 expect ( remaining . has ( doomed . id ) , `doomed ${ doomed . id } was not tombstoned` ) . toBe ( false )
161- console . log ( ` Survived: ${ survivor . id } ` )
162- console . log ( ` Tombstoned: ${ doomed . id } ` )
178+
179+ const afterProducts = await pool . query < { id : string } > (
180+ `SELECT id FROM "${ SCHEMA } "."${ PRODUCTS_STREAM } " WHERE id = ANY($1)` ,
181+ [ [ productSurvivor . id , productDoomed . id ] ]
182+ )
183+ const remainingProducts = new Set ( afterProducts . rows . map ( ( r ) => r . id ) )
184+ expect (
185+ remainingProducts . has ( productSurvivor . id ) ,
186+ `product survivor ${ productSurvivor . id } was tombstoned`
187+ ) . toBe ( true )
188+ expect (
189+ remainingProducts . has ( productDoomed . id ) ,
190+ `product doomed ${ productDoomed . id } was not tombstoned`
191+ ) . toBe ( false )
192+
193+ console . log ( ` Customer survived: ${ survivor . id } ` )
194+ console . log ( ` Customer tombstoned: ${ doomed . id } ` )
195+ console . log ( ` Product survived: ${ productSurvivor . id } ` )
196+ console . log ( ` Product tombstoned: ${ productDoomed . id } ` )
163197 } finally {
164198 if ( ! process . env . KEEP_TEST_DATA ) {
165- for ( const id of cleanupIds ) {
199+ for ( const id of cleanupCustomerIds ) {
166200 try {
167201 await stripe . customers . del ( id )
168202 } catch { }
169203 }
204+ for ( const id of cleanupProductIds ) {
205+ try {
206+ await stripe . products . del ( id )
207+ } catch { }
208+ }
170209 }
171210 }
172211 } , 180_000 )
@@ -215,7 +254,7 @@ describeWithEnv(
215254 batch_size : 50 ,
216255 } ,
217256 } ,
218- streams : [ { name : STREAM } ] ,
257+ streams : [ { name : CUSTOMERS_STREAM } , { name : PRODUCTS_STREAM } ] ,
219258 }
220259 }
221260
@@ -235,7 +274,7 @@ describeWithEnv(
235274 }
236275 } )
237276
238- it ( 'tombstones customers deleted in stripe without a delete event ' , async ( ) => {
277+ it ( 'tombstones deleted customers and products ' , async ( ) => {
239278 const engine = await createEngine ( resolver )
240279
241280 // pipeline_setup creates the spreadsheet if needed and emits the new
@@ -265,22 +304,41 @@ describeWithEnv(
265304 const doomed = await stripe . customers . create ( {
266305 name : `e2e-recon-sheets-doomed-${ Date . now ( ) } ` ,
267306 } )
268- const cleanupIds = new Set < string > ( [ survivor . id , doomed . id ] )
307+ const productSurvivor = await stripe . products . create ( {
308+ name : `e2e-recon-sheets-product-survivor-${ Date . now ( ) } ` ,
309+ } )
310+ const productDoomed = await stripe . products . create ( {
311+ name : `e2e-recon-sheets-product-doomed-${ Date . now ( ) } ` ,
312+ } )
313+ const cleanupCustomerIds = new Set < string > ( [ survivor . id , doomed . id ] )
314+ const cleanupProductIds = new Set < string > ( [ productSurvivor . id , productDoomed . id ] )
269315
270316 try {
271- // Backfill seeds both customers with `_last_synced_at ≈ T0`.
317+ // Backfill seeds both streams with `_last_synced_at ≈ T0`.
272318 await drain ( engine . pipeline_sync ( pipeline ) )
273319
274- const seededRows = await readSheet ( sheetsClient , spreadsheetId , STREAM )
320+ const seededRows = await readSheet ( sheetsClient , spreadsheetId , CUSTOMERS_STREAM )
275321 const seededHeader = ( seededRows [ 0 ] ?? [ ] ) as string [ ]
276322 const idIdx = seededHeader . indexOf ( 'id' )
277323 expect ( idIdx , 'id column missing in sheet header' ) . toBeGreaterThanOrEqual ( 0 )
278324 const seededIds = new Set ( seededRows . slice ( 1 ) . map ( ( row ) => String ( row [ idIdx ] ?? '' ) ) )
279325 expect ( seededIds . has ( survivor . id ) ) . toBe ( true )
280326 expect ( seededIds . has ( doomed . id ) ) . toBe ( true )
281327
328+ const seededProducts = await readSheet ( sheetsClient , spreadsheetId , PRODUCTS_STREAM )
329+ const seededProductHeader = ( seededProducts [ 0 ] ?? [ ] ) as string [ ]
330+ const productIdIdx = seededProductHeader . indexOf ( 'id' )
331+ expect ( productIdIdx , 'id column missing in products header' ) . toBeGreaterThanOrEqual ( 0 )
332+ const seededProductIds = new Set (
333+ seededProducts . slice ( 1 ) . map ( ( row ) => String ( row [ productIdIdx ] ?? '' ) )
334+ )
335+ expect ( seededProductIds . has ( productSurvivor . id ) ) . toBe ( true )
336+ expect ( seededProductIds . has ( productDoomed . id ) ) . toBe ( true )
337+
282338 await stripe . customers . del ( doomed . id )
283- cleanupIds . delete ( doomed . id )
339+ cleanupCustomerIds . delete ( doomed . id )
340+ await stripe . products . del ( productDoomed . id )
341+ cleanupProductIds . delete ( productDoomed . id )
284342
285343 await new Promise ( ( r ) => setTimeout ( r , 50 ) )
286344 const syncRunStartedAt = new Date ( ) . toISOString ( )
@@ -289,19 +347,40 @@ describeWithEnv(
289347 const env = new MockActivityEnvironment ( )
290348 await env . run ( activities . reconcileCleanup , PIPELINE_ID , syncRunStartedAt )
291349
292- const afterRows = await readSheet ( sheetsClient , spreadsheetId , STREAM )
350+ const afterRows = await readSheet ( sheetsClient , spreadsheetId , CUSTOMERS_STREAM )
293351 const afterIds = new Set ( afterRows . slice ( 1 ) . map ( ( row ) => String ( row [ idIdx ] ?? '' ) ) )
294352 expect ( afterIds . has ( survivor . id ) , `survivor ${ survivor . id } was tombstoned` ) . toBe ( true )
295353 expect ( afterIds . has ( doomed . id ) , `doomed ${ doomed . id } was not tombstoned` ) . toBe ( false )
296- console . log ( ` Survived: ${ survivor . id } ` )
297- console . log ( ` Tombstoned: ${ doomed . id } ` )
354+
355+ const afterProducts = await readSheet ( sheetsClient , spreadsheetId , PRODUCTS_STREAM )
356+ const afterProductIds = new Set (
357+ afterProducts . slice ( 1 ) . map ( ( row ) => String ( row [ productIdIdx ] ?? '' ) )
358+ )
359+ expect (
360+ afterProductIds . has ( productSurvivor . id ) ,
361+ `product survivor ${ productSurvivor . id } was tombstoned`
362+ ) . toBe ( true )
363+ expect (
364+ afterProductIds . has ( productDoomed . id ) ,
365+ `product doomed ${ productDoomed . id } was not tombstoned`
366+ ) . toBe ( false )
367+
368+ console . log ( ` Customer survived: ${ survivor . id } ` )
369+ console . log ( ` Customer tombstoned: ${ doomed . id } ` )
370+ console . log ( ` Product survived: ${ productSurvivor . id } ` )
371+ console . log ( ` Product tombstoned: ${ productDoomed . id } ` )
298372 } finally {
299373 if ( ! process . env . KEEP_TEST_DATA ) {
300- for ( const id of cleanupIds ) {
374+ for ( const id of cleanupCustomerIds ) {
301375 try {
302376 await stripe . customers . del ( id )
303377 } catch { }
304378 }
379+ for ( const id of cleanupProductIds ) {
380+ try {
381+ await stripe . products . del ( id )
382+ } catch { }
383+ }
305384 }
306385 }
307386 } , 240_000 )
0 commit comments