Skip to content

Commit e232766

Browse files
committed
Harden bounded authority crawl: atomic dequeue + per-corpus Analysis reuse
Addresses the code review in issue #2027 of the Phase-5 BFS crawl engine. Issue #1 (atomic dequeue): AuthorityFrontierService.dequeue_queued() was a plain filter(discovery_state="queued") read — the in_progress transition only happened later in discover_and_bootstrap, leaving a window where two concurrent crawl_authorities tasks could dequeue and bootstrap the same frontier row (wasted provider calls, distorted counters). It now claims the rows it returns inside a single SELECT ... FOR UPDATE SKIP LOCKED transaction, flipping them to in_progress; a second worker skips locked rows and grabs the next ones. Issue #2 (Analysis-per-section bloat): every section of an authority bootstraps into ONE corpus (the provider title is a constant, so all usc-* sections land in the single "United States Code" corpus), so the BFS calls apply() on that corpus once per ingested section. Each call previously minted a fresh Analysis via _get_analysis. crawl() now caches the Analysis the first apply creates per corpus and threads it back through apply(analysis=...), capping it at one provenance row per corpus. Issue #3 (blocked_by_bound): clarified in comments that min_demand_or_depth is populated only on the frontier_drained stop — where every residual queued row is provably bound-excluded — and intentionally not on the max_authorities / token_budget early stops, whose unreached-but-eligible rows are accounted for by the frontier_residual census instead. Minor: crawl_authorities / acrawl_authorities tool params now apply the C.CRAWL_DEFAULT_* constants uniformly instead of None sentinels for two of five. Regression tests: dequeue atomically claims returned rows / leaves filtered-out rows queued (test_authority_frontier.py); a crawl ingesting multiple sections of one authority reuses a single provenance Analysis (test_crawl_authorities.py ApplyAnalysisReuseTests). Closes #2027
1 parent 6df5dc0 commit e232766

6 files changed

Lines changed: 290 additions & 21 deletions

File tree

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
- Harden the bounded authority crawl against a concurrency race and provenance
2+
bloat (issue #2027, a code review of the Phase-5 BFS engine):
3+
- **Atomic dequeue claim.**
4+
`AuthorityFrontierService.dequeue_queued()`
5+
(`opencontractserver/enrichment/services/authority_frontier_service.py`)
6+
was a plain `filter(discovery_state="queued")` read — the `in_progress`
7+
transition only happened later inside `discover_and_bootstrap`, leaving a
8+
TOCTOU window where two concurrent `crawl_authorities` tasks (e.g. two
9+
manual triggers on the same corpus) could dequeue the SAME frontier row and
10+
`discover_and_bootstrap` it twice (wasted provider calls, distorted summary
11+
counters). It now claims the rows it returns inside a single
12+
`SELECT … FOR UPDATE SKIP LOCKED` transaction, flipping them to
13+
`in_progress`; a second worker skips locked rows and grabs the next ones.
14+
Rows excluded by `max_depth` / `min_demand` are never claimed, so the
15+
`frontier_drained` residual census still counts them as `queued`.
16+
- **One provenance `Analysis` per authority corpus.** Every section of an
17+
authority bootstraps into ONE corpus (the provider `title` is a constant —
18+
all `usc-*` sections land in the single "United States Code" corpus), so the
19+
BFS calls `EnrichmentService.apply()` on that corpus once per ingested
20+
section. Each call previously minted a fresh `Analysis`
21+
(`_get_analysis``Analysis.objects.create`), so a deep crawl left dozens
22+
of provenance rows on one corpus. `CrawlAuthoritiesService.crawl()`
23+
(`opencontractserver/enrichment/services/crawl_authorities_service.py`) now
24+
caches the `Analysis` the first apply creates per corpus and threads it back
25+
into the rest via `apply(analysis=…)`, capping it at one per corpus. A
26+
misleading "this apply scan is bounded (one small document per section)"
27+
comment was corrected.
28+
- **Honest `blocked_by_bound` accounting.** Clarified (in comments) that
29+
`blocked_by_bound["min_demand_or_depth"]` is populated only on the
30+
`frontier_drained` stop — where every residual `queued` row is provably
31+
bound-excluded — and intentionally NOT on the `max_authorities` /
32+
`token_budget` early stops, whose unreached rows may be perfectly eligible
33+
and are accounted for by the `frontier_residual` census instead.
34+
- **Uniform tool signature.** `crawl_authorities` / `acrawl_authorities`
35+
(`opencontractserver/llms/tools/core_tools/corpus_references.py`) now apply
36+
`C.CRAWL_DEFAULT_*` constants to all five bound parameters instead of using
37+
`None` sentinels for two of them and constants for the other three.
38+
- Regression tests: `test_authority_frontier.py` (dequeue atomically claims
39+
returned rows / leaves filtered-out rows `queued`) and
40+
`test_crawl_authorities.py::ApplyAnalysisReuseTests` (a crawl that ingests
41+
multiple sections of one authority reuses a single provenance `Analysis`).

opencontractserver/enrichment/services/authority_frontier_service.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from collections.abc import Mapping
1111
from dataclasses import dataclass
1212

13+
from django.db import transaction
1314
from django.db.models import Count, Q
1415
from django.utils import timezone
1516

@@ -143,19 +144,47 @@ def dequeue_queued(
143144
max_depth: int | None = None,
144145
min_demand: int = 0,
145146
) -> list[AuthorityFrontier]:
146-
"""Highest-demand queued rows regardless of assigned provider.
147+
"""Atomically CLAIM the highest-demand queued rows for the crawl driver.
147148
148149
Unlike ``dequeue_for_provider`` (which requires a stamped provider),
149150
this serves the crawl driver: it picks ``discovery_state="queued"`` rows
150151
ranked by ``-mention_count``, optionally bounded by depth and a minimum
151152
demand floor. Provider selection happens later in the discovery service.
153+
154+
The returned rows are transitioned to ``in_progress`` inside a single
155+
``SELECT ... FOR UPDATE SKIP LOCKED`` transaction, so the dequeue is an
156+
atomic *claim* rather than a plain read: two ``crawl_authorities`` tasks
157+
running concurrently (e.g. two manual triggers on the same corpus) can
158+
never return — and therefore never ``discover_and_bootstrap`` — the same
159+
frontier row twice (issue #2027). ``skip_locked`` lets a second worker
160+
pick the next available rows instead of blocking on the first worker's
161+
lock. Rows excluded by ``max_depth`` / ``min_demand`` are never claimed,
162+
so the crawl's ``frontier_drained`` residual census still sees them as
163+
``queued``.
152164
"""
153165
qs = AuthorityFrontier.objects.filter(discovery_state="queued")
154166
if max_depth is not None:
155167
qs = qs.filter(depth__lte=max_depth)
156168
if min_demand:
157169
qs = qs.filter(mention_count__gte=min_demand)
158-
return list(qs.order_by("-mention_count")[:limit])
170+
with transaction.atomic():
171+
rows = list(
172+
qs.select_for_update(skip_locked=True).order_by("-mention_count")[
173+
:limit
174+
]
175+
)
176+
if rows:
177+
now = timezone.now()
178+
for row in rows:
179+
row.discovery_state = "in_progress"
180+
row.last_attempt = now
181+
# bulk_update bypasses auto_now — stamp ``modified`` so the
182+
# claim matches the single-row ``mark()`` writer.
183+
row.modified = now
184+
AuthorityFrontier.objects.bulk_update(
185+
rows, ["discovery_state", "last_attempt", "modified"]
186+
)
187+
return rows
159188

160189
@classmethod
161190
def seed_child_keys(

opencontractserver/enrichment/services/crawl_authorities_service.py

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,27 @@ def crawl(
136136
# iterations rather than constructing a fresh object on every BFS hop.
137137
enrichment = EnrichmentService()
138138

139+
# One provenance Analysis per authority corpus, reused across the run.
140+
# Every section of a given authority bootstraps into ONE corpus — the
141+
# provider's ``title`` is a constant, so every ``usc-*`` section lands in
142+
# the single "United States Code" corpus — so a crawl that ingests N
143+
# sections of an authority calls apply() on the SAME corpus N times.
144+
# Letting each call mint its own Analysis (apply()'s default when
145+
# ``analysis=None``) would leave N provenance rows on that one corpus;
146+
# instead we capture the Analysis the first apply creates and feed it back
147+
# into the rest, capping it at one per corpus (issue #2027).
148+
from opencontractserver.analyzer.models import Analysis
149+
150+
apply_analyses: dict[int, Analysis] = {}
151+
139152
while True:
140-
# Hard cap checks before dequeue so the summary is honest.
153+
# Hard cap checks before dequeue so the summary is honest. On these
154+
# early stops we intentionally do NOT populate
155+
# blocked_by_bound["min_demand_or_depth"]: rows still queued when a cap
156+
# fires were simply not reached, and may be perfectly eligible (above
157+
# min_demand, within max_depth) — attributing them to a bound would be
158+
# a lie. The frontier_residual census (computed below for EVERY stop
159+
# reason) accounts for them, so the summary is still non-silent.
141160
if ingested >= max_authorities:
142161
stop_reason = "max_authorities"
143162
break
@@ -150,10 +169,13 @@ def crawl(
150169
limit=1, max_depth=max_depth, min_demand=min_demand
151170
)
152171
if not rows:
153-
# Count how many queued rows remain so the summary is non-silent
154-
# about what was left. This is the UNION of rows excluded by the
155-
# min_demand floor and/or the max_depth bound — the single key
156-
# does not attribute each row to one cause or the other.
172+
# frontier_drained: dequeue returned nothing, so EVERY remaining
173+
# queued row failed the (min_demand AND max_depth) filters. Here —
174+
# and only here — is attributing the residual queued count to those
175+
# bounds correct (the early max_authorities / token_budget breaks
176+
# above leave their unreached-but-eligible rows to
177+
# frontier_residual instead). The single key is the UNION of the
178+
# two exclusions; it does not split each row by cause.
157179
blocked_by_bound["min_demand_or_depth"] = (
158180
AuthorityFrontier.objects.filter(discovery_state="queued").count()
159181
)
@@ -206,14 +228,26 @@ def crawl(
206228
# Re-extract the authority's OWN outbound citations and seed the
207229
# frontier at depth+1 — only when we haven't reached max_depth.
208230
if row.depth < max_depth:
209-
# Authority corpora hold one small document per statute section,
210-
# so this apply scan is bounded (not a large-corpus scan).
231+
# Reuse this corpus's provenance Analysis across sections (see the
232+
# apply_analyses note above) so the BFS doesn't accumulate one
233+
# Analysis row per section on a shared authority corpus.
234+
apply_analysis = apply_analyses.get(authority_corpus_id)
211235
apply_res = enrichment.apply(
212236
corpus_id=authority_corpus_id,
213237
creator_id=creator_id,
214238
types=[C.REF_LAW],
215239
extra_tiers=[C.DETECTION_TIER_GRAMMAR],
240+
analysis=apply_analysis,
216241
)
242+
if apply_analysis is None:
243+
# First apply on this corpus created the provenance Analysis;
244+
# cache it so the corpus's remaining sections reattach to it
245+
# instead of each minting a fresh one.
246+
new_analysis_id = apply_res.get("analysis_id")
247+
if new_analysis_id is not None:
248+
apply_analyses[authority_corpus_id] = Analysis.objects.get(
249+
pk=new_analysis_id
250+
)
217251

218252
outbound = list(
219253
CorpusReferenceService.for_corpus(user, authority_corpus_id)

opencontractserver/llms/tools/core_tools/corpus_references.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,18 @@ def crawl_authorities(
247247
max_depth: int = C.CRAWL_DEFAULT_MAX_DEPTH,
248248
min_demand: int = C.CRAWL_DEFAULT_MIN_DEMAND,
249249
max_authorities: int = C.CRAWL_DEFAULT_MAX_AUTHORITIES,
250-
per_jurisdiction_cap: int | None = None,
251-
token_budget: int | None = None,
250+
per_jurisdiction_cap: int = C.CRAWL_DEFAULT_PER_JURISDICTION_CAP,
251+
token_budget: int = C.CRAWL_DEFAULT_TOKEN_BUDGET,
252252
) -> dict:
253253
"""Bounded recursive crawl: discover & ingest the authorities a corpus
254254
cites, then the authorities THOSE cite, up to ``max_depth`` hops. Returns a
255255
summary with per-state counts, per-jurisdiction tallies, the stop reason,
256256
and the full frontier residual census. Idempotent: already-ingested
257257
authorities are skipped, re-crawling creates zero duplicate documents.
258+
259+
All five bound parameters default to the ``C.CRAWL_DEFAULT_*`` constants —
260+
a uniform signature so a caller reading it sees the same default style for
261+
every bound (no None-sentinel for two of them and constants for the rest).
258262
"""
259263
from opencontractserver.enrichment.services.crawl_authorities_service import (
260264
CrawlAuthoritiesService,
@@ -266,14 +270,8 @@ def crawl_authorities(
266270
max_depth=max_depth,
267271
min_demand=min_demand,
268272
max_authorities=max_authorities,
269-
per_jurisdiction_cap=(
270-
per_jurisdiction_cap
271-
if per_jurisdiction_cap is not None
272-
else C.CRAWL_DEFAULT_PER_JURISDICTION_CAP
273-
),
274-
token_budget=(
275-
token_budget if token_budget is not None else C.CRAWL_DEFAULT_TOKEN_BUDGET
276-
),
273+
per_jurisdiction_cap=per_jurisdiction_cap,
274+
token_budget=token_budget,
277275
)
278276

279277

@@ -284,8 +282,8 @@ async def acrawl_authorities(
284282
max_depth: int = C.CRAWL_DEFAULT_MAX_DEPTH,
285283
min_demand: int = C.CRAWL_DEFAULT_MIN_DEMAND,
286284
max_authorities: int = C.CRAWL_DEFAULT_MAX_AUTHORITIES,
287-
per_jurisdiction_cap: int | None = None,
288-
token_budget: int | None = None,
285+
per_jurisdiction_cap: int = C.CRAWL_DEFAULT_PER_JURISDICTION_CAP,
286+
token_budget: int = C.CRAWL_DEFAULT_TOKEN_BUDGET,
289287
) -> dict:
290288
return await _db_sync_to_async(crawl_authorities)(
291289
creator_id=creator_id,

opencontractserver/tests/test_authority_frontier.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,59 @@ def test_combined_max_depth_and_min_demand(self):
509509
self.assertNotIn("usc-15:7b", keys)
510510
self.assertNotIn("usc-15:7c", keys)
511511

512+
def test_dequeue_atomically_claims_rows_in_progress(self):
513+
"""dequeue_queued is an atomic CLAIM, not a plain read (issue #2027).
514+
515+
Each returned row must be flipped to ``in_progress`` — both in the
516+
returned object and in the DB — so a second concurrent dequeue cannot
517+
re-return it and re-run ``discover_and_bootstrap`` on the same key.
518+
"""
519+
self._make_row("usc-15:claim-a", mention_count=10)
520+
self._make_row("usc-15:claim-b", mention_count=5)
521+
522+
first = AuthorityFrontierService.dequeue_queued(limit=1)
523+
self.assertEqual(len(first), 1)
524+
self.assertEqual(first[0].canonical_key, "usc-15:claim-a")
525+
# Claimed in the returned object AND persisted to the DB.
526+
self.assertEqual(first[0].discovery_state, "in_progress")
527+
self.assertEqual(
528+
AuthorityFrontier.objects.get(
529+
canonical_key="usc-15:claim-a"
530+
).discovery_state,
531+
"in_progress",
532+
)
533+
self.assertIsNotNone(first[0].last_attempt)
534+
535+
# A second dequeue must skip the already-claimed row and pick the next.
536+
second = AuthorityFrontierService.dequeue_queued(limit=10)
537+
keys = {r.canonical_key for r in second}
538+
self.assertNotIn("usc-15:claim-a", keys)
539+
self.assertIn("usc-15:claim-b", keys)
540+
541+
def test_filtered_out_rows_are_not_claimed(self):
542+
"""Rows excluded by min_demand/max_depth must stay ``queued`` (unclaimed).
543+
544+
The crawl's frontier_drained residual census counts ``queued`` rows, so
545+
the claim must touch only rows it actually returns.
546+
"""
547+
self._make_row("usc-15:keep", mention_count=5, depth=0)
548+
self._make_row("usc-15:low", mention_count=1, depth=0) # below min_demand
549+
self._make_row("usc-15:deep", mention_count=5, depth=9) # beyond max_depth
550+
551+
claimed = AuthorityFrontierService.dequeue_queued(
552+
limit=10, max_depth=2, min_demand=2
553+
)
554+
self.assertEqual({r.canonical_key for r in claimed}, {"usc-15:keep"})
555+
# The excluded rows are untouched — still queued for a later, looser pass.
556+
self.assertEqual(
557+
AuthorityFrontier.objects.get(canonical_key="usc-15:low").discovery_state,
558+
"queued",
559+
)
560+
self.assertEqual(
561+
AuthorityFrontier.objects.get(canonical_key="usc-15:deep").discovery_state,
562+
"queued",
563+
)
564+
512565

513566
class SeedChildKeysTests(TestCase):
514567
"""Tests for AuthorityFrontierService.seed_child_keys (Phase-5 idempotent seeding)."""

0 commit comments

Comments
 (0)