diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 6d2a54b54..ba1aeead6 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -353,7 +353,7 @@ export class WalStream { const fixGuidance = slot.invalidation_reason === 'idle_timeout' ? `Increase idle_replication_slot_timeout on the source database.` - : `Increase max_slot_wal_keep_size on the source database.`; + : `Increase max_slot_wal_keep_size on the source database and delete the existing slot to recover.`; throw new MissingReplicationSlotError( `[PSYNC_S1146] Replication slot ${slotName} was invalidated ` + `(reason: ${slot.invalidation_reason ?? 'unknown'}). ` + diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 7679b2330..7373ad45a 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -573,8 +573,8 @@ bucket_definitions: // When a snapshot is interrupted and the slot is subsequently invalidated, // a retry calls initSlot() which finds the lost slot. Since snapshot_done // is still false, the error should carry phase: 'snapshot' so that - // shouldRetryReplication() can block futile retries. Currently initSlot() - // always reports phase: 'streaming' — this test should FAIL until fixed. + // shouldRetryReplication() can block retries for snapshot failures + // requiring operator intervention to recover. await using baseContext = await openContext({ doNotClear: true }); const serverVersion = await baseContext.connectionManager.getServerVersion(); @@ -639,8 +639,7 @@ bucket_definitions: expect(caughtError).toBeInstanceOf(MissingReplicationSlotError); expect(caughtError.walStatus).toBe('lost'); - // This assertion should FAIL: initSlot() currently reports 'streaming' - // but the correct phase is 'snapshot' because snapshot_done is false. + // initSlot() derives phase from snapshotDone: snapshot not done → phase is 'snapshot' expect(caughtError.phase).toBe('snapshot'); } } diff --git a/packages/service-core/src/api/diagnostics.ts b/packages/service-core/src/api/diagnostics.ts index 10c515cb0..03bb93738 100644 --- a/packages/service-core/src/api/diagnostics.ts +++ b/packages/service-core/src/api/diagnostics.ts @@ -145,32 +145,26 @@ export async function getSyncRulesStatus( } errors.push(...syncRuleErrors.map((error) => syncConfigYamlErrorToReplicationError(error, now))); - if (slot_wal_budget) { - if (slot_wal_budget.wal_status === 'lost') { + if ( + slot_wal_budget && + slot_wal_budget.wal_status !== 'lost' && + slot_wal_budget.safe_wal_size != null && + slot_wal_budget.max_slot_wal_keep_size != null && + slot_wal_budget.max_slot_wal_keep_size > 0 + ) { + const budgetPct = Math.max( + 0, + Math.round((slot_wal_budget.safe_wal_size / slot_wal_budget.max_slot_wal_keep_size) * 100) + ); + if (budgetPct <= 50) { errors.push({ - level: 'fatal', + level: 'warning', message: - `[PSYNC_S1146] Replication slot WAL status is 'lost'. ` + - `The slot has been invalidated. Increase max_slot_wal_keep_size ` + - `on the source database and delete the existing slot to recover.`, + `WAL budget is low: ${budgetPct}% remaining. ` + + `The replication slot may be invalidated if WAL consumption ` + + `continues at this rate. Consider increasing max_slot_wal_keep_size.`, ts: now }); - } else if ( - slot_wal_budget.safe_wal_size != null && - slot_wal_budget.max_slot_wal_keep_size != null && - slot_wal_budget.max_slot_wal_keep_size > 0 - ) { - const budgetPct = Math.round((slot_wal_budget.safe_wal_size / slot_wal_budget.max_slot_wal_keep_size) * 100); - if (budgetPct <= 50) { - errors.push({ - level: 'warning', - message: - `WAL budget is low: ${budgetPct}% remaining. ` + - `The replication slot may be invalidated if WAL consumption ` + - `continues at this rate. Consider increasing max_slot_wal_keep_size.`, - ts: now - }); - } } } diff --git a/packages/service-core/test/src/diagnostics.test.ts b/packages/service-core/test/src/diagnostics.test.ts index 79ef5369b..cd4882a32 100644 --- a/packages/service-core/test/src/diagnostics.test.ts +++ b/packages/service-core/test/src/diagnostics.test.ts @@ -116,14 +116,28 @@ describe('getSyncRulesStatus WAL budget warnings', () => { expect(walWarnings).toHaveLength(0); }); - test('fatal error when slot status is lost', async () => { + test('clamps negative safe_wal_size to 0%', async () => { + const api = makeRouteAPI({ + wal_status: 'unreserved', + safe_wal_size: -2.4 * GB, + max_slot_wal_keep_size: 1 * 1024 * 1024 // 1MB + }); + const result = await getSyncRulesStatus(makeBucketStorage(), api, makeSyncRulesContent(), OPTIONS); + const walWarnings = result!.errors.filter((e) => e.message.includes('WAL budget')); + expect(walWarnings).toHaveLength(1); + expect(walWarnings[0].message).toContain('0%'); + expect(walWarnings[0].message).not.toMatch(/-\d+%/); + }); + + test('no WAL budget error when slot status is lost', async () => { const api = makeRouteAPI({ wal_status: 'lost' }); const result = await getSyncRulesStatus(makeBucketStorage(), api, makeSyncRulesContent(), OPTIONS); - const slotErrors = result!.errors.filter((e) => e.message.includes('PSYNC_S1146')); - expect(slotErrors).toHaveLength(1); - expect(slotErrors[0].level).toBe('fatal'); + const walErrors = result!.errors.filter( + (e) => e.message.includes('WAL budget') || e.message.includes('PSYNC_S1146') + ); + expect(walErrors).toHaveLength(0); }); test('no WAL error when getSlotWalBudget is not defined', async () => {