Skip to content

Commit a84c04a

Browse files
committed
copilot feedback
1 parent d3f5add commit a84c04a

4 files changed

Lines changed: 58 additions & 5 deletions

File tree

transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,9 @@ ON CONFLICT (message_uuid) DO NOTHING
196196
stmt.executeBatch();
197197
} catch (SQLException e) {
198198
if ("23503".equals(e.getSQLState())) {
199-
// Find which destination was missing
200-
String missingDest = rows.stream().map(InsertRow::destId).findFirst().orElse("unknown");
201-
throw new DBOSNonExistentWorkflowException(missingDest);
199+
var distinctDests = rows.stream().map(InsertRow::destId).distinct().toList();
200+
throw new DBOSNonExistentWorkflowException(
201+
distinctDests.size() == 1 ? distinctDests.get(0) : null);
202202
}
203203
throw e;
204204
}

transact/src/main/java/dev/dbos/transact/exceptions/DBOSNonExistentWorkflowException.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ public class DBOSNonExistentWorkflowException extends RuntimeException {
1111
private final String workflowId;
1212

1313
public DBOSNonExistentWorkflowException(String workflowId) {
14-
super(String.format("Workflow does not exist %s", workflowId));
14+
super(
15+
workflowId != null
16+
? String.format("Workflow does not exist %s", workflowId)
17+
: "One or more destination workflows do not exist");
1518
this.workflowId = workflowId;
1619
}
1720

transact/src/test/java/dev/dbos/transact/client/ClientTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException;
1515
import dev.dbos.transact.utils.DBUtils;
1616
import dev.dbos.transact.utils.PgContainer;
17+
import dev.dbos.transact.workflow.ForkOptions;
1718
import dev.dbos.transact.workflow.Queue;
1819
import dev.dbos.transact.workflow.SendMessage;
1920
import dev.dbos.transact.workflow.WorkflowState;
@@ -386,4 +387,52 @@ public void testClientSendBulk() throws Exception {
386387
assertEquals("1-msg1", handle1.getResult());
387388
assertEquals("2-msg2", handle2.getResult());
388389
}
390+
391+
@Test
392+
public void testClientSendBulkSendToForks() throws Exception {
393+
// Parent is a running workflow; child is a fork (ENQUEUED, not executing)
394+
var parent = dbos.startWorkflow(() -> service.sendTest(99));
395+
var childId = "fork-child-" + UUID.randomUUID();
396+
dbos.forkWorkflow(parent.workflowId(), 0, new ForkOptions(childId));
397+
398+
var sysdb = DBOSTestAccess.getSystemDatabase(dbos);
399+
String key = UUID.randomUUID().toString();
400+
401+
try (var client = pgContainer.dbosClient()) {
402+
client.sendBulk(
403+
List.of(new SendMessage(parent.workflowId(), "fan-out", "test-topic", key)),
404+
DBOSClient.SendOptions.defaults().withSendToForks(true));
405+
}
406+
407+
// Parent receives the message and completes
408+
assertEquals("99-fan-out", parent.getResult());
409+
410+
// Fork also received the notification (verified via DB since it is not executing)
411+
assertEquals(1, sysdb.getAllNotifications(childId).size());
412+
413+
// Re-send with same key is idempotent — {key}::{dest} UUIDs dedup via ON CONFLICT DO NOTHING
414+
try (var client = pgContainer.dbosClient()) {
415+
client.sendBulk(
416+
List.of(new SendMessage(parent.workflowId(), "fan-out", "test-topic", key)),
417+
DBOSClient.SendOptions.defaults().withSendToForks(true));
418+
}
419+
assertEquals(1, sysdb.getAllNotifications(childId).size());
420+
}
421+
422+
@Test
423+
public void testClientSendBulkPortableSerialization() throws Exception {
424+
var handle1 = dbos.startWorkflow(() -> service.sendTest(1));
425+
var handle2 = dbos.startWorkflow(() -> service.sendTest(2));
426+
427+
try (var client = pgContainer.dbosClient()) {
428+
client.sendBulk(
429+
List.of(
430+
new SendMessage(handle1.workflowId(), "hello", "test-topic", null),
431+
new SendMessage(handle2.workflowId(), "world", "test-topic", null)),
432+
DBOSClient.SendOptions.portable());
433+
}
434+
435+
assertEquals("1-hello", handle1.getResult());
436+
assertEquals("2-world", handle2.getResult());
437+
}
389438
}

transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ void testSendBulkEmpty() {
8585

8686
// Inside workflow: also a no-op, no step recorded
8787
String wfId = "empty-wf-" + UUID.randomUUID();
88-
service.sendBulkWorkflow(List.of());
88+
dbos.startWorkflow(() -> service.sendBulkWorkflow(List.of()), new StartWorkflowOptions(wfId))
89+
.getResult();
8990
assertEquals(0, dbos.listWorkflowSteps(wfId).size());
9091
}
9192

0 commit comments

Comments
 (0)