Skip to content

Commit 1c43a3f

Browse files
authored
Merge pull request #2052 from Open-Source-Legal/claude/serene-pascal-4103pn
Harden bounded authority crawl: atomic dequeue + per-corpus Analysis reuse (#2027)
2 parents c7a4040 + 15ec5a9 commit 1c43a3f

6 files changed

Lines changed: 290 additions & 21 deletions

File tree

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
- Harden the bounded authority crawl against a concurrency race and provenance
2+
bloat (issue #2027, a code review of the Phase-5 BFS engine):
3+
- **Atomic dequeue claim.**
4+
`AuthorityFrontierService.dequeue_queued()`
5+
(`opencontractserver/enrichment/services/authority_frontier_service.py`)
6+
was a plain `filter(discovery_state="queued")` read — the `in_progress`
7+
transition only happened later inside `discover_and_bootstrap`, leaving a
8+
TOCTOU window where two concurrent `crawl_authorities` tasks (e.g. two
9+
manual triggers on the same corpus) could dequeue the SAME frontier row and
10+
`discover_and_bootstrap` it twice (wasted provider calls, distorted summary
11+
counters). It now claims the rows it returns inside a single
12+
`SELECT … FOR UPDATE SKIP LOCKED` transaction, flipping them to
13+
`in_progress`; a second worker skips locked rows and grabs the next ones.
14+
Rows excluded by `max_depth` / `min_demand` are never claimed, so the
15+
`frontier_drained` residual census still counts them as `queued`.
16+
- **One provenance `Analysis` per authority corpus.** Every section of an
17+
authority bootstraps into ONE corpus (the provider `title` is a constant —
18+
all `usc-*` sections land in the single "United States Code" corpus), so the
19+
BFS calls `EnrichmentService.apply()` on that corpus once per ingested
20+
section. Each call previously minted a fresh `Analysis`
21+
(`_get_analysis``Analysis.objects.create`), so a deep crawl left dozens
22+
of provenance rows on one corpus. `CrawlAuthoritiesService.crawl()`
23+
(`opencontractserver/enrichment/services/crawl_authorities_service.py`) now
24+
caches the `Analysis` the first apply creates per corpus and threads it back
25+
into the rest via `apply(analysis=…)`, capping it at one per corpus. A
26+
misleading "this apply scan is bounded (one small document per section)"
27+
comment was corrected.
28+
- **Honest `blocked_by_bound` accounting.** Clarified (in comments) that
29+
`blocked_by_bound["min_demand_or_depth"]` is populated only on the
30+
`frontier_drained` stop — where every residual `queued` row is provably
31+
bound-excluded — and intentionally NOT on the `max_authorities` /
32+
`token_budget` early stops, whose unreached rows may be perfectly eligible
33+
and are accounted for by the `frontier_residual` census instead.
34+
- **Uniform tool signature.** `crawl_authorities` / `acrawl_authorities`
35+
(`opencontractserver/llms/tools/core_tools/corpus_references.py`) now apply
36+
`C.CRAWL_DEFAULT_*` constants to all five bound parameters instead of using
37+
`None` sentinels for two of them and constants for the other three.
38+
- Regression tests: `test_authority_frontier.py` (dequeue atomically claims
39+
returned rows / leaves filtered-out rows `queued`) and
40+
`test_crawl_authorities.py::ApplyAnalysisReuseTests` (a crawl that ingests
41+
multiple sections of one authority reuses a single provenance `Analysis`).

opencontractserver/enrichment/services/authority_frontier_service.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,19 +161,47 @@ def dequeue_queued(
161161
max_depth: int | None = None,
162162
min_demand: int = 0,
163163
) -> list[AuthorityFrontier]:
164-
"""Highest-demand queued rows regardless of assigned provider.
164+
"""Atomically CLAIM the highest-demand queued rows for the crawl driver.
165165
166166
Unlike ``dequeue_for_provider`` (which requires a stamped provider),
167167
this serves the crawl driver: it picks ``discovery_state="queued"`` rows
168168
ranked by ``-mention_count``, optionally bounded by depth and a minimum
169169
demand floor. Provider selection happens later in the discovery service.
170+
171+
The returned rows are transitioned to ``in_progress`` inside a single
172+
``SELECT ... FOR UPDATE SKIP LOCKED`` transaction, so the dequeue is an
173+
atomic *claim* rather than a plain read: two ``crawl_authorities`` tasks
174+
running concurrently (e.g. two manual triggers on the same corpus) can
175+
never return — and therefore never ``discover_and_bootstrap`` — the same
176+
frontier row twice (issue #2027). ``skip_locked`` lets a second worker
177+
pick the next available rows instead of blocking on the first worker's
178+
lock. Rows excluded by ``max_depth`` / ``min_demand`` are never claimed,
179+
so the crawl's ``frontier_drained`` residual census still sees them as
180+
``queued``.
170181
"""
171182
qs = AuthorityFrontier.objects.filter(discovery_state=C.DISCOVERY_STATE_QUEUED)
172183
if max_depth is not None:
173184
qs = qs.filter(depth__lte=max_depth)
174185
if min_demand:
175186
qs = qs.filter(mention_count__gte=min_demand)
176-
return list(qs.order_by("-mention_count")[:limit])
187+
with transaction.atomic():
188+
rows = list(
189+
qs.select_for_update(skip_locked=True).order_by("-mention_count")[
190+
:limit
191+
]
192+
)
193+
if rows:
194+
now = timezone.now()
195+
for row in rows:
196+
row.discovery_state = "in_progress"
197+
row.last_attempt = now
198+
# bulk_update bypasses auto_now — stamp ``modified`` so the
199+
# claim matches the single-row ``mark()`` writer.
200+
row.modified = now
201+
AuthorityFrontier.objects.bulk_update(
202+
rows, ["discovery_state", "last_attempt", "modified"]
203+
)
204+
return rows
177205

178206
@classmethod
179207
def seed_child_keys(

opencontractserver/enrichment/services/crawl_authorities_service.py

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,27 @@ def crawl(
136136
# iterations rather than constructing a fresh object on every BFS hop.
137137
enrichment = EnrichmentService()
138138

139+
# One provenance Analysis per authority corpus, reused across the run.
140+
# Every section of a given authority bootstraps into ONE corpus — the
141+
# provider's ``title`` is a constant, so every ``usc-*`` section lands in
142+
# the single "United States Code" corpus — so a crawl that ingests N
143+
# sections of an authority calls apply() on the SAME corpus N times.
144+
# Letting each call mint its own Analysis (apply()'s default when
145+
# ``analysis=None``) would leave N provenance rows on that one corpus;
146+
# instead we capture the Analysis the first apply creates and feed it back
147+
# into the rest, capping it at one per corpus (issue #2027).
148+
from opencontractserver.analyzer.models import Analysis
149+
150+
apply_analyses: dict[int, Analysis] = {}
151+
139152
while True:
140-
# Hard cap checks before dequeue so the summary is honest.
153+
# Hard cap checks before dequeue so the summary is honest. On these
154+
# early stops we intentionally do NOT populate
155+
# blocked_by_bound["min_demand_or_depth"]: rows still queued when a cap
156+
# fires were simply not reached, and may be perfectly eligible (above
157+
# min_demand, within max_depth) — attributing them to a bound would be
158+
# a lie. The frontier_residual census (computed below for EVERY stop
159+
# reason) accounts for them, so the summary is still non-silent.
141160
if ingested >= max_authorities:
142161
stop_reason = "max_authorities"
143162
break
@@ -150,10 +169,13 @@ def crawl(
150169
limit=1, max_depth=max_depth, min_demand=min_demand
151170
)
152171
if not rows:
153-
# Count how many queued rows remain so the summary is non-silent
154-
# about what was left. This is the UNION of rows excluded by the
155-
# min_demand floor and/or the max_depth bound — the single key
156-
# does not attribute each row to one cause or the other.
172+
# frontier_drained: dequeue returned nothing, so EVERY remaining
173+
# queued row failed the (min_demand AND max_depth) filters. Here —
174+
# and only here — is attributing the residual queued count to those
175+
# bounds correct (the early max_authorities / token_budget breaks
176+
# above leave their unreached-but-eligible rows to
177+
# frontier_residual instead). The single key is the UNION of the
178+
# two exclusions; it does not split each row by cause.
157179
blocked_by_bound["min_demand_or_depth"] = (
158180
AuthorityFrontier.objects.filter(
159181
discovery_state=C.DISCOVERY_STATE_QUEUED
@@ -208,14 +230,26 @@ def crawl(
208230
# Re-extract the authority's OWN outbound citations and seed the
209231
# frontier at depth+1 — only when we haven't reached max_depth.
210232
if row.depth < max_depth:
211-
# Authority corpora hold one small document per statute section,
212-
# so this apply scan is bounded (not a large-corpus scan).
233+
# Reuse this corpus's provenance Analysis across sections (see the
234+
# apply_analyses note above) so the BFS doesn't accumulate one
235+
# Analysis row per section on a shared authority corpus.
236+
apply_analysis = apply_analyses.get(authority_corpus_id)
213237
apply_res = enrichment.apply(
214238
corpus_id=authority_corpus_id,
215239
creator_id=creator_id,
216240
types=[C.REF_LAW],
217241
extra_tiers=[C.DETECTION_TIER_GRAMMAR],
242+
analysis=apply_analysis,
218243
)
244+
if apply_analysis is None:
245+
# First apply on this corpus created the provenance Analysis;
246+
# cache it so the corpus's remaining sections reattach to it
247+
# instead of each minting a fresh one.
248+
new_analysis_id = apply_res.get("analysis_id")
249+
if new_analysis_id is not None:
250+
apply_analyses[authority_corpus_id] = Analysis.objects.get(
251+
pk=new_analysis_id
252+
)
219253

220254
outbound = list(
221255
CorpusReferenceService.for_corpus(user, authority_corpus_id)

opencontractserver/llms/tools/core_tools/corpus_references.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,18 @@ def crawl_authorities(
247247
max_depth: int = C.CRAWL_DEFAULT_MAX_DEPTH,
248248
min_demand: int = C.CRAWL_DEFAULT_MIN_DEMAND,
249249
max_authorities: int = C.CRAWL_DEFAULT_MAX_AUTHORITIES,
250-
per_jurisdiction_cap: int | None = None,
251-
token_budget: int | None = None,
250+
per_jurisdiction_cap: int = C.CRAWL_DEFAULT_PER_JURISDICTION_CAP,
251+
token_budget: int = C.CRAWL_DEFAULT_TOKEN_BUDGET,
252252
) -> dict:
253253
"""Bounded recursive crawl: discover & ingest the authorities a corpus
254254
cites, then the authorities THOSE cite, up to ``max_depth`` hops. Returns a
255255
summary with per-state counts, per-jurisdiction tallies, the stop reason,
256256
and the full frontier residual census. Idempotent: already-ingested
257257
authorities are skipped, re-crawling creates zero duplicate documents.
258+
259+
All five bound parameters default to the ``C.CRAWL_DEFAULT_*`` constants —
260+
a uniform signature so a caller reading it sees the same default style for
261+
every bound (no None-sentinel for two of them and constants for the rest).
258262
"""
259263
from opencontractserver.enrichment.services.crawl_authorities_service import (
260264
CrawlAuthoritiesService,
@@ -266,14 +270,8 @@ def crawl_authorities(
266270
max_depth=max_depth,
267271
min_demand=min_demand,
268272
max_authorities=max_authorities,
269-
per_jurisdiction_cap=(
270-
per_jurisdiction_cap
271-
if per_jurisdiction_cap is not None
272-
else C.CRAWL_DEFAULT_PER_JURISDICTION_CAP
273-
),
274-
token_budget=(
275-
token_budget if token_budget is not None else C.CRAWL_DEFAULT_TOKEN_BUDGET
276-
),
273+
per_jurisdiction_cap=per_jurisdiction_cap,
274+
token_budget=token_budget,
277275
)
278276

279277

@@ -284,8 +282,8 @@ async def acrawl_authorities(
284282
max_depth: int = C.CRAWL_DEFAULT_MAX_DEPTH,
285283
min_demand: int = C.CRAWL_DEFAULT_MIN_DEMAND,
286284
max_authorities: int = C.CRAWL_DEFAULT_MAX_AUTHORITIES,
287-
per_jurisdiction_cap: int | None = None,
288-
token_budget: int | None = None,
285+
per_jurisdiction_cap: int = C.CRAWL_DEFAULT_PER_JURISDICTION_CAP,
286+
token_budget: int = C.CRAWL_DEFAULT_TOKEN_BUDGET,
289287
) -> dict:
290288
return await _db_sync_to_async(crawl_authorities)(
291289
creator_id=creator_id,

opencontractserver/tests/test_authority_frontier.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,59 @@ def test_combined_max_depth_and_min_demand(self):
603603
self.assertNotIn("usc-15:7b", keys)
604604
self.assertNotIn("usc-15:7c", keys)
605605

606+
def test_dequeue_atomically_claims_rows_in_progress(self):
607+
"""dequeue_queued is an atomic CLAIM, not a plain read (issue #2027).
608+
609+
Each returned row must be flipped to ``in_progress`` — both in the
610+
returned object and in the DB — so a second concurrent dequeue cannot
611+
re-return it and re-run ``discover_and_bootstrap`` on the same key.
612+
"""
613+
self._make_row("usc-15:claim-a", mention_count=10)
614+
self._make_row("usc-15:claim-b", mention_count=5)
615+
616+
first = AuthorityFrontierService.dequeue_queued(limit=1)
617+
self.assertEqual(len(first), 1)
618+
self.assertEqual(first[0].canonical_key, "usc-15:claim-a")
619+
# Claimed in the returned object AND persisted to the DB.
620+
self.assertEqual(first[0].discovery_state, "in_progress")
621+
self.assertEqual(
622+
AuthorityFrontier.objects.get(
623+
canonical_key="usc-15:claim-a"
624+
).discovery_state,
625+
"in_progress",
626+
)
627+
self.assertIsNotNone(first[0].last_attempt)
628+
629+
# A second dequeue must skip the already-claimed row and pick the next.
630+
second = AuthorityFrontierService.dequeue_queued(limit=10)
631+
keys = {r.canonical_key for r in second}
632+
self.assertNotIn("usc-15:claim-a", keys)
633+
self.assertIn("usc-15:claim-b", keys)
634+
635+
def test_filtered_out_rows_are_not_claimed(self):
636+
"""Rows excluded by min_demand/max_depth must stay ``queued`` (unclaimed).
637+
638+
The crawl's frontier_drained residual census counts ``queued`` rows, so
639+
the claim must touch only rows it actually returns.
640+
"""
641+
self._make_row("usc-15:keep", mention_count=5, depth=0)
642+
self._make_row("usc-15:low", mention_count=1, depth=0) # below min_demand
643+
self._make_row("usc-15:deep", mention_count=5, depth=9) # beyond max_depth
644+
645+
claimed = AuthorityFrontierService.dequeue_queued(
646+
limit=10, max_depth=2, min_demand=2
647+
)
648+
self.assertEqual({r.canonical_key for r in claimed}, {"usc-15:keep"})
649+
# The excluded rows are untouched — still queued for a later, looser pass.
650+
self.assertEqual(
651+
AuthorityFrontier.objects.get(canonical_key="usc-15:low").discovery_state,
652+
"queued",
653+
)
654+
self.assertEqual(
655+
AuthorityFrontier.objects.get(canonical_key="usc-15:deep").discovery_state,
656+
"queued",
657+
)
658+
606659

607660
class SeedChildKeysTests(TestCase):
608661
"""Tests for AuthorityFrontierService.seed_child_keys (Phase-5 idempotent seeding)."""

0 commit comments

Comments
 (0)