Skip to content

Commit c522c0f

Browse files
Prune more queries when evaluating subscription updates (#2855)
Signed-off-by: joshua-spacetime <josh@clockworklabs.io> Co-authored-by: Mario Montoya <mamcx@elmalabarista.com>
1 parent 70df4fb commit c522c0f

5 files changed

Lines changed: 559 additions & 79 deletions

File tree

crates/core/src/subscription/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ where
179179
})
180180
.map(|(sql, plan)| (sql, plan, plan.subscribed_table_id(), plan.subscribed_table_name()))
181181
.map(|(sql, plan, table_id, table_name)| {
182-
plan.physical_plan()
182+
plan.optimized_physical_plan()
183183
.clone()
184184
.optimize()
185185
.map(|plan| (sql, PipelinedProject::from(plan)))

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 260 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ impl ModuleSubscriptions {
224224
tx,
225225
|plan, tx| {
226226
plan.plans_fragments()
227-
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.physical_plan()))
227+
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
228228
.fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
229229
},
230230
auth,
@@ -235,7 +235,7 @@ impl ModuleSubscriptions {
235235

236236
let plans = query
237237
.plans_fragments()
238-
.map(|fragment| fragment.physical_plan())
238+
.map(|fragment| fragment.optimized_physical_plan())
239239
.cloned()
240240
.map(|plan| plan.optimize())
241241
.collect::<Result<Vec<_>, _>>()?
@@ -267,7 +267,7 @@ impl ModuleSubscriptions {
267267
tx,
268268
|plan, tx| {
269269
plan.plans_fragments()
270-
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.physical_plan()))
270+
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
271271
.fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
272272
},
273273
auth,
@@ -778,7 +778,7 @@ impl ModuleSubscriptions {
778778
&tx,
779779
|plan, tx| {
780780
plan.plans_fragments()
781-
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.physical_plan()))
781+
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
782782
.fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
783783
},
784784
&auth,
@@ -2169,6 +2169,262 @@ mod tests {
21692169
Ok(())
21702170
}
21712171

2172+
/// Test that we do not evaluate queries that we know will not match row updates
2173+
#[tokio::test]
2174+
async fn test_join_pruning() -> anyhow::Result<()> {
2175+
let (tx, mut rx) = client_connection(client_id_from_u8(1));
2176+
2177+
let db = relational_db()?;
2178+
let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
2179+
2180+
let u_id = db.create_table_for_test_with_the_works(
2181+
"u",
2182+
&[
2183+
("i", AlgebraicType::U64),
2184+
("a", AlgebraicType::U64),
2185+
("b", AlgebraicType::U64),
2186+
],
2187+
&[0.into()],
2188+
&[0.into()],
2189+
StAccess::Public,
2190+
)?;
2191+
let v_id = db.create_table_for_test_with_the_works(
2192+
"v",
2193+
&[
2194+
("i", AlgebraicType::U64),
2195+
("x", AlgebraicType::U64),
2196+
("y", AlgebraicType::U64),
2197+
],
2198+
&[0.into(), 1.into()],
2199+
&[0.into()],
2200+
StAccess::Public,
2201+
)?;
2202+
2203+
let schema = ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]);
2204+
2205+
commit_tx(
2206+
&db,
2207+
&subs,
2208+
[],
2209+
[
2210+
(v_id, product![1u64, 1u64, 1u64]),
2211+
(v_id, product![2u64, 2u64, 2u64]),
2212+
(v_id, product![3u64, 3u64, 3u64]),
2213+
(v_id, product![4u64, 4u64, 4u64]),
2214+
(v_id, product![5u64, 5u64, 5u64]),
2215+
],
2216+
)?;
2217+
2218+
let mut query_ids = 0;
2219+
2220+
subscribe_multi(
2221+
&subs,
2222+
&[
2223+
"select u.* from u join v on u.i = v.i where v.x = 1",
2224+
"select u.* from u join v on u.i = v.i where v.x = 2",
2225+
"select u.* from u join v on u.i = v.i where v.x = 3",
2226+
"select u.* from u join v on u.i = v.i where v.x = 4",
2227+
"select u.* from u join v on u.i = v.i where v.x = 5",
2228+
],
2229+
tx,
2230+
&mut query_ids,
2231+
)?;
2232+
2233+
assert_matches!(
2234+
rx.recv().await,
2235+
Some(SerializableMessage::Subscription(SubscriptionMessage {
2236+
result: SubscriptionResult::SubscribeMulti(_),
2237+
..
2238+
}))
2239+
);
2240+
2241+
// Insert a new row into `u` that joins with `x = 1`
2242+
let metrics = commit_tx(&db, &subs, [], [(u_id, product![1u64, 2u64, 3u64])])?;
2243+
2244+
assert_tx_update_for_table(&mut rx, u_id, &schema, [product![1u64, 2u64, 3u64]], []).await;
2245+
2246+
// We should only have evaluated a single query
2247+
assert_eq!(metrics.delta_queries_evaluated, 1);
2248+
assert_eq!(metrics.delta_queries_matched, 1);
2249+
2250+
// UPDATE v SET y = 2 WHERE id = 1
2251+
let metrics = commit_tx(
2252+
&db,
2253+
&subs,
2254+
[(v_id, product![1u64, 1u64, 1u64])],
2255+
[(v_id, product![1u64, 1u64, 2u64])],
2256+
)?;
2257+
2258+
// We should only have evaluated a single query
2259+
assert_eq!(metrics.delta_queries_evaluated, 1);
2260+
assert_eq!(metrics.delta_queries_matched, 0);
2261+
2262+
// UPDATE v SET x = 2 WHERE id = 1
2263+
let metrics = commit_tx(
2264+
&db,
2265+
&subs,
2266+
[(v_id, product![1u64, 1u64, 2u64])],
2267+
[(v_id, product![1u64, 2u64, 2u64])],
2268+
)?;
2269+
2270+
// Results in a no-op
2271+
assert_tx_update_for_table(&mut rx, u_id, &schema, [], []).await;
2272+
2273+
// We should have evaluated queries for `x = 1` and `x = 2`
2274+
assert_eq!(metrics.delta_queries_evaluated, 2);
2275+
assert_eq!(metrics.delta_queries_matched, 2);
2276+
2277+
// Insert new row into `u` that joins with `x = 3`
2278+
// UPDATE v SET x = 4 WHERE id = 3
2279+
let metrics = commit_tx(
2280+
&db,
2281+
&subs,
2282+
[(v_id, product![3u64, 3u64, 3u64])],
2283+
[(v_id, product![3u64, 4u64, 3u64]), (u_id, product![3u64, 4u64, 5u64])],
2284+
)?;
2285+
2286+
assert_tx_update_for_table(&mut rx, u_id, &schema, [product![3u64, 4u64, 5u64]], []).await;
2287+
2288+
// We should have evaluated queries for `x = 3` and `x = 4`
2289+
assert_eq!(metrics.delta_queries_evaluated, 2);
2290+
assert_eq!(metrics.delta_queries_matched, 2);
2291+
2292+
// UPDATE v SET x = 0 WHERE id = 3
2293+
let metrics = commit_tx(
2294+
&db,
2295+
&subs,
2296+
[(v_id, product![3u64, 4u64, 3u64])],
2297+
[(v_id, product![3u64, 0u64, 3u64])],
2298+
)?;
2299+
2300+
assert_tx_update_for_table(&mut rx, u_id, &schema, [], [product![3u64, 4u64, 5u64]]).await;
2301+
2302+
// We should only have evaluated the query for `x = 4`
2303+
assert_eq!(metrics.delta_queries_evaluated, 1);
2304+
assert_eq!(metrics.delta_queries_matched, 1);
2305+
2306+
// Insert new row into `u` that joins with `x = 5`
2307+
// UPDATE v SET x = 6 WHERE id = 5
2308+
let metrics = commit_tx(
2309+
&db,
2310+
&subs,
2311+
[(v_id, product![5u64, 5u64, 5u64])],
2312+
[(v_id, product![5u64, 6u64, 6u64]), (u_id, product![5u64, 6u64, 7u64])],
2313+
)?;
2314+
2315+
// Results in a no-op
2316+
assert_tx_update_for_table(&mut rx, u_id, &schema, [], []).await;
2317+
2318+
// We should only have evaluated the query for `x = 5`
2319+
assert_eq!(metrics.delta_queries_evaluated, 1);
2320+
assert_eq!(metrics.delta_queries_matched, 1);
2321+
2322+
Ok(())
2323+
}
2324+
2325+
/// Test that one client unsubscribing does not affect another
2326+
#[tokio::test]
2327+
async fn test_unsubscribe() -> anyhow::Result<()> {
2328+
// Establish a connection for each client
2329+
let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1));
2330+
let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2));
2331+
2332+
let db = relational_db()?;
2333+
let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());
2334+
2335+
let u_id = db.create_table_for_test(
2336+
"u",
2337+
&[
2338+
("i", AlgebraicType::U64),
2339+
("a", AlgebraicType::U64),
2340+
("b", AlgebraicType::U64),
2341+
],
2342+
&[0.into()],
2343+
)?;
2344+
let v_id = db.create_table_for_test(
2345+
"v",
2346+
&[
2347+
("i", AlgebraicType::U64),
2348+
("x", AlgebraicType::U64),
2349+
("y", AlgebraicType::U64),
2350+
],
2351+
&[0.into(), 1.into()],
2352+
)?;
2353+
2354+
commit_tx(&db, &subs, [], [(v_id, product![1u64, 1u64, 1u64])])?;
2355+
2356+
let mut query_ids = 0;
2357+
2358+
subscribe_multi(
2359+
&subs,
2360+
&["select u.* from u join v on u.i = v.i where v.x = 1"],
2361+
tx_for_a,
2362+
&mut query_ids,
2363+
)?;
2364+
subscribe_multi(
2365+
&subs,
2366+
&["select u.* from u join v on u.i = v.i where v.x = 1"],
2367+
tx_for_b.clone(),
2368+
&mut query_ids,
2369+
)?;
2370+
2371+
// Wait for both subscriptions
2372+
assert_matches!(
2373+
rx_for_a.recv().await,
2374+
Some(SerializableMessage::Subscription(SubscriptionMessage {
2375+
result: SubscriptionResult::SubscribeMulti(_),
2376+
..
2377+
}))
2378+
);
2379+
assert_matches!(
2380+
rx_for_b.recv().await,
2381+
Some(SerializableMessage::Subscription(SubscriptionMessage {
2382+
result: SubscriptionResult::SubscribeMulti(_),
2383+
..
2384+
}))
2385+
);
2386+
2387+
unsubscribe_multi(&subs, tx_for_b, query_ids)?;
2388+
2389+
assert_matches!(
2390+
rx_for_b.recv().await,
2391+
Some(SerializableMessage::Subscription(SubscriptionMessage {
2392+
result: SubscriptionResult::UnsubscribeMulti(_),
2393+
..
2394+
}))
2395+
);
2396+
2397+
// Insert a new row into `u`
2398+
let metrics = commit_tx(&db, &subs, [], [(u_id, product![1u64, 0u64, 0u64])])?;
2399+
2400+
assert_tx_update_for_table(
2401+
&mut rx_for_a,
2402+
u_id,
2403+
&ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
2404+
[product![1u64, 0u64, 0u64]],
2405+
[],
2406+
)
2407+
.await;
2408+
2409+
// We should only have evaluated a single query
2410+
assert_eq!(metrics.delta_queries_evaluated, 1);
2411+
assert_eq!(metrics.delta_queries_matched, 1);
2412+
2413+
// Modify a matching row in `v`
2414+
let metrics = commit_tx(
2415+
&db,
2416+
&subs,
2417+
[(v_id, product![1u64, 1u64, 1u64])],
2418+
[(v_id, product![1u64, 2u64, 2u64])],
2419+
)?;
2420+
2421+
// We should only have evaluated a single query
2422+
assert_eq!(metrics.delta_queries_evaluated, 1);
2423+
assert_eq!(metrics.delta_queries_matched, 1);
2424+
2425+
Ok(())
2426+
}
2427+
21722428
/// Test that we do not evaluate queries that return trivially empty results
21732429
///
21742430
/// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel.

0 commit comments

Comments
 (0)