|
| 1 | +/** |
| 2 | + * Member Aggregates Cleanup Script |
| 3 | + * |
| 4 | + * PROBLEM: |
| 5 | + * Some records in the memberSegmentsAgg table reference members that no longer exist, |
| 6 | + * creating orphaned aggregation data that needs to be cleaned up. |
| 7 | + * |
| 8 | + * SOLUTION: |
| 9 | + * This script deletes orphaned member aggregates from the memberSegmentsAgg table |
| 10 | + * by finding records that reference non-existent members and removing them in batches. |
| 11 | + * |
| 12 | + * Usage: |
| 13 | + * # Via package.json script (recommended): |
| 14 | + * pnpm run cleanup-member-aggregates -- [--batch-size <size>] [--dry-run] |
| 15 | + * |
| 16 | + * # Or directly with tsx: |
| 17 | + * npx tsx src/bin/cleanup-member-aggregates.ts [--batch-size <size>] [--dry-run] |
| 18 | + * |
| 19 | + * Options: |
| 20 | + * --batch-size Number of records to process in each batch (default: 10000) |
| 21 | + * --dry-run Display what would be deleted without actually deleting anything |
| 22 | + * |
| 23 | + * Environment Variables Required: |
| 24 | + * CROWD_DB_WRITE_HOST - Postgres write host |
| 25 | + * CROWD_DB_PORT - Postgres port |
| 26 | + * CROWD_DB_USERNAME - Postgres username |
| 27 | + * CROWD_DB_PASSWORD - Postgres password |
| 28 | + * CROWD_DB_DATABASE - Postgres database name |
| 29 | + */ |
| 30 | +import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' |
| 31 | +import { QueryExecutor, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' |
| 32 | +import { getServiceChildLogger } from '@crowd/logging' |
| 33 | + |
| 34 | +const log = getServiceChildLogger('cleanup-member-aggregates-script') |
| 35 | + |
| 36 | +// Type definitions |
| 37 | +interface BatchResult { |
| 38 | + deletedCount: number |
| 39 | + success: boolean |
| 40 | + error?: string |
| 41 | +} |
| 42 | + |
| 43 | +interface CleanupSummary { |
| 44 | + totalBatches: number |
| 45 | + totalDeleted: number |
| 46 | + failedBatches: number |
| 47 | + startTime: string |
| 48 | + endTime: string |
| 49 | + batchSize: number |
| 50 | + dryRun: boolean |
| 51 | +} |
| 52 | + |
| 53 | +/** |
| 54 | + * Initialize Postgres connection using QueryExecutor |
| 55 | + */ |
| 56 | +async function initPostgresClient(): Promise<QueryExecutor> { |
| 57 | + log.info('Initializing Postgres connection...') |
| 58 | + |
| 59 | + const dbConnection = await getDbConnection(WRITE_DB_CONFIG()) |
| 60 | + const queryExecutor = pgpQx(dbConnection) |
| 61 | + |
| 62 | + log.info('Postgres connection established') |
| 63 | + return queryExecutor |
| 64 | +} |
| 65 | + |
| 66 | +/** |
| 67 | + * Count total orphaned member aggregates |
| 68 | + */ |
| 69 | +async function countOrphanedAggregates(postgres: QueryExecutor): Promise<number> { |
| 70 | + log.info('Counting orphaned member aggregates...') |
| 71 | + |
| 72 | + const query = ` |
| 73 | + SELECT COUNT(*)::int as count |
| 74 | + FROM public."memberSegmentsAgg" msa |
| 75 | + LEFT JOIN public."members" m |
| 76 | + ON m.id = msa."memberId" |
| 77 | + WHERE m.id IS NULL |
| 78 | + ` |
| 79 | + |
| 80 | + const result = await postgres.selectOne(query) |
| 81 | + const count = (result as { count: number }).count |
| 82 | + |
| 83 | + log.info(`Found ${count} orphaned member aggregate(s)`) |
| 84 | + return count |
| 85 | +} |
| 86 | + |
| 87 | +/** |
| 88 | + * Delete one batch of orphaned member aggregates |
| 89 | + */ |
| 90 | +async function deleteOrphanedAggregatesBatch( |
| 91 | + postgres: QueryExecutor, |
| 92 | + batchSize: number, |
| 93 | + dryRun: boolean, |
| 94 | +): Promise<BatchResult> { |
| 95 | + try { |
| 96 | + if (dryRun) { |
| 97 | + log.info(`[DRY RUN] Would delete up to ${batchSize} orphaned member aggregates`) |
| 98 | + |
| 99 | + const query = ` |
| 100 | + SELECT COUNT(*)::int as count |
| 101 | + FROM ( |
| 102 | + SELECT msa."memberId", msa."segmentId" |
| 103 | + FROM public."memberSegmentsAgg" msa |
| 104 | + LEFT JOIN public."members" m |
| 105 | + ON m.id = msa."memberId" |
| 106 | + WHERE m.id IS NULL |
| 107 | + LIMIT $(batchSize) |
| 108 | + ) subquery |
| 109 | + ` |
| 110 | + |
| 111 | + const result = await postgres.selectOne(query, { batchSize }) |
| 112 | + const count = (result as { count: number }).count |
| 113 | + |
| 114 | + log.info(`[DRY RUN] Would delete ${count} record(s) in this batch`) |
| 115 | + return { |
| 116 | + deletedCount: count, |
| 117 | + success: true, |
| 118 | + } |
| 119 | + } |
| 120 | + |
| 121 | + const query = ` |
| 122 | + WITH to_delete AS ( |
| 123 | + SELECT msa."memberId", msa."segmentId" |
| 124 | + FROM public."memberSegmentsAgg" msa |
| 125 | + LEFT JOIN public."members" m |
| 126 | + ON m.id = msa."memberId" |
| 127 | + WHERE m.id IS NULL |
| 128 | + LIMIT $(batchSize) |
| 129 | + ) |
| 130 | + DELETE FROM public."memberSegmentsAgg" msa |
| 131 | + USING to_delete d |
| 132 | + WHERE msa."memberId" = d."memberId" |
| 133 | + AND msa."segmentId" = d."segmentId" |
| 134 | + ` |
| 135 | + |
| 136 | + const deletedCount = await postgres.result(query, { batchSize }) |
| 137 | + |
| 138 | + if (deletedCount > 0) { |
| 139 | + log.info(`✓ Deleted ${deletedCount} orphaned member aggregate(s) in this batch`) |
| 140 | + } |
| 141 | + |
| 142 | + return { |
| 143 | + deletedCount, |
| 144 | + success: true, |
| 145 | + } |
| 146 | + } catch (error) { |
| 147 | + log.error(`Failed to delete batch: ${error.message}`) |
| 148 | + return { |
| 149 | + deletedCount: 0, |
| 150 | + success: false, |
| 151 | + error: error.message, |
| 152 | + } |
| 153 | + } |
| 154 | +} |
| 155 | + |
| 156 | +/** |
| 157 | + * Process cleanup in batches |
| 158 | + */ |
| 159 | +async function processCleanup( |
| 160 | + postgres: QueryExecutor, |
| 161 | + batchSize: number, |
| 162 | + dryRun: boolean, |
| 163 | +): Promise<CleanupSummary> { |
| 164 | + const startTime = new Date().toISOString() |
| 165 | + let totalBatches = 0 |
| 166 | + let totalDeleted = 0 |
| 167 | + let failedBatches = 0 |
| 168 | + |
| 169 | + log.info(`Starting cleanup process with batch size: ${batchSize}`) |
| 170 | + |
| 171 | + if (dryRun) { |
| 172 | + log.info('🧪 DRY RUN MODE - No data will be deleted') |
| 173 | + } |
| 174 | + |
| 175 | + // Count total orphaned aggregates first |
| 176 | + const totalOrphaned = await countOrphanedAggregates(postgres) |
| 177 | + |
| 178 | + if (totalOrphaned === 0) { |
| 179 | + log.info('No orphaned member aggregates found. Cleanup complete.') |
| 180 | + |
| 181 | + return { |
| 182 | + totalBatches: 0, |
| 183 | + totalDeleted: 0, |
| 184 | + failedBatches: 0, |
| 185 | + startTime, |
| 186 | + endTime: new Date().toISOString(), |
| 187 | + batchSize, |
| 188 | + dryRun, |
| 189 | + } |
| 190 | + } |
| 191 | + |
| 192 | + log.info(`Processing ${totalOrphaned} total orphaned record(s) in batches of ${batchSize}`) |
| 193 | + |
| 194 | + // In dry-run mode, just simulate the batches without actually processing |
| 195 | + if (dryRun) { |
| 196 | + const estimatedBatches = Math.ceil(totalOrphaned / batchSize) |
| 197 | + log.info( |
| 198 | + `[DRY RUN] Would process ${estimatedBatches} batch(es) to delete ${totalOrphaned} record(s)`, |
| 199 | + ) |
| 200 | + |
| 201 | + return { |
| 202 | + totalBatches: estimatedBatches, |
| 203 | + totalDeleted: totalOrphaned, |
| 204 | + failedBatches: 0, |
| 205 | + startTime, |
| 206 | + endTime: new Date().toISOString(), |
| 207 | + batchSize, |
| 208 | + dryRun, |
| 209 | + } |
| 210 | + } |
| 211 | + |
| 212 | + let hasMore = true |
| 213 | + |
| 214 | + while (hasMore) { |
| 215 | + totalBatches++ |
| 216 | + |
| 217 | + log.info(`Processing batch ${totalBatches}...`) |
| 218 | + |
| 219 | + const batchResult = await deleteOrphanedAggregatesBatch(postgres, batchSize, dryRun) |
| 220 | + |
| 221 | + if (batchResult.success) { |
| 222 | + totalDeleted += batchResult.deletedCount |
| 223 | + |
| 224 | + // If we deleted fewer records than the batch size, we're done |
| 225 | + if (batchResult.deletedCount < batchSize) { |
| 226 | + hasMore = false |
| 227 | + log.info( |
| 228 | + `Batch ${totalBatches} processed ${batchResult.deletedCount} record(s). No more records to process.`, |
| 229 | + ) |
| 230 | + } else { |
| 231 | + log.info( |
| 232 | + `Batch ${totalBatches} completed. ${batchResult.deletedCount} record(s) processed.`, |
| 233 | + ) |
| 234 | + } |
| 235 | + } else { |
| 236 | + failedBatches++ |
| 237 | + log.error(`Batch ${totalBatches} failed: ${batchResult.error}`) |
| 238 | + |
| 239 | + // Continue with next batch instead of stopping |
| 240 | + } |
| 241 | + |
| 242 | + // Safety check to prevent infinite loops |
| 243 | + if (totalBatches >= 1000) { |
| 244 | + log.warn('Reached maximum batch limit (1000). Stopping to prevent infinite loops.') |
| 245 | + hasMore = false |
| 246 | + } |
| 247 | + } |
| 248 | + |
| 249 | + const endTime = new Date().toISOString() |
| 250 | + |
| 251 | + return { |
| 252 | + totalBatches, |
| 253 | + totalDeleted, |
| 254 | + failedBatches, |
| 255 | + startTime, |
| 256 | + endTime, |
| 257 | + batchSize, |
| 258 | + dryRun, |
| 259 | + } |
| 260 | +} |
| 261 | + |
| 262 | +/** |
| 263 | + * Main entry point |
| 264 | + */ |
| 265 | +async function main() { |
| 266 | + const args = process.argv.slice(2) |
| 267 | + |
| 268 | + // Parse command line arguments |
| 269 | + const batchSizeIndex = args.indexOf('--batch-size') |
| 270 | + const dryRunIndex = args.indexOf('--dry-run') |
| 271 | + |
| 272 | + const dryRun = dryRunIndex !== -1 |
| 273 | + |
| 274 | + let batchSize = 10000 // Default batch size |
| 275 | + if (batchSizeIndex !== -1) { |
| 276 | + if (batchSizeIndex + 1 >= args.length) { |
| 277 | + log.error('Error: --batch-size requires a value') |
| 278 | + process.exit(1) |
| 279 | + } |
| 280 | + const batchSizeValue = parseInt(args[batchSizeIndex + 1], 10) |
| 281 | + if (isNaN(batchSizeValue) || batchSizeValue <= 0) { |
| 282 | + log.error('Error: --batch-size must be a positive integer') |
| 283 | + process.exit(1) |
| 284 | + } |
| 285 | + batchSize = batchSizeValue |
| 286 | + } |
| 287 | + |
| 288 | + // Check for help flag or invalid arguments |
| 289 | + if (args.includes('--help') || args.includes('-h')) { |
| 290 | + log.info(` |
| 291 | + Usage: |
| 292 | + # Via package.json script (recommended): |
| 293 | + pnpm run cleanup-member-aggregates -- [--batch-size <size>] [--dry-run] |
| 294 | + |
| 295 | + # Or directly with tsx: |
| 296 | + npx tsx src/bin/cleanup-member-aggregates.ts [--batch-size <size>] [--dry-run] |
| 297 | + |
| 298 | + Options: |
| 299 | + --batch-size <size>: Number of records to process in each batch (default: 10000) |
| 300 | + --dry-run: Display what would be deleted without actually deleting anything |
| 301 | + |
| 302 | + Examples: |
| 303 | + # Run cleanup with default batch size (10000) |
| 304 | + pnpm run cleanup-member-aggregates |
| 305 | + |
| 306 | + # Run with custom batch size |
| 307 | + pnpm run cleanup-member-aggregates -- --batch-size 5000 |
| 308 | + |
| 309 | + # Dry run to preview what would be deleted |
| 310 | + pnpm run cleanup-member-aggregates -- --dry-run |
| 311 | + |
| 312 | + # Dry run with custom batch size |
| 313 | + pnpm run cleanup-member-aggregates -- --batch-size 1000 --dry-run |
| 314 | + `) |
| 315 | + process.exit(0) |
| 316 | + } |
| 317 | + |
| 318 | + try { |
| 319 | + log.info(`\n${'='.repeat(80)}`) |
| 320 | + log.info('Member Aggregates Cleanup Script') |
| 321 | + log.info(`${'='.repeat(80)}`) |
| 322 | + log.info(`Batch size: ${batchSize}`) |
| 323 | + if (dryRun) { |
| 324 | + log.info('Mode: DRY RUN (no data will be deleted)') |
| 325 | + } else { |
| 326 | + log.info('Mode: LIVE (data will be deleted)') |
| 327 | + } |
| 328 | + log.info(`${'='.repeat(80)}\n`) |
| 329 | + |
| 330 | + // Initialize database connection |
| 331 | + const postgres = await initPostgresClient() |
| 332 | + |
| 333 | + // Process cleanup |
| 334 | + const summary = await processCleanup(postgres, batchSize, dryRun) |
| 335 | + |
| 336 | + // Print final summary |
| 337 | + log.info(`\n${'='.repeat(80)}`) |
| 338 | + log.info('Cleanup Summary') |
| 339 | + log.info(`${'='.repeat(80)}`) |
| 340 | + log.info(`✓ Total batches processed: ${summary.totalBatches}`) |
| 341 | + log.info(`✓ Total records ${dryRun ? 'found' : 'deleted'}: ${summary.totalDeleted}`) |
| 342 | + if (summary.failedBatches > 0) { |
| 343 | + log.warn(`✗ Failed batches: ${summary.failedBatches}`) |
| 344 | + } |
| 345 | + log.info(`Duration: ${summary.startTime} → ${summary.endTime}`) |
| 346 | + |
| 347 | + const exitCode = summary.failedBatches > 0 ? 1 : 0 |
| 348 | + process.exit(exitCode) |
| 349 | + } catch (error) { |
| 350 | + log.error(error, 'Failed to run cleanup script') |
| 351 | + log.error(`\n❌ Error: ${error.message}`) |
| 352 | + process.exit(1) |
| 353 | + } |
| 354 | +} |
| 355 | + |
| 356 | +main().catch((error) => { |
| 357 | + log.error('Unexpected error:', error) |
| 358 | + process.exit(1) |
| 359 | +}) |
0 commit comments