Skip to content

Commit caa3d79

Browse files
fix: trivial fix for pro function (#201)
## What type of PR is this? - [ ] feat (new feature) - [ ] fix (bug fix) - [ ] docs (documentation) - [ ] style (formatting, no code change) - [x] refactor (code change that neither fixes a bug nor adds a feature) - [ ] perf (performance improvement) - [ ] test (adding or updating tests) - [ ] chore (maintenance, tooling) - [ ] build / ci (build or CI changes) ## Which issue(s) this PR fixes Fixes # matrixorigin/memoria-website#98 ## What this PR does / why we need it ### Scope These edits extend Memoria’s storage/API/MCP layers for **group collaboration**, **branch metadata**, **list filtering**, **MatrixOne compatibility**, and **operational observability**. They also align tests with updated function signatures. --- ### 1. `memoria-storage` (`store.rs` + tests) **Schema / migrations** - **`CURRENT_USER_SCHEMA_VERSION`**: bumped from `1` → **`2`** so existing user databases run compat migrations on next startup. - **`author_id` on `mem_memories`**: migration is **idempotent** — adds column only if missing (information_schema check); **`AFTER user_id` removed** for broader MatrixOne compatibility. - **`idx_author`**: added only if missing (handles “column exists but index missing”). - **Branch tables** (`mem_branches.table_name` → physical branch tables): **same `author_id` + `idx_author` migration** applied per active branch table. Rationale: upstream group mode added `author_id` to inserts; **legacy branch tables** created before that column may not have it; zero-copy branches don’t automatically pick up a later `ALTER` on `mem_memories`. **Behavior** - **`active_table`**: when `mem_branches` has metadata for the active branch, **trust it** and **skip** `information_schema.tables` existence checks — MatrixOne zero-copy branch tables may not appear reliably there; failures surface on actual DML instead. - **`list_branches`**: return type extended to **`(name, table_name, created_at)`**; query adds **`ORDER BY created_at ASC`** for stable ordering. - **`list_active_lite`**: new parameter **`trust_tier: Option<&str>`** with SQL filtering when set. **Tests** - `branch_ops.rs`: destructuring updated for 3-tuple `list_branches`. - `store_crud.rs`: `list_active_lite(..., None)` extended with extra **`None`** for `trust_tier`. --- ### 2. `memoria-service` (`service.rs`) - **`list_active_filtered`**: accepts **`trust_tier: Option<&str>`** and filters in-memory when provided (consistent with REST/MCP). --- ### 3. `memoria-api` **`routes/memory.rs`** - **`ListQuery`**: optional **`trust_tier`** query param wired through to **`list_active_filtered`**. **`routes/snapshots.rs`** - **`list_branches` JSON**: includes **`created_at`** where available (tuple destructuring updated for new storage return type). **`routes/admin.rs`** - **`user_stats`**: counting memories for **group scopes** (`grp_*`) adjusted so counts are not incorrectly filtered by personal `user_id` only. - **New handler `user_branch_stats`**: **`GET /admin/users/:user_id/branch-stats`** — returns per-branch memory counts (uses main count + iterates branches via `list_branches`), with defensive error handling per branch. **`lib.rs`** - Registers the new admin route above. --- ### 4. `memoria-mcp` **`server.rs`** - **`ACTOR_USER_ID`** (task-local): captured before **`spawn_blocking`** and re-entered inside the blocking closure so **active branch context matches** between async REST/MCP paths and blocking Git/storage work (fixes Dashboard vs MCP branch mismatch when using the same API key). **`tools.rs`** - **`memory_list` / list path**: passes **`trust_tier`** from tool args into **`list_active_filtered`**. **`git_tools.rs`** - All **`list_branches`** call sites updated for **3-tuple** return (third element ignored where unused). --- ### 5. `memoria-git` (`service.rs` + `Cargo.toml`) **MatrixOne DDL reliability** - **`exec_ddl`**: retries up to **3 attempts** with backoff when the DB returns **20631** / **`txn need retry`** (“def changed” under RC mode). Typical trigger: concurrent **`data branch merge`** vs **`ALTER TABLE`** on branch tables during migration (transient; retry is the intended client-side mitigation). **Diff / compatibility** - **`data branch diff`**: **`columns (...)` clause removed** from SQL — filtered in Rust instead — for **MatrixOne version differences** (local vs cloud). - **`FIXME(memoria-team)`** comment documenting **duplicate `memory_id` rows** in diff results / UI linkage — left for upstream; not “fixed” here. **Dependencies** - **`tokio`** moved from **dev-dependencies only** to **normal dependency** (needed for **`tokio::time::sleep`** in `exec_ddl` retry loop). --- ### 6. Integration tests (`memoria-mcp`) - **`branch_e2e.rs`**, **`integration_full.rs`**: tuple destructuring updated for **`list_branches`** 3-tuple. --- ### Risks / review focus 1. **Migration + DDL concurrency**: `author_id` ALTERs on branch tables can overlap with **`data branch merge`** — mitigated by **`exec_ddl` retry**; reviewers may want to confirm whether long-term **serialization** or **migration ordering** is preferable. 2. **`CURRENT_USER_SCHEMA_VERSION = 2`**: ensures one migration pass; coordinate with upstream versioning if they already use `2` differently. 3. **Admin `user_branch_stats`**: new surface area — auth / master-key expectations should match existing admin routes. 4. **Known open issue**: **`FIXME`** in `memoria-git` for duplicate diff rows — document for Memoria team. --- ### How to verify locally - **`cargo check`** / **`cargo test`** with a running MatrixOne and **`DATABASE_URL`** pointing at the correct port (e.g. **`6001`** if that’s where MO listens). Note: **`group_collab_api`** defaults to port **`6666`** if **`DATABASE_URL`** is unset — set env explicitly if tests fail with DB timeouts. --- ### Files changed (from your status) | Area | Files | |------|--------| | API | `memoria-api/src/lib.rs`, `routes/admin.rs`, `routes/memory.rs`, `routes/snapshots.rs` | | Git / DDL | `memoria-git/Cargo.toml`, `memoria-git/src/service.rs` | | MCP | `memoria-mcp/src/git_tools.rs`, `server.rs`, `tools.rs`, tests `branch_e2e.rs`, `integration_full.rs` | | Service | `memoria-service/src/service.rs` | | Storage | `memoria-storage/src/store.rs`, tests `branch_ops.rs`, `store_crud.rs` | --- You can paste this block into a PR description or share it as an internal review brief. If you want a shorter “changelog” paragraph only, say so and I’ll trim it.
1 parent 357d44c commit caa3d79

17 files changed

Lines changed: 905 additions & 104 deletions

File tree

memoria/crates/memoria-api/src/auth.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ async fn cached_or_db_principal(token: &str, state: &AppState) -> Option<CachedA
8383
Some(principal)
8484
}
8585

86-
async fn group_main_write_allowed_for_solo_owner(
86+
pub(crate) async fn group_main_write_allowed_for_solo_owner(
8787
state: &AppState,
8888
group_id: &str,
8989
user_id: &str,

memoria/crates/memoria-api/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,10 @@ pub fn build_router(state: AppState) -> Router {
358358
"/admin/users/:user_id/stats",
359359
get(routes::admin::user_stats),
360360
)
361+
.route(
362+
"/admin/users/:user_id/branch-stats",
363+
get(routes::admin::user_branch_stats),
364+
)
361365
.route(
362366
"/admin/users/:user_id/call-stats",
363367
get(routes::admin::user_call_stats),

memoria/crates/memoria-api/src/routes/admin.rs

Lines changed: 121 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,13 +165,25 @@ pub async fn user_stats(
165165
auth.require_master()?;
166166
let user_store = get_user_store(&state, &user_id).await?;
167167
let memories_table = user_store.t("mem_memories");
168-
let memory_count = sqlx::query_scalar::<_, i64>(&format!(
169-
"SELECT COUNT(*) FROM {memories_table} WHERE user_id = ? AND is_active > 0"
170-
))
171-
.bind(&user_id)
172-
.fetch_one(user_store.pool())
173-
.await
174-
.map_err(db_err)?;
168+
// For group scopes (user_id starts with "grp_") count ALL active memories in the
169+
// group database — the individual user_id filter would always return 0 since
170+
// memories are attributed to each member's uid, not the group id itself.
171+
let memory_count = if user_id.starts_with("grp_") {
172+
sqlx::query_scalar::<_, i64>(&format!(
173+
"SELECT COUNT(*) FROM {memories_table} WHERE is_active > 0"
174+
))
175+
.fetch_one(user_store.pool())
176+
.await
177+
.map_err(db_err)?
178+
} else {
179+
sqlx::query_scalar::<_, i64>(&format!(
180+
"SELECT COUNT(*) FROM {memories_table} WHERE user_id = ? AND is_active > 0"
181+
))
182+
.bind(&user_id)
183+
.fetch_one(user_store.pool())
184+
.await
185+
.map_err(db_err)?
186+
};
175187
let snapshot_count = user_store
176188
.list_snapshot_registrations(&user_id)
177189
.await
@@ -788,6 +800,108 @@ pub async fn user_call_stats(
788800
})))
789801
}
790802

803+
/// GET /admin/users/:user_id/branch-stats
804+
///
805+
/// Returns memory counts per branch for a given user (or group).
806+
/// Response:
807+
/// ```json
808+
/// {
809+
/// "branches": [
810+
/// {"name": "main", "count": 170},
811+
/// {"name": "another", "count": 320}
812+
/// ],
813+
/// "total": 490
814+
/// }
815+
/// ```
816+
pub async fn user_branch_stats(
817+
auth: AuthUser,
818+
State(state): State<AppState>,
819+
Path(user_id): Path<String>,
820+
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
821+
auth.require_master()?;
822+
let user_store = get_user_store(&state, &user_id).await?;
823+
824+
// --- main branch ---
825+
// Personal users have rows from multiple users in mem_memories (shared table), so
826+
// we filter by user_id. Group users own an isolated per-group database where
827+
// mem_memories contains only group data, so no user_id filter is needed there.
828+
let memories_table = user_store.t("mem_memories");
829+
let main_count: i64 = if user_id.starts_with("grp_") {
830+
sqlx::query_scalar::<_, i64>(&format!(
831+
"SELECT COUNT(*) FROM {memories_table} WHERE is_active > 0"
832+
))
833+
.fetch_one(user_store.pool())
834+
.await
835+
.map_err(db_err)?
836+
} else {
837+
sqlx::query_scalar::<_, i64>(&format!(
838+
"SELECT COUNT(*) FROM {memories_table} WHERE user_id = ? AND is_active > 0"
839+
))
840+
.bind(&user_id)
841+
.fetch_one(user_store.pool())
842+
.await
843+
.map_err(db_err)?
844+
};
845+
846+
let mut branches_stats = vec![serde_json::json!({"name": "main", "count": main_count})];
847+
let mut total = main_count;
848+
let mut degraded_branches: Vec<String> = Vec::new();
849+
850+
// --- non-main branches from mem_branches ---
851+
// Branch tables are per-user physical tables (created via `data branch create table`),
852+
// so they only contain data belonging to the owning user/group. No user_id filter is
853+
// required — this is consistent with the group-scoped main count above.
854+
let branch_rows = user_store.list_branches(&user_id).await.map_err(api_err)?;
855+
for (name, raw_table_name, _created_at) in &branch_rows {
856+
if name == "main" || raw_table_name.is_empty() {
857+
continue;
858+
}
859+
if !raw_table_name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
860+
tracing::warn!(
861+
user_id = %user_id,
862+
branch = %name,
863+
table = %raw_table_name,
864+
"user_branch_stats: skipping branch with invalid table identifier"
865+
);
866+
degraded_branches.push(name.clone());
867+
continue;
868+
}
869+
let bt = user_store.t(raw_table_name);
870+
let count_result = sqlx::query_scalar::<_, i64>(&format!(
871+
"SELECT COUNT(*) FROM {bt} WHERE is_active > 0"
872+
))
873+
.fetch_one(user_store.pool())
874+
.await;
875+
let count = match count_result {
876+
Ok(n) => n,
877+
Err(e) => {
878+
tracing::warn!(
879+
user_id = %user_id,
880+
branch = %name,
881+
table = %raw_table_name,
882+
error = %e,
883+
"user_branch_stats: failed to count memories for branch"
884+
);
885+
// Record degraded branch so the caller knows the total is unreliable.
886+
degraded_branches.push(name.clone());
887+
0
888+
}
889+
};
890+
branches_stats.push(serde_json::json!({"name": name, "count": count}));
891+
total += count;
892+
}
893+
894+
Ok(Json(serde_json::json!({
895+
"branches": branches_stats,
896+
"total": total,
897+
// `degraded` is true when at least one branch table could not be counted.
898+
// In that case `total` is a lower bound and `degraded_branches` lists the
899+
// affected branches — callers should treat the numbers as approximate.
900+
"degraded": !degraded_branches.is_empty(),
901+
"degraded_branches": degraded_branches,
902+
})))
903+
}
904+
791905
/// Redact password from database URL for safe display.
792906
fn redact_url(url: &str) -> String {
793907
// mysql://user:pass@host:port/db → mysql://user:***@host:port/db

memoria/crates/memoria-api/src/routes/mcp.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,25 @@ fn tracking_path(method: &str, params: Option<&serde_json::Value>) -> String {
7676
format!("/mcp/{sanitized}")
7777
}
7878

79+
/// MCP write tools that mutate the active memory table.
80+
/// These are blocked when the caller is group-scoped and checked out on `main`,
81+
/// mirroring the REST `group_main_write_guard` middleware.
82+
///
83+
/// Read-only tools, branch-management tools (`memory_checkout`, `memory_branch`,
84+
/// `memory_branch_delete`), and snapshot ops are intentionally excluded.
85+
const GROUP_MAIN_WRITE_BLOCKED: &[&str] = &[
86+
"memory_store",
87+
"memory_correct",
88+
"memory_purge",
89+
"memory_observe",
90+
"memory_governance",
91+
"memory_consolidate",
92+
"memory_reflect",
93+
"memory_extract_entities",
94+
"memory_link_entities",
95+
"memory_rollback",
96+
];
97+
7998
fn mcp_tool_dirty_mask(tool: &str) -> Option<crate::metrics_summary::DirtyMask> {
8099
use crate::metrics_summary::DirtyMask;
81100
match tool {
@@ -226,9 +245,75 @@ pub async fn mcp_handler(
226245
}
227246
};
228247

248+
// ── Group main-write guard (computed once, shared by both code paths) ─────
249+
// Resolved here — before the Notification early-return — so that
250+
// Notification-form write calls to `main` are also blocked instead of
251+
// silently bypassing the restriction.
252+
//
253+
// Returns Some(tool_name) when the call must be blocked, None otherwise.
254+
let blocked_tool: Option<String> = if let Some(gid) = &auth.group_id {
255+
if let Some(tool) = tracked_tool.as_deref() {
256+
if GROUP_MAIN_WRITE_BLOCKED.contains(&tool) {
257+
// ACTOR_USER_ID is already set by the global actor_scope_layer.
258+
let maybe_blocked = state
259+
.service
260+
.user_sql_store(gid)
261+
.await
262+
.ok()
263+
.map(|sql| {
264+
let gid_owned = gid.clone();
265+
memoria_storage::ACTOR_USER_ID
266+
.scope(auth.user_id.clone(), async move {
267+
sql.active_branch_name(&gid_owned).await
268+
})
269+
});
270+
if let Some(fut) = maybe_blocked {
271+
if let Ok(branch) = fut.await {
272+
let is_solo_owner =
273+
crate::auth::group_main_write_allowed_for_solo_owner(
274+
&state,
275+
gid,
276+
&auth.user_id,
277+
)
278+
.await;
279+
if branch == "main" && !is_solo_owner {
280+
Some(tool.to_string())
281+
} else {
282+
None
283+
}
284+
} else {
285+
None
286+
}
287+
} else {
288+
None
289+
}
290+
} else {
291+
None
292+
}
293+
} else {
294+
None
295+
}
296+
} else {
297+
None
298+
};
299+
229300
// JSON-RPC 2.0: a Notification is a *valid* Request without an "id" member.
230301
// The server MUST NOT reply to Notifications.
231302
if req.get("id").is_none() {
303+
// Write guard: per JSON-RPC 2.0 the server MUST NOT reply to Notifications,
304+
// so we silently drop blocked writes without dispatching.
305+
if blocked_tool.is_some() {
306+
report_stats(&track_path, false);
307+
state.call_log_batcher.record_rpc(
308+
user_id,
309+
"POST".to_string(),
310+
track_path,
311+
204,
312+
t.elapsed().as_millis() as u32,
313+
RpcMeta::err(-32001),
314+
);
315+
return StatusCode::NO_CONTENT.into_response();
316+
}
232317
let dispatch_result = memoria_mcp::dispatch_http(
233318
method.clone(),
234319
params,
@@ -262,6 +347,36 @@ pub async fn mcp_handler(
262347

263348
let id = req["id"].clone();
264349

350+
// Use the pre-computed write-guard decision (see above).
351+
if let Some(tool) = &blocked_tool {
352+
let err_body = Json(json!({
353+
"jsonrpc": "2.0",
354+
"id": id,
355+
"error": {
356+
"code": -32001,
357+
"message": format!(
358+
"Cannot call '{}' on 'main' in a shared space — \
359+
main is read-only in collaboration mode. \
360+
Your active branch is currently 'main'. \
361+
Call memory_checkout with a branch name \
362+
(e.g. {{\"name\": \"my-branch\"}}) to switch \
363+
to your own branch, then retry.",
364+
tool
365+
)
366+
}
367+
}));
368+
report_stats(&track_path, false);
369+
state.call_log_batcher.record_rpc(
370+
user_id,
371+
"POST".to_string(),
372+
track_path,
373+
200,
374+
t.elapsed().as_millis() as u32,
375+
RpcMeta::err(-32001),
376+
);
377+
return err_body.into_response();
378+
}
379+
265380
// JSON-RPC spec: the HTTP response is always 200 OK, even for RPC errors.
266381
// Business-level error tracking uses rpc_success / rpc_error_code in the call log.
267382
let (response, rpc) = match memoria_mcp::dispatch_http(

memoria/crates/memoria-api/src/routes/memory.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ async fn find_memory_any_user(
6767
pub struct ListQuery {
6868
pub memory_type: Option<String>,
6969
pub session_id: Option<String>,
70+
pub trust_tier: Option<String>,
7071
#[serde(default = "default_limit")]
7172
pub limit: i64,
7273
pub cursor: Option<String>,
@@ -114,6 +115,9 @@ pub async fn list_memories(
114115
.cursor
115116
.as_deref()
116117
.filter(|c| c.len() == 32 && c.chars().all(|ch| ch.is_ascii_hexdigit()));
118+
if let Some(tier) = q.trust_tier.as_deref() {
119+
parse_trust_tier(tier).map_err(|e| (StatusCode::UNPROCESSABLE_ENTITY, e))?;
120+
}
117121
let fetch_limit = limit + 1;
118122
let mut memories = state
119123
.service
@@ -122,6 +126,7 @@ pub async fn list_memories(
122126
fetch_limit,
123127
q.memory_type.as_deref(),
124128
q.session_id.as_deref(),
129+
q.trust_tier.as_deref(),
125130
cursor,
126131
)
127132
.await

memoria/crates/memoria-api/src/routes/snapshots.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ async fn branch_table_name_raw(
214214
if branch_name == "main" {
215215
return Ok("mem_memories".to_string());
216216
}
217-
for (name, table_name) in sql.list_branches(scope_id).await.map_err(api_err)? {
217+
for (name, table_name, _created_at) in sql.list_branches(scope_id).await.map_err(api_err)? {
218218
if name == branch_name {
219219
return Ok(table_name);
220220
}
@@ -241,7 +241,26 @@ async fn branch_diff_items_payload(
241241
.diff_branch_rows(&branch_table_raw, "mem_memories", scope_id, limit * 3)
242242
.await
243243
.map_err(api_err)?;
244-
let classified = memoria_git::classify_diff_rows(raw_rows, &branch_table_raw);
244+
let mut classified = memoria_git::classify_diff_rows(raw_rows, &branch_table_raw);
245+
246+
// classify_diff_rows cannot distinguish "deleted from main on branch" from
247+
// "created-then-deleted on branch" without querying the main table (because
248+
// data branch diff only returns the branch-side row for deletions). Resolve
249+
// now: ghost removes (never in main) move to classified.ghost_removes.
250+
git.resolve_ghost_removes(&mut classified, "mem_memories", scope_id)
251+
.await
252+
.map_err(api_err)?;
253+
254+
tracing::debug!(
255+
branch = %branch_name,
256+
added = classified.added.len(),
257+
updated = classified.updated.len(),
258+
removed = classified.removed.len(),
259+
ghost_removes = classified.ghost_removes.len(),
260+
conflicts = classified.conflicts.len(),
261+
behind_main = classified.behind_main.len(),
262+
"branch_diff_items_payload: classified result"
263+
);
245264

246265
let item_to_json = |item: &memoria_git::DiffItem| -> Value {
247266
json!({
@@ -749,10 +768,12 @@ pub async fn list_branches(
749768
"name": "main",
750769
"active": active_branch == "main",
751770
})];
752-
for (name, _table_name) in sql.list_branches(auth.scope_id()).await.map_err(api_err)? {
771+
for (name, _table_name, created_at) in sql.list_branches(auth.scope_id()).await.map_err(api_err)? {
772+
let created_at_str = format_snapshot_timestamp(created_at);
753773
branches.push(json!({
754774
"name": name,
755775
"active": name == active_branch,
776+
"created_at": created_at_str,
756777
}));
757778
}
758779
if !branches

memoria/crates/memoria-git/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ serde_json = { workspace = true }
1111
chrono = { workspace = true }
1212
thiserror = { workspace = true }
1313
tracing = { workspace = true }
14+
tokio = { workspace = true }
1415

1516
[dev-dependencies]
1617
memoria-storage = { path = "../memoria-storage" }
17-
tokio = { workspace = true }
1818
uuid = { workspace = true }

0 commit comments

Comments
 (0)