Skip to content

Commit f37338b

Browse files
lilyplasticlabslowyellingclaude
authored
fix(dreamer): threshold and time-guard semantics (plastic-labs#573)
* fix(dreamer): threshold and time-guard semantics Finding 2: filter count_stmt on documents.level == 'explicit' in check_and_schedule_dream. Dreamer-created levels (deductive, inductive, contradiction) are consolidation output, not input, and would otherwise inflate the threshold count and create a feedback loop. Finding 3 (code-level): relocate last_dream_at write from enqueue_dream (enqueue.py) to process_dream (orchestrator.py), inside the 'if result is not None' block. Duplicate enqueues can no longer reset the 8-hour time guard clock. Failed/never-run dreams don't advance it. Success criteria: lenient (any non-null DreamResult counts). Pending Vineeth confirmation — will adjust to strict/middle if requested. Tests pending in follow-up commits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(dreamer): threshold filter + last_dream_at relocation regression tests Tests for Finding 2 and Finding 3 (code-level): - TestThresholdFilter (tests/dreamer/test_dream_scheduler.py): * Mixed levels below explicit threshold: 30 explicit + 40 deductive + 10 inductive → no trigger (core regression, buggy count would trigger) * Explicit-only at threshold: 60 explicit → triggers * Contradiction excluded: 100 contradiction + 10 explicit → no trigger (confirms positive == "explicit" filter excludes all dreamer output) - TestEnqueueDreamMetadataShape (tests/deriver/test_enqueue_dream.py): * AsyncMock-patched update_collection_internal_metadata verifies enqueue writes last_dream_document_count but NOT last_dream_at - TestLastDreamAtCompletionWrite (tests/dreamer/test_dreamer_integration.py): * Happy path: run_dream returns DreamResult → last_dream_at written * Failure path: run_dream returns None → last_dream_at absent * Exception path: run_dream raises → last_dream_at absent, process_dream swallows exception (queue-processed semantics preserved) Docstring on check_and_schedule_dream tightened: "document threshold" -> "explicit-observation threshold" to reflect filter semantics. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(dreamer): preserve last_dream_document_count in completion write CodeRabbit caught this: update_collection_internal_metadata uses a top-level JSONB `||` merge, so passing {"dream": {"last_dream_at": ...}} replaces the entire "dream" subkey and drops last_dream_document_count that was written by enqueue_dream. Symptom: after every completed dream, the baseline drops to 0. Next check_and_schedule_dream reads documents_since_last_dream as current_count - 0 = current_count, so any collection with >= 50 explicit observations can re-trigger immediately once the 8h guard expires, even with no new raw material. Fix: read-modify-write. Fetch current collection, merge last_dream_at into the existing "dream" dict, write the merged dict back. Preserves sibling keys (current: last_dream_document_count; future-proof for telemetry fields that might land in PR 4). Regression test added to tests/dreamer/test_dreamer_integration.py: pre-seeds {"dream": {"last_dream_document_count": 42}}, runs process_dream, asserts both last_dream_at is written AND last_dream_document_count == 42 is preserved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(dreamer): address CodeRabbit feedback on b89997c - enqueue.py: read-modify-write preserves last_dream_at when writing baseline - dream_scheduler.py: explicit-level filter on execute_dream count query - test fixture: pin DOCUMENT_THRESHOLD and ENABLED_TYPES for stability - integration test: timezone-aware assertion on last_dream_at Regression test added for enqueue sibling-drop (symmetric to c8fe40a). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(dreamer): session lookup symmetry + row lock on dream metadata RMW - dream_scheduler.py: explicit-level filter on execute_dream session lookup (baseline and session pick must agree on the same document set) - crud.collection.get_collection: optional with_for_update flag for callers that need serialized read-modify-write on internal_metadata - enqueue.py + orchestrator.py: pass with_for_update=True on the RMW reads to close the TOCTOU between concurrent enqueue and completion writes Follow-up filed for jsonb_set-based nested updates (docs/factory/backlog/). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(dreamer): explicit-only count on manual schedule_dream route The third caller of enqueue_dream — POST /workspaces/{id}/schedule_dream — was passing an all-levels document count as the baseline, breaking symmetry with check_and_schedule_dream and execute_dream after Loop 2's filter fixes. Filter the manual route's count to match. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(dreamer): document explicit-only invariant on enqueue_dream.document_count Loop 3 follow-up on d76627a. The parameter's semantic tightened across Loop 2 (check_and_schedule_dream, execute_dream) and Loop 3 (schedule_dream route) to "explicit-level count, used as the baseline," but the signature still read "Current document count for metadata update." The next caller would have no way to know from the function contract. Docstring now spells out: (1) the value is explicit-only, (2) it's written as last_dream_document_count, (3) it's the baseline that check_and_schedule_dream subtracts from to compute documents_since_last_dream, (4) passing a count that includes non-explicit levels (deductive, inductive, contradiction) inflates the baseline and suppresses the next scheduled dream. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(dreamer): rename current_document_count → current_explicit_count Loop 3 follow-up on d4e10e3. After Loop 2's filter landed, the local in check_and_schedule_dream held an explicit-only count but was still named current_document_count — asymmetric with execute_dream's current_explicit_count (line 201) and contradicting the filter on line 269 that produces the value. Pure rename: three occurrences (definition at 271, subtraction at 274, log extra key at 282). No test references. Naming-as-invariant alignment with d76627a (query filters), d4e10e3 (parameter docstring), and Loop 1's local rename in execute_dream. The persisted JSONB key last_dream_document_count is the one remaining drift-layer; filed as plastic-claudebook backlog item for a separate PR with an intentional migration path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(dreamer): atomic guard-pair write + in-flight stampede defense Loop 4 response to Vineeth's CHANGES_REQUESTED on PR plastic-labs#573. The pre-Loop-4 enqueue-time write of last_dream_document_count was serving double duty: rate limiter AND stampede latch. By arming the 8h guard the moment a dream entered the pipeline, it implicitly blocked a second dream from being scheduled during the in-flight window. Loop 3 relocated the last_dream_at write to completion without moving its sibling baseline, splitting the semantic pair and exposing the latch role that had lived only in Vineeth's head. Invariant (now pinned to check_and_schedule_dream's docstring): from the moment a dream is scheduled until it completes or fails, no second dream may be enqueued for the same (workspace, observer, observed) — and the baseline count advances only when consolidation actually happened. Changes: - enqueue_dream: remove the last_dream_document_count write entirely and drop the document_count parameter. enqueue no longer touches dream metadata; the implicit stampede latch is replaced by an explicit queue-backed defense. - process_dream: extend the existing row-locked RMW to write both guard fields atomically. Current explicit-doc count is recomputed inside the locked block (not carried on DreamPayload) so the pair reflects the actual consolidation moment. - check_and_schedule_dream: query QueueItem for pending dreams on this collection's work_unit_keys (mirrors uq_queue_dream_pending_work_unit_key) before arming a timer. Uses queue state as source of truth rather than reflecting it into metadata. - Tests: two new coherence tests under TestGuardPairCoherence — test_pending_queue_item_blocks_second_schedule walks the stampede timeline, test_silent_failure_allows_retry_on_same_corpus verifies failed dreams don't consume the baseline. Existing tests updated to the new contract. * chore(dreamer): trim comment slop from loop-4 atomic pair work Compress three verbose comments added in d24958d — the invariant itself is captured in check_and_schedule_dream's docstring, so the inline narrative restates what the code already says. - dream_scheduler.py defense C block: 5 lines → 2 - orchestrator.py atomic pair write: 4 lines → 1 - enqueue.py docstring paragraph: 5 lines → 2 Net: +5/-14. Follows Eri's eef27be precedent on sillytavern-honcho PR #7. --------- Co-authored-by: lilyplasticlabs <lily@plasticlabs.ai> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a05c2f8 commit f37338b

9 files changed

Lines changed: 956 additions & 189 deletions

File tree

src/crud/collection.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ async def get_collection(
8181
*,
8282
observer: str,
8383
observed: str,
84+
with_for_update: bool = False,
8485
) -> models.Collection:
8586
"""
8687
Get a collection by observer/observed for a workspace.
@@ -90,13 +91,34 @@ async def get_collection(
9091
workspace_name: Name of the workspace
9192
observer: Name of the observing peer (owns the collection)
9293
observed: Name of the observed peer
94+
with_for_update: If True, acquire a row-level lock (SELECT ... FOR UPDATE)
95+
on the collection. Bypasses the cache so the lock is actually held
96+
by the current transaction. Callers using this flag must wrap the
97+
read and subsequent write in the same transaction (the lock is
98+
released on commit/rollback).
9399
94100
Returns:
95101
The collection if found
96102
97103
Raises:
98104
ResourceNotFoundException: If the collection does not exist
99105
"""
106+
if with_for_update:
107+
# Row-lock path: go direct to DB (skip cache) so the FOR UPDATE lock
108+
# is actually acquired on the row in the current transaction. The
109+
# cached dict path would return without issuing SELECT ... FOR UPDATE.
110+
stmt = (
111+
select(models.Collection)
112+
.where(models.Collection.workspace_name == workspace_name)
113+
.where(models.Collection.observer == observer)
114+
.where(models.Collection.observed == observed)
115+
.with_for_update()
116+
)
117+
collection = await db.scalar(stmt)
118+
if collection is None:
119+
raise ResourceNotFoundException("Collection not found")
120+
return collection
121+
100122
data = await _fetch_collection(db, workspace_name, observer, observed)
101123
if data is None:
102124
raise ResourceNotFoundException("Collection not found")

src/deriver/enqueue.py

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import logging
2-
from datetime import datetime, timezone
32
from typing import Any, Literal
43

54
from sqlalchemy import exists, insert, select
@@ -436,27 +435,26 @@ async def enqueue_dream(
436435
observer: str,
437436
observed: str,
438437
dream_type: schemas.DreamType,
439-
document_count: int,
440438
session_name: str | None = None,
441439
) -> None:
442440
"""
443441
Enqueue a dream task for immediate processing by the deriver.
444442
443+
Does not touch collection.internal_metadata["dream"] — both guard fields
444+
are written atomically in process_dream on successful completion.
445+
445446
Deduplication: If a dream with the same work_unit_key is already in-progress
446-
(has an ActiveQueueSession), the enqueue is skipped to prevent running
447-
multiple dreams concurrently for the same collection.
447+
(has an ActiveQueueSession) or pending in the queue, the enqueue is skipped.
448448
449449
Args:
450450
workspace_name: Name of the workspace
451451
observer: Name of the observer peer
452452
observed: Name of the observed peer
453453
dream_type: Type of dream to execute
454-
document_count: Current document count for metadata update
455454
session_name: Name of the session to scope the dream to if specified
456455
"""
457456
async with tracked_db("dream_enqueue") as db_session:
458457
try:
459-
# Create the dream queue record
460458
dream_record = create_dream_record(
461459
workspace_name,
462460
observer=observer,
@@ -467,11 +465,6 @@ async def enqueue_dream(
467465

468466
work_unit_key = dream_record["work_unit_key"]
469467

470-
# Check if a dream with this work_unit_key is currently in progress
471-
# (has an ActiveQueueSession, meaning a worker is processing it)
472-
# We only block on in-progress dreams, not pending ones - if there's
473-
# a pending dream, we don't need to add another one anyway since
474-
# the queue processor will pick it up.
475468
in_progress_check = select(
476469
exists(
477470
select(models.ActiveQueueSession.id).where(
@@ -491,7 +484,6 @@ async def enqueue_dream(
491484
)
492485
return
493486

494-
# Check if there's already a pending dream with the same work_unit_key
495487
pending_check = select(
496488
exists(
497489
select(QueueItem.id).where(
@@ -512,25 +504,9 @@ async def enqueue_dream(
512504
)
513505
return
514506

515-
# Insert into queue
516507
stmt = insert(QueueItem).returning(QueueItem)
517508
await db_session.execute(stmt, [dream_record])
518-
519-
# Update collection metadata (CRUD handles cache invalidation)
520-
now_iso = datetime.now(timezone.utc).isoformat()
521-
await crud.update_collection_internal_metadata(
522-
db_session,
523-
workspace_name,
524-
observer,
525-
observed,
526-
update_data={
527-
"dream": {
528-
"last_dream_document_count": document_count,
529-
"last_dream_at": now_iso,
530-
}
531-
},
532-
)
533-
# update_collection_internal_metadata commits already
509+
await db_session.commit()
534510

535511
logger.info(
536512
"Enqueued dream task for %s/%s/%s (type: %s)",

src/dreamer/dream_scheduler.py

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from logging import getLogger
55

66
import sentry_sdk
7-
from sqlalchemy import func, select
7+
from sqlalchemy import exists, func, select
88
from sqlalchemy.ext.asyncio import AsyncSession
99

1010
from src import models
@@ -160,20 +160,19 @@ async def execute_dream(
160160
observer: str,
161161
observed: str,
162162
) -> None:
163-
"""Execute the dream by enqueueing it and updating collection metadata."""
164-
# Import here to avoid circular dependency
163+
"""Execute the dream by enqueueing it."""
165164
from src import crud
166165
from src.deriver.enqueue import enqueue_dream
167166
from src.utils.config_helpers import get_configuration
168167

169-
# Find the most recent session and get current document count
170168
async with tracked_db("dream_session_lookup") as db:
171169
stmt = (
172170
select(models.Document.session_name)
173171
.where(
174172
models.Document.workspace_name == workspace_name,
175173
models.Document.observer == observer,
176174
models.Document.observed == observed,
175+
models.Document.level == "explicit",
177176
)
178177
.order_by(models.Document.created_at.desc())
179178
.limit(1)
@@ -186,14 +185,6 @@ async def execute_dream(
186185
)
187186
return
188187

189-
# Get current document count at execution time (not stale from scheduling)
190-
count_stmt = select(func.count(models.Document.id)).where(
191-
models.Document.workspace_name == workspace_name,
192-
models.Document.observer == observer,
193-
models.Document.observed == observed,
194-
)
195-
current_document_count = int(await db.scalar(count_stmt) or 0)
196-
197188
session = await crud.get_session(
198189
db, workspace_name=workspace_name, session_name=session_name
199190
)
@@ -212,7 +203,6 @@ async def execute_dream(
212203
observer=observer,
213204
observed=observed,
214205
dream_type=dream_type,
215-
document_count=current_document_count,
216206
session_name=session_name,
217207
)
218208

@@ -231,13 +221,18 @@ async def check_and_schedule_dream(
231221
collection: models.Collection,
232222
) -> bool:
233223
"""
234-
Check if a collection has reached the document threshold and schedule a timer-based dream.
224+
From the moment a dream is scheduled until it completes or fails, no second
225+
dream may be enqueued for the same (workspace, observer, observed) — and the
226+
baseline count advances only when consolidation actually happened.
227+
228+
Check if a collection has reached the explicit-observation threshold and schedule a timer-based dream.
235229
236230
This function only schedules a timer-based dream if:
237231
1. Dreams are enabled
238-
2. Document threshold is reached
232+
2. Explicit-observation threshold is reached (dreamer output does not count)
239233
3. Minimum hours between dreams have passed
240-
4. No dream is already scheduled for this collection
234+
4. No dream is already pending in the queue for this collection (in-flight check)
235+
5. No dream is already scheduled for this collection
241236
242237
Args:
243238
db: Database session
@@ -249,38 +244,36 @@ async def check_and_schedule_dream(
249244
if not settings.DREAM.ENABLED:
250245
return False
251246

252-
# Get dream metadata from internal_metadata
253247
dream_metadata = collection.internal_metadata.get("dream", {})
254248
last_dream_document_count = dream_metadata.get("last_dream_document_count", 0)
255249
last_dream_at = dream_metadata.get("last_dream_at")
256250

257-
# Count current documents in the collection
251+
# Count explicit-level docs only: dreamer output (deductive/inductive/
252+
# contradiction) would inflate the threshold and create a feedback loop.
258253
count_stmt = select(func.count(models.Document.id)).where(
259254
models.Document.workspace_name == collection.workspace_name,
260255
models.Document.observer == collection.observer,
261256
models.Document.observed == collection.observed,
257+
models.Document.level == "explicit",
262258
)
263-
current_document_count = int(await db.scalar(count_stmt) or 0)
259+
current_explicit_count = int(await db.scalar(count_stmt) or 0)
264260

265-
# Calculate documents added since last dream
266-
documents_since_last_dream = current_document_count - last_dream_document_count
261+
documents_since_last_dream = current_explicit_count - last_dream_document_count
267262

268263
logger.debug(
269264
"Dream check",
270265
extra={
271266
"workspace_name": collection.workspace_name,
272267
"observer": collection.observer,
273268
"observed": collection.observed,
274-
"current_document_count": current_document_count,
269+
"current_explicit_count": current_explicit_count,
275270
"last_dream_document_count": last_dream_document_count,
276271
"documents_since_last_dream": documents_since_last_dream,
277272
"document_threshold": settings.DREAM.DOCUMENT_THRESHOLD,
278273
},
279274
)
280275

281-
# Only schedule timer if document threshold is reached
282276
if documents_since_last_dream >= settings.DREAM.DOCUMENT_THRESHOLD:
283-
# Check if we're within minimum hours between dreams
284277
if last_dream_at:
285278
try:
286279
last_dream_time = datetime.fromisoformat(last_dream_at)
@@ -299,11 +292,43 @@ async def check_and_schedule_dream(
299292
f"Invalid last_dream_at timestamp: {last_dream_at}, error: {e}"
300293
)
301294

295+
# Queue is source of truth for in-flight dreams; mirrors
296+
# uq_queue_dream_pending_work_unit_key.
297+
enabled_dream_types = settings.DREAM.ENABLED_TYPES
298+
pending_keys = [
299+
construct_work_unit_key(
300+
collection.workspace_name,
301+
{
302+
"task_type": "dream",
303+
"observer": collection.observer,
304+
"observed": collection.observed,
305+
"dream_type": dream_type,
306+
},
307+
)
308+
for dream_type in enabled_dream_types
309+
]
310+
pending_exists = await db.scalar(
311+
select(
312+
exists(
313+
select(models.QueueItem.id).where(
314+
models.QueueItem.task_type == "dream",
315+
models.QueueItem.processed == False, # noqa: E712
316+
models.QueueItem.work_unit_key.in_(pending_keys),
317+
)
318+
)
319+
)
320+
)
321+
if pending_exists:
322+
logger.info(
323+
"Skipping dream schedule for %s/%s: pending dream already in queue",
324+
collection.observer,
325+
collection.observed,
326+
)
327+
return False
328+
302329
dream_scheduler = get_dream_scheduler()
303330
if dream_scheduler:
304-
enabled_dream_types = settings.DREAM.ENABLED_TYPES
305331
for dream_type in enabled_dream_types:
306-
# Include dream_type in key so each dream type can be tracked independently
307332
dream_work_unit_key = construct_work_unit_key(
308333
collection.workspace_name,
309334
{

src/dreamer/orchestrator.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
import time
1818
import uuid
1919
from dataclasses import dataclass
20+
from datetime import datetime, timezone
2021
from typing import Any
2122

2223
import sentry_sdk
24+
from sqlalchemy import func, select
2325

24-
from src import crud
26+
from src import crud, models
2527
from src.config import settings
2628
from src.dependencies import tracked_db
2729
from src.dreamer.specialists import SPECIALISTS, SpecialistResult
@@ -323,6 +325,34 @@ async def process_dream(
323325
+ f"duration={result.total_duration_ms:.0f}ms"
324326
)
325327

328+
# Both guard fields advance together only on successful consolidation.
329+
now_iso = datetime.now(timezone.utc).isoformat()
330+
async with tracked_db("dream.guard_pair_write") as db:
331+
collection = await crud.get_collection(
332+
db,
333+
workspace_name,
334+
observer=payload.observer,
335+
observed=payload.observed,
336+
with_for_update=True,
337+
)
338+
count_stmt = select(func.count(models.Document.id)).where(
339+
models.Document.workspace_name == workspace_name,
340+
models.Document.observer == payload.observer,
341+
models.Document.observed == payload.observed,
342+
models.Document.level == "explicit",
343+
)
344+
current_explicit_count = int(await db.scalar(count_stmt) or 0)
345+
dream_meta = dict(collection.internal_metadata.get("dream", {}))
346+
dream_meta["last_dream_at"] = now_iso
347+
dream_meta["last_dream_document_count"] = current_explicit_count
348+
await crud.update_collection_internal_metadata(
349+
db,
350+
workspace_name,
351+
payload.observer,
352+
payload.observed,
353+
update_data={"dream": dream_meta},
354+
)
355+
326356
except Exception as e:
327357
logger.error(
328358
f"Error processing dream task {payload.dream_type} for {payload.observer}/{payload.observed}: {str(e)}",

src/routers/workspaces.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@
33
from fastapi import APIRouter, Body, Depends, HTTPException, Path, Query, Response
44
from fastapi_pagination import Page
55
from fastapi_pagination.ext.sqlalchemy import apaginate
6-
from sqlalchemy import func, select
76
from sqlalchemy.ext.asyncio import AsyncSession
87

9-
from src import crud, models, schemas
8+
from src import crud, schemas
109
from src.config import settings
1110
from src.dependencies import db
1211
from src.deriver.enqueue import enqueue_deletion, enqueue_dream
@@ -201,7 +200,6 @@ async def schedule_dream(
201200
request: schemas.ScheduleDreamRequest = Body(
202201
..., description="Dream scheduling parameters"
203202
),
204-
db: AsyncSession = db,
205203
):
206204
"""
207205
Manually schedule a dream task for a specific collection.
@@ -224,21 +222,11 @@ async def schedule_dream(
224222
observed = request.observed if request.observed is not None else request.observer
225223
dream_type = request.dream_type
226224

227-
# Count documents in the collection
228-
count_stmt = select(func.count(models.Document.id)).where(
229-
models.Document.workspace_name == workspace_id,
230-
models.Document.observer == observer,
231-
models.Document.observed == observed,
232-
)
233-
document_count = int(await db.scalar(count_stmt) or 0)
234-
235-
# Enqueue the dream task for immediate processing
236225
await enqueue_dream(
237226
workspace_id,
238227
observer=observer,
239228
observed=observed,
240229
dream_type=dream_type,
241-
document_count=document_count,
242230
session_name=request.session_id,
243231
)
244232

0 commit comments

Comments
 (0)