From af8b63c618c331a080f94e673a4bfa2359af384a Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 12 Jun 2025 12:15:31 -0700 Subject: [PATCH] Always use identity when hashing queries from owners --- .../subscription/module_subscription_actor.rs | 77 +++++++++++++++++++ crates/core/src/subscription/query.rs | 6 +- crates/core/src/subscription/subscription.rs | 26 +++++-- crates/expr/src/rls.rs | 4 +- crates/lib/src/identity.rs | 4 + 5 files changed, 106 insertions(+), 11 deletions(-) diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 3a6ef438284..7e76552b372 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1534,6 +1534,83 @@ mod tests { Ok(()) } + /// Test that a client and the database owner can subscribe to the same query + #[tokio::test] + async fn test_rls_for_owner() -> anyhow::Result<()> { + // Establish a connection for owner and client + let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(0)); + let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(1)); + + let db = relational_db()?; + let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); + + // Create table `t` + let table_id = db.create_table_for_test("t", &[("id", AlgebraicType::identity())], &[0.into()])?; + + // Restrict access to `t` + insert_rls_rules(&db, [table_id], ["select * from t where id = :sender"])?; + + let mut query_ids = 0; + + // Have owner and client subscribe to `t` + subscribe_multi(&subs, &["select * from t"], tx_for_a, &mut query_ids)?; + subscribe_multi(&subs, &["select * from t"], tx_for_b, &mut query_ids)?; + + // Wait for both subscriptions + assert_matches!( + rx_for_a.recv().await, + Some(SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + })) + ); + assert_matches!( + rx_for_b.recv().await, + Some(SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + })) + ); + + let schema = ProductType::from([AlgebraicType::identity()]); + + let id_for_b = identity_from_u8(1); + let id_for_c = identity_from_u8(2); + + commit_tx( + &db, + &subs, + [], + [ + // Insert an identity for client `b` plus a random identity + (table_id, product![id_for_b]), + (table_id, product![id_for_c]), + ], + )?; + + assert_tx_update_for_table( + &mut rx_for_a, + table_id, + &schema, + // The owner should receive both identities + [product![id_for_b], product![id_for_c]], + [], + ) + .await; + + assert_tx_update_for_table( + &mut rx_for_b, + table_id, + &schema, + // Client `b` should only receive its identity + [product![id_for_b]], + [], + ) + .await; + + Ok(()) + } + /// Test that we do not send empty updates to clients #[tokio::test] async fn test_no_empty_updates() -> anyhow::Result<()> { diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 7ba111592c3..d836a50122a 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -107,7 +107,11 @@ pub fn compile_query_with_hashes( let tx = SchemaViewer::new(tx, auth); let (plans, has_param) = SubscriptionPlan::compile(input, &tx, auth)?; - if has_param { + if auth.is_owner() || has_param { + // Note that when generating hashes for queries from owners, + // we always treat them as if they were parameterized by :sender. + // This is because RLS is not applicable to owners. + // Hence owner hashes must never overlap with client hashes. return Ok(Plan::new(plans, hash_with_param, input.to_owned())); } Ok(Plan::new(plans, hash, input.to_owned())) diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 650a913d83b..e2523ae4f0e 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -614,13 +614,25 @@ pub(crate) fn get_all(relational_db: &RelationalDB, tx: &Tx, auth: &AuthCtx) -> .get_all_tables(tx)? .iter() .map(Deref::deref) - .filter(|t| { - t.table_type == StTableType::User && (auth.owner == auth.caller || t.table_access == StAccess::Public) - }) + .filter(|t| t.table_type == StTableType::User && (auth.is_owner() || t.table_access == StAccess::Public)) .map(|schema| { let sql = format!("SELECT * FROM {}", schema.table_name); - SubscriptionPlan::compile(&sql, &SchemaViewer::new(tx, auth), auth) - .map(|(plans, has_param)| Plan::new(plans, QueryHash::from_string(&sql, auth.caller, has_param), sql)) + let tx = SchemaViewer::new(tx, auth); + SubscriptionPlan::compile(&sql, &tx, auth).map(|(plans, has_param)| { + Plan::new( + plans, + QueryHash::from_string( + &sql, + auth.caller, + // Note that when generating hashes for queries from owners, + // we always treat them as if they were parameterized by :sender. + // This is because RLS is not applicable to owners. + // Hence owner hashes must never overlap with client hashes. + auth.is_owner() || has_param, + ), + sql, + ) + }) }) .collect::>()?) } @@ -638,9 +650,7 @@ pub(crate) fn legacy_get_all( .get_all_tables(tx)? .iter() .map(Deref::deref) - .filter(|t| { - t.table_type == StTableType::User && (auth.owner == auth.caller || t.table_access == StAccess::Public) - }) + .filter(|t| t.table_type == StTableType::User && (auth.is_owner() || t.table_access == StAccess::Public)) .map(|src| SupportedQuery { kind: query::Supported::Select, expr: QueryExpr::new(src), diff --git a/crates/expr/src/rls.rs b/crates/expr/src/rls.rs index cd6e835959e..89bdf7149f8 100644 --- a/crates/expr/src/rls.rs +++ b/crates/expr/src/rls.rs @@ -18,7 +18,7 @@ pub fn resolve_views_for_sub( has_param: &mut bool, ) -> anyhow::Result> { // RLS does not apply to the database owner - if auth.caller == auth.owner { + if auth.is_owner() { return Ok(vec![expr]); } @@ -56,7 +56,7 @@ pub fn resolve_views_for_sub( /// Mainly a wrapper around [resolve_views_for_expr]. pub fn resolve_views_for_sql(tx: &impl SchemaView, expr: ProjectList, auth: &AuthCtx) -> anyhow::Result { // RLS does not apply to the database owner - if auth.caller == auth.owner { + if auth.is_owner() { return Ok(expr); } // The subscription language is a subset of the sql language. diff --git a/crates/lib/src/identity.rs b/crates/lib/src/identity.rs index e4059931165..ccc5c8981c9 100644 --- a/crates/lib/src/identity.rs +++ b/crates/lib/src/identity.rs @@ -22,6 +22,10 @@ impl AuthCtx { pub fn for_current(owner: Identity) -> Self { Self { owner, caller: owner } } + /// Does `owner == caller` + pub fn is_owner(&self) -> bool { + self.owner == self.caller + } /// WARNING: Use this only for simple test were the `auth` don't matter pub fn for_testing() -> Self { AuthCtx {