Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ export class WalStream {
const fixGuidance =
slot.invalidation_reason === 'idle_timeout'
? `Increase idle_replication_slot_timeout on the source database.`
: `Increase max_slot_wal_keep_size on the source database.`;
: `Increase max_slot_wal_keep_size on the source database and delete the existing slot to recover.`;
throw new MissingReplicationSlotError(
`[PSYNC_S1146] Replication slot ${slotName} was invalidated ` +
`(reason: ${slot.invalidation_reason ?? 'unknown'}). ` +
Expand Down
7 changes: 3 additions & 4 deletions modules/module-postgres/test/src/wal_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,8 @@ bucket_definitions:
// When a snapshot is interrupted and the slot is subsequently invalidated,
// a retry calls initSlot() which finds the lost slot. Since snapshot_done
// is still false, the error should carry phase: 'snapshot' so that
// shouldRetryReplication() can block futile retries. Currently initSlot()
// always reports phase: 'streaming' — this test should FAIL until fixed.
// shouldRetryReplication() can block retries for snapshot failures
// requiring operator intervention to recover.
await using baseContext = await openContext({ doNotClear: true });

const serverVersion = await baseContext.connectionManager.getServerVersion();
Expand Down Expand Up @@ -639,8 +639,7 @@ bucket_definitions:

expect(caughtError).toBeInstanceOf(MissingReplicationSlotError);
expect(caughtError.walStatus).toBe('lost');
// This assertion should FAIL: initSlot() currently reports 'streaming'
// but the correct phase is 'snapshot' because snapshot_done is false.
// initSlot() derives phase from snapshotDone: snapshot not done → phase is 'snapshot'
expect(caughtError.phase).toBe('snapshot');
}
}
Expand Down
38 changes: 16 additions & 22 deletions packages/service-core/src/api/diagnostics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,32 +145,26 @@ export async function getSyncRulesStatus(
}
errors.push(...syncRuleErrors.map((error) => syncConfigYamlErrorToReplicationError(error, now)));

if (slot_wal_budget) {
if (slot_wal_budget.wal_status === 'lost') {
if (
slot_wal_budget &&
slot_wal_budget.wal_status !== 'lost' &&
slot_wal_budget.safe_wal_size != null &&
slot_wal_budget.max_slot_wal_keep_size != null &&
slot_wal_budget.max_slot_wal_keep_size > 0
) {
const budgetPct = Math.max(
0,
Math.round((slot_wal_budget.safe_wal_size / slot_wal_budget.max_slot_wal_keep_size) * 100)
);
if (budgetPct <= 50) {
errors.push({
level: 'fatal',
level: 'warning',
message:
`[PSYNC_S1146] Replication slot WAL status is 'lost'. ` +
`The slot has been invalidated. Increase max_slot_wal_keep_size ` +
`on the source database and delete the existing slot to recover.`,
`WAL budget is low: ${budgetPct}% remaining. ` +
`The replication slot may be invalidated if WAL consumption ` +
`continues at this rate. Consider increasing max_slot_wal_keep_size.`,
ts: now
});
} else if (
slot_wal_budget.safe_wal_size != null &&
slot_wal_budget.max_slot_wal_keep_size != null &&
slot_wal_budget.max_slot_wal_keep_size > 0
) {
const budgetPct = Math.round((slot_wal_budget.safe_wal_size / slot_wal_budget.max_slot_wal_keep_size) * 100);
if (budgetPct <= 50) {
errors.push({
level: 'warning',
message:
`WAL budget is low: ${budgetPct}% remaining. ` +
`The replication slot may be invalidated if WAL consumption ` +
`continues at this rate. Consider increasing max_slot_wal_keep_size.`,
ts: now
});
}
}
}

Expand Down
22 changes: 18 additions & 4 deletions packages/service-core/test/src/diagnostics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,28 @@ describe('getSyncRulesStatus WAL budget warnings', () => {
expect(walWarnings).toHaveLength(0);
});

test('fatal error when slot status is lost', async () => {
test('clamps negative safe_wal_size to 0%', async () => {
const api = makeRouteAPI({
wal_status: 'unreserved',
safe_wal_size: -2.4 * GB,
max_slot_wal_keep_size: 1 * 1024 * 1024 // 1MB
});
const result = await getSyncRulesStatus(makeBucketStorage(), api, makeSyncRulesContent(), OPTIONS);
const walWarnings = result!.errors.filter((e) => e.message.includes('WAL budget'));
expect(walWarnings).toHaveLength(1);
expect(walWarnings[0].message).toContain('0%');
expect(walWarnings[0].message).not.toMatch(/-\d+%/);
});

test('no WAL budget error when slot status is lost', async () => {
const api = makeRouteAPI({
wal_status: 'lost'
});
const result = await getSyncRulesStatus(makeBucketStorage(), api, makeSyncRulesContent(), OPTIONS);
const slotErrors = result!.errors.filter((e) => e.message.includes('PSYNC_S1146'));
expect(slotErrors).toHaveLength(1);
expect(slotErrors[0].level).toBe('fatal');
const walErrors = result!.errors.filter(
(e) => e.message.includes('WAL budget') || e.message.includes('PSYNC_S1146')
);
expect(walErrors).toHaveLength(0);
});

test('no WAL error when getSlotWalBudget is not defined', async () => {
Expand Down
Loading