Skip to content

Commit b9ade3f

Browse files
jensensclaude
andauthored
fix: CacheWarmer hardening (#65) (#68)
* docs(plan): warmer hardening plan (#65) * refactor(warmer): consolidate MAX(tid) query + log/skip on failure (#65) Add module-level _read_max_tid(conn) helper and a public PGJsonbStorage.current_max_tid() method that logs on failure and returns None. Route _restore_state, poll_invalidations, and the warmer's load_current_tid_fn through them. Warmer skips warmup when current_max_tid returns None instead of installing consensus=0. Closes I1 and I2 of #65. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(instance): hoist _read_max_tid import to module top (#65) Matches the existing import-style block (6 other names from .storage already imported at module top). Avoids a per-call name lookup in the poll_invalidations hot path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(shared-cache): consensus_tid property + set() returns bool (#65) Both are wanted by the cache warmer's race-detection fix (next commit). The existing instance.load / load_multiple call sites ignore the new return value; no behaviour change there. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(warmer): recover from consensus race; warn on zero accepted writes (#65) Closes I3 of #65. After poll_advance, re-read consensus_tid and use that as the polled_tid for subsequent set() calls — if another instance advanced consensus ahead of our sampled TID, this ensures our sets still pass the gate. Track accepted vs. attempted writes via the new set() return value; log a WARNING when the entire warmup was rejected despite non-empty results. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(shared-cache): pin main-storage _finish → shared.poll_advance (#65) Coverage gap flagged in #63 final review. Pins the invariant that the direct-use write path advances consensus and invalidates changed zoids in the shared cache, independent of any instance's tpc_finish. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: CHANGES entry for 1.12.1 warmer hardening (#65) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 65fd357 commit b9ade3f

9 files changed

Lines changed: 1197 additions & 38 deletions

File tree

CHANGES.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,31 @@
11
# Changelog
22

3+
## 1.12.1
4+
5+
- Harden the `CacheWarmer` lifecycle (#65).
6+
- The `MAX(tid)` lookup was duplicated in three places with three
7+
different error-handling policies. Consolidated behind a single
8+
`_read_max_tid(conn)` helper and a public
9+
`PGJsonbStorage.current_max_tid()` method that logs a warning and
10+
returns `None` on failure. The warmer skips warmup when the
11+
method returns `None` instead of installing a fabricated
12+
consensus of 0.
13+
- `SharedLoadCache.set()` now returns `True` on accept and `False`
14+
on rejection. `SharedLoadCache.consensus_tid` is exposed as a
15+
read-only property. Both are used by the warmer's race-recovery
16+
path; existing `instance.load` / `load_multiple` callers ignore
17+
the new return value and are unaffected.
18+
- The warmer re-reads `shared_cache.consensus_tid` after its own
19+
`poll_advance` and uses that value as `polled_tid` for the
20+
subsequent `set()` loop — this fixes a startup race where a
21+
concurrent instance poll could advance consensus past the
22+
warmer's sampled TID and cause every warmup write to be silently
23+
rejected. A WARNING is now logged when the entire warmup was
24+
rejected despite a non-empty result set.
25+
- Added a regression test that pins `PGJsonbStorage._finish`
26+
(direct-use write path) advancing shared consensus and
27+
invalidating changed zoids.
28+
329
## 1.12.0
430

531
- Introduce a process-wide `SharedLoadCache` that replaces per-

docs/superpowers/plans/2026-04-23-warmer-hardening.md

Lines changed: 819 additions & 0 deletions
Large diffs are not rendered by default.

src/zodb_pgjsonb/cache_warmer.py

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,16 @@ def warm(self, load_multiple_fn):
127127
128128
Runs in a background daemon thread. Primes the consensus TID
129129
on the shared cache to the current PG max_tid so that
130-
subsequent ``shared.set`` calls are accepted.
130+
subsequent ``shared.set`` calls are accepted. Re-reads
131+
consensus after ``poll_advance`` and uses that as the
132+
``polled_tid`` for set calls — this is the mitigation for the
133+
startup race where an instance's poll advances consensus past
134+
the warmer's sampled TID before the set loop begins.
135+
136+
Skips warmup when the TID is unavailable. Logs a WARNING when
137+
every set() was rejected despite a non-empty result set (a
138+
likely sign of the race being wider than this mitigation
139+
covers).
131140
"""
132141
from ZODB.utils import p64
133142
from ZODB.utils import u64
@@ -137,23 +146,58 @@ def warm(self, load_multiple_fn):
137146
log.info("Cache warmer: no stats yet, skipping warmup")
138147
return
139148

149+
current_tid = self._load_current_tid_fn()
150+
if current_tid is None:
151+
log.warning("Cache warmer: could not read current TID, skipping warmup")
152+
return
153+
140154
oids = [p64(z) for z in top_zoids]
141155
try:
142156
results = load_multiple_fn(oids)
143157
except Exception:
144158
log.warning("Cache warmer: load_multiple failed", exc_info=True)
145159
return
146160

147-
# Prime consensus so set() accepts our writes, then populate.
148-
current_tid = self._load_current_tid_fn()
161+
if not results:
162+
log.info("Cache warmer: load_multiple returned no objects")
163+
return
164+
165+
# Prime consensus so set() accepts our writes. Another instance
166+
# may have advanced consensus beyond our sampled current_tid
167+
# already — in that case poll_advance is a no-op, and the actual
168+
# consensus is higher than current_tid. Re-read it so our
169+
# subsequent set() calls use the effective consensus as their
170+
# polled_tid and pass the gate.
149171
self._shared_cache.poll_advance(new_tid=current_tid, changed_zoids=[])
172+
effective_tid = self._shared_cache.consensus_tid
173+
if effective_tid is None: # guarded for paranoia; should not happen
174+
log.warning("Cache warmer: consensus still None after poll_advance")
175+
return
176+
150177
written = 0
178+
attempted = 0
151179
for oid, (data, tid_bytes) in results.items():
152-
self._shared_cache.set(
180+
attempted += 1
181+
if self._shared_cache.set(
153182
zoid=u64(oid),
154183
data=data,
155184
tid_bytes=tid_bytes,
156-
polled_tid=current_tid,
185+
polled_tid=effective_tid,
186+
):
187+
written += 1
188+
189+
if written == 0:
190+
log.warning(
191+
"Cache warmer: all %d set() calls rejected by shared cache "
192+
"(consensus=%d, sampled_tid=%d) — likely raced with a "
193+
"concurrent instance poll",
194+
attempted,
195+
effective_tid,
196+
current_tid,
197+
)
198+
else:
199+
log.info(
200+
"Cache warmer: loaded %d of %d objects into shared cache",
201+
written,
202+
attempted,
157203
)
158-
written += 1
159-
log.info("Cache warmer: loaded %d objects into shared cache", written)

src/zodb_pgjsonb/instance.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from .storage import _loadBefore_hf
1818
from .storage import _loadBefore_hp
1919
from .storage import _NoopSerialCache
20+
from .storage import _read_max_tid
2021
from .storage import LoadCache
2122
from .undo import _compute_undo
2223
from ZODB.ConflictResolution import ConflictResolvingStorage
@@ -180,10 +181,7 @@ def poll_invalidations(self):
180181
# (invalidation lookups AND load() calls) see this same state.
181182
self._begin_read_txn()
182183

183-
with self._conn.cursor() as cur:
184-
cur.execute("SELECT COALESCE(MAX(tid), 0) AS max_tid FROM transaction_log")
185-
row = cur.fetchone()
186-
new_tid = row["max_tid"]
184+
new_tid = _read_max_tid(self._conn)
187185

188186
result = []
189187
changed_zoids = []

src/zodb_pgjsonb/storage.py

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,18 @@ def __post_init__(self):
128128
DEFAULT_CACHE_LOCAL_MB = 16
129129

130130

131+
def _read_max_tid(conn):
132+
"""Return the current MAX(tid) from transaction_log, or 0 when empty.
133+
134+
Raises whatever psycopg raises on a broken connection or closed
135+
database. Callers decide whether to catch.
136+
"""
137+
with conn.cursor() as cur:
138+
cur.execute("SELECT COALESCE(MAX(tid), 0) AS max_tid FROM transaction_log")
139+
row = cur.fetchone()
140+
return row["max_tid"] if row else 0
141+
142+
131143
class _NoopSerialCache:
132144
"""Drop-in dict substitute that never stores anything (#62).
133145
@@ -249,6 +261,15 @@ def __init__(self, max_mb):
249261
self.hits = 0
250262
self.misses = 0
251263

264+
@property
265+
def consensus_tid(self):
266+
"""The highest TID any instance has polled to (read-only).
267+
268+
Returns ``None`` before the first ``poll_advance`` call.
269+
"""
270+
with self._lock:
271+
return self._consensus_tid
272+
252273
def get(self, zoid, polled_tid):
253274
"""Return (data, tid) for zoid or None.
254275
@@ -274,20 +295,19 @@ def get(self, zoid, polled_tid):
274295
def set(self, zoid, data, tid_bytes, polled_tid):
275296
"""Store (data, tid_bytes) for zoid if the caller is up to date.
276297
277-
Rejects writes from callers whose snapshot is older than the
278-
current consensus (which could be carrying stale pre-
279-
invalidation bytes), and never replaces a newer entry with an
280-
older one.
298+
Returns True when the entry was accepted (possibly replacing an
299+
older entry), False when rejected by the consensus gate, the
300+
monotonicity gate, or the stale-polled_tid gate.
281301
"""
282302
with self._lock:
283303
if self._consensus_tid is None or polled_tid is None:
284-
return
304+
return False
285305
if polled_tid < self._consensus_tid:
286-
return
306+
return False
287307
tid_int = u64(tid_bytes)
288308
existing = self._cache.get(zoid)
289309
if existing is not None and u64(existing[1]) >= tid_int:
290-
return
310+
return False
291311
if existing is not None:
292312
self._current_bytes -= len(existing[0])
293313
self._cache[zoid] = (data, tid_bytes)
@@ -296,6 +316,7 @@ def set(self, zoid, data, tid_bytes, polled_tid):
296316
while self._current_bytes > self._max_bytes and self._cache:
297317
_, (evicted_data, _) = self._cache.popitem(last=False)
298318
self._current_bytes -= len(evicted_data)
319+
return True
299320

300321
def poll_advance(self, new_tid, changed_zoids):
301322
"""Advance consensus_tid and invalidate changed zoids atomically."""
@@ -503,22 +524,11 @@ def __init__(
503524
estimated_objects = int(cache_per_connection_mb * 1_000_000 / avg_size)
504525
target = max(1, int(estimated_objects * cache_warm_pct / 100))
505526

506-
def _current_max_tid():
507-
try:
508-
with self._conn.cursor() as cur:
509-
cur.execute(
510-
"SELECT COALESCE(MAX(tid), 0) AS t FROM transaction_log"
511-
)
512-
row = cur.fetchone()
513-
return row["t"] if row else 0
514-
except Exception:
515-
return 0
516-
517527
self._warmer = CacheWarmer(
518528
self._conn,
519529
target_count=target,
520530
shared_cache=self._shared_cache,
521-
load_current_tid_fn=_current_max_tid,
531+
load_current_tid_fn=self.current_max_tid,
522532
decay=cache_warm_decay,
523533
)
524534
import threading
@@ -579,14 +589,12 @@ def _restore_state(self):
579589
if max_oid > 0:
580590
self._oid = p64(max_oid)
581591

582-
cur.execute("SELECT COALESCE(MAX(tid), 0) AS max_tid FROM transaction_log")
583-
row = cur.fetchone()
584-
max_tid = row["max_tid"]
585-
if max_tid > 0:
586-
self._ltid = p64(max_tid)
587-
# Ensure _ts is at least as recent as the last committed TID
588-
# so _new_tid() generates monotonically increasing TIDs.
589-
self._ts = TimeStamp(self._ltid)
592+
max_tid = _read_max_tid(self._conn)
593+
if max_tid > 0:
594+
self._ltid = p64(max_tid)
595+
# Ensure _ts is at least as recent as the last committed TID
596+
# so _new_tid() generates monotonically increasing TIDs.
597+
self._ts = TimeStamp(self._ltid)
590598

591599
def new_oid(self):
592600
"""Allocate a new OID via PostgreSQL sequence (cross-process safe).
@@ -1055,6 +1063,23 @@ def lastTransaction(self):
10551063
"""Return TID of the last committed transaction."""
10561064
return self._ltid
10571065

1066+
def current_max_tid(self):
1067+
"""Return the current MAX(tid) in transaction_log, or ``None``.
1068+
1069+
On any psycopg / DB error the failure is logged at WARNING and
1070+
``None`` is returned. ``None`` signals callers to skip any
1071+
work that depends on a fresh TID (e.g. the cache warmer
1072+
gracefully skips warmup rather than installing a fabricated
1073+
consensus of 0).
1074+
"""
1075+
try:
1076+
return _read_max_tid(self._conn)
1077+
except Exception:
1078+
logger.warning(
1079+
"PGJsonbStorage.current_max_tid: query failed", exc_info=True
1080+
)
1081+
return None
1082+
10581083
def __len__(self):
10591084
"""Return approximate number of objects."""
10601085
with self._conn.cursor() as cur:

tests/test_cache_warmer.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Tests for CacheWarmer — unit + integration tests."""
22

33
from unittest import mock
4+
from zodb_pgjsonb.cache_warmer import CacheWarmer
45

56
import pytest
67

@@ -500,3 +501,107 @@ def load_multiple_fn(oids):
500501
shared = storage._shared_cache
501502
assert shared.get(zoid=1, polled_tid=100) == (b"data-" + p64(1), p64(50))
502503
assert shared.get(zoid=2, polled_tid=100) == (b"data-" + p64(2), p64(50))
504+
505+
506+
class TestWarmerRaceRecovery:
507+
"""#65 I3: warmer tolerates a consensus race and logs silent failures."""
508+
509+
def test_warm_uses_effective_consensus_when_race_advances_it(self):
510+
"""If consensus is already ahead, warmer's writes still land."""
511+
from ZODB.utils import p64
512+
from zodb_pgjsonb.storage import SharedLoadCache
513+
514+
shared = SharedLoadCache(max_mb=4)
515+
# Simulate another instance's poll_advance racing ahead
516+
shared.poll_advance(new_tid=1000, changed_zoids=[])
517+
518+
w = CacheWarmer(
519+
conn=_FakeConn(top_oids=[1, 2]),
520+
target_count=10,
521+
shared_cache=shared,
522+
load_current_tid_fn=lambda: 500, # stale — before the race
523+
)
524+
525+
def loader(oids):
526+
return {oid: (b"data-" + oid, p64(500)) for oid in oids}
527+
528+
w.warm(loader)
529+
530+
# Entries must be present: the warmer re-read consensus and
531+
# used 1000 (the actual value) as polled_tid instead of 500.
532+
assert shared.get(zoid=1, polled_tid=1000) == (b"data-" + p64(1), p64(500))
533+
assert shared.get(zoid=2, polled_tid=1000) == (b"data-" + p64(2), p64(500))
534+
535+
def test_warm_logs_warning_when_all_writes_rejected(self, caplog):
536+
"""If consensus advances past the warmer's effective polled_tid
537+
mid-loop, every set() is rejected and the warmer flags it."""
538+
from ZODB.utils import p64
539+
from zodb_pgjsonb.storage import SharedLoadCache
540+
541+
import logging
542+
543+
# A cache subclass whose set() always returns False (simulating
544+
# a mid-loop consensus advance that we can't actually time).
545+
class _RejectingCache(SharedLoadCache):
546+
def set(self, *a, **kw):
547+
super().set(*a, **kw)
548+
return False
549+
550+
shared = _RejectingCache(max_mb=4)
551+
w = CacheWarmer(
552+
conn=_FakeConn(top_oids=[1, 2, 3]),
553+
target_count=10,
554+
shared_cache=shared,
555+
load_current_tid_fn=lambda: 100,
556+
)
557+
558+
def loader(oids):
559+
return {oid: (b"x", p64(50)) for oid in oids}
560+
561+
with caplog.at_level(logging.WARNING, logger="zodb_pgjsonb.cache_warmer"):
562+
w.warm(loader)
563+
564+
warnings = [
565+
r
566+
for r in caplog.records
567+
if r.levelno == logging.WARNING and "rejected" in r.getMessage().lower()
568+
]
569+
assert warnings, (
570+
f"expected a WARNING about rejected writes, got: "
571+
f"{[r.getMessage() for r in caplog.records]}"
572+
)
573+
574+
def test_warm_info_log_on_normal_writes(self, caplog):
575+
"""Happy path logs at INFO, not WARNING."""
576+
from ZODB.utils import p64
577+
from zodb_pgjsonb.storage import SharedLoadCache
578+
579+
import logging
580+
581+
shared = SharedLoadCache(max_mb=4)
582+
w = CacheWarmer(
583+
conn=_FakeConn(top_oids=[1, 2]),
584+
target_count=10,
585+
shared_cache=shared,
586+
load_current_tid_fn=lambda: 100,
587+
)
588+
589+
def loader(oids):
590+
return {oid: (b"x", p64(50)) for oid in oids}
591+
592+
with caplog.at_level(logging.INFO, logger="zodb_pgjsonb.cache_warmer"):
593+
w.warm(loader)
594+
595+
warnings = [r for r in caplog.records if r.levelno == logging.WARNING]
596+
assert not warnings, (
597+
f"expected no warnings, got: {[r.getMessage() for r in warnings]}"
598+
)
599+
infos = [
600+
r
601+
for r in caplog.records
602+
if r.levelno == logging.INFO and "loaded 2 of 2 objects" in r.getMessage()
603+
]
604+
assert infos, (
605+
f"expected INFO with loaded count, got: "
606+
f"{[r.getMessage() for r in caplog.records]}"
607+
)

0 commit comments

Comments
 (0)