diff --git a/transact/src/main/java/dev/dbos/transact/DBOS.java b/transact/src/main/java/dev/dbos/transact/DBOS.java index e00c5dcdf..41d2670cd 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOS.java +++ b/transact/src/main/java/dev/dbos/transact/DBOS.java @@ -17,6 +17,7 @@ import dev.dbos.transact.workflow.QueueConflictResolution; import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.Step; import dev.dbos.transact.workflow.StepInfo; @@ -130,7 +131,7 @@ public static String version() { /** * Register a lifecycle listener that receives callbacks when DBOS is launched or shut down * - * @param listener + * @param listener the lifecycle listener to register */ private void registerLifecycleListener(@NonNull DBOSLifecycleListener listener) { if (dbosExecutor.get() != null) { @@ -512,7 +513,7 @@ public void send( @NonNull Object message, @Nullable String topic, @Nullable String idempotencyKey) { - send(destinationId, message, topic, idempotencyKey, null); + send(destinationId, message, topic, idempotencyKey, null, false); } /** @@ -523,7 +524,7 @@ public void send( * @param topic topic to which the message is send */ public void send(@NonNull String destinationId, @NonNull Object message, @Nullable String topic) { - send(destinationId, message, topic, null, null); + send(destinationId, message, topic, null, null, false); } /** @@ -541,8 +542,66 @@ public void send( @Nullable String topic, @Nullable String idempotencyKey, @Nullable SerializationStrategy serialization) { + send(destinationId, message, topic, idempotencyKey, serialization, false); + } + + /** + * Send a message to a workflow with all options + * + * @param destinationId recipient of the message + * @param message message to be sent + * @param topic topic to which the message is sent + * @param idempotencyKey optional idempotency key for exactly-once send + * @param serialization serialization strategy to use (null for default) + * @param sendToForks if true, also deliver the message to any forked copies of the destination + * workflow + */ + public void send( + @NonNull String destinationId, + @NonNull Object message, + @Nullable String topic, + @Nullable String idempotencyKey, + @Nullable SerializationStrategy serialization, + boolean sendToForks) { + if (serialization == null) serialization = SerializationStrategy.DEFAULT; + ensureLaunched("send") + .send(destinationId, message, topic, idempotencyKey, serialization, sendToForks); + } + + /** + * Send multiple messages to workflows using default options + * + * @param messages list of messages to send + */ + public void sendBulk(@NonNull List messages) { + sendBulk(messages, false, null); + } + + /** + * Send multiple messages to workflows + * + * @param messages list of messages to send + * @param sendToForks if true, also deliver each message to any forked copies of the destination + * workflow + */ + public void sendBulk(@NonNull List messages, boolean sendToForks) { + sendBulk(messages, sendToForks, null); + } + + /** + * Send multiple messages to workflows with full options + * + * @param messages list of messages to send + * @param sendToForks if true, also deliver each message to any forked copies of the destination + * workflow + * @param serialization serialization strategy to use (null for default) + */ + public void sendBulk( + @NonNull List messages, + boolean sendToForks, + @Nullable SerializationStrategy serialization) { if (serialization == null) serialization = SerializationStrategy.DEFAULT; - ensureLaunched("send").send(destinationId, message, topic, idempotencyKey, serialization); + ensureLaunched("sendBulk").sendBulk(messages, sendToForks, serialization); } /** diff --git a/transact/src/main/java/dev/dbos/transact/DBOSClient.java b/transact/src/main/java/dev/dbos/transact/DBOSClient.java index a011b2064..5bffc3daa 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOSClient.java +++ b/transact/src/main/java/dev/dbos/transact/DBOSClient.java @@ -17,6 +17,7 @@ import dev.dbos.transact.workflow.QueueConflictResolution; import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.Timeout; @@ -119,6 +120,16 @@ public DBOSClient( this(url, user, password, schema, serializer, true); } + /** + * Construct a DBOSClient, by providing system database access credentials + * + * @param url System database JDBC URL + * @param user System database user + * @param password System database credential / password + * @param schema Database schema for DBOS tables + * @param serializer Custom serializer for serialization/deserialization + * @param useListenNotify if true, use PostgreSQL LISTEN/NOTIFY for real-time event notifications + */ public DBOSClient( @NonNull String url, @NonNull String user, @@ -564,6 +575,18 @@ public EnqueueOptions( } } + /** + * Enqueue a workflow with explicit serialization format and support for both positional and named + * arguments. + * + * @param Return type of workflow function + * @param Exception thrown by workflow function + * @param options {@link EnqueueOptions} for configuring the workflow enqueue + * @param positionalArgs Positional arguments to pass to the workflow function + * @param namedArgs Named arguments to pass to the workflow function (e.g., for Python kwargs) + * @param serializationFormat Serialization format string to use (null for default) + * @return WorkflowHandle for retrieving workflow ID, status, and results + */ public @NonNull WorkflowHandle enqueueWorkflow( @NonNull EnqueueOptions options, @Nullable Object[] positionalArgs, @@ -647,7 +670,7 @@ public EnqueueOptions( } /** Options for sending a message. */ - public record SendOptions(@Nullable SerializationStrategy serialization) { + public record SendOptions(@Nullable SerializationStrategy serialization, boolean sendToForks) { /** * Create SendOptions with default serialization strategy. Uses the system's default * serialization format for message encoding. @@ -655,7 +678,7 @@ public record SendOptions(@Nullable SerializationStrategy serialization) { * @return SendOptions configured with default serialization */ public static SendOptions defaults() { - return new SendOptions(SerializationStrategy.DEFAULT); + return new SendOptions(SerializationStrategy.DEFAULT, false); } /** @@ -665,7 +688,17 @@ public static SendOptions defaults() { * @return SendOptions configured with portable JSON serialization */ public static SendOptions portable() { - return new SendOptions(SerializationStrategy.PORTABLE); + return new SendOptions(SerializationStrategy.PORTABLE, false); + } + + /** + * Create a new SendOptions with the sendToForks flag set. + * + * @param v if true, deliver the message to any forked copies of the destination workflow + * @return new SendOptions with the sendToForks flag updated + */ + public SendOptions withSendToForks(boolean v) { + return new SendOptions(serialization, v); } } @@ -705,8 +738,40 @@ public void send( (options != null && options.serialization() != null) ? options.serialization().formatName() : null; + boolean sendToForks = options != null && options.sendToForks(); + + systemDatabase.sendBulk( + List.of(new SendMessage(destinationId, message, topic, idempotencyKey)), + null, + -1, + "DBOS.send", + sendToForks, + serializationFormat); + } + + /** + * Send multiple messages to workflows using default options + * + * @param messages list of messages to send + */ + public void sendBulk(@NonNull List messages) { + sendBulk(messages, null); + } - systemDatabase.sendDirect(destinationId, message, topic, idempotencyKey, serializationFormat); + /** + * Send multiple messages to workflows with serialization options + * + * @param messages list of messages to send + * @param options optional send options including serialization type and fork delivery; null for + * defaults + */ + public void sendBulk(@NonNull List messages, @Nullable SendOptions options) { + String serializationFormat = + (options != null && options.serialization() != null) + ? options.serialization().formatName() + : null; + boolean sendToForks = options != null && options.sendToForks(); + systemDatabase.sendBulk(messages, null, -1, "DBOS.sendBulk", sendToForks, serializationFormat); } /** diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index 0e01b1a67..54280eed4 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -24,6 +24,7 @@ import dev.dbos.transact.workflow.Queue; import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.VersionInfo; import dev.dbos.transact.workflow.WorkflowAggregateRow; @@ -454,26 +455,17 @@ public Optional checkChildWorkflow(String workflowUuid, int functionId) return dbRetry(() -> WorkflowDAO.checkChildWorkflow(ctx, workflowUuid, functionId)); } - public void send( + public void sendBulk( + List messages, String workflowId, int stepId, - String destinationId, - Object message, - String topic, - String messageId, + String functionName, + boolean sendToForks, String serialization) { dbRetry( () -> - NotificationsDAO.send( - ctx, workflowId, stepId, destinationId, message, topic, messageId, serialization)); - } - - public void sendDirect( - String destinationId, Object message, String topic, String messageId, String serialization) { - dbRetry( - () -> - NotificationsDAO.sendDirect( - ctx, destinationId, message, topic, messageId, serialization)); + NotificationsDAO.sendBulk( + ctx, messages, workflowId, stepId, functionName, sendToForks, serialization)); } public Object recv( diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java index e8ef29ac5..2ef277846 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java @@ -1,5 +1,7 @@ package dev.dbos.transact.database.dao; +import static java.util.stream.Collectors.joining; + import dev.dbos.transact.Constants; import dev.dbos.transact.database.DbContext; import dev.dbos.transact.database.GetEventCaller; @@ -10,6 +12,7 @@ import dev.dbos.transact.json.DBOSSerializer; import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.workflow.NotificationInfo; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.internal.StepResult; import java.sql.Connection; @@ -18,10 +21,17 @@ import java.sql.SQLException; import java.time.Duration; import java.time.Instant; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; import org.jspecify.annotations.NonNull; @@ -35,48 +45,134 @@ private NotificationsDAO() {} private static final Logger logger = LoggerFactory.getLogger(NotificationsDAO.class); - public static void send( + private static Map> findForkDescendantsTxn( + Connection conn, String schema, List workflowIds) throws SQLException { + Map> children = new HashMap<>(); + Set seen = new LinkedHashSet<>(workflowIds); + List frontier = new ArrayList<>(new LinkedHashSet<>(workflowIds)); + + while (!frontier.isEmpty()) { + String placeholders = frontier.stream().map(x -> "?").collect(joining(",")); + String sql = + """ + SELECT workflow_uuid, forked_from FROM "%s".workflow_status + WHERE forked_from IN (%s) + """ + .formatted(schema, placeholders); + try (var stmt = conn.prepareStatement(sql)) { + for (int i = 0; i < frontier.size(); i++) stmt.setString(i + 1, frontier.get(i)); + List next = new ArrayList<>(); + try (var rs = stmt.executeQuery()) { + while (rs.next()) { + String forkedId = rs.getString("workflow_uuid"); + String forkedFrom = rs.getString("forked_from"); + children.computeIfAbsent(forkedFrom, k -> new ArrayList<>()).add(forkedId); + if (seen.add(forkedId)) next.add(forkedId); + } + } + frontier = next; + } + } + + Map> result = new LinkedHashMap<>(); + for (String root : workflowIds) { + if (result.containsKey(root)) continue; + Set descendants = new LinkedHashSet<>(); + Deque stack = new ArrayDeque<>(children.getOrDefault(root, List.of())); + while (!stack.isEmpty()) { + String node = stack.pop(); + if (!node.equals(root) && descendants.add(node)) { + stack.addAll(children.getOrDefault(node, List.of())); + } + } + result.put(root, descendants); + } + return result; + } + + public static void sendBulk( DbContext ctx, + List messages, String workflowId, int stepId, - String destinationId, - Object message, - String topic, - String messageId, + String functionName, + boolean sendToForks, String serialization) throws SQLException { + if (messages.isEmpty()) { + return; + } + + // Reject duplicate idempotency keys within the batch + var keys = messages.stream().map(SendMessage::idempotencyKey).filter(Objects::nonNull).toList(); + if (keys.size() != keys.stream().distinct().count()) { + throw new IllegalArgumentException("Duplicate idempotency keys within sendBulk batch"); + } + DBOSSerializer serializer = ctx.serializer(); var startTime = System.currentTimeMillis(); - String functionName = "DBOS.send"; - String finalTopic = (topic != null) ? topic : Constants.DBOS_NULL_TOPIC; + + // Serialize each message once + record SerializedPair(SendMessage msg, SerializationUtil.SerializedResult serialized) {} + List pairs = new ArrayList<>(messages.size()); + for (var msg : messages) { + pairs.add( + new SerializedPair( + msg, SerializationUtil.serializeValue(msg.message(), serialization, serializer))); + } try (Connection conn = ctx.getConnection()) { conn.setAutoCommit(false); - try { - StepResult recordedOutput = - StepsDAO.checkStepResult(conn, ctx.schema(), workflowId, stepId, functionName); - - if (recordedOutput != null) { - logger.debug( - "Replaying send, id: {}, destination_uuid: {}, topic: {}", - stepId, - destinationId, - finalTopic); - conn.commit(); - return; - } else { - logger.debug( - "Running send, id: {}, destination_uuid: {}, topic: {}", - stepId, - destinationId, - finalTopic); + // Check for replay if inside a workflow + if (workflowId != null) { + StepResult recorded = + StepsDAO.checkStepResult(conn, ctx.schema(), workflowId, stepId, functionName); + if (recorded != null) { + logger.debug("Replaying sendBulk, workflowId: {}, stepId: {}", workflowId, stepId); + conn.commit(); + return; + } } - var finalMessageId = (messageId != null) ? messageId : UUID.randomUUID().toString(); - var serializedMsg = SerializationUtil.serializeValue(message, serialization, serializer); + // Collect all destination IDs for fork resolution + Map> forkDescendants = Map.of(); + if (sendToForks) { + List destIds = + pairs.stream().map(p -> p.msg().destinationId()).distinct().toList(); + forkDescendants = findForkDescendantsTxn(conn, ctx.schema(), destIds); + } + + // Build insert rows: base dest + sorted descendants + record InsertRow( + String destId, + SerializationUtil.SerializedResult serialized, + String topic, + String messageUuid) {} + List rows = new ArrayList<>(); + for (var pair : pairs) { + var msg = pair.msg(); + String baseDest = msg.destinationId(); + String finalTopic = (msg.topic() != null) ? msg.topic() : Constants.DBOS_NULL_TOPIC; + + List destinations = new ArrayList<>(); + destinations.add(baseDest); + if (sendToForks) { + var desc = forkDescendants.getOrDefault(baseDest, Set.of()); + desc.stream().sorted().forEach(destinations::add); + } + for (String dest : destinations) { + var wfid = + msg.idempotencyKey() != null + ? msg.idempotencyKey() + "::" + dest + : UUID.randomUUID().toString(); + rows.add(new InsertRow(dest, pair.serialized(), finalTopic, wfid)); + } + } + + // Batch-insert all rows final String sql = """ INSERT INTO "%s".notifications @@ -87,25 +183,31 @@ ON CONFLICT (message_uuid) DO NOTHING .formatted(ctx.schema()); try (PreparedStatement stmt = conn.prepareStatement(sql)) { - stmt.setString(1, destinationId); - stmt.setString(2, finalTopic); - stmt.setString(3, serializedMsg.serializedValue()); - stmt.setString(4, serializedMsg.serialization()); - stmt.setString(5, finalMessageId); - stmt.executeUpdate(); + for (var row : rows) { + stmt.setString(1, row.destId()); + stmt.setString(2, row.topic()); + stmt.setString(3, row.serialized().serializedValue()); + stmt.setString(4, row.serialized().serialization()); + stmt.setString(5, row.messageUuid()); + stmt.addBatch(); + } + stmt.executeBatch(); } catch (SQLException e) { if ("23503".equals(e.getSQLState())) { - throw new DBOSNonExistentWorkflowException(destinationId); + var distinctDests = rows.stream().map(InsertRow::destId).distinct().toList(); + throw new DBOSNonExistentWorkflowException( + distinctDests.size() == 1 ? distinctDests.get(0) : null); } throw e; } - var output = new StepResult(workflowId, stepId, functionName, null, null, null, null); - StepsDAO.recordStepResult( - conn, ctx.schema(), output, startTime, System.currentTimeMillis()); + if (workflowId != null) { + var output = new StepResult(workflowId, stepId, functionName, null, null, null, null); + StepsDAO.recordStepResult( + conn, ctx.schema(), output, startTime, System.currentTimeMillis()); + } conn.commit(); - } catch (Exception e) { try { conn.rollback(); @@ -117,44 +219,6 @@ ON CONFLICT (message_uuid) DO NOTHING } } - public static void sendDirect( - DbContext ctx, - String destinationId, - Object message, - String topic, - String messageId, - String serialization) - throws SQLException { - DBOSSerializer serializer = ctx.serializer(); - String finalTopic = (topic != null) ? topic : Constants.DBOS_NULL_TOPIC; - String finalMessageId = (messageId != null) ? messageId : UUID.randomUUID().toString(); - var serializedMsg = SerializationUtil.serializeValue(message, serialization, serializer); - - final String sql = - """ - INSERT INTO "%s".notifications - (destination_uuid, topic, message, message_uuid, serialization) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT (message_uuid) DO NOTHING - """ - .formatted(ctx.schema()); - - try (var conn = ctx.getConnection(); - var stmt = conn.prepareStatement(sql)) { - stmt.setString(1, destinationId); - stmt.setString(2, finalTopic); - stmt.setString(3, serializedMsg.serializedValue()); - stmt.setString(4, finalMessageId); - stmt.setString(5, serializedMsg.serialization()); - stmt.executeUpdate(); - } catch (SQLException e) { - if ("23503".equals(e.getSQLState())) { - throw new DBOSNonExistentWorkflowException(destinationId); - } - throw e; - } - } - public static Object recv( DbContext ctx, String workflowId, diff --git a/transact/src/main/java/dev/dbos/transact/exceptions/DBOSNonExistentWorkflowException.java b/transact/src/main/java/dev/dbos/transact/exceptions/DBOSNonExistentWorkflowException.java index 60c32b8a2..c6bf1f479 100644 --- a/transact/src/main/java/dev/dbos/transact/exceptions/DBOSNonExistentWorkflowException.java +++ b/transact/src/main/java/dev/dbos/transact/exceptions/DBOSNonExistentWorkflowException.java @@ -11,7 +11,10 @@ public class DBOSNonExistentWorkflowException extends RuntimeException { private final String workflowId; public DBOSNonExistentWorkflowException(String workflowId) { - super(String.format("Workflow does not exist %s", workflowId)); + super( + workflowId != null + ? String.format("Workflow does not exist %s", workflowId) + : "One or more destination workflows do not exist"); this.workflowId = workflowId; } diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java index 0a5f5f6b1..5c73c4da3 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java +++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java @@ -33,6 +33,7 @@ import dev.dbos.transact.workflow.QueueConflictResolution; import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.StepOptions; @@ -450,30 +451,56 @@ public void fireAlertHandler(String name, String message, Map me // DBOS / DBOSClient API methods - public void send( - String destinationId, - Object message, - String topic, - String idempotencyKey, - SerializationStrategy serialization) { + private void sendBulkInternal( + List messages, + boolean sendToForks, + SerializationStrategy serialization, + String functionName) { DBOSContext ctx = DBOSContextHolder.get(); if (ctx.isInWorkflow() && !ctx.isInStep()) { int stepId = ctx.getAndIncrementFunctionId(); - systemDatabase.send( + systemDatabase.sendBulk( + messages, ctx.getWorkflowId(), stepId, - destinationId, - message, - topic, - idempotencyKey, + functionName, + sendToForks, serialization.formatName()); } else { - systemDatabase.sendDirect( - destinationId, message, topic, idempotencyKey, serialization.formatName()); + systemDatabase.sendBulk( + messages, null, -1, functionName, sendToForks, serialization.formatName()); } } + public void sendBulk( + List messages, boolean sendToForks, SerializationStrategy serialization) { + sendBulkInternal(messages, sendToForks, serialization, "DBOS.sendBulk"); + } + + public void send( + String destinationId, + Object message, + String topic, + String idempotencyKey, + SerializationStrategy serialization, + boolean sendToForks) { + sendBulkInternal( + List.of(new SendMessage(destinationId, message, topic, idempotencyKey)), + sendToForks, + serialization, + "DBOS.send"); + } + + public void send( + String destinationId, + Object message, + String topic, + String idempotencyKey, + SerializationStrategy serialization) { + send(destinationId, message, topic, idempotencyKey, serialization, false); + } + public Object recv(String topic, Duration timeout) { DBOSContext ctx = DBOSContextHolder.get(); if (!ctx.isInWorkflow()) { diff --git a/transact/src/main/java/dev/dbos/transact/workflow/SendMessage.java b/transact/src/main/java/dev/dbos/transact/workflow/SendMessage.java new file mode 100644 index 000000000..c101a19f0 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/workflow/SendMessage.java @@ -0,0 +1,20 @@ +package dev.dbos.transact.workflow; + +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; + +public record SendMessage( + @NonNull String destinationId, + @NonNull Object message, + @Nullable String topic, + @Nullable String idempotencyKey) { + + public SendMessage(@NonNull String destinationId, @NonNull Object message) { + this(destinationId, message, null, null); + } + + public SendMessage( + @NonNull String destinationId, @NonNull Object message, @Nullable String topic) { + this(destinationId, message, topic, null); + } +} diff --git a/transact/src/test/java/dev/dbos/transact/client/ClientTest.java b/transact/src/test/java/dev/dbos/transact/client/ClientTest.java index caf4938a4..c2f3d2a7d 100644 --- a/transact/src/test/java/dev/dbos/transact/client/ClientTest.java +++ b/transact/src/test/java/dev/dbos/transact/client/ClientTest.java @@ -14,11 +14,14 @@ import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException; import dev.dbos.transact.utils.DBUtils; import dev.dbos.transact.utils.PgContainer; +import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.WorkflowState; import java.time.Duration; import java.time.Instant; +import java.util.List; import java.util.UUID; import com.zaxxer.hikari.HikariDataSource; @@ -367,4 +370,69 @@ public void versionCrudCrossApiConsistency() throws Exception { assertEquals(dbosVersionNames, clientVersionNames); } } + + @Test + public void testClientSendBulk() throws Exception { + // Start two recv workflows to act as destinations + var handle1 = dbos.startWorkflow(() -> service.sendTest(1)); + var handle2 = dbos.startWorkflow(() -> service.sendTest(2)); + + try (var client = pgContainer.dbosClient()) { + client.sendBulk( + List.of( + new SendMessage(handle1.workflowId(), "msg1", "test-topic", null), + new SendMessage(handle2.workflowId(), "msg2", "test-topic", null))); + } + + assertEquals("1-msg1", handle1.getResult()); + assertEquals("2-msg2", handle2.getResult()); + } + + @Test + public void testClientSendBulkSendToForks() throws Exception { + // Parent is a running workflow; child is a fork (ENQUEUED, not executing) + var parent = dbos.startWorkflow(() -> service.sendTest(99)); + var childId = "fork-child-" + UUID.randomUUID(); + dbos.forkWorkflow(parent.workflowId(), 0, new ForkOptions(childId)); + + var sysdb = DBOSTestAccess.getSystemDatabase(dbos); + String key = UUID.randomUUID().toString(); + + try (var client = pgContainer.dbosClient()) { + client.sendBulk( + List.of(new SendMessage(parent.workflowId(), "fan-out", "test-topic", key)), + DBOSClient.SendOptions.defaults().withSendToForks(true)); + } + + // Parent receives the message and completes + assertEquals("99-fan-out", parent.getResult()); + + // Fork also received the notification (verified via DB since it is not executing) + assertEquals(1, sysdb.getAllNotifications(childId).size()); + + // Re-send with same key is idempotent — {key}::{dest} UUIDs dedup via ON CONFLICT DO NOTHING + try (var client = pgContainer.dbosClient()) { + client.sendBulk( + List.of(new SendMessage(parent.workflowId(), "fan-out", "test-topic", key)), + DBOSClient.SendOptions.defaults().withSendToForks(true)); + } + assertEquals(1, sysdb.getAllNotifications(childId).size()); + } + + @Test + public void testClientSendBulkPortableSerialization() throws Exception { + var handle1 = dbos.startWorkflow(() -> service.sendTest(1)); + var handle2 = dbos.startWorkflow(() -> service.sendTest(2)); + + try (var client = pgContainer.dbosClient()) { + client.sendBulk( + List.of( + new SendMessage(handle1.workflowId(), "hello", "test-topic", null), + new SendMessage(handle2.workflowId(), "world", "test-topic", null)), + DBOSClient.SendOptions.portable()); + } + + assertEquals("1-hello", handle1.getResult()); + assertEquals("2-world", handle2.getResult()); + } } diff --git a/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java b/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java index 0a7372765..f015b11b4 100644 --- a/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java +++ b/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java @@ -26,6 +26,7 @@ import dev.dbos.transact.workflow.Queue; import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; +import dev.dbos.transact.workflow.SendMessage; import dev.dbos.transact.workflow.VersionInfo; import dev.dbos.transact.workflow.WorkflowDelay; import dev.dbos.transact.workflow.WorkflowSchedule; @@ -1547,8 +1548,20 @@ public void testGetAllNotifications() throws Exception { var wfId = "get-all-notifications-wf-1"; sysdb.importWorkflow(List.of(buildEmptyWorkflow(wfId))); - sysdb.sendDirect(wfId, "message1", "topic1", "notif-uuid-1", null); - sysdb.sendDirect(wfId, "message2", "topic2", "notif-uuid-2", null); + sysdb.sendBulk( + List.of(new SendMessage(wfId, "message1", "topic1", "notif-uuid-1")), + null, + -1, + "DBOS.send", + false, + null); + sysdb.sendBulk( + List.of(new SendMessage(wfId, "message2", "topic2", "notif-uuid-2")), + null, + -1, + "DBOS.send", + false, + null); var notifications = sysdb.getAllNotifications(wfId); diff --git a/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java b/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java new file mode 100644 index 000000000..c99839876 --- /dev/null +++ b/transact/src/test/java/dev/dbos/transact/notifications/SendBulkTest.java @@ -0,0 +1,220 @@ +package dev.dbos.transact.notifications; + +import static org.junit.jupiter.api.Assertions.*; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; +import dev.dbos.transact.StartWorkflowOptions; +import dev.dbos.transact.config.DBOSConfig; +import dev.dbos.transact.database.SystemDatabase; +import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException; +import dev.dbos.transact.utils.PgContainer; +import dev.dbos.transact.workflow.ForkOptions; +import dev.dbos.transact.workflow.SendMessage; +import dev.dbos.transact.workflow.StepInfo; +import dev.dbos.transact.workflow.Workflow; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; + +import org.junit.jupiter.api.AutoClose; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +interface BulkService { + String block(String topic); + + void sendBulkWorkflow(List messages); +} + +class BulkServiceImpl implements BulkService { + private final DBOS dbos; + + BulkServiceImpl(DBOS dbos) { + this.dbos = dbos; + } + + @Workflow + @Override + public String block(String topic) { + return dbos.recv(topic, Duration.ofSeconds(30)).orElse(null); + } + + @Workflow + @Override + public void sendBulkWorkflow(List messages) { + dbos.sendBulk(messages); + } +} + +class SendBulkTest { + + @AutoClose final PgContainer pgContainer = new PgContainer(); + + private DBOSConfig dbosConfig; + @AutoClose private DBOS dbos; + private BulkService service; + + @BeforeEach + void setup() { + dbosConfig = pgContainer.dbosConfig(); + dbos = new DBOS(dbosConfig); + service = dbos.registerProxy(BulkService.class, new BulkServiceImpl(dbos)); + dbos.launch(); + } + + private SystemDatabase db() { + return DBOSTestAccess.getSystemDatabase(dbos); + } + + /** Starts a blocking recv workflow, providing a valid workflow_status row as a destination. */ + private String startDest(String id) throws Exception { + dbos.startWorkflow(() -> service.block(id), new StartWorkflowOptions(id)); + return id; + } + + // ------------------------------------------------------------------------- + // testSendBulkEmpty + // ------------------------------------------------------------------------- + + @Test + void testSendBulkEmpty() { + // Outside workflow: no-op + assertDoesNotThrow(() -> dbos.sendBulk(List.of())); + + // Inside workflow: also a no-op, no step recorded + String wfId = "empty-wf-" + UUID.randomUUID(); + dbos.startWorkflow(() -> service.sendBulkWorkflow(List.of()), new StartWorkflowOptions(wfId)) + .getResult(); + assertEquals(0, dbos.listWorkflowSteps(wfId).size()); + } + + // ------------------------------------------------------------------------- + // testSendBulk + // ------------------------------------------------------------------------- + + @Test + void testSendBulk() throws Exception { + String dest1 = startDest("dest1-" + UUID.randomUUID()); + String dest2 = startDest("dest2-" + UUID.randomUUID()); + + // Multi-message to one dest + one message to another, all atomic + dbos.sendBulk( + List.of( + new SendMessage(dest1, "hello-1"), + new SendMessage(dest1, "hello-2"), + new SendMessage(dest2, "world"))); + + assertEquals(2, db().getAllNotifications(dest1).size()); + assertEquals(1, db().getAllNotifications(dest2).size()); + assertEquals("world", db().getAllNotifications(dest2).get(0).message()); + + // Non-existent destination rolls back the entire batch + String validDest = startDest("valid-" + UUID.randomUUID()); + assertThrows( + DBOSNonExistentWorkflowException.class, + () -> + dbos.sendBulk( + List.of( + new SendMessage(validDest, "first"), + new SendMessage("nonexistent-" + UUID.randomUUID(), "second")))); + assertEquals(0, db().getAllNotifications(validDest).size()); + } + + // ------------------------------------------------------------------------- + // testSendBulkFromWorkflow + // ------------------------------------------------------------------------- + + @Test + void testSendBulkFromWorkflow() throws Exception { + String dest = startDest("wf-dest-" + UUID.randomUUID()); + + // Run the sendBulk workflow and wait for it to complete + String senderWfId = "sender-wf-" + UUID.randomUUID(); + var handle = + dbos.startWorkflow( + () -> service.sendBulkWorkflow(List.of(new SendMessage(dest, "from-workflow"))), + new StartWorkflowOptions(senderWfId)); + handle.getResult(); + + assertEquals(1, db().getAllNotifications(dest).size()); + + // The entire bulk send is recorded as exactly one step + List steps = dbos.listWorkflowSteps(senderWfId); + assertEquals(1, steps.size()); + assertEquals("DBOS.sendBulk", steps.get(0).functionName()); + } + + // ------------------------------------------------------------------------- + // testSendBulkIdempotencyKey + // ------------------------------------------------------------------------- + + @Test + void testSendBulkIdempotencyKey() throws Exception { + String dest = startDest("idem-dest-" + UUID.randomUUID()); + String key = UUID.randomUUID().toString(); + + var messages = List.of(new SendMessage(dest, "hello", null, key)); + dbos.sendBulk(messages); + dbos.sendBulk(messages); + + // Only one notification despite two calls + assertEquals(1, db().getAllNotifications(dest).size()); + } + + // ------------------------------------------------------------------------- + // testSendBulkDuplicateKeyWithinBatch + // ------------------------------------------------------------------------- + + @Test + void testSendBulkDuplicateKeyWithinBatch() throws Exception { + String dest = startDest("dupkey-dest-" + UUID.randomUUID()); + String key = UUID.randomUUID().toString(); + + assertThrows( + IllegalArgumentException.class, + () -> + dbos.sendBulk( + List.of( + new SendMessage(dest, "msg1", null, key), + new SendMessage(dest, "msg2", null, key)))); + + // Nothing was delivered + assertEquals(0, db().getAllNotifications(dest).size()); + } + + // ------------------------------------------------------------------------- + // testSendBulkSendToForks + // ------------------------------------------------------------------------- + + @Test + void testSendBulkSendToForks() throws Exception { + String parentId = "parent-" + UUID.randomUUID(); + String childId = "child-" + UUID.randomUUID(); + String grandchildId = "grandchild-" + UUID.randomUUID(); + String unrelatedId = "unrelated-" + UUID.randomUUID(); + + startDest(parentId); + startDest(unrelatedId); + dbos.forkWorkflow(parentId, 0, new ForkOptions(childId)); + dbos.forkWorkflow(childId, 0, new ForkOptions(grandchildId)); + + String key = UUID.randomUUID().toString(); + dbos.sendBulk(List.of(new SendMessage(parentId, "fan-out", null, key)), true); + + // Fan-out reaches all descendants + assertEquals(1, db().getAllNotifications(parentId).size()); + assertEquals(1, db().getAllNotifications(childId).size()); + assertEquals(1, db().getAllNotifications(grandchildId).size()); + + // Unrelated workflow untouched + assertEquals(0, db().getAllNotifications(unrelatedId).size()); + + // Re-send with same key is idempotent (ON CONFLICT DO NOTHING via {key}::{dest} UUIDs) + dbos.sendBulk(List.of(new SendMessage(parentId, "fan-out", null, key)), true); + assertEquals(1, db().getAllNotifications(parentId).size()); + assertEquals(1, db().getAllNotifications(childId).size()); + assertEquals(1, db().getAllNotifications(grandchildId).size()); + } +}