@@ -90,6 +90,7 @@ interface CleanupResult {
9090 startTime : string
9191 endTime : string
9292 postgresDeleted : number
93+ postgresFailedBatches : number
9394 tinybirdJobIds : string [ ]
9495 deletions : {
9596 postgres : DeletionStatus
@@ -141,15 +142,6 @@ function buildPostgresFilter(filters: Filters): { where: string; values: Record<
141142// Count helpers
142143// ---------------------------------------------------------------------------
143144
144- async function countPostgresRows ( postgres : QueryExecutor , filters : Filters ) : Promise < number > {
145- const { where, values } = buildPostgresFilter ( filters )
146- const result = ( await postgres . selectOne (
147- `SELECT COUNT(*) AS count FROM "activityRelations" WHERE ${ where } ` ,
148- values ,
149- ) ) as { count : string }
150- return parseInt ( result . count , 10 )
151- }
152-
153145async function countTinybirdRows (
154146 tinybird : TinybirdClient ,
155147 datasource : string ,
@@ -184,40 +176,48 @@ async function confirmOrAbort(message: string): Promise<void> {
184176// ---------------------------------------------------------------------------
185177
186178/**
187- * Delete matching rows from activityRelations in 10k batches.
188- * Each batch is its own transaction so lock duration and WAL stays bounded.
189- * Returns total rows deleted.
179+ * Delete matching rows from activityRelations in batches by fetching IDs first.
180+ * Each iteration: fetch up to batchSize IDs matching the filter, then delete by PK.
181+ * PK deletes are cheap index lookups; the filter scan happens once per batch (not twice).
182+ * Returns { deleted, failedBatches }.
190183 */
191184async function deletePostgresInChunks (
192185 postgres : QueryExecutor ,
193186 filters : Filters ,
194187 batchSize = 10000 ,
195- ) : Promise < number > {
188+ ) : Promise < { deleted : number ; failedBatches : number } > {
196189 const { where, values } = buildPostgresFilter ( filters )
197- const query = `
198- DELETE FROM "activityRelations"
199- WHERE "activityId" IN (
200- SELECT "activityId" FROM "activityRelations"
201- WHERE ${ where }
202- LIMIT ${ batchSize }
203- )
204- `
190+ const fetchQuery = `SELECT "activityId" FROM "activityRelations" WHERE ${ where } LIMIT ${ batchSize } `
205191
206192 let total = 0
207193 let batch = 0
208- let deleted = 0
194+ let failedBatches = 0
195+
196+ while ( true ) {
197+ const rows = ( await postgres . select ( fetchQuery , values ) ) as Array < { activityId : string } >
198+ if ( rows . length === 0 ) break
209199
210- do {
211- deleted = await postgres . result ( query , values )
212- if ( deleted === 0 ) break
213- total += deleted
200+ const ids = rows . map ( ( r ) => r . activityId )
214201 batch ++
202+
203+ try {
204+ await postgres . result ( `DELETE FROM "activityRelations" WHERE "activityId" IN ($(ids:csv))` , {
205+ ids,
206+ } )
207+ total += ids . length
208+ } catch ( error ) {
209+ log . error (
210+ ` Batch ${ batch } delete failed (sample IDs: ${ ids . slice ( 0 , 3 ) . join ( ', ' ) } ): ${ error . message } ` ,
211+ )
212+ failedBatches ++
213+ }
214+
215215 if ( batch % 10 === 0 ) {
216216 log . info ( ` … deleted ${ total . toLocaleString ( ) } rows so far (batch ${ batch } )` )
217217 }
218- } while ( deleted > 0 )
218+ }
219219
220- return total
220+ return { deleted : total , failedBatches }
221221}
222222
223223// ---------------------------------------------------------------------------
@@ -284,45 +284,57 @@ async function runCleanup(
284284 const postgres = await initPostgresClient ( )
285285 const tinybird = new TinybirdClient ( tbToken )
286286
287- // Pre-flight counts (all three stores in parallel)
288- log . info ( 'Counting affected rows across all stores...' )
289- const [ pgCount , tbActivitiesCount , tbRelationsCount ] = await Promise . all ( [
290- countPostgresRows ( postgres , filters ) ,
287+ // Pre-flight counts — Tinybird only (ClickHouse COUNT is cheap; PG COUNT over 1M+ rows is not)
288+ log . info ( 'Counting affected rows in Tinybird...' )
289+ const [ tbActivitiesCount , tbRelationsCount ] = await Promise . all ( [
291290 countTinybirdRows ( tinybird , 'activities' , filters ) ,
292291 countTinybirdRows ( tinybird , 'activityRelations' , filters ) ,
293292 ] )
294293
295- log . info ( ` PostgreSQL activityRelations : ${ pgCount . toLocaleString ( ) } rows` )
296294 log . info ( ` Tinybird activities : ${ tbActivitiesCount . toLocaleString ( ) } rows` )
297295 log . info ( ` Tinybird activityRelations : ${ tbRelationsCount . toLocaleString ( ) } rows` )
296+ log . info ( ` PostgreSQL activityRelations : will be deleted by streaming batches (no pre-count)` )
298297
299298 if ( dryRun ) {
300299 log . info ( `\n[DRY RUN] Would delete:` )
301- log . info ( ` ${ pgCount . toLocaleString ( ) } rows from PostgreSQL activityRelations` )
300+ log . info (
301+ ` PostgreSQL activityRelations matching filter: ${ buildTinybirdFilterClause ( filters ) . replace ( / ' / g, '"' ) } — actual count reported during real execution` ,
302+ )
302303 log . info ( ` ${ tbActivitiesCount . toLocaleString ( ) } rows from Tinybird activities` )
303304 log . info ( ` ${ tbRelationsCount . toLocaleString ( ) } rows from Tinybird activityRelations` )
304305 log . info ( '[DRY RUN] No data was deleted.' )
305306 return
306307 }
307308
308- if ( pgCount === 0 && tbActivitiesCount === 0 && tbRelationsCount === 0 ) {
309- log . info ( 'No matching rows found. Nothing to delete.' )
309+ if ( tbActivitiesCount === 0 && tbRelationsCount === 0 ) {
310+ log . info ( 'No matching rows found in Tinybird . Nothing to delete.' )
310311 return
311312 }
312313
313314 if ( ! skipConfirm ) {
314315 await confirmOrAbort (
315- `\nAbout to permanently delete ${ pgCount . toLocaleString ( ) } PG rows, ${ tbActivitiesCount . toLocaleString ( ) } TB activities, ${ tbRelationsCount . toLocaleString ( ) } TB activityRelations.` ,
316+ `\nAbout to permanently delete PG rows matching filter , ${ tbActivitiesCount . toLocaleString ( ) } TB activities, ${ tbRelationsCount . toLocaleString ( ) } TB activityRelations.` ,
316317 )
317318 }
318319
319- // Step 1: Delete from Postgres in chunks
320- log . info ( `\nStep 1: Deleting ${ pgCount . toLocaleString ( ) } rows from PostgreSQL in 10k batches...` )
320+ // Step 1: Delete from Postgres in chunks (fetch IDs then delete by PK, 10k at a time)
321+ log . info ( `\nStep 1: Deleting matching rows from PostgreSQL in 10k batches...` )
321322 const postgresStatus : DeletionStatus = { success : true }
322323 let postgresDeleted = 0
324+ let postgresFailedBatches = 0
323325 try {
324- postgresDeleted = await deletePostgresInChunks ( postgres , filters )
325- log . info ( `✓ Deleted ${ postgresDeleted . toLocaleString ( ) } row(s) from PostgreSQL` )
326+ const pgResult = await deletePostgresInChunks ( postgres , filters )
327+ postgresDeleted = pgResult . deleted
328+ postgresFailedBatches = pgResult . failedBatches
329+ if ( postgresFailedBatches > 0 ) {
330+ log . warn (
331+ ` ${ postgresFailedBatches } batch(es) failed — ${ postgresDeleted . toLocaleString ( ) } rows deleted successfully` ,
332+ )
333+ postgresStatus . success = false
334+ postgresStatus . error = `${ postgresFailedBatches } batch(es) failed`
335+ } else {
336+ log . info ( `✓ Deleted ${ postgresDeleted . toLocaleString ( ) } row(s) from PostgreSQL` )
337+ }
326338 } catch ( error ) {
327339 log . error ( `Failed to delete from PostgreSQL: ${ error . message } ` )
328340 postgresStatus . success = false
@@ -345,6 +357,7 @@ async function runCleanup(
345357 startTime,
346358 endTime,
347359 postgresDeleted,
360+ postgresFailedBatches,
348361 tinybirdJobIds : tinybirdStatuses . jobIds ,
349362 deletions : {
350363 postgres : postgresStatus ,
@@ -388,7 +401,13 @@ async function runCleanup(
388401 log . info ( `\n${ '=' . repeat ( 80 ) } ` )
389402 log . info ( 'Cleanup Summary' )
390403 log . info ( `${ '=' . repeat ( 80 ) } ` )
391- log . info ( `✓ PostgreSQL rows deleted : ${ postgresDeleted . toLocaleString ( ) } ` )
404+ if ( postgresStatus . success ) {
405+ log . info ( `✓ PostgreSQL rows deleted : ${ postgresDeleted . toLocaleString ( ) } ` )
406+ } else {
407+ log . warn (
408+ `⚠ PostgreSQL rows deleted : ${ postgresDeleted . toLocaleString ( ) } (${ postgresFailedBatches } batch(es) failed)` ,
409+ )
410+ }
392411 if ( tinybirdStatuses . activities . success ) {
393412 log . info ( `✓ Tinybird activities job : ${ tinybirdStatuses . activities . jobId } ` )
394413 } else {
0 commit comments