Skip to content

Commit 8fb07b8

Browse files
authored
Merge pull request #573 from spacedriveapp/codex/cortex-synthesis-r6-mutex
[codex] Guard dirty knowledge synthesis
2 parents 94f4191 + a49e994 commit 8fb07b8

2 files changed

Lines changed: 83 additions & 17 deletions

File tree

docs/design-docs/working-memory-triage.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ Findings from CodeRabbit review + bug reports. Tracking resolution before merge.
2323
- [ ] **R5 — Dirty flag only bumps on merges** (`src/agent/cortex.rs:1958`)
2424
Prunes and decays also change the memory set but don't trigger knowledge synthesis re-gen. Add `report.pruned > 0 || report.decayed > 0`. **Partial in PR #570:** prunes and merges now dirty synthesis; decay remains intentionally importance-only and needs a follow-up decision.
2525

26-
- [ ] **R6 — Dirty-flag synthesis not mutex-guarded** (`src/agent/cortex.rs:2106`)
27-
Can race with warmup synthesis path. Should acquire the same synthesis mutex. **Still open:** PR #570 single-flights background refresh tasks, but lock parity with warmup still needs a focused verify/fix pass.
26+
- [x] **R6 — Dirty-flag synthesis not mutex-guarded** (`src/agent/cortex.rs:2106`)
27+
Can race with warmup synthesis path. **Fixed in this slice:** dirty-triggered synthesis now acquires the warmup/synthesis mutex and re-checks the dirty version after the lock is held.
2828

2929
- [x] **R7 — Intraday/daily synthesis blocks main cortex loop** (`src/agent/cortex.rs:2166`)
3030
LLM calls awaited inline inside `tokio::select!`; events stop draining during synthesis. **Fixed in PR #570:** intraday and daily synthesis now run as background tasks with single-flight scheduling and failure backoff.

src/agent/cortex.rs

Lines changed: 81 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,42 @@ where
317317
}
318318
}
319319

320+
async fn generate_if_dirty_under_lock<ShouldGenerate, Generate, Fut>(
321+
warmup_lock: &tokio::sync::Mutex<()>,
322+
should_generate: ShouldGenerate,
323+
generate: Generate,
324+
) -> BulletinRefreshOutcome
325+
where
326+
ShouldGenerate: FnOnce() -> bool,
327+
Generate: FnOnce() -> Fut,
328+
Fut: std::future::Future<Output = bool>,
329+
{
330+
let _warmup_guard = warmup_lock.lock().await;
331+
332+
if !should_generate() {
333+
tracing::debug!("skipping knowledge synthesis because dirty version was already handled");
334+
return BulletinRefreshOutcome::SkippedFresh;
335+
}
336+
337+
if generate().await {
338+
BulletinRefreshOutcome::Generated
339+
} else {
340+
BulletinRefreshOutcome::Failed
341+
}
342+
}
343+
344+
async fn generate_knowledge_synthesis_if_dirty_under_lock(
345+
deps: &AgentDeps,
346+
logger: &CortexLogger,
347+
) -> BulletinRefreshOutcome {
348+
generate_if_dirty_under_lock(
349+
deps.runtime_config.warmup_lock.as_ref(),
350+
|| should_regenerate_knowledge_synthesis(deps),
351+
|| generate_knowledge_synthesis(deps, logger),
352+
)
353+
.await
354+
}
355+
320356
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
321357
enum BulletinRefreshOutcome {
322358
Generated,
@@ -2399,12 +2435,8 @@ async fn run_cortex_loop(
23992435
let deps = cortex.deps.clone();
24002436
let synthesis_logger = logger.clone();
24012437
refresh_task = Some(tokio::spawn(async move {
2402-
let success = generate_knowledge_synthesis(&deps, &synthesis_logger).await;
2403-
if success {
2404-
BulletinRefreshOutcome::Generated
2405-
} else {
2406-
BulletinRefreshOutcome::Failed
2407-
}
2438+
generate_knowledge_synthesis_if_dirty_under_lock(&deps, &synthesis_logger)
2439+
.await
24082440
}));
24092441
}
24102442

@@ -4597,14 +4629,14 @@ mod tests {
45974629
MAINTENANCE_TASK_CANCEL_GRACE_SECS, MaintenanceTimeoutAction, ReceiverClosedBehavior,
45984630
Signal, SynthesisTaskBackoff, WorkerTracker, apply_cancelled_warmup_status,
45994631
build_kill_targets, claim_detached_completion, collect_synthesis_task,
4600-
detached_timeout_transition, handle_cortex_receiver_result, has_completed_initial_warmup,
4601-
is_cancelled_control_result, is_terminal_control_result, maintenance_task_timeout,
4602-
maintenance_timeout_action, mark_knowledge_synthesis_version_complete,
4603-
maybe_close_bulletin_refresh_circuit, maybe_generate_bulletin_under_lock,
4604-
maybe_spawn_synthesis_task, parse_structured_success_flag, push_signal_into_buffer,
4605-
record_bulletin_refresh_failure, should_execute_warmup,
4606-
should_generate_bulletin_from_bulletin_loop, signal_from_event, summarize_signal_text,
4607-
take_lagged_control_flag,
4632+
detached_timeout_transition, generate_if_dirty_under_lock, handle_cortex_receiver_result,
4633+
has_completed_initial_warmup, is_cancelled_control_result, is_terminal_control_result,
4634+
maintenance_task_timeout, maintenance_timeout_action,
4635+
mark_knowledge_synthesis_version_complete, maybe_close_bulletin_refresh_circuit,
4636+
maybe_generate_bulletin_under_lock, maybe_spawn_synthesis_task,
4637+
parse_structured_success_flag, push_signal_into_buffer, record_bulletin_refresh_failure,
4638+
should_execute_warmup, should_generate_bulletin_from_bulletin_loop, signal_from_event,
4639+
summarize_signal_text, take_lagged_control_flag,
46084640
};
46094641
use crate::ProcessEvent;
46104642
use crate::agent::process_control::ControlActionResult;
@@ -4616,7 +4648,7 @@ mod tests {
46164648
use sqlx::sqlite::SqlitePoolOptions;
46174649
use std::collections::VecDeque;
46184650
use std::sync::Arc;
4619-
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
4651+
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
46204652
use std::time::{Duration, Instant};
46214653

46224654
#[test]
@@ -4812,6 +4844,40 @@ mod tests {
48124844
assert_eq!(calls.load(Ordering::SeqCst), 0);
48134845
}
48144846

4847+
#[tokio::test]
4848+
async fn dirty_knowledge_synthesis_rechecks_after_waiting_for_warmup_lock() {
4849+
let warmup_lock = Arc::new(tokio::sync::Mutex::new(()));
4850+
let dirty = Arc::new(AtomicBool::new(true));
4851+
let calls = Arc::new(AtomicUsize::new(0));
4852+
4853+
let guard = warmup_lock.as_ref().lock().await;
4854+
4855+
let warmup_lock_for_task = Arc::clone(&warmup_lock);
4856+
let dirty_for_task = Arc::clone(&dirty);
4857+
let calls_for_task = Arc::clone(&calls);
4858+
let task = tokio::spawn(async move {
4859+
generate_if_dirty_under_lock(
4860+
warmup_lock_for_task.as_ref(),
4861+
|| dirty_for_task.load(Ordering::SeqCst),
4862+
|| async move {
4863+
calls_for_task.fetch_add(1, Ordering::SeqCst);
4864+
true
4865+
},
4866+
)
4867+
.await
4868+
});
4869+
4870+
// A warmup pass completes while the dirty task is waiting for the same
4871+
// lock. Once unblocked, the dirty task must observe the clean version
4872+
// and skip instead of running a duplicate synthesis.
4873+
dirty.store(false, Ordering::SeqCst);
4874+
drop(guard);
4875+
4876+
let result = task.await.expect("task should join");
4877+
assert_eq!(result, BulletinRefreshOutcome::SkippedFresh);
4878+
assert_eq!(calls.load(Ordering::SeqCst), 0);
4879+
}
4880+
48154881
#[tokio::test]
48164882
async fn working_memory_synthesis_task_is_single_flight() {
48174883
let calls = Arc::new(AtomicUsize::new(0));

0 commit comments

Comments
 (0)