From f9a5518ea54f4286725d55d2239237f42bf2f8ca Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 27 May 2026 12:17:00 -0700 Subject: [PATCH 1/5] add sendBulk sysdb --- .../transact/database/SystemDatabase.java | 36 ++- .../database/dao/NotificationsDAO.java | 216 +++++++++------ .../dbos/transact/workflow/SendMessage.java | 19 ++ .../transact/notifications/SendBulkTest.java | 246 ++++++++++++++++++ 4 files changed, 431 insertions(+), 86 deletions(-) create mode 100644 transact/src/main/java/dev/dbos/transact/workflow/SendMessage.java create mode 100644 transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index 0e01b1a6..87a962ba 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -22,6 +22,7 @@ import dev.dbos.transact.workflow.ListWorkflowsInput; import dev.dbos.transact.workflow.NotificationInfo; import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; import dev.dbos.transact.workflow.StepInfo; @@ -454,6 +455,19 @@ public Optional checkChildWorkflow(String workflowUuid, int functionId) return dbRetry(() -> WorkflowDAO.checkChildWorkflow(ctx, workflowUuid, functionId)); } + public void sendBulk( + List messages, + String workflowId, + int stepId, + String functionName, + boolean sendToForks, + String serialization) { + dbRetry( + () -> + NotificationsDAO.sendBulk( + ctx, messages, workflowId, stepId, functionName, sendToForks, serialization)); + } + public void send( String workflowId, int stepId, @@ -462,18 +476,24 @@ public void send( String topic, String messageId, String serialization) { - dbRetry( - () -> - NotificationsDAO.send( - ctx, workflowId, stepId, destinationId, message, topic, messageId, serialization)); + sendBulk( + List.of(new SendMessage(destinationId, message, topic, messageId)), + workflowId, + stepId, + "DBOS.send", + false, + serialization); } public void sendDirect( String destinationId, Object message, String topic, String messageId, String serialization) { - dbRetry( - () -> - NotificationsDAO.sendDirect( - ctx, destinationId, message, topic, messageId, serialization)); + sendBulk( + List.of(new SendMessage(destinationId, message, topic, messageId)), + null, + -1, + "DBOS.send", + false, + serialization); } public Object recv( diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java index e8ef29ac..a3d482ae 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java @@ -10,6 +10,7 @@ import dev.dbos.transact.json.DBOSSerializer; import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.workflow.NotificationInfo; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.internal.StepResult; import java.sql.Connection; @@ -18,12 +19,21 @@ import java.sql.SQLException; import java.time.Duration; import java.time.Instant; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; +import static java.util.stream.Collectors.joining; + import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; import org.slf4j.Logger; @@ -35,77 +45,165 @@ private NotificationsDAO() {} private static final Logger logger = LoggerFactory.getLogger(NotificationsDAO.class); - public static void send( + private static Map> findForkDescendantsTxn( + Connection conn, String schema, List workflowIds) throws SQLException { + Map> children = new HashMap<>(); + Set seen = new LinkedHashSet<>(workflowIds); + List frontier = new ArrayList<>(new LinkedHashSet<>(workflowIds)); + + while (!frontier.isEmpty()) { + String placeholders = frontier.stream().map(x -> "?").collect(joining(",")); + String sql = + """ + SELECT workflow_uuid, forked_from FROM "%s".workflow_status + WHERE forked_from IN (%s) + """.formatted(schema, placeholders); + try (var stmt = conn.prepareStatement(sql)) { + for (int i = 0; i < frontier.size(); i++) stmt.setString(i + 1, frontier.get(i)); + List next = new ArrayList<>(); + try (var rs = stmt.executeQuery()) { + while (rs.next()) { + String forkedId = rs.getString("workflow_uuid"); + String forkedFrom = rs.getString("forked_from"); + children.computeIfAbsent(forkedFrom, k -> new ArrayList<>()).add(forkedId); + if (seen.add(forkedId)) next.add(forkedId); + } + } + frontier = next; + } + } + + Map> result = new LinkedHashMap<>(); + for (String root : workflowIds) { + if (result.containsKey(root)) continue; + Set descendants = new LinkedHashSet<>(); + Deque stack = new ArrayDeque<>(children.getOrDefault(root, List.of())); + while (!stack.isEmpty()) { + String node = stack.pop(); + if (!node.equals(root) && descendants.add(node)) { + stack.addAll(children.getOrDefault(node, List.of())); + } + } + result.put(root, descendants); + } + return result; + } + + public static void sendBulk( DbContext ctx, + List messages, String workflowId, int stepId, - String destinationId, - Object message, - String topic, - String messageId, + String functionName, + boolean sendToForks, String serialization) throws SQLException { + if (messages.isEmpty()) { + return; + } + + // Reject duplicate idempotency keys within the batch + var keys = + messages.stream() + .map(SendMessage::idempotencyKey) + .filter(Objects::nonNull) + .toList(); + if (keys.size() != keys.stream().distinct().count()) { + throw new IllegalArgumentException("Duplicate idempotency keys within sendBulk batch"); + } + DBOSSerializer serializer = ctx.serializer(); var startTime = System.currentTimeMillis(); - String functionName = "DBOS.send"; - String finalTopic = (topic != null) ? topic : Constants.DBOS_NULL_TOPIC; + + // Serialize each message once + record SerializedPair(SendMessage msg, SerializationUtil.SerializedResult serialized) {} + List pairs = new ArrayList<>(messages.size()); + for (var msg : messages) { + pairs.add(new SerializedPair(msg, SerializationUtil.serializeValue(msg.message(), serialization, serializer))); + } try (Connection conn = ctx.getConnection()) { conn.setAutoCommit(false); - try { - StepResult recordedOutput = - StepsDAO.checkStepResult(conn, ctx.schema(), workflowId, stepId, functionName); - - if (recordedOutput != null) { - logger.debug( - "Replaying send, id: {}, destination_uuid: {}, topic: {}", - stepId, - destinationId, - finalTopic); - conn.commit(); - return; - } else { - logger.debug( - "Running send, id: {}, destination_uuid: {}, topic: {}", - stepId, - destinationId, - finalTopic); + // Check for replay if inside a workflow + if (workflowId != null) { + StepResult recorded = + StepsDAO.checkStepResult(conn, ctx.schema(), workflowId, stepId, functionName); + if (recorded != null) { + logger.debug("Replaying sendBulk, workflowId: {}, stepId: {}", workflowId, stepId); + conn.commit(); + return; + } + } + + // Collect all destination IDs for fork resolution + Map> forkDescendants = Map.of(); + if (sendToForks) { + List destIds = pairs.stream().map(p -> p.msg().destinationId()).distinct().toList(); + forkDescendants = findForkDescendantsTxn(conn, ctx.schema(), destIds); } - var finalMessageId = (messageId != null) ? messageId : UUID.randomUUID().toString(); - var serializedMsg = SerializationUtil.serializeValue(message, serialization, serializer); + // Build insert rows: base dest + sorted descendants + record InsertRow(String destId, SerializationUtil.SerializedResult serialized, String topic, String messageUuid) {} + List rows = new ArrayList<>(); + for (var pair : pairs) { + var msg = pair.msg(); + String baseDest = msg.destinationId(); + String finalTopic = (msg.topic() != null) ? msg.topic() : Constants.DBOS_NULL_TOPIC; + + List destinations = new ArrayList<>(); + destinations.add(baseDest); + if (sendToForks) { + var desc = forkDescendants.getOrDefault(baseDest, Set.of()); + desc.stream().sorted().forEach(destinations::add); + } + + for (String dest : destinations) { + String uuid; + if (msg.idempotencyKey() != null) { + uuid = sendToForks ? msg.idempotencyKey() + "::" + dest : msg.idempotencyKey(); + } else { + uuid = UUID.randomUUID().toString(); + } + rows.add(new InsertRow(dest, pair.serialized(), finalTopic, uuid)); + } + } + // Batch-insert all rows final String sql = """ INSERT INTO "%s".notifications (destination_uuid, topic, message, serialization, message_uuid) VALUES (?, ?, ?, ?, ?) ON CONFLICT (message_uuid) DO NOTHING - """ - .formatted(ctx.schema()); + """.formatted(ctx.schema()); try (PreparedStatement stmt = conn.prepareStatement(sql)) { - stmt.setString(1, destinationId); - stmt.setString(2, finalTopic); - stmt.setString(3, serializedMsg.serializedValue()); - stmt.setString(4, serializedMsg.serialization()); - stmt.setString(5, finalMessageId); - stmt.executeUpdate(); + for (var row : rows) { + stmt.setString(1, row.destId()); + stmt.setString(2, row.topic()); + stmt.setString(3, row.serialized().serializedValue()); + stmt.setString(4, row.serialized().serialization()); + stmt.setString(5, row.messageUuid()); + stmt.addBatch(); + } + stmt.executeBatch(); } catch (SQLException e) { if ("23503".equals(e.getSQLState())) { - throw new DBOSNonExistentWorkflowException(destinationId); + // Find which destination was missing + String missingDest = rows.stream().map(InsertRow::destId).findFirst().orElse("unknown"); + throw new DBOSNonExistentWorkflowException(missingDest); } throw e; } - var output = new StepResult(workflowId, stepId, functionName, null, null, null, null); - StepsDAO.recordStepResult( - conn, ctx.schema(), output, startTime, System.currentTimeMillis()); + if (workflowId != null) { + var output = new StepResult(workflowId, stepId, functionName, null, null, null, null); + StepsDAO.recordStepResult(conn, ctx.schema(), output, startTime, System.currentTimeMillis()); + } conn.commit(); - } catch (Exception e) { try { conn.rollback(); @@ -117,44 +215,6 @@ ON CONFLICT (message_uuid) DO NOTHING } } - public static void sendDirect( - DbContext ctx, - String destinationId, - Object message, - String topic, - String messageId, - String serialization) - throws SQLException { - DBOSSerializer serializer = ctx.serializer(); - String finalTopic = (topic != null) ? topic : Constants.DBOS_NULL_TOPIC; - String finalMessageId = (messageId != null) ? messageId : UUID.randomUUID().toString(); - var serializedMsg = SerializationUtil.serializeValue(message, serialization, serializer); - - final String sql = - """ - INSERT INTO "%s".notifications - (destination_uuid, topic, message, message_uuid, serialization) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT (message_uuid) DO NOTHING - """ - .formatted(ctx.schema()); - - try (var conn = ctx.getConnection(); - var stmt = conn.prepareStatement(sql)) { - stmt.setString(1, destinationId); - stmt.setString(2, finalTopic); - stmt.setString(3, serializedMsg.serializedValue()); - stmt.setString(4, finalMessageId); - stmt.setString(5, serializedMsg.serialization()); - stmt.executeUpdate(); - } catch (SQLException e) { - if ("23503".equals(e.getSQLState())) { - throw new DBOSNonExistentWorkflowException(destinationId); - } - throw e; - } - } - public static Object recv( DbContext ctx, String workflowId, diff --git a/transact/src/main/java/dev/dbos/transact/workflow/SendMessage.java b/transact/src/main/java/dev/dbos/transact/workflow/SendMessage.java new file mode 100644 index 00000000..5402834a --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/workflow/SendMessage.java @@ -0,0 +1,19 @@ +package dev.dbos.transact.workflow; + +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; + +public record SendMessage( + @NonNull String destinationId, + @NonNull Object message, + @Nullable String topic, + @Nullable String idempotencyKey) { + + public SendMessage(@NonNull String destinationId, @NonNull Object message) { + this(destinationId, message, null, null); + } + + public SendMessage(@NonNull String destinationId, @NonNull Object message, @Nullable String topic) { + this(destinationId, message, topic, null); + } +} diff --git a/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java b/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java new file mode 100644 index 00000000..94913cc6 --- /dev/null +++ b/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java @@ -0,0 +1,246 @@ +package dev.dbos.transact.notifications; + +import static org.junit.jupiter.api.Assertions.*; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; +import dev.dbos.transact.StartWorkflowOptions; +import dev.dbos.transact.config.DBOSConfig; +import dev.dbos.transact.database.SystemDatabase; +import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException; +import dev.dbos.transact.utils.PgContainer; +import dev.dbos.transact.workflow.ForkOptions; +import dev.dbos.transact.workflow.SendMessage; +import dev.dbos.transact.workflow.Workflow; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; + +import org.junit.jupiter.api.AutoClose; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +interface BulkService { + String block(String topic); +} + +class BulkServiceImpl implements BulkService { + private final DBOS dbos; + + BulkServiceImpl(DBOS dbos) { + this.dbos = dbos; + } + + @Workflow + @Override + public String block(String topic) { + return dbos.recv(topic, Duration.ofSeconds(30)).orElse(null); + } +} + +class SendBulkTest { + + @AutoClose final PgContainer pgContainer = new PgContainer(); + + private DBOSConfig dbosConfig; + @AutoClose private DBOS dbos; + private BulkService service; + + @BeforeEach + void setup() { + dbosConfig = pgContainer.dbosConfig(); + dbos = new DBOS(dbosConfig); + service = dbos.registerProxy(BulkService.class, new BulkServiceImpl(dbos)); + dbos.launch(); + } + + private SystemDatabase db() { + return DBOSTestAccess.getSystemDatabase(dbos); + } + + /** Starts a blocking workflow that provides a valid workflow_status row. */ + private String startBlockingWorkflow(String id) throws Exception { + dbos.startWorkflow(() -> service.block(id), new StartWorkflowOptions(id)); + return id; + } + + // ------------------------------------------------------------------------- + // testSendBulkEmpty + // ------------------------------------------------------------------------- + + @Test + void testSendBulkEmpty() { + // Outside workflow: should be a complete no-op + assertDoesNotThrow(() -> db().sendBulk(List.of(), null, -1, "DBOS.sendBulk", false, null)); + } + + // ------------------------------------------------------------------------- + // testSendBulkBasic + // ------------------------------------------------------------------------- + + @Test + void testSendBulkBasic() throws Exception { + String dest1 = startBlockingWorkflow("dest1-" + UUID.randomUUID()); + String dest2 = startBlockingWorkflow("dest2-" + UUID.randomUUID()); + + // Two messages to dest1, one message to dest2, all in one call + db().sendBulk( + List.of( + new SendMessage(dest1, "hello-1"), + new SendMessage(dest1, "hello-2"), + new SendMessage(dest2, "world")), + null, + -1, + "DBOS.sendBulk", + false, + null); + + var notes1 = db().getAllNotifications(dest1); + assertEquals(2, notes1.size()); + + var notes2 = db().getAllNotifications(dest2); + assertEquals(1, notes2.size()); + assertEquals("world", notes2.get(0).message()); + } + + // ------------------------------------------------------------------------- + // testSendBulkNonExistentDest — entire tx rolls back + // ------------------------------------------------------------------------- + + @Test + void testSendBulkNonExistentDest() throws Exception { + String validDest = startBlockingWorkflow("valid-" + UUID.randomUUID()); + + assertThrows( + DBOSNonExistentWorkflowException.class, + () -> + db().sendBulk( + List.of( + new SendMessage(validDest, "first"), + new SendMessage("nonexistent-" + UUID.randomUUID(), "second")), + null, + -1, + "DBOS.sendBulk", + false, + null)); + + // Atomicity: valid dest must have received nothing + assertEquals(0, db().getAllNotifications(validDest).size()); + } + + // ------------------------------------------------------------------------- + // testSendBulkDuplicateKeyWithinBatch — rejected before transaction opens + // ------------------------------------------------------------------------- + + @Test + void testSendBulkDuplicateKeyWithinBatch() throws Exception { + String dest = startBlockingWorkflow("dupkey-dest-" + UUID.randomUUID()); + String key = UUID.randomUUID().toString(); + + assertThrows( + IllegalArgumentException.class, + () -> + db().sendBulk( + List.of( + new SendMessage(dest, "msg1", null, key), + new SendMessage(dest, "msg2", null, key)), + null, + -1, + "DBOS.sendBulk", + false, + null)); + + // Nothing was delivered + assertEquals(0, db().getAllNotifications(dest).size()); + } + + // ------------------------------------------------------------------------- + // testSendBulkIdempotencyKey — duplicate across calls is deduped + // ------------------------------------------------------------------------- + + @Test + void testSendBulkIdempotencyKey() throws Exception { + String dest = startBlockingWorkflow("idem-dest-" + UUID.randomUUID()); + String key = UUID.randomUUID().toString(); + + var messages = List.of(new SendMessage(dest, "hello", null, key)); + db().sendBulk(messages, null, -1, "DBOS.sendBulk", false, null); + db().sendBulk(messages, null, -1, "DBOS.sendBulk", false, null); + + // Only one notification despite two calls + assertEquals(1, db().getAllNotifications(dest).size()); + } + + // ------------------------------------------------------------------------- + // testSendBulkFromWorkflow — step is recorded; replay skips re-delivery + // ------------------------------------------------------------------------- + + @Test + void testSendBulkFromWorkflow() throws Exception { + // senderWf provides a real workflow_status row to act as the sender context + String senderWf = startBlockingWorkflow("sender-wf-" + UUID.randomUUID()); + String dest = startBlockingWorkflow("wf-dest-" + UUID.randomUUID()); + + var messages = List.of(new SendMessage(dest, "from-workflow")); + // Use stepId 100 to stay clear of the recv workflow's own step IDs + db().sendBulk(messages, senderWf, 100, "DBOS.sendBulk", false, null); + + assertEquals(1, db().getAllNotifications(dest).size()); + + // The sendBulk step should be recorded in the sender workflow's history + var steps = dbos.listWorkflowSteps(senderWf); + assertTrue(steps.stream().anyMatch(s -> "DBOS.sendBulk".equals(s.functionName()))); + + // Second call with same workflowId/stepId is a replay — no new delivery + db().sendBulk(messages, senderWf, 100, "DBOS.sendBulk", false, null); + assertEquals(1, db().getAllNotifications(dest).size()); + } + + // ------------------------------------------------------------------------- + // testSendBulkSendToForks — fan-out to recursive fork tree + // ------------------------------------------------------------------------- + + @Test + void testSendBulkSendToForks() throws Exception { + // Build the tree: parent -> child -> grandchild + String parentId = "parent-" + UUID.randomUUID(); + String childId = "child-" + UUID.randomUUID(); + String grandchildId = "grandchild-" + UUID.randomUUID(); + String unrelatedId = "unrelated-" + UUID.randomUUID(); + + startBlockingWorkflow(parentId); + startBlockingWorkflow(unrelatedId); + dbos.forkWorkflow(parentId, 0, new ForkOptions(childId)); + dbos.forkWorkflow(childId, 0, new ForkOptions(grandchildId)); + + String key = UUID.randomUUID().toString(); + db().sendBulk( + List.of(new SendMessage(parentId, "fan-out", null, key)), + null, + -1, + "DBOS.sendBulk", + true, + null); + + // All three should have received the message + assertEquals(1, db().getAllNotifications(parentId).size()); + assertEquals(1, db().getAllNotifications(childId).size()); + assertEquals(1, db().getAllNotifications(grandchildId).size()); + + // Unrelated workflow receives nothing + assertEquals(0, db().getAllNotifications(unrelatedId).size()); + + // Message UUIDs follow the {key}::{dest} pattern — sending again is idempotent + db().sendBulk( + List.of(new SendMessage(parentId, "fan-out", null, key)), + null, + -1, + "DBOS.sendBulk", + true, + null); + assertEquals(1, db().getAllNotifications(parentId).size()); + assertEquals(1, db().getAllNotifications(childId).size()); + assertEquals(1, db().getAllNotifications(grandchildId).size()); + } +} From d3f5adda2e435fbf92cd5bf22382f00ba4b9de0c Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 27 May 2026 12:44:18 -0700 Subject: [PATCH 2/5] add DBOS/DBOSClient api + tests --- .../src/main/java/dev/dbos/transact/DBOS.java | 34 +++- .../java/dev/dbos/transact/DBOSClient.java | 33 +++- .../transact/database/SystemDatabase.java | 30 +-- .../database/dao/NotificationsDAO.java | 32 ++-- .../dbos/transact/execution/DBOSExecutor.java | 53 ++++-- .../dbos/transact/workflow/SendMessage.java | 3 +- .../dev/dbos/transact/client/ClientTest.java | 19 ++ .../transact/database/SystemDatabaseTest.java | 17 +- .../transact/notifications/SendBulkTest.java | 175 ++++++++---------- 9 files changed, 230 insertions(+), 166 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/DBOS.java b/transact/src/main/java/dev/dbos/transact/DBOS.java index e00c5dcd..c1709dd7 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOS.java +++ b/transact/src/main/java/dev/dbos/transact/DBOS.java @@ -17,6 +17,7 @@ import dev.dbos.transact.workflow.QueueConflictResolution; import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.Step; import dev.dbos.transact.workflow.StepInfo; @@ -512,7 +513,7 @@ public void send( @NonNull Object message, @Nullable String topic, @Nullable String idempotencyKey) { - send(destinationId, message, topic, idempotencyKey, null); + send(destinationId, message, topic, idempotencyKey, null, false); } /** @@ -523,7 +524,7 @@ public void send( * @param topic topic to which the message is send */ public void send(@NonNull String destinationId, @NonNull Object message, @Nullable String topic) { - send(destinationId, message, topic, null, null); + send(destinationId, message, topic, null, null, false); } /** @@ -541,8 +542,35 @@ public void send( @Nullable String topic, @Nullable String idempotencyKey, @Nullable SerializationStrategy serialization) { + send(destinationId, message, topic, idempotencyKey, serialization, false); + } + + public void send( + @NonNull String destinationId, + @NonNull Object message, + @Nullable String topic, + @Nullable String idempotencyKey, + @Nullable SerializationStrategy serialization, + boolean sendToForks) { + if (serialization == null) serialization = SerializationStrategy.DEFAULT; + ensureLaunched("send") + .send(destinationId, message, topic, idempotencyKey, serialization, sendToForks); + } + + public void sendBulk(@NonNull List messages) { + sendBulk(messages, false, null); + } + + public void sendBulk(@NonNull List messages, boolean sendToForks) { + sendBulk(messages, sendToForks, null); + } + + public void sendBulk( + @NonNull List messages, + boolean sendToForks, + @Nullable SerializationStrategy serialization) { if (serialization == null) serialization = SerializationStrategy.DEFAULT; - ensureLaunched("send").send(destinationId, message, topic, idempotencyKey, serialization); + ensureLaunched("sendBulk").sendBulk(messages, sendToForks, serialization); } /** diff --git a/transact/src/main/java/dev/dbos/transact/DBOSClient.java b/transact/src/main/java/dev/dbos/transact/DBOSClient.java index a011b206..dcae6411 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOSClient.java +++ b/transact/src/main/java/dev/dbos/transact/DBOSClient.java @@ -17,6 +17,7 @@ import dev.dbos.transact.workflow.QueueConflictResolution; import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.Timeout; @@ -647,7 +648,7 @@ public EnqueueOptions( } /** Options for sending a message. */ - public record SendOptions(@Nullable SerializationStrategy serialization) { + public record SendOptions(@Nullable SerializationStrategy serialization, boolean sendToForks) { /** * Create SendOptions with default serialization strategy. Uses the system's default * serialization format for message encoding. @@ -655,7 +656,7 @@ public record SendOptions(@Nullable SerializationStrategy serialization) { * @return SendOptions configured with default serialization */ public static SendOptions defaults() { - return new SendOptions(SerializationStrategy.DEFAULT); + return new SendOptions(SerializationStrategy.DEFAULT, false); } /** @@ -665,7 +666,11 @@ public static SendOptions defaults() { * @return SendOptions configured with portable JSON serialization */ public static SendOptions portable() { - return new SendOptions(SerializationStrategy.PORTABLE); + return new SendOptions(SerializationStrategy.PORTABLE, false); + } + + public SendOptions withSendToForks(boolean v) { + return new SendOptions(serialization, v); } } @@ -705,8 +710,28 @@ public void send( (options != null && options.serialization() != null) ? options.serialization().formatName() : null; + boolean sendToForks = options != null && options.sendToForks(); + + systemDatabase.sendBulk( + List.of(new SendMessage(destinationId, message, topic, idempotencyKey)), + null, + -1, + "DBOS.send", + sendToForks, + serializationFormat); + } - systemDatabase.sendDirect(destinationId, message, topic, idempotencyKey, serializationFormat); + public void sendBulk(@NonNull List messages) { + sendBulk(messages, null); + } + + public void sendBulk(@NonNull List messages, @Nullable SendOptions options) { + String serializationFormat = + (options != null && options.serialization() != null) + ? options.serialization().formatName() + : null; + boolean sendToForks = options != null && options.sendToForks(); + systemDatabase.sendBulk(messages, null, -1, "DBOS.sendBulk", sendToForks, serializationFormat); } /** diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index 87a962ba..54280eed 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -22,9 +22,9 @@ import dev.dbos.transact.workflow.ListWorkflowsInput; import dev.dbos.transact.workflow.NotificationInfo; import dev.dbos.transact.workflow.Queue; -import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.VersionInfo; import dev.dbos.transact.workflow.WorkflowAggregateRow; @@ -468,34 +468,6 @@ public void sendBulk( ctx, messages, workflowId, stepId, functionName, sendToForks, serialization)); } - public void send( - String workflowId, - int stepId, - String destinationId, - Object message, - String topic, - String messageId, - String serialization) { - sendBulk( - List.of(new SendMessage(destinationId, message, topic, messageId)), - workflowId, - stepId, - "DBOS.send", - false, - serialization); - } - - public void sendDirect( - String destinationId, Object message, String topic, String messageId, String serialization) { - sendBulk( - List.of(new SendMessage(destinationId, message, topic, messageId)), - null, - -1, - "DBOS.send", - false, - serialization); - } - public Object recv( String workflowId, int stepId, int timeoutStepId, String topic, Duration timeout) { return dbRetry( diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java index a3d482ae..3e2a904d 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java @@ -1,5 +1,7 @@ package dev.dbos.transact.database.dao; +import static java.util.stream.Collectors.joining; + import dev.dbos.transact.Constants; import dev.dbos.transact.database.DbContext; import dev.dbos.transact.database.GetEventCaller; @@ -32,8 +34,6 @@ import java.util.Set; import java.util.UUID; -import static java.util.stream.Collectors.joining; - import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; import org.slf4j.Logger; @@ -57,7 +57,8 @@ private static Map> findForkDescendantsTxn( """ SELECT workflow_uuid, forked_from FROM "%s".workflow_status WHERE forked_from IN (%s) - """.formatted(schema, placeholders); + """ + .formatted(schema, placeholders); try (var stmt = conn.prepareStatement(sql)) { for (int i = 0; i < frontier.size(); i++) stmt.setString(i + 1, frontier.get(i)); List next = new ArrayList<>(); @@ -104,11 +105,7 @@ public static void sendBulk( } // Reject duplicate idempotency keys within the batch - var keys = - messages.stream() - .map(SendMessage::idempotencyKey) - .filter(Objects::nonNull) - .toList(); + var keys = messages.stream().map(SendMessage::idempotencyKey).filter(Objects::nonNull).toList(); if (keys.size() != keys.stream().distinct().count()) { throw new IllegalArgumentException("Duplicate idempotency keys within sendBulk batch"); } @@ -120,7 +117,9 @@ public static void sendBulk( record SerializedPair(SendMessage msg, SerializationUtil.SerializedResult serialized) {} List pairs = new ArrayList<>(messages.size()); for (var msg : messages) { - pairs.add(new SerializedPair(msg, SerializationUtil.serializeValue(msg.message(), serialization, serializer))); + pairs.add( + new SerializedPair( + msg, SerializationUtil.serializeValue(msg.message(), serialization, serializer))); } try (Connection conn = ctx.getConnection()) { @@ -140,12 +139,17 @@ record SerializedPair(SendMessage msg, SerializationUtil.SerializedResult serial // Collect all destination IDs for fork resolution Map> forkDescendants = Map.of(); if (sendToForks) { - List destIds = pairs.stream().map(p -> p.msg().destinationId()).distinct().toList(); + List destIds = + pairs.stream().map(p -> p.msg().destinationId()).distinct().toList(); forkDescendants = findForkDescendantsTxn(conn, ctx.schema(), destIds); } // Build insert rows: base dest + sorted descendants - record InsertRow(String destId, SerializationUtil.SerializedResult serialized, String topic, String messageUuid) {} + record InsertRow( + String destId, + SerializationUtil.SerializedResult serialized, + String topic, + String messageUuid) {} List rows = new ArrayList<>(); for (var pair : pairs) { var msg = pair.msg(); @@ -177,7 +181,8 @@ record InsertRow(String destId, SerializationUtil.SerializedResult serialized, S (destination_uuid, topic, message, serialization, message_uuid) VALUES (?, ?, ?, ?, ?) ON CONFLICT (message_uuid) DO NOTHING - """.formatted(ctx.schema()); + """ + .formatted(ctx.schema()); try (PreparedStatement stmt = conn.prepareStatement(sql)) { for (var row : rows) { @@ -200,7 +205,8 @@ ON CONFLICT (message_uuid) DO NOTHING if (workflowId != null) { var output = new StepResult(workflowId, stepId, functionName, null, null, null, null); - StepsDAO.recordStepResult(conn, ctx.schema(), output, startTime, System.currentTimeMillis()); + StepsDAO.recordStepResult( + conn, ctx.schema(), output, startTime, System.currentTimeMillis()); } conn.commit(); diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java index 0a5f5f6b..5c73c4da 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java +++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java @@ -33,6 +33,7 @@ import dev.dbos.transact.workflow.QueueConflictResolution; import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.StepOptions; @@ -450,30 +451,56 @@ public void fireAlertHandler(String name, String message, Map me // DBOS / DBOSClient API methods - public void send( - String destinationId, - Object message, - String topic, - String idempotencyKey, - SerializationStrategy serialization) { + private void sendBulkInternal( + List messages, + boolean sendToForks, + SerializationStrategy serialization, + String functionName) { DBOSContext ctx = DBOSContextHolder.get(); if (ctx.isInWorkflow() && !ctx.isInStep()) { int stepId = ctx.getAndIncrementFunctionId(); - systemDatabase.send( + systemDatabase.sendBulk( + messages, ctx.getWorkflowId(), stepId, - destinationId, - message, - topic, - idempotencyKey, + functionName, + sendToForks, serialization.formatName()); } else { - systemDatabase.sendDirect( - destinationId, message, topic, idempotencyKey, serialization.formatName()); + systemDatabase.sendBulk( + messages, null, -1, functionName, sendToForks, serialization.formatName()); } } + public void sendBulk( + List messages, boolean sendToForks, SerializationStrategy serialization) { + sendBulkInternal(messages, sendToForks, serialization, "DBOS.sendBulk"); + } + + public void send( + String destinationId, + Object message, + String topic, + String idempotencyKey, + SerializationStrategy serialization, + boolean sendToForks) { + sendBulkInternal( + List.of(new SendMessage(destinationId, message, topic, idempotencyKey)), + sendToForks, + serialization, + "DBOS.send"); + } + + public void send( + String destinationId, + Object message, + String topic, + String idempotencyKey, + SerializationStrategy serialization) { + send(destinationId, message, topic, idempotencyKey, serialization, false); + } + public Object recv(String topic, Duration timeout) { DBOSContext ctx = DBOSContextHolder.get(); if (!ctx.isInWorkflow()) { diff --git a/transact/src/main/java/dev/dbos/transact/workflow/SendMessage.java b/transact/src/main/java/dev/dbos/transact/workflow/SendMessage.java index 5402834a..c101a19f 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/SendMessage.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/SendMessage.java @@ -13,7 +13,8 @@ public SendMessage(@NonNull String destinationId, @NonNull Object message) { this(destinationId, message, null, null); } - public SendMessage(@NonNull String destinationId, @NonNull Object message, @Nullable String topic) { + public SendMessage( + @NonNull String destinationId, @NonNull Object message, @Nullable String topic) { this(destinationId, message, topic, null); } } diff --git a/transact/src/test/java/dev/dbos/transact/client/ClientTest.java b/transact/src/test/java/dev/dbos/transact/client/ClientTest.java index caf4938a..c75dd18e 100644 --- a/transact/src/test/java/dev/dbos/transact/client/ClientTest.java +++ b/transact/src/test/java/dev/dbos/transact/client/ClientTest.java @@ -15,10 +15,12 @@ import dev.dbos.transact.utils.DBUtils; import dev.dbos.transact.utils.PgContainer; import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.WorkflowState; import java.time.Duration; import java.time.Instant; +import java.util.List; import java.util.UUID; import com.zaxxer.hikari.HikariDataSource; @@ -367,4 +369,21 @@ public void versionCrudCrossApiConsistency() throws Exception { assertEquals(dbosVersionNames, clientVersionNames); } } + + @Test + public void testClientSendBulk() throws Exception { + // Start two recv workflows to act as destinations + var handle1 = dbos.startWorkflow(() -> service.sendTest(1)); + var handle2 = dbos.startWorkflow(() -> service.sendTest(2)); + + try (var client = pgContainer.dbosClient()) { + client.sendBulk( + List.of( + new SendMessage(handle1.workflowId(), "msg1", "test-topic", null), + new SendMessage(handle2.workflowId(), "msg2", "test-topic", null))); + } + + assertEquals("1-msg1", handle1.getResult()); + assertEquals("2-msg2", handle2.getResult()); + } } diff --git a/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java b/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java index 0a737276..f015b11b 100644 --- a/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java +++ b/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java @@ -26,6 +26,7 @@ import dev.dbos.transact.workflow.Queue; import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.VersionInfo; import dev.dbos.transact.workflow.WorkflowDelay; import dev.dbos.transact.workflow.WorkflowSchedule; @@ -1547,8 +1548,20 @@ public void testGetAllNotifications() throws Exception { var wfId = "get-all-notifications-wf-1"; sysdb.importWorkflow(List.of(buildEmptyWorkflow(wfId))); - sysdb.sendDirect(wfId, "message1", "topic1", "notif-uuid-1", null); - sysdb.sendDirect(wfId, "message2", "topic2", "notif-uuid-2", null); + sysdb.sendBulk( + List.of(new SendMessage(wfId, "message1", "topic1", "notif-uuid-1")), + null, + -1, + "DBOS.send", + false, + null); + sysdb.sendBulk( + List.of(new SendMessage(wfId, "message2", "topic2", "notif-uuid-2")), + null, + -1, + "DBOS.send", + false, + null); var notifications = sysdb.getAllNotifications(wfId); diff --git a/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java b/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java index 94913cc6..1fc44e29 100644 --- a/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java +++ b/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java @@ -11,6 +11,7 @@ import dev.dbos.transact.utils.PgContainer; import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.SendMessage; +import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.Workflow; import java.time.Duration; @@ -23,6 +24,8 @@ interface BulkService { String block(String topic); + + void sendBulkWorkflow(List messages); } class BulkServiceImpl implements BulkService { @@ -37,6 +40,12 @@ class BulkServiceImpl implements BulkService { public String block(String topic) { return dbos.recv(topic, Duration.ofSeconds(30)).orElse(null); } + + @Workflow + @Override + public void sendBulkWorkflow(List messages) { + dbos.sendBulk(messages); + } } class SendBulkTest { @@ -59,8 +68,8 @@ private SystemDatabase db() { return DBOSTestAccess.getSystemDatabase(dbos); } - /** Starts a blocking workflow that provides a valid workflow_status row. */ - private String startBlockingWorkflow(String id) throws Exception { + /** Starts a blocking recv workflow, providing a valid workflow_status row as a destination. */ + private String startDest(String id) throws Exception { dbos.startWorkflow(() -> service.block(id), new StartWorkflowOptions(id)); return id; } @@ -71,174 +80,138 @@ private String startBlockingWorkflow(String id) throws Exception { @Test void testSendBulkEmpty() { - // Outside workflow: should be a complete no-op - assertDoesNotThrow(() -> db().sendBulk(List.of(), null, -1, "DBOS.sendBulk", false, null)); + // Outside workflow: no-op + assertDoesNotThrow(() -> dbos.sendBulk(List.of())); + + // Inside workflow: also a no-op, no step recorded + String wfId = "empty-wf-" + UUID.randomUUID(); + service.sendBulkWorkflow(List.of()); + assertEquals(0, dbos.listWorkflowSteps(wfId).size()); } // ------------------------------------------------------------------------- - // testSendBulkBasic + // testSendBulk // ------------------------------------------------------------------------- @Test - void testSendBulkBasic() throws Exception { - String dest1 = startBlockingWorkflow("dest1-" + UUID.randomUUID()); - String dest2 = startBlockingWorkflow("dest2-" + UUID.randomUUID()); + void testSendBulk() throws Exception { + String dest1 = startDest("dest1-" + UUID.randomUUID()); + String dest2 = startDest("dest2-" + UUID.randomUUID()); - // Two messages to dest1, one message to dest2, all in one call - db().sendBulk( + // Multi-message to one dest + one message to another, all atomic + dbos.sendBulk( List.of( new SendMessage(dest1, "hello-1"), new SendMessage(dest1, "hello-2"), - new SendMessage(dest2, "world")), - null, - -1, - "DBOS.sendBulk", - false, - null); - - var notes1 = db().getAllNotifications(dest1); - assertEquals(2, notes1.size()); - - var notes2 = db().getAllNotifications(dest2); - assertEquals(1, notes2.size()); - assertEquals("world", notes2.get(0).message()); - } + new SendMessage(dest2, "world"))); - // ------------------------------------------------------------------------- - // testSendBulkNonExistentDest — entire tx rolls back - // ------------------------------------------------------------------------- - - @Test - void testSendBulkNonExistentDest() throws Exception { - String validDest = startBlockingWorkflow("valid-" + UUID.randomUUID()); + assertEquals(2, db().getAllNotifications(dest1).size()); + assertEquals(1, db().getAllNotifications(dest2).size()); + assertEquals("world", db().getAllNotifications(dest2).get(0).message()); + // Non-existent destination rolls back the entire batch + String validDest = startDest("valid-" + UUID.randomUUID()); assertThrows( DBOSNonExistentWorkflowException.class, () -> - db().sendBulk( + dbos.sendBulk( List.of( new SendMessage(validDest, "first"), - new SendMessage("nonexistent-" + UUID.randomUUID(), "second")), - null, - -1, - "DBOS.sendBulk", - false, - null)); - - // Atomicity: valid dest must have received nothing + new SendMessage("nonexistent-" + UUID.randomUUID(), "second")))); assertEquals(0, db().getAllNotifications(validDest).size()); } // ------------------------------------------------------------------------- - // testSendBulkDuplicateKeyWithinBatch — rejected before transaction opens + // testSendBulkFromWorkflow // ------------------------------------------------------------------------- @Test - void testSendBulkDuplicateKeyWithinBatch() throws Exception { - String dest = startBlockingWorkflow("dupkey-dest-" + UUID.randomUUID()); - String key = UUID.randomUUID().toString(); + void testSendBulkFromWorkflow() throws Exception { + String dest = startDest("wf-dest-" + UUID.randomUUID()); - assertThrows( - IllegalArgumentException.class, - () -> - db().sendBulk( - List.of( - new SendMessage(dest, "msg1", null, key), - new SendMessage(dest, "msg2", null, key)), - null, - -1, - "DBOS.sendBulk", - false, - null)); + // Run the sendBulk workflow and wait for it to complete + String senderWfId = "sender-wf-" + UUID.randomUUID(); + var handle = + dbos.startWorkflow( + () -> service.sendBulkWorkflow(List.of(new SendMessage(dest, "from-workflow"))), + new StartWorkflowOptions(senderWfId)); + handle.getResult(); - // Nothing was delivered - assertEquals(0, db().getAllNotifications(dest).size()); + assertEquals(1, db().getAllNotifications(dest).size()); + + // The entire bulk send is recorded as exactly one step + List steps = dbos.listWorkflowSteps(senderWfId); + assertEquals(1, steps.size()); + assertEquals("DBOS.sendBulk", steps.get(0).functionName()); } // ------------------------------------------------------------------------- - // testSendBulkIdempotencyKey — duplicate across calls is deduped + // testSendBulkIdempotencyKey // ------------------------------------------------------------------------- @Test void testSendBulkIdempotencyKey() throws Exception { - String dest = startBlockingWorkflow("idem-dest-" + UUID.randomUUID()); + String dest = startDest("idem-dest-" + UUID.randomUUID()); String key = UUID.randomUUID().toString(); var messages = List.of(new SendMessage(dest, "hello", null, key)); - db().sendBulk(messages, null, -1, "DBOS.sendBulk", false, null); - db().sendBulk(messages, null, -1, "DBOS.sendBulk", false, null); + dbos.sendBulk(messages); + dbos.sendBulk(messages); // Only one notification despite two calls assertEquals(1, db().getAllNotifications(dest).size()); } // ------------------------------------------------------------------------- - // testSendBulkFromWorkflow — step is recorded; replay skips re-delivery + // testSendBulkDuplicateKeyWithinBatch // ------------------------------------------------------------------------- @Test - void testSendBulkFromWorkflow() throws Exception { - // senderWf provides a real workflow_status row to act as the sender context - String senderWf = startBlockingWorkflow("sender-wf-" + UUID.randomUUID()); - String dest = startBlockingWorkflow("wf-dest-" + UUID.randomUUID()); - - var messages = List.of(new SendMessage(dest, "from-workflow")); - // Use stepId 100 to stay clear of the recv workflow's own step IDs - db().sendBulk(messages, senderWf, 100, "DBOS.sendBulk", false, null); - - assertEquals(1, db().getAllNotifications(dest).size()); + void testSendBulkDuplicateKeyWithinBatch() throws Exception { + String dest = startDest("dupkey-dest-" + UUID.randomUUID()); + String key = UUID.randomUUID().toString(); - // The sendBulk step should be recorded in the sender workflow's history - var steps = dbos.listWorkflowSteps(senderWf); - assertTrue(steps.stream().anyMatch(s -> "DBOS.sendBulk".equals(s.functionName()))); + assertThrows( + IllegalArgumentException.class, + () -> + dbos.sendBulk( + List.of( + new SendMessage(dest, "msg1", null, key), + new SendMessage(dest, "msg2", null, key)))); - // Second call with same workflowId/stepId is a replay — no new delivery - db().sendBulk(messages, senderWf, 100, "DBOS.sendBulk", false, null); - assertEquals(1, db().getAllNotifications(dest).size()); + // Nothing was delivered + assertEquals(0, db().getAllNotifications(dest).size()); } // ------------------------------------------------------------------------- - // testSendBulkSendToForks — fan-out to recursive fork tree + // testSendBulkSendToForks // ------------------------------------------------------------------------- @Test void testSendBulkSendToForks() throws Exception { - // Build the tree: parent -> child -> grandchild String parentId = "parent-" + UUID.randomUUID(); String childId = "child-" + UUID.randomUUID(); String grandchildId = "grandchild-" + UUID.randomUUID(); String unrelatedId = "unrelated-" + UUID.randomUUID(); - startBlockingWorkflow(parentId); - startBlockingWorkflow(unrelatedId); + startDest(parentId); + startDest(unrelatedId); dbos.forkWorkflow(parentId, 0, new ForkOptions(childId)); dbos.forkWorkflow(childId, 0, new ForkOptions(grandchildId)); String key = UUID.randomUUID().toString(); - db().sendBulk( - List.of(new SendMessage(parentId, "fan-out", null, key)), - null, - -1, - "DBOS.sendBulk", - true, - null); - - // All three should have received the message + dbos.sendBulk(List.of(new SendMessage(parentId, "fan-out", null, key)), true); + + // Fan-out reaches all descendants assertEquals(1, db().getAllNotifications(parentId).size()); assertEquals(1, db().getAllNotifications(childId).size()); assertEquals(1, db().getAllNotifications(grandchildId).size()); - // Unrelated workflow receives nothing + // Unrelated workflow untouched assertEquals(0, db().getAllNotifications(unrelatedId).size()); - // Message UUIDs follow the {key}::{dest} pattern — sending again is idempotent - db().sendBulk( - List.of(new SendMessage(parentId, "fan-out", null, key)), - null, - -1, - "DBOS.sendBulk", - true, - null); + // Re-send with same key is idempotent (ON CONFLICT DO NOTHING via {key}::{dest} UUIDs) + dbos.sendBulk(List.of(new SendMessage(parentId, "fan-out", null, key)), true); assertEquals(1, db().getAllNotifications(parentId).size()); assertEquals(1, db().getAllNotifications(childId).size()); assertEquals(1, db().getAllNotifications(grandchildId).size()); From a84c04aae9a86dd71272639dfee1d165bfac20a7 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 27 May 2026 12:58:35 -0700 Subject: [PATCH 3/5] copilot feedback --- .../database/dao/NotificationsDAO.java | 6 +-- .../DBOSNonExistentWorkflowException.java | 5 +- .../dev/dbos/transact/client/ClientTest.java | 49 +++++++++++++++++++ .../transact/notifications/SendBulkTest.java | 3 +- 4 files changed, 58 insertions(+), 5 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java index 3e2a904d..4e637d07 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java @@ -196,9 +196,9 @@ ON CONFLICT (message_uuid) DO NOTHING stmt.executeBatch(); } catch (SQLException e) { if ("23503".equals(e.getSQLState())) { - // Find which destination was missing - String missingDest = rows.stream().map(InsertRow::destId).findFirst().orElse("unknown"); - throw new DBOSNonExistentWorkflowException(missingDest); + var distinctDests = rows.stream().map(InsertRow::destId).distinct().toList(); + throw new DBOSNonExistentWorkflowException( + distinctDests.size() == 1 ? distinctDests.get(0) : null); } throw e; } diff --git a/transact/src/main/java/dev/dbos/transact/exceptions/DBOSNonExistentWorkflowException.java b/transact/src/main/java/dev/dbos/transact/exceptions/DBOSNonExistentWorkflowException.java index 60c32b8a..c6bf1f47 100644 --- a/transact/src/main/java/dev/dbos/transact/exceptions/DBOSNonExistentWorkflowException.java +++ b/transact/src/main/java/dev/dbos/transact/exceptions/DBOSNonExistentWorkflowException.java @@ -11,7 +11,10 @@ public class DBOSNonExistentWorkflowException extends RuntimeException { private final String workflowId; public DBOSNonExistentWorkflowException(String workflowId) { - super(String.format("Workflow does not exist %s", workflowId)); + super( + workflowId != null + ? String.format("Workflow does not exist %s", workflowId) + : "One or more destination workflows do not exist"); this.workflowId = workflowId; } diff --git a/transact/src/test/java/dev/dbos/transact/client/ClientTest.java b/transact/src/test/java/dev/dbos/transact/client/ClientTest.java index c75dd18e..c2f3d2a7 100644 --- a/transact/src/test/java/dev/dbos/transact/client/ClientTest.java +++ b/transact/src/test/java/dev/dbos/transact/client/ClientTest.java @@ -14,6 +14,7 @@ import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException; import dev.dbos.transact.utils.DBUtils; import dev.dbos.transact.utils.PgContainer; +import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.Queue; import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.WorkflowState; @@ -386,4 +387,52 @@ public void testClientSendBulk() throws Exception { assertEquals("1-msg1", handle1.getResult()); assertEquals("2-msg2", handle2.getResult()); } + + @Test + public void testClientSendBulkSendToForks() throws Exception { + // Parent is a running workflow; child is a fork (ENQUEUED, not executing) + var parent = dbos.startWorkflow(() -> service.sendTest(99)); + var childId = "fork-child-" + UUID.randomUUID(); + dbos.forkWorkflow(parent.workflowId(), 0, new ForkOptions(childId)); + + var sysdb = DBOSTestAccess.getSystemDatabase(dbos); + String key = UUID.randomUUID().toString(); + + try (var client = pgContainer.dbosClient()) { + client.sendBulk( + List.of(new SendMessage(parent.workflowId(), "fan-out", "test-topic", key)), + DBOSClient.SendOptions.defaults().withSendToForks(true)); + } + + // Parent receives the message and completes + assertEquals("99-fan-out", parent.getResult()); + + // Fork also received the notification (verified via DB since it is not executing) + assertEquals(1, sysdb.getAllNotifications(childId).size()); + + // Re-send with same key is idempotent — {key}::{dest} UUIDs dedup via ON CONFLICT DO NOTHING + try (var client = pgContainer.dbosClient()) { + client.sendBulk( + List.of(new SendMessage(parent.workflowId(), "fan-out", "test-topic", key)), + DBOSClient.SendOptions.defaults().withSendToForks(true)); + } + assertEquals(1, sysdb.getAllNotifications(childId).size()); + } + + @Test + public void testClientSendBulkPortableSerialization() throws Exception { + var handle1 = dbos.startWorkflow(() -> service.sendTest(1)); + var handle2 = dbos.startWorkflow(() -> service.sendTest(2)); + + try (var client = pgContainer.dbosClient()) { + client.sendBulk( + List.of( + new SendMessage(handle1.workflowId(), "hello", "test-topic", null), + new SendMessage(handle2.workflowId(), "world", "test-topic", null)), + DBOSClient.SendOptions.portable()); + } + + assertEquals("1-hello", handle1.getResult()); + assertEquals("2-world", handle2.getResult()); + } } diff --git a/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java b/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java index 1fc44e29..c9983987 100644 --- a/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java +++ b/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java @@ -85,7 +85,8 @@ void testSendBulkEmpty() { // Inside workflow: also a no-op, no step recorded String wfId = "empty-wf-" + UUID.randomUUID(); - service.sendBulkWorkflow(List.of()); + dbos.startWorkflow(() -> service.sendBulkWorkflow(List.of()), new StartWorkflowOptions(wfId)) + .getResult(); assertEquals(0, dbos.listWorkflowSteps(wfId).size()); } From 198cf9872fb72a07be4fa2cb36f6cde265f71504 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 27 May 2026 14:13:00 -0700 Subject: [PATCH 4/5] java docs --- .../src/main/java/dev/dbos/transact/DBOS.java | 33 ++++++++++++++- .../java/dev/dbos/transact/DBOSClient.java | 40 +++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/transact/src/main/java/dev/dbos/transact/DBOS.java b/transact/src/main/java/dev/dbos/transact/DBOS.java index c1709dd7..41d2670c 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOS.java +++ b/transact/src/main/java/dev/dbos/transact/DBOS.java @@ -131,7 +131,7 @@ public static String version() { /** * Register a lifecycle listener that receives callbacks when DBOS is launched or shut down * - * @param listener + * @param listener the lifecycle listener to register */ private void registerLifecycleListener(@NonNull DBOSLifecycleListener listener) { if (dbosExecutor.get() != null) { @@ -545,6 +545,17 @@ public void send( send(destinationId, message, topic, idempotencyKey, serialization, false); } + /** + * Send a message to a workflow with all options + * + * @param destinationId recipient of the message + * @param message message to be sent + * @param topic topic to which the message is sent + * @param idempotencyKey optional idempotency key for exactly-once send + * @param serialization serialization strategy to use (null for default) + * @param sendToForks if true, also deliver the message to any forked copies of the destination + * workflow + */ public void send( @NonNull String destinationId, @NonNull Object message, @@ -557,14 +568,34 @@ public void send( .send(destinationId, message, topic, idempotencyKey, serialization, sendToForks); } + /** + * Send multiple messages to workflows using default options + * + * @param messages list of messages to send + */ public void sendBulk(@NonNull List messages) { sendBulk(messages, false, null); } + /** + * Send multiple messages to workflows + * + * @param messages list of messages to send + * @param sendToForks if true, also deliver each message to any forked copies of the destination + * workflow + */ public void sendBulk(@NonNull List messages, boolean sendToForks) { sendBulk(messages, sendToForks, null); } + /** + * Send multiple messages to workflows with full options + * + * @param messages list of messages to send + * @param sendToForks if true, also deliver each message to any forked copies of the destination + * workflow + * @param serialization serialization strategy to use (null for default) + */ public void sendBulk( @NonNull List messages, boolean sendToForks, diff --git a/transact/src/main/java/dev/dbos/transact/DBOSClient.java b/transact/src/main/java/dev/dbos/transact/DBOSClient.java index dcae6411..5bffc3da 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOSClient.java +++ b/transact/src/main/java/dev/dbos/transact/DBOSClient.java @@ -120,6 +120,16 @@ public DBOSClient( this(url, user, password, schema, serializer, true); } + /** + * Construct a DBOSClient, by providing system database access credentials + * + * @param url System database JDBC URL + * @param user System database user + * @param password System database credential / password + * @param schema Database schema for DBOS tables + * @param serializer Custom serializer for serialization/deserialization + * @param useListenNotify if true, use PostgreSQL LISTEN/NOTIFY for real-time event notifications + */ public DBOSClient( @NonNull String url, @NonNull String user, @@ -565,6 +575,18 @@ public EnqueueOptions( } } + /** + * Enqueue a workflow with explicit serialization format and support for both positional and named + * arguments. + * + * @param Return type of workflow function + * @param Exception thrown by workflow function + * @param options {@link EnqueueOptions} for configuring the workflow enqueue + * @param positionalArgs Positional arguments to pass to the workflow function + * @param namedArgs Named arguments to pass to the workflow function (e.g., for Python kwargs) + * @param serializationFormat Serialization format string to use (null for default) + * @return WorkflowHandle for retrieving workflow ID, status, and results + */ public @NonNull WorkflowHandle enqueueWorkflow( @NonNull EnqueueOptions options, @Nullable Object[] positionalArgs, @@ -669,6 +691,12 @@ public static SendOptions portable() { return new SendOptions(SerializationStrategy.PORTABLE, false); } + /** + * Create a new SendOptions with the sendToForks flag set. + * + * @param v if true, deliver the message to any forked copies of the destination workflow + * @return new SendOptions with the sendToForks flag updated + */ public SendOptions withSendToForks(boolean v) { return new SendOptions(serialization, v); } @@ -721,10 +749,22 @@ public void send( serializationFormat); } + /** + * Send multiple messages to workflows using default options + * + * @param messages list of messages to send + */ public void sendBulk(@NonNull List messages) { sendBulk(messages, null); } + /** + * Send multiple messages to workflows with serialization options + * + * @param messages list of messages to send + * @param options optional send options including serialization type and fork delivery; null for + * defaults + */ public void sendBulk(@NonNull List messages, @Nullable SendOptions options) { String serializationFormat = (options != null && options.serialization() != null) From 67bf930218b631d5e62e7695a23bdea4eb6cecb6 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 27 May 2026 14:15:09 -0700 Subject: [PATCH 5/5] always suffix idempotency keys --- .../dbos/transact/database/dao/NotificationsDAO.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java index 4e637d07..2ef27784 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java @@ -164,13 +164,11 @@ record InsertRow( } for (String dest : destinations) { - String uuid; - if (msg.idempotencyKey() != null) { - uuid = sendToForks ? msg.idempotencyKey() + "::" + dest : msg.idempotencyKey(); - } else { - uuid = UUID.randomUUID().toString(); - } - rows.add(new InsertRow(dest, pair.serialized(), finalTopic, uuid)); + var wfid = + msg.idempotencyKey() != null + ? msg.idempotencyKey() + "::" + dest + : UUID.randomUUID().toString(); + rows.add(new InsertRow(dest, pair.serialized(), finalTopic, wfid)); } }