Skip to content

Commit ec138c8

Browse files
authored
feat(composio): provider folder modules + user profile persistence (tinyhumansai#523)
* feat(composio): implement profile persistence for user data - Introduced a new `profile` module to handle the persistence of user profile data from various providers into the local `user_profile` facet table. - Enhanced the `composio_get_user_profile` and `fetch_user_profile` functions to call `persist_provider_profile`, ensuring that profile fields like display name, email, and avatar are stored locally for quick access. - Added debug logging to track the number of facets written during the persistence process, improving observability of profile updates. - This change aims to enhance user experience by reducing the need for repeated upstream API calls for frequently accessed profile information. * style: apply cargo fmt to profile.rs and client.rs * fix: PR review — cursor whitespace, stale profile values, test precision, log level - Trim whitespace in cursor_to_gmail_after_filter before parsing so leading/trailing spaces don't silently bypass the date filter. - Change profile_upsert condition from `>` to `>=` so equal-confidence provider refreshes replace stale values instead of only bumping evidence_count. - Tighten epoch millis test to assert exact date "2026/03/31" instead of loose contains('/') check. - Lower profile persistence log from info to debug to reduce noise in normal connect/sync flows.
1 parent ae9ac30 commit ec138c8

11 files changed

Lines changed: 572 additions & 303 deletions

File tree

src/openhuman/composio/ops.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,15 @@ pub async fn composio_get_user_profile(
292292
.await
293293
.map_err(|e| format!("[composio] get_user_profile({toolkit}) failed: {e}"))?;
294294

295+
// Side-effect: persist profile fields into the local user_profile
296+
// facet table so any RPC call also refreshes the local store.
297+
let facets = super::providers::profile::persist_provider_profile(&profile);
298+
tracing::debug!(
299+
toolkit = %toolkit,
300+
facets_written = facets,
301+
"[composio] profile facets persisted from get_user_profile"
302+
);
303+
295304
Ok(RpcOutcome::new(
296305
profile,
297306
vec![format!(

src/openhuman/composio/providers/gmail.rs renamed to src/openhuman/composio/providers/gmail/mod.rs

Lines changed: 10 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
//! number of `execute_tool` calls per calendar day, preventing runaway
1818
//! API usage during large initial backfills.
1919
20+
mod sync;
21+
#[cfg(test)]
22+
mod tests;
23+
2024
use async_trait::async_trait;
2125
use serde_json::{json, Value};
2226

@@ -151,7 +155,7 @@ impl ComposioProvider for GmailProvider {
151155
}
152156

153157
async fn sync(&self, ctx: &ProviderContext, reason: SyncReason) -> Result<SyncOutcome, String> {
154-
let started_at_ms = now_ms();
158+
let started_at_ms = sync::now_ms();
155159
let connection_id = ctx
156160
.connection_id
157161
.clone()
@@ -181,7 +185,7 @@ impl ComposioProvider for GmailProvider {
181185
reason: reason.as_str().to_string(),
182186
items_ingested: 0,
183187
started_at_ms,
184-
finished_at_ms: now_ms(),
188+
finished_at_ms: sync::now_ms(),
185189
summary: "gmail sync skipped: daily budget exhausted".to_string(),
186190
details: json!({ "budget_exhausted": true }),
187191
});
@@ -212,7 +216,7 @@ impl ComposioProvider for GmailProvider {
212216
// returns newer mail.
213217
let mut query = "in:inbox -in:spam -in:trash".to_string();
214218
if let Some(ref cursor) = state.cursor {
215-
if let Some(date_filter) = cursor_to_gmail_after_filter(cursor) {
219+
if let Some(date_filter) = sync::cursor_to_gmail_after_filter(cursor) {
216220
query.push_str(&format!(" after:{date_filter}"));
217221
tracing::debug!(
218222
page = page_num,
@@ -252,7 +256,7 @@ impl ComposioProvider for GmailProvider {
252256
));
253257
}
254258

255-
let messages = extract_messages(&resp.data);
259+
let messages = sync::extract_messages(&resp.data);
256260
total_fetched += messages.len();
257261

258262
if messages.is_empty() {
@@ -336,7 +340,7 @@ impl ComposioProvider for GmailProvider {
336340
}
337341

338342
// Check for next page token.
339-
page_token = extract_page_token(&resp.data);
343+
page_token = sync::extract_page_token(&resp.data);
340344
if page_token.is_none() {
341345
tracing::debug!(page = page_num, "[composio:gmail] no next page token, done");
342346
break;
@@ -349,7 +353,7 @@ impl ComposioProvider for GmailProvider {
349353
}
350354
state.save(&memory).await?;
351355

352-
let finished_at_ms = now_ms();
356+
let finished_at_ms = sync::now_ms();
353357
let summary = format!(
354358
"gmail sync ({reason}): fetched {total_fetched}, persisted {total_persisted} new, \
355359
budget remaining {remaining}",
@@ -408,144 +412,3 @@ impl ComposioProvider for GmailProvider {
408412
Ok(())
409413
}
410414
}
411-
412-
// ── helpers ────────────────────────────────────────────────────────
413-
414-
/// Walk the Composio response envelope and pull out message objects.
415-
fn extract_messages(data: &Value) -> Vec<Value> {
416-
let candidates = [
417-
data.pointer("/data/messages"),
418-
data.pointer("/messages"),
419-
data.pointer("/data/data/messages"),
420-
data.pointer("/data/items"),
421-
data.pointer("/items"),
422-
];
423-
for cand in candidates.into_iter().flatten() {
424-
if let Some(arr) = cand.as_array() {
425-
return arr.clone();
426-
}
427-
}
428-
Vec::new()
429-
}
430-
431-
/// Try to extract a pagination token from the API response.
432-
fn extract_page_token(data: &Value) -> Option<String> {
433-
let candidates = [
434-
data.pointer("/data/nextPageToken"),
435-
data.pointer("/nextPageToken"),
436-
data.pointer("/data/data/nextPageToken"),
437-
];
438-
for cand in candidates.into_iter().flatten() {
439-
if let Some(s) = cand.as_str() {
440-
let trimmed = s.trim();
441-
if !trimmed.is_empty() {
442-
return Some(trimmed.to_string());
443-
}
444-
}
445-
}
446-
None
447-
}
448-
449-
/// Convert a cursor value (epoch millis or date string) into a Gmail
450-
/// `after:YYYY/MM/DD` filter component. Returns `None` if the cursor
451-
/// cannot be parsed.
452-
fn cursor_to_gmail_after_filter(cursor: &str) -> Option<String> {
453-
// Try parsing as epoch millis first (Gmail's internalDate).
454-
if let Ok(millis) = cursor.parse::<i64>() {
455-
let secs = millis / 1000;
456-
if let Some(dt) = chrono::DateTime::from_timestamp(secs, 0) {
457-
return Some(dt.format("%Y/%m/%d").to_string());
458-
}
459-
}
460-
// Try parsing as an ISO date/datetime.
461-
if let Ok(dt) = chrono::NaiveDate::parse_from_str(cursor, "%Y-%m-%d") {
462-
return Some(dt.format("%Y/%m/%d").to_string());
463-
}
464-
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(cursor) {
465-
return Some(dt.format("%Y/%m/%d").to_string());
466-
}
467-
None
468-
}
469-
470-
fn now_ms() -> u64 {
471-
use std::time::{SystemTime, UNIX_EPOCH};
472-
SystemTime::now()
473-
.duration_since(UNIX_EPOCH)
474-
.map(|d| d.as_millis() as u64)
475-
.unwrap_or(0)
476-
}
477-
478-
#[cfg(test)]
479-
mod tests {
480-
use super::*;
481-
482-
#[test]
483-
fn extract_messages_finds_data_messages() {
484-
let v = json!({
485-
"data": { "messages": [{"id": "m1"}, {"id": "m2"}] },
486-
"successful": true,
487-
});
488-
assert_eq!(extract_messages(&v).len(), 2);
489-
}
490-
491-
#[test]
492-
fn extract_messages_finds_top_level_messages() {
493-
let v = json!({ "messages": [{"id": "m1"}] });
494-
assert_eq!(extract_messages(&v).len(), 1);
495-
}
496-
497-
#[test]
498-
fn extract_messages_returns_empty_when_missing() {
499-
let v = json!({ "data": { "other": [] } });
500-
assert_eq!(extract_messages(&v).len(), 0);
501-
}
502-
503-
#[test]
504-
fn extract_page_token_finds_nested() {
505-
let v = json!({ "data": { "nextPageToken": "tok123" } });
506-
assert_eq!(extract_page_token(&v), Some("tok123".to_string()));
507-
}
508-
509-
#[test]
510-
fn extract_page_token_none_when_missing() {
511-
let v = json!({ "data": {} });
512-
assert_eq!(extract_page_token(&v), None);
513-
}
514-
515-
#[test]
516-
fn cursor_to_filter_from_epoch_millis() {
517-
// 2026-04-01 00:00:00 UTC in millis
518-
let millis = "1774915200000";
519-
let filter = cursor_to_gmail_after_filter(millis);
520-
assert!(filter.is_some());
521-
// Should produce a YYYY/MM/DD date.
522-
let f = filter.unwrap();
523-
assert!(f.contains('/'), "Expected date with slashes, got {f}");
524-
}
525-
526-
#[test]
527-
fn cursor_to_filter_from_iso_date() {
528-
assert_eq!(
529-
cursor_to_gmail_after_filter("2026-03-15"),
530-
Some("2026/03/15".to_string())
531-
);
532-
}
533-
534-
#[test]
535-
fn cursor_to_filter_from_rfc3339() {
536-
let f = cursor_to_gmail_after_filter("2026-03-15T12:00:00Z");
537-
assert_eq!(f, Some("2026/03/15".to_string()));
538-
}
539-
540-
#[test]
541-
fn cursor_to_filter_returns_none_for_garbage() {
542-
assert_eq!(cursor_to_gmail_after_filter("not-a-date"), None);
543-
}
544-
545-
#[test]
546-
fn provider_metadata_is_stable() {
547-
let p = GmailProvider::new();
548-
assert_eq!(p.toolkit_slug(), "gmail");
549-
assert_eq!(p.sync_interval_secs(), Some(15 * 60));
550-
}
551-
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
//! Gmail sync helpers — message extraction, pagination, cursor
2+
//! conversion, and time utilities.
3+
4+
use serde_json::Value;
5+
6+
/// Walk the Composio response envelope and pull out message objects.
7+
pub(crate) fn extract_messages(data: &Value) -> Vec<Value> {
8+
let candidates = [
9+
data.pointer("/data/messages"),
10+
data.pointer("/messages"),
11+
data.pointer("/data/data/messages"),
12+
data.pointer("/data/items"),
13+
data.pointer("/items"),
14+
];
15+
for cand in candidates.into_iter().flatten() {
16+
if let Some(arr) = cand.as_array() {
17+
return arr.clone();
18+
}
19+
}
20+
Vec::new()
21+
}
22+
23+
/// Try to extract a pagination token from the API response.
24+
pub(crate) fn extract_page_token(data: &Value) -> Option<String> {
25+
let candidates = [
26+
data.pointer("/data/nextPageToken"),
27+
data.pointer("/nextPageToken"),
28+
data.pointer("/data/data/nextPageToken"),
29+
];
30+
for cand in candidates.into_iter().flatten() {
31+
if let Some(s) = cand.as_str() {
32+
let trimmed = s.trim();
33+
if !trimmed.is_empty() {
34+
return Some(trimmed.to_string());
35+
}
36+
}
37+
}
38+
None
39+
}
40+
41+
/// Convert a cursor value (epoch millis or date string) into a Gmail
42+
/// `after:YYYY/MM/DD` filter component. Returns `None` if the cursor
43+
/// cannot be parsed.
44+
pub(crate) fn cursor_to_gmail_after_filter(cursor: &str) -> Option<String> {
45+
let cursor = cursor.trim();
46+
// Try parsing as epoch millis first (Gmail's internalDate).
47+
if let Ok(millis) = cursor.parse::<i64>() {
48+
let secs = millis / 1000;
49+
if let Some(dt) = chrono::DateTime::from_timestamp(secs, 0) {
50+
return Some(dt.format("%Y/%m/%d").to_string());
51+
}
52+
}
53+
// Try parsing as an ISO date/datetime.
54+
if let Ok(dt) = chrono::NaiveDate::parse_from_str(cursor, "%Y-%m-%d") {
55+
return Some(dt.format("%Y/%m/%d").to_string());
56+
}
57+
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(cursor) {
58+
return Some(dt.format("%Y/%m/%d").to_string());
59+
}
60+
None
61+
}
62+
63+
pub(crate) fn now_ms() -> u64 {
64+
use std::time::{SystemTime, UNIX_EPOCH};
65+
SystemTime::now()
66+
.duration_since(UNIX_EPOCH)
67+
.map(|d| d.as_millis() as u64)
68+
.unwrap_or(0)
69+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
//! Unit tests for the Gmail provider.
2+
3+
use super::sync::{cursor_to_gmail_after_filter, extract_messages, extract_page_token};
4+
use super::GmailProvider;
5+
use crate::openhuman::composio::providers::ComposioProvider;
6+
use serde_json::json;
7+
8+
#[test]
9+
fn extract_messages_finds_data_messages() {
10+
let v = json!({
11+
"data": { "messages": [{"id": "m1"}, {"id": "m2"}] },
12+
"successful": true,
13+
});
14+
assert_eq!(extract_messages(&v).len(), 2);
15+
}
16+
17+
#[test]
18+
fn extract_messages_finds_top_level_messages() {
19+
let v = json!({ "messages": [{"id": "m1"}] });
20+
assert_eq!(extract_messages(&v).len(), 1);
21+
}
22+
23+
#[test]
24+
fn extract_messages_returns_empty_when_missing() {
25+
let v = json!({ "data": { "other": [] } });
26+
assert_eq!(extract_messages(&v).len(), 0);
27+
}
28+
29+
#[test]
30+
fn extract_page_token_finds_nested() {
31+
let v = json!({ "data": { "nextPageToken": "tok123" } });
32+
assert_eq!(extract_page_token(&v), Some("tok123".to_string()));
33+
}
34+
35+
#[test]
36+
fn extract_page_token_none_when_missing() {
37+
let v = json!({ "data": {} });
38+
assert_eq!(extract_page_token(&v), None);
39+
}
40+
41+
#[test]
42+
fn cursor_to_filter_from_epoch_millis() {
43+
// 1774915200000 ms = 2026-03-31 UTC
44+
let millis = "1774915200000";
45+
assert_eq!(
46+
cursor_to_gmail_after_filter(millis),
47+
Some("2026/03/31".to_string())
48+
);
49+
}
50+
51+
#[test]
52+
fn cursor_to_filter_from_iso_date() {
53+
assert_eq!(
54+
cursor_to_gmail_after_filter("2026-03-15"),
55+
Some("2026/03/15".to_string())
56+
);
57+
}
58+
59+
#[test]
60+
fn cursor_to_filter_from_rfc3339() {
61+
let f = cursor_to_gmail_after_filter("2026-03-15T12:00:00Z");
62+
assert_eq!(f, Some("2026/03/15".to_string()));
63+
}
64+
65+
#[test]
66+
fn cursor_to_filter_returns_none_for_garbage() {
67+
assert_eq!(cursor_to_gmail_after_filter("not-a-date"), None);
68+
}
69+
70+
#[test]
71+
fn provider_metadata_is_stable() {
72+
let p = GmailProvider::new();
73+
assert_eq!(p.toolkit_slug(), "gmail");
74+
assert_eq!(p.sync_interval_secs(), Some(15 * 60));
75+
}

src/openhuman/composio/providers/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use super::client::{build_composio_client, ComposioClient};
4242

4343
pub mod gmail;
4444
pub mod notion;
45+
pub mod profile;
4546
pub mod registry;
4647
pub mod sync_state;
4748

@@ -262,6 +263,17 @@ pub trait ComposioProvider: Send + Sync {
262263
email_domain = ?email_domain,
263264
"[composio:provider] user profile fetched"
264265
);
266+
267+
// Persist profile fields into the local user_profile
268+
// facet table so display_name / email / avatar are
269+
// available to the agent context and UI without a
270+
// round-trip to the upstream provider.
271+
let facets = profile::persist_provider_profile(&profile);
272+
tracing::debug!(
273+
toolkit = %toolkit,
274+
facets_written = facets,
275+
"[composio:provider] profile facets persisted"
276+
);
265277
}
266278
Err(e) => {
267279
tracing::warn!(

0 commit comments

Comments
 (0)