Skip to content

Commit dd30a4b

Browse files
sanil-23claude
andauthored
fix(memory_tree, e2e tests ): deterministic query_topic ordering + robust CEF cleanup (tinyhumansai#1751)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a7a262b commit dd30a4b

3 files changed

Lines changed: 79 additions & 8 deletions

File tree

app/scripts/e2e-run-session.sh

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,40 @@ cleanup() {
6363
fi
6464
if [ -n "$APP_PID" ]; then
6565
echo "[runner] Stopping CEF app (pid $APP_PID)..."
66+
# CEF spawns helper child processes (zygote, GPU, renderers) that
67+
# the parent does not reap on SIGTERM. If we only `kill $APP_PID`
68+
# the parent exits but children keep writing into the temp
69+
# workspace, and the `rm -rf` below races them and fails with
70+
# "Directory not empty" on Linux runners — even though the WDIO
71+
# spec itself passed. Reap the whole process tree before cleanup.
72+
#
73+
# CRITICAL: capture child PIDs **before** killing the parent.
74+
# The instant the parent exits, the kernel reparents its children
75+
# to init (PID 1). After that, `pkill -P "$APP_PID"` matches
76+
# nothing because no process has the dying parent as its PPID
77+
# anymore. Snapshot the PIDs while the relationship still exists,
78+
# then signal them directly by PID.
79+
CHILD_PIDS="$(pgrep -P "$APP_PID" 2>/dev/null || true)"
80+
pkill -TERM -P "$APP_PID" 2>/dev/null || true
6681
kill "$APP_PID" 2>/dev/null || true
6782
wait "$APP_PID" 2>/dev/null || true
83+
# Brief grace period so CEF helpers can flush their CEF/Default
84+
# files and exit on the SIGTERM we already sent. Anything that
85+
# ignored it gets SIGKILLed by the captured-PID sweep below.
86+
sleep 1
87+
if [ -n "$CHILD_PIDS" ]; then
88+
for pid in $CHILD_PIDS; do
89+
kill -KILL "$pid" 2>/dev/null || true
90+
done
91+
fi
6892
fi
6993
if [ -n "$CREATED_TEMP_WORKSPACE" ]; then
70-
rm -rf "$CREATED_TEMP_WORKSPACE"
94+
# Tolerate transient races: even after the kill above, a CEF helper
95+
# may still be flushing CEF/Default/* on a slow Linux runner. The
96+
# workspace is a per-run mktemp under /tmp; anything left behind is
97+
# collected by the next CI tmp-cleanup pass. We must not fail the
98+
# whole job on cleanup leftovers when the test itself passed.
99+
rm -rf "$CREATED_TEMP_WORKSPACE" 2>/dev/null || true
71100
fi
72101
if [ -n "$E2E_CONFIG_BACKUP" ] && [ -f "$E2E_CONFIG_BACKUP" ]; then
73102
mv "$E2E_CONFIG_BACKUP" "$E2E_CONFIG_FILE"

src/openhuman/memory/tree/retrieval/topic.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,20 @@ pub async fn query_topic(
8888
// `total` and waste result slots. For duplicates, keep the higher
8989
// score; if scores tie, prefer the newer `time_range_end`.
9090
// Flagged on PR #831 CodeRabbit review.
91-
use std::collections::HashMap;
92-
let mut by_node: HashMap<String, RetrievalHit> = HashMap::new();
93-
94-
let merge = |map: &mut HashMap<String, RetrievalHit>, hit: RetrievalHit| {
91+
//
92+
// `BTreeMap` (not `HashMap`) so the post-dedup iteration order is a
93+
// deterministic function of `node_id`. The downstream sort is
94+
// stable, so when many hits tie on `(score, time_range_end)` —
95+
// which is common with the inert embedder used in tests and with
96+
// freshly-ingested workspaces where score normalisation hasn't run
97+
// — the surviving order falls back to alphabetical `node_id`
98+
// instead of `HashMap`'s randomised SipHash iteration. Without
99+
// this, `tests/agent_retrieval_e2e.rs::orchestrator_query_topic…`
100+
// picked a different "first leaf hit" on each run.
101+
use std::collections::BTreeMap;
102+
let mut by_node: BTreeMap<String, RetrievalHit> = BTreeMap::new();
103+
104+
let merge = |map: &mut BTreeMap<String, RetrievalHit>, hit: RetrievalHit| {
95105
map.entry(hit.node_id.clone())
96106
.and_modify(|existing| {
97107
let better = match hit
@@ -130,12 +140,17 @@ pub async fn query_topic(
130140
rerank_by_semantic_similarity(config, q, hits).await?
131141
} else {
132142
let mut by_score = hits;
133-
// Sort: score DESC, then newest first on ties.
143+
// Sort: score DESC, then newest first on ties, then `node_id`
144+
// ASC as a final tie-break so two hits that match on every
145+
// ranked dimension still produce a deterministic order across
146+
// runs (matters with the inert embedder used in tests, where
147+
// every score lands at 0.0 and only the `node_id` differs).
134148
by_score.sort_by(|a, b| {
135149
b.score
136150
.partial_cmp(&a.score)
137151
.unwrap_or(std::cmp::Ordering::Equal)
138152
.then_with(|| b.time_range_end.cmp(&a.time_range_end))
153+
.then_with(|| a.node_id.cmp(&b.node_id))
139154
});
140155
by_score
141156
};

tests/agent_retrieval_e2e.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,37 @@ fn test_config() -> (TempDir, Config) {
5454

5555
// ── RAII env guard shared by all tests in this file ──────────────────────────
5656

57+
/// Process-wide mutex that serialises every test in this binary that
58+
/// mutates `OPENHUMAN_WORKSPACE`. Cargo runs integration-test binaries
59+
/// multi-threaded by default (`test-threads = num_cpus`), so without
60+
/// this serialisation two tests would race on the env var: test A sets
61+
/// it to `/tmp/aaa`, test B overwrites it with `/tmp/bbb`, then when
62+
/// B's `TempDir` drops it unlinks `/tmp/bbb` while A is still reading
63+
/// from it. That race surfaced in CI as `SQLITE_IOERR_FSTAT` (error
64+
/// code 1802) during a later `with_connection` call on the now-deleted
65+
/// path, and earlier as `fetch_leaves` returning 0 hits when the
66+
/// resolved workspace temporarily pointed at the wrong sibling test's
67+
/// (otherwise empty) tempdir.
68+
///
69+
/// `unwrap_or_else(|p| p.into_inner())` keeps the lock usable after a
70+
/// poisoning panic so one failing test never cascades.
71+
static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
72+
5773
struct EnvGuard {
5874
key: &'static str,
5975
prev: Option<std::ffi::OsString>,
76+
/// Last field — dropped after `Drop::drop` has already restored
77+
/// the env var, so the next test acquires the lock against a
78+
/// clean `OPENHUMAN_WORKSPACE` value.
79+
_lock: std::sync::MutexGuard<'static, ()>,
6080
}
6181

6282
impl Drop for EnvGuard {
6383
fn drop(&mut self) {
6484
// SAFETY: cargo test runs each integration test binary in its own
65-
// process; nothing else in this bin mutates these env vars, and the
66-
// guard restores the previous value on drop.
85+
// process; the `ENV_LOCK` mutex held in `_lock` serialises all
86+
// mutations within this binary, and the guard restores the
87+
// previous value before the lock is released.
6788
unsafe {
6889
match self.prev.take() {
6990
Some(v) => std::env::set_var(self.key, v),
@@ -77,13 +98,19 @@ impl Drop for EnvGuard {
7798
/// restores the previous value on drop. This makes the tool wrappers (which
7899
/// call `load_config_with_timeout` internally) resolve to the same workspace
79100
/// that was used for ingest.
101+
///
102+
/// The returned guard also holds [`ENV_LOCK`] for its lifetime, so concurrent
103+
/// tests in the same binary cannot stomp on each other's
104+
/// `OPENHUMAN_WORKSPACE` setting.
80105
fn set_workspace_env(tmp: &TempDir) -> EnvGuard {
106+
let lock = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
81107
let prev = std::env::var_os("OPENHUMAN_WORKSPACE");
82108
// SAFETY: see EnvGuard::Drop above.
83109
unsafe { std::env::set_var("OPENHUMAN_WORKSPACE", tmp.path()) };
84110
EnvGuard {
85111
key: "OPENHUMAN_WORKSPACE",
86112
prev,
113+
_lock: lock,
87114
}
88115
}
89116

0 commit comments

Comments
 (0)