Skip to content

Commit 12ef69b

Browse files
authored
feat(learning): ambient personalization cache for tinyhumansai#566 (tinyhumansai#1460)
1 parent 065e198 commit 12ef69b

31 files changed

Lines changed: 7656 additions & 146 deletions

docs/AGENT_SELF_LEARNING.md

Lines changed: 368 additions & 0 deletions
Large diffs are not rendered by default.

src/core/event_bus/events.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,55 @@ pub enum DomainEvent {
353353
routed: bool,
354354
},
355355

356+
// ── Memory tree ─────────────────────────────────────────────────────
357+
/// A document (chat batch, email thread, or standalone document) was
358+
/// fully canonicalised and its chunks written to the memory tree.
359+
///
360+
/// Emitted by `memory::tree::ingest::persist()` after the chunk upsert
361+
/// and extract-job enqueue complete. Subscribers (Phase 2 producers such
362+
/// as the email-signature parser) react to this to inspect the
363+
/// canonicalised content.
364+
DocumentCanonicalized {
365+
/// The source identifier passed to the ingest call (e.g. `"gmail:abc"`,
366+
/// `"conversations:agent"`).
367+
source_id: String,
368+
/// Kind of content — `"chat"`, `"email"`, `"document"`.
369+
source_kind: String,
370+
/// Number of chunks written to `vector_chunks` in this ingest.
371+
chunks_written: usize,
372+
/// IDs of the chunks that were written.
373+
chunk_ids: Vec<String>,
374+
/// Wall-clock seconds since epoch when canonicalisation completed.
375+
canonicalized_at: f64,
376+
/// Last ≤ 2 048 characters of the canonicalised markdown body.
377+
///
378+
/// Populated for `email` and `document` sources so that lightweight
379+
/// subscribers (e.g. the email-signature parser) can inspect trailing
380+
/// content without hitting disk. `None` for `chat` sources where the
381+
/// content is conversational and doesn't contain signature-style structure.
382+
body_preview: Option<String>,
383+
},
384+
385+
// ── Learning ─────────────────────────────────────────────────────────
386+
/// The stability detector finished a full cache rebuild cycle.
387+
///
388+
/// Emitted by `learning::stability_detector` (Phase 3) after writing
389+
/// the new snapshot to `user_profile_facets`. Subscribers (Phase 4
390+
/// `profile_md_renderer`) react to re-render the `PROFILE.md` managed
391+
/// blocks.
392+
CacheRebuilt {
393+
/// Number of facets added in this cycle.
394+
added: usize,
395+
/// Number of facets evicted (below τ_evict threshold) in this cycle.
396+
evicted: usize,
397+
/// Number of facets unchanged / carried over.
398+
kept: usize,
399+
/// Total facets in the cache after the rebuild.
400+
total_size: usize,
401+
/// Wall-clock seconds since epoch when the rebuild completed.
402+
rebuilt_at: f64,
403+
},
404+
356405
// ── System lifecycle ────────────────────────────────────────────────
357406
/// A system component started up.
358407
SystemStartup { component: String },
@@ -389,7 +438,10 @@ impl DomainEvent {
389438
| Self::MemoryRecalled { .. }
390439
| Self::MemorySyncRequested { .. }
391440
| Self::MemoryIngestionStarted { .. }
392-
| Self::MemoryIngestionCompleted { .. } => "memory",
441+
| Self::MemoryIngestionCompleted { .. }
442+
| Self::DocumentCanonicalized { .. } => "memory",
443+
444+
Self::CacheRebuilt { .. } => "learning",
393445

394446
Self::ChannelInboundMessage { .. }
395447
| Self::ChannelMessageReceived { .. }

src/core/event_bus/events_tests.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,29 @@ fn all_variants_have_correct_domain() {
415415
},
416416
"system",
417417
),
418+
// Memory tree
419+
(
420+
DomainEvent::DocumentCanonicalized {
421+
source_id: "gmail:abc".into(),
422+
source_kind: "email".into(),
423+
chunks_written: 3,
424+
chunk_ids: vec!["c1".into(), "c2".into(), "c3".into()],
425+
canonicalized_at: 1_700_000_000.0,
426+
body_preview: Some("Thanks,\nAlice".into()),
427+
},
428+
"memory",
429+
),
430+
// Learning
431+
(
432+
DomainEvent::CacheRebuilt {
433+
added: 2,
434+
evicted: 1,
435+
kept: 5,
436+
total_size: 7,
437+
rebuilt_at: 1_700_000_000.0,
438+
},
439+
"learning",
440+
),
418441
];
419442

420443
for (event, expected_domain) in cases {

src/openhuman/agent/harness/archivist.rs

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,19 @@
66
//! 2. Manages conversation segments (boundary detection + lifecycle).
77
//! 3. On segment close: extracts events (heuristic) and updates user profile.
88
//! 4. Extracts simple lessons from tool failures.
9+
//! 5. (Phase 1 / #566) Pipes the turn into the memory tree as `conversations:agent`
10+
//! when `config.learning.chat_to_tree_enabled` is true.
911
1012
use crate::openhuman::agent::hooks::{PostTurnHook, TurnContext};
13+
use crate::openhuman::config::Config;
1114
use crate::openhuman::memory::store::events::{self, EventRecord, EventType};
1215
use crate::openhuman::memory::store::fts5::{self, EpisodicEntry};
1316
use crate::openhuman::memory::store::profile::{self, FacetType};
1417
use crate::openhuman::memory::store::segments::{
1518
self, BoundaryConfig, BoundaryDecision, ConversationSegment,
1619
};
20+
use crate::openhuman::memory::tree::canonicalize::chat::{ChatBatch, ChatMessage};
21+
use crate::openhuman::memory::tree::ingest;
1722
use async_trait::async_trait;
1823
use parking_lot::Mutex;
1924
use rusqlite::Connection;
@@ -31,24 +36,43 @@ pub struct ArchivistHook {
3136
enabled: bool,
3237
/// Boundary detection configuration.
3338
boundary_config: BoundaryConfig,
39+
/// Optional runtime config — used to gate the tree-ingest path.
40+
///
41+
/// When `None`, the tree-ingest path is skipped. Set via
42+
/// [`ArchivistHook::with_config`] on the production path.
43+
config: Option<Config>,
3444
}
3545

3646
impl ArchivistHook {
3747
/// Create an Archivist hook with a shared SQLite connection.
48+
///
49+
/// Tree-ingest is disabled by default; call [`Self::with_config`] to
50+
/// enable it on the production path.
3851
pub fn new(conn: Arc<Mutex<Connection>>, enabled: bool) -> Self {
3952
Self {
4053
conn: Some(conn),
4154
enabled,
4255
boundary_config: BoundaryConfig::default(),
56+
config: None,
4357
}
4458
}
4559

60+
/// Attach runtime config so the archivist can gate the tree-ingest path.
61+
///
62+
/// When `config.learning.chat_to_tree_enabled` is `true`, each completed
63+
/// turn is also piped into the memory tree as `source="conversations:agent"`.
64+
pub fn with_config(mut self, config: Config) -> Self {
65+
self.config = Some(config);
66+
self
67+
}
68+
4669
/// Create a disabled/no-op Archivist (when FTS5 is not enabled).
4770
pub fn disabled() -> Self {
4871
Self {
4972
conn: None,
5073
enabled: false,
5174
boundary_config: BoundaryConfig::default(),
75+
config: None,
5276
}
5377
}
5478

@@ -365,11 +389,173 @@ impl PostTurnHook for ArchivistHook {
365389
current_episodic_id,
366390
);
367391

392+
// ── Phase 1 / #566: pipe turn into the memory tree ───────────────────
393+
// Gate: only when config is attached and chat_to_tree_enabled is true.
394+
// Non-fatal: if tree-ingest fails, the episodic write already succeeded
395+
// and the turn result is not affected.
396+
if let Some(ref cfg) = self.config {
397+
if cfg.learning.chat_to_tree_enabled {
398+
tracing::debug!(
399+
"[archivist] piping turn into tree as conversations:agent session={}",
400+
session_id
401+
);
402+
self.pipe_turn_to_tree(cfg, ctx, session_id, timestamp)
403+
.await;
404+
}
405+
}
406+
368407
tracing::debug!("[archivist] turn indexed successfully");
369408
Ok(())
370409
}
371410
}
372411

412+
impl ArchivistHook {
413+
/// Pipe the completed turn into the memory tree as `source="conversations:agent"`.
414+
///
415+
/// Tool-call JSON is stripped from the assistant text before ingest — only
416+
/// the assistant's prose response flows into the tree (memory ingestion
417+
/// policy: tool outputs must not reach memory).
418+
///
419+
/// Failures are logged and swallowed; the episodic write is the source of
420+
/// truth.
421+
async fn pipe_turn_to_tree(
422+
&self,
423+
config: &Config,
424+
ctx: &TurnContext,
425+
session_id: &str,
426+
timestamp: f64,
427+
) {
428+
use chrono::{TimeZone, Utc};
429+
430+
// Build turn timestamps. The assistant message is offset by 1ms as in
431+
// the episodic write so ordering is stable.
432+
let user_ts = Utc
433+
.timestamp_opt(
434+
timestamp as i64,
435+
((timestamp.fract() * 1e9) as u32).min(999_999_999),
436+
)
437+
.single()
438+
.unwrap_or_else(Utc::now);
439+
let asst_ts = Utc
440+
.timestamp_opt(
441+
(timestamp + 0.001) as i64,
442+
(((timestamp + 0.001).fract() * 1e9) as u32).min(999_999_999),
443+
)
444+
.single()
445+
.unwrap_or(user_ts);
446+
447+
// Strip tool-call JSON from the assistant response.
448+
// Per memory ingestion policy, structured tool-call payloads must not
449+
// flow into the tree — only the prose response is ingested.
450+
let assistant_prose = strip_tool_calls_from_response(&ctx.assistant_response);
451+
452+
let batch = ChatBatch {
453+
platform: "agent".into(),
454+
channel_label: session_id.to_string(),
455+
messages: vec![
456+
ChatMessage {
457+
author: "user".into(),
458+
timestamp: user_ts,
459+
text: ctx.user_message.clone(),
460+
source_ref: Some(format!("agent://session/{session_id}")),
461+
},
462+
ChatMessage {
463+
author: "assistant".into(),
464+
timestamp: asst_ts,
465+
text: assistant_prose,
466+
source_ref: Some(format!("agent://session/{session_id}")),
467+
},
468+
],
469+
};
470+
471+
// Use the session_id as the owner / identity tag.
472+
let source_id = "conversations:agent";
473+
let owner = session_id;
474+
let tags = vec!["agent_chat".to_string()];
475+
476+
match ingest::ingest_chat(config, source_id, owner, tags, batch).await {
477+
Ok(result) => {
478+
tracing::debug!(
479+
"[archivist] tree ingest ok: source_id={} chunks_written={} session={}",
480+
source_id,
481+
result.chunks_written,
482+
session_id
483+
);
484+
}
485+
Err(e) => {
486+
tracing::warn!(
487+
"[archivist] tree ingest failed (non-fatal): source_id={} session={} error={e}",
488+
source_id,
489+
session_id
490+
);
491+
}
492+
}
493+
}
494+
}
495+
496+
/// Strip tool-call JSON blocks from an assistant response, leaving only the
497+
/// prose text.
498+
///
499+
/// The archivist stores the full response (including `tool_calls_json`) in
500+
/// the episodic log for diagnostic purposes. However, per the memory
501+
/// ingestion policy, structured tool-call payloads must not reach the memory
502+
/// tree — only the assistant's natural-language prose is ingested.
503+
///
504+
/// This function applies a lightweight heuristic: it removes any contiguous
505+
/// spans of text that look like `<tool_call>…</tool_call>` XML/JSON blocks or
506+
/// raw JSON objects that begin with `{"tool_calls":`. The output may be empty
507+
/// if the entire response was tool-call markup — callers should handle that
508+
/// case (empty text → no-op ingest).
509+
fn strip_tool_calls_from_response(response: &str) -> String {
510+
// Fast path: if the response contains no obvious tool-call markers, return
511+
// it unchanged to avoid unnecessary allocation.
512+
if !response.contains("<tool_call>")
513+
&& !response.contains("{\"tool_calls\"")
514+
&& !response.contains("\"tool_use\"")
515+
{
516+
return response.to_string();
517+
}
518+
519+
// Remove XML-style tool-call blocks.
520+
let mut cleaned = response.to_string();
521+
522+
// Strip <tool_call>…</tool_call> spans (may span multiple lines).
523+
while let Some(start) = cleaned.find("<tool_call>") {
524+
if let Some(end) = cleaned[start..].find("</tool_call>") {
525+
cleaned.drain(start..start + end + "</tool_call>".len());
526+
} else {
527+
// Unclosed tag — remove from the tag to end of string.
528+
cleaned.truncate(start);
529+
break;
530+
}
531+
}
532+
533+
// Trim and collapse runs of blank lines left by block removal.
534+
let trimmed = cleaned
535+
.lines()
536+
.map(str::trim_end)
537+
.collect::<Vec<_>>()
538+
.join("\n");
539+
540+
// Collapse more than two consecutive newlines to two.
541+
let mut result = String::with_capacity(trimmed.len());
542+
let mut blank_run = 0usize;
543+
for line in trimmed.lines() {
544+
if line.is_empty() {
545+
blank_run += 1;
546+
if blank_run <= 2 {
547+
result.push('\n');
548+
}
549+
} else {
550+
blank_run = 0;
551+
result.push_str(line);
552+
result.push('\n');
553+
}
554+
}
555+
556+
result.trim().to_string()
557+
}
558+
373559
/// Extract simple lessons from tool call outcomes (no LLM needed).
374560
fn extract_lesson_from_tools(
375561
tool_calls: &[crate::openhuman::agent::hooks::ToolCallRecord],

src/openhuman/agent/harness/session/builder.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,8 @@ impl Agent {
762762
.add_section(Box::new(
763763
crate::openhuman::learning::UserProfileSection::new(memory.clone()),
764764
));
765+
// NOTE: MemoryAccessSection is added after tool-filtering so we can
766+
// gate it on retrieval-tool visibility — see below.
765767
log::info!(
766768
"[learning] prompt sections registered (user_reflections, learned_context, user_profile)"
767769
);
@@ -928,6 +930,35 @@ impl Agent {
928930
.map(|t| t.name().to_string())
929931
.collect(),
930932
};
933+
934+
// Phase 4 (#566): add the MemoryAccessSection bias instruction only
935+
// when at least one retrieval tool is actually loaded AND survives
936+
// filtering. We require both because:
937+
// - the tool may be filtered out by the agent's scope config
938+
// - the tool may not be registered at all on this agent (tool
939+
// listing is build-time configurable)
940+
// An empty `visible` set means "no filter" (wildcard / orchestrator
941+
// path); in that case any registered retrieval tool is reachable.
942+
if config.learning.enabled {
943+
let recall_tools = ["memory_recall", "memory_search"];
944+
let has_retrieval = recall_tools.iter().any(|name| {
945+
let registered = tools.iter().any(|t| t.name() == *name)
946+
|| delegation_tools.iter().any(|t| t.name() == *name);
947+
let allowed_by_filter = visible.is_empty() || visible.contains(*name);
948+
registered && allowed_by_filter
949+
});
950+
if has_retrieval {
951+
prompt_builder = prompt_builder
952+
.add_section(Box::new(crate::openhuman::learning::MemoryAccessSection));
953+
log::debug!("[learning] memory_access prompt section registered");
954+
} else {
955+
log::debug!(
956+
"[learning] skipping MemoryAccessSection — neither memory_recall nor \
957+
memory_search is registered+visible for agent={agent_id}"
958+
);
959+
}
960+
}
961+
931962
// De-duplicate: some synthesised tool names may collide with
932963
// already-registered tools (unlikely for `delegate_*` names but
933964
// cheap to guard against).

0 commit comments

Comments
 (0)