Skip to content

Commit a8186f7

Browse files
Copilotpinodeca
andauthored
Honor |=> captures on composite THEN / IF / LOOP nodes (#163)
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: pinodeca <32303022+pinodeca@users.noreply.github.com>
1 parent 944f2bc commit a8186f7

4 files changed

Lines changed: 155 additions & 4 deletions

File tree

src/orchestrations/execute_function_graph.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,19 @@ async fn execute_sql_node(
332332
Ok(result)
333333
}
334334

335+
fn store_named_result(
336+
ctx: &OrchestrationContext,
337+
node: &FunctionNode,
338+
result: &str,
339+
results: &mut HashMap<String, String>,
340+
node_label: &str,
341+
) {
342+
if let Some(name) = &node.result_name {
343+
ctx.trace_info(format!("Storing {node_label} result as ${name}"));
344+
results.insert(name.clone(), result.to_string());
345+
}
346+
}
347+
335348
async fn execute_then_node(
336349
ctx: &OrchestrationContext,
337350
graph: &FunctionGraph,
@@ -364,6 +377,8 @@ async fn execute_then_node(
364377
))
365378
.await?;
366379

380+
store_named_result(ctx, node, &right_result, results, "THEN");
381+
367382
Ok(right_result)
368383
}
369384

@@ -474,6 +489,7 @@ async fn execute_loop_node(
474489
ctx.trace_info(format!(
475490
"Loop terminated by break with value: {break_value}"
476491
));
492+
store_named_result(ctx, node, &break_value, results, "LOOP");
477493
return Ok(break_value);
478494
}
479495

@@ -499,6 +515,7 @@ async fn execute_loop_node(
499515

500516
if !should_continue {
501517
ctx.trace_info("Loop condition false, exiting loop");
518+
store_named_result(ctx, node, &body_result, results, "LOOP");
502519
return Ok(body_result);
503520
}
504521
}
@@ -639,15 +656,19 @@ async fn execute_if_node(
639656
ctx.trace_info(format!("Condition evaluated to: {is_true}"));
640657

641658
if is_true {
642-
Box::pin(execute_function_node_with_vars(
659+
let result = Box::pin(execute_function_node_with_vars(
643660
ctx, graph, then_id, results, exec_ctx,
644661
))
645-
.await
662+
.await?;
663+
store_named_result(ctx, node, &result, results, "IF");
664+
Ok(result)
646665
} else {
647-
Box::pin(execute_function_node_with_vars(
666+
let result = Box::pin(execute_function_node_with_vars(
648667
ctx, graph, else_id, results, exec_ctx,
649668
))
650-
.await
669+
.await?;
670+
store_named_result(ctx, node, &result, results, "IF");
671+
Ok(result)
651672
}
652673
}
653674

tests/e2e/sql/02_conditionals.sql

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,5 +299,46 @@ END $$;
299299
DROP TABLE _test_state2;
300300
DROP TABLE test_if_rows_dot;
301301

302+
-- Test 4: named result on the IF node itself is accessible downstream
303+
DROP TABLE IF EXISTS test_if_named;
304+
CREATE TABLE test_if_named (id SERIAL, chosen INT);
305+
306+
CREATE TEMP TABLE _test_if_named (instance_id TEXT);
307+
308+
INSERT INTO _test_if_named SELECT df.start(
309+
df.if(
310+
'SELECT true',
311+
'SELECT 41 AS chosen',
312+
'SELECT 99 AS chosen'
313+
) |=> 'decision'
314+
~> $$INSERT INTO test_if_named (chosen) VALUES ($decision.chosen)$$,
315+
'test-if-named-result'
316+
);
317+
318+
DO $$
319+
DECLARE
320+
inst_id TEXT;
321+
status TEXT;
322+
chosen_val INT;
323+
BEGIN
324+
SELECT instance_id INTO inst_id FROM _test_if_named;
325+
SELECT df.wait_for_completion(inst_id, 10) INTO status;
326+
327+
IF status != 'completed' THEN
328+
RAISE EXCEPTION 'TEST FAILED [if-named]: status = %', status;
329+
END IF;
330+
331+
SELECT chosen INTO chosen_val FROM test_if_named ORDER BY id DESC LIMIT 1;
332+
333+
IF chosen_val != 41 THEN
334+
RAISE EXCEPTION 'TEST FAILED [if-named]: expected 41, got %', chosen_val;
335+
END IF;
336+
337+
RAISE NOTICE 'PASSED: named IF result stored';
338+
END $$;
339+
340+
DROP TABLE _test_if_named;
341+
DROP TABLE test_if_named;
342+
302343
RESET SESSION AUTHORIZATION;
303344
SELECT 'TEST PASSED' AS result;

tests/e2e/sql/03_loops.sql

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,44 @@ END $$;
201201
DROP TABLE _test3_state;
202202
DROP TABLE test_break_log;
203203

204+
-- Test 4: named result on the LOOP node itself is accessible downstream
205+
DROP TABLE IF EXISTS test_loop_named;
206+
CREATE TABLE test_loop_named (id SERIAL, status TEXT);
207+
208+
CREATE TEMP TABLE _test4_state AS
209+
SELECT df.start(
210+
(df.loop(df.break('{"status": "done"}')) |=> 'loop_result')
211+
~> $$INSERT INTO test_loop_named (status) VALUES ($loop_result::jsonb->>'status')$$,
212+
'test-loop-named-result'
213+
) AS instance_id;
214+
215+
DO $$
216+
DECLARE
217+
v_instance_id TEXT;
218+
v_status TEXT;
219+
v_loop_status TEXT;
220+
BEGIN
221+
SELECT instance_id INTO v_instance_id FROM _test4_state;
222+
RAISE NOTICE 'Test 4 - named loop result: instance %', v_instance_id;
223+
224+
SELECT df.wait_for_completion(v_instance_id, 20) INTO v_status;
225+
226+
IF v_status != 'completed' THEN
227+
RAISE EXCEPTION 'TEST FAILED [loop-named]: expected Completed, got %', v_status;
228+
END IF;
229+
230+
SELECT status INTO v_loop_status FROM test_loop_named ORDER BY id DESC LIMIT 1;
231+
232+
IF v_loop_status != 'done' THEN
233+
RAISE EXCEPTION 'TEST FAILED [loop-named]: expected done, got %', v_loop_status;
234+
END IF;
235+
236+
RAISE NOTICE 'PASSED: named LOOP result stored';
237+
END $$;
238+
239+
DROP TABLE _test4_state;
240+
DROP TABLE test_loop_named;
241+
204242
-- === Test: running_status_during_loop ===
205243
-- Verify that df.status() reports 'running' while a loop is actively executing
206244
-- (regression test for: loops reporting 'pending' instead of 'running')

tests/e2e/sql/07_signals.sql

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,57 @@ BEGIN
224224
END $$;
225225

226226
DROP TABLE _test_signal_text;
227+
DELETE FROM signal_test_log;
228+
229+
-- Test 5: |=> applied after a THEN composite still captures the final result
230+
CREATE TEMP TABLE _test_signal_then_named (instance_id TEXT);
231+
232+
INSERT INTO _test_signal_then_named SELECT df.start(
233+
'INSERT INTO signal_test_log (msg) VALUES (''waiting-for-decision'')'
234+
~> df.wait_for_signal('test_approval_then', 60) |=> 'decision'
235+
~> df.if(
236+
'SELECT ($decision::jsonb->''data''->>''approved'')::boolean',
237+
$$INSERT INTO signal_test_log (msg, data) VALUES ('approved-after-then', $decision::jsonb)$$,
238+
$$INSERT INTO signal_test_log (msg, data) VALUES ('rejected-after-then', $decision::jsonb)$$
239+
),
240+
'test-signal-then-named'
241+
);
242+
243+
SELECT pg_sleep(1);
244+
245+
DO $$
246+
DECLARE
247+
inst_id TEXT;
248+
BEGIN
249+
SELECT instance_id INTO inst_id FROM _test_signal_then_named;
250+
PERFORM df.signal(inst_id, 'test_approval_then', '{"approved": true}');
251+
END $$;
252+
253+
DO $$
254+
DECLARE
255+
inst_id TEXT;
256+
status TEXT;
257+
BEGIN
258+
SELECT instance_id INTO inst_id FROM _test_signal_then_named;
259+
SELECT df.wait_for_completion(inst_id, 10) INTO status;
260+
261+
IF status != 'completed' THEN
262+
RAISE EXCEPTION 'TEST FAILED: composite THEN capture status = %', status;
263+
END IF;
264+
265+
IF NOT EXISTS (
266+
SELECT 1
267+
FROM signal_test_log
268+
WHERE msg = 'approved-after-then'
269+
AND (data->'data'->>'approved')::boolean = true
270+
) THEN
271+
RAISE EXCEPTION 'TEST FAILED: THEN composite capture did not substitute $decision';
272+
END IF;
273+
274+
RAISE NOTICE 'TEST PASSED: composite THEN capture';
275+
END $$;
276+
277+
DROP TABLE _test_signal_then_named;
227278
DROP TABLE signal_test_log;
228279

229280
RESET SESSION AUTHORIZATION;

0 commit comments

Comments
 (0)