Skip to content

Commit d3f5add

Browse files
committed
add DBOS/DBOSClient api + tests
1 parent f9a5518 commit d3f5add

9 files changed

Lines changed: 230 additions & 166 deletions

File tree

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import dev.dbos.transact.workflow.QueueConflictResolution;
1818
import dev.dbos.transact.workflow.QueueOptions;
1919
import dev.dbos.transact.workflow.ScheduleStatus;
20+
import dev.dbos.transact.workflow.SendMessage;
2021
import dev.dbos.transact.workflow.SerializationStrategy;
2122
import dev.dbos.transact.workflow.Step;
2223
import dev.dbos.transact.workflow.StepInfo;
@@ -512,7 +513,7 @@ public void send(
512513
@NonNull Object message,
513514
@Nullable String topic,
514515
@Nullable String idempotencyKey) {
515-
send(destinationId, message, topic, idempotencyKey, null);
516+
send(destinationId, message, topic, idempotencyKey, null, false);
516517
}
517518

518519
/**
@@ -523,7 +524,7 @@ public void send(
523524
* @param topic topic to which the message is send
524525
*/
525526
public void send(@NonNull String destinationId, @NonNull Object message, @Nullable String topic) {
526-
send(destinationId, message, topic, null, null);
527+
send(destinationId, message, topic, null, null, false);
527528
}
528529

529530
/**
@@ -541,8 +542,35 @@ public void send(
541542
@Nullable String topic,
542543
@Nullable String idempotencyKey,
543544
@Nullable SerializationStrategy serialization) {
545+
send(destinationId, message, topic, idempotencyKey, serialization, false);
546+
}
547+
548+
public void send(
549+
@NonNull String destinationId,
550+
@NonNull Object message,
551+
@Nullable String topic,
552+
@Nullable String idempotencyKey,
553+
@Nullable SerializationStrategy serialization,
554+
boolean sendToForks) {
555+
if (serialization == null) serialization = SerializationStrategy.DEFAULT;
556+
ensureLaunched("send")
557+
.send(destinationId, message, topic, idempotencyKey, serialization, sendToForks);
558+
}
559+
560+
public void sendBulk(@NonNull List<SendMessage> messages) {
561+
sendBulk(messages, false, null);
562+
}
563+
564+
public void sendBulk(@NonNull List<SendMessage> messages, boolean sendToForks) {
565+
sendBulk(messages, sendToForks, null);
566+
}
567+
568+
public void sendBulk(
569+
@NonNull List<SendMessage> messages,
570+
boolean sendToForks,
571+
@Nullable SerializationStrategy serialization) {
544572
if (serialization == null) serialization = SerializationStrategy.DEFAULT;
545-
ensureLaunched("send").send(destinationId, message, topic, idempotencyKey, serialization);
573+
ensureLaunched("sendBulk").sendBulk(messages, sendToForks, serialization);
546574
}
547575

548576
/**

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import dev.dbos.transact.workflow.QueueConflictResolution;
1818
import dev.dbos.transact.workflow.QueueOptions;
1919
import dev.dbos.transact.workflow.ScheduleStatus;
20+
import dev.dbos.transact.workflow.SendMessage;
2021
import dev.dbos.transact.workflow.SerializationStrategy;
2122
import dev.dbos.transact.workflow.StepInfo;
2223
import dev.dbos.transact.workflow.Timeout;
@@ -647,15 +648,15 @@ public EnqueueOptions(
647648
}
648649

649650
/** Options for sending a message. */
650-
public record SendOptions(@Nullable SerializationStrategy serialization) {
651+
public record SendOptions(@Nullable SerializationStrategy serialization, boolean sendToForks) {
651652
/**
652653
* Create SendOptions with default serialization strategy. Uses the system's default
653654
* serialization format for message encoding.
654655
*
655656
* @return SendOptions configured with default serialization
656657
*/
657658
public static SendOptions defaults() {
658-
return new SendOptions(SerializationStrategy.DEFAULT);
659+
return new SendOptions(SerializationStrategy.DEFAULT, false);
659660
}
660661

661662
/**
@@ -665,7 +666,11 @@ public static SendOptions defaults() {
665666
* @return SendOptions configured with portable JSON serialization
666667
*/
667668
public static SendOptions portable() {
668-
return new SendOptions(SerializationStrategy.PORTABLE);
669+
return new SendOptions(SerializationStrategy.PORTABLE, false);
670+
}
671+
672+
public SendOptions withSendToForks(boolean v) {
673+
return new SendOptions(serialization, v);
669674
}
670675
}
671676

@@ -705,8 +710,28 @@ public void send(
705710
(options != null && options.serialization() != null)
706711
? options.serialization().formatName()
707712
: null;
713+
boolean sendToForks = options != null && options.sendToForks();
714+
715+
systemDatabase.sendBulk(
716+
List.of(new SendMessage(destinationId, message, topic, idempotencyKey)),
717+
null,
718+
-1,
719+
"DBOS.send",
720+
sendToForks,
721+
serializationFormat);
722+
}
708723

709-
systemDatabase.sendDirect(destinationId, message, topic, idempotencyKey, serializationFormat);
724+
public void sendBulk(@NonNull List<SendMessage> messages) {
725+
sendBulk(messages, null);
726+
}
727+
728+
public void sendBulk(@NonNull List<SendMessage> messages, @Nullable SendOptions options) {
729+
String serializationFormat =
730+
(options != null && options.serialization() != null)
731+
? options.serialization().formatName()
732+
: null;
733+
boolean sendToForks = options != null && options.sendToForks();
734+
systemDatabase.sendBulk(messages, null, -1, "DBOS.sendBulk", sendToForks, serializationFormat);
710735
}
711736

712737
/**

transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import dev.dbos.transact.workflow.ListWorkflowsInput;
2323
import dev.dbos.transact.workflow.NotificationInfo;
2424
import dev.dbos.transact.workflow.Queue;
25-
import dev.dbos.transact.workflow.SendMessage;
2625
import dev.dbos.transact.workflow.QueueOptions;
2726
import dev.dbos.transact.workflow.ScheduleStatus;
27+
import dev.dbos.transact.workflow.SendMessage;
2828
import dev.dbos.transact.workflow.StepInfo;
2929
import dev.dbos.transact.workflow.VersionInfo;
3030
import dev.dbos.transact.workflow.WorkflowAggregateRow;
@@ -468,34 +468,6 @@ public void sendBulk(
468468
ctx, messages, workflowId, stepId, functionName, sendToForks, serialization));
469469
}
470470

471-
public void send(
472-
String workflowId,
473-
int stepId,
474-
String destinationId,
475-
Object message,
476-
String topic,
477-
String messageId,
478-
String serialization) {
479-
sendBulk(
480-
List.of(new SendMessage(destinationId, message, topic, messageId)),
481-
workflowId,
482-
stepId,
483-
"DBOS.send",
484-
false,
485-
serialization);
486-
}
487-
488-
public void sendDirect(
489-
String destinationId, Object message, String topic, String messageId, String serialization) {
490-
sendBulk(
491-
List.of(new SendMessage(destinationId, message, topic, messageId)),
492-
null,
493-
-1,
494-
"DBOS.send",
495-
false,
496-
serialization);
497-
}
498-
499471
public Object recv(
500472
String workflowId, int stepId, int timeoutStepId, String topic, Duration timeout) {
501473
return dbRetry(

transact/src/main/java/dev/dbos/transact/database/dao/NotificationsDAO.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package dev.dbos.transact.database.dao;
22

3+
import static java.util.stream.Collectors.joining;
4+
35
import dev.dbos.transact.Constants;
46
import dev.dbos.transact.database.DbContext;
57
import dev.dbos.transact.database.GetEventCaller;
@@ -32,8 +34,6 @@
3234
import java.util.Set;
3335
import java.util.UUID;
3436

35-
import static java.util.stream.Collectors.joining;
36-
3737
import org.jspecify.annotations.NonNull;
3838
import org.jspecify.annotations.Nullable;
3939
import org.slf4j.Logger;
@@ -57,7 +57,8 @@ private static Map<String, Set<String>> findForkDescendantsTxn(
5757
"""
5858
SELECT workflow_uuid, forked_from FROM "%s".workflow_status
5959
WHERE forked_from IN (%s)
60-
""".formatted(schema, placeholders);
60+
"""
61+
.formatted(schema, placeholders);
6162
try (var stmt = conn.prepareStatement(sql)) {
6263
for (int i = 0; i < frontier.size(); i++) stmt.setString(i + 1, frontier.get(i));
6364
List<String> next = new ArrayList<>();
@@ -104,11 +105,7 @@ public static void sendBulk(
104105
}
105106

106107
// Reject duplicate idempotency keys within the batch
107-
var keys =
108-
messages.stream()
109-
.map(SendMessage::idempotencyKey)
110-
.filter(Objects::nonNull)
111-
.toList();
108+
var keys = messages.stream().map(SendMessage::idempotencyKey).filter(Objects::nonNull).toList();
112109
if (keys.size() != keys.stream().distinct().count()) {
113110
throw new IllegalArgumentException("Duplicate idempotency keys within sendBulk batch");
114111
}
@@ -120,7 +117,9 @@ public static void sendBulk(
120117
record SerializedPair(SendMessage msg, SerializationUtil.SerializedResult serialized) {}
121118
List<SerializedPair> pairs = new ArrayList<>(messages.size());
122119
for (var msg : messages) {
123-
pairs.add(new SerializedPair(msg, SerializationUtil.serializeValue(msg.message(), serialization, serializer)));
120+
pairs.add(
121+
new SerializedPair(
122+
msg, SerializationUtil.serializeValue(msg.message(), serialization, serializer)));
124123
}
125124

126125
try (Connection conn = ctx.getConnection()) {
@@ -140,12 +139,17 @@ record SerializedPair(SendMessage msg, SerializationUtil.SerializedResult serial
140139
// Collect all destination IDs for fork resolution
141140
Map<String, Set<String>> forkDescendants = Map.of();
142141
if (sendToForks) {
143-
List<String> destIds = pairs.stream().map(p -> p.msg().destinationId()).distinct().toList();
142+
List<String> destIds =
143+
pairs.stream().map(p -> p.msg().destinationId()).distinct().toList();
144144
forkDescendants = findForkDescendantsTxn(conn, ctx.schema(), destIds);
145145
}
146146

147147
// Build insert rows: base dest + sorted descendants
148-
record InsertRow(String destId, SerializationUtil.SerializedResult serialized, String topic, String messageUuid) {}
148+
record InsertRow(
149+
String destId,
150+
SerializationUtil.SerializedResult serialized,
151+
String topic,
152+
String messageUuid) {}
149153
List<InsertRow> rows = new ArrayList<>();
150154
for (var pair : pairs) {
151155
var msg = pair.msg();
@@ -177,7 +181,8 @@ record InsertRow(String destId, SerializationUtil.SerializedResult serialized, S
177181
(destination_uuid, topic, message, serialization, message_uuid)
178182
VALUES (?, ?, ?, ?, ?)
179183
ON CONFLICT (message_uuid) DO NOTHING
180-
""".formatted(ctx.schema());
184+
"""
185+
.formatted(ctx.schema());
181186

182187
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
183188
for (var row : rows) {
@@ -200,7 +205,8 @@ ON CONFLICT (message_uuid) DO NOTHING
200205

201206
if (workflowId != null) {
202207
var output = new StepResult(workflowId, stepId, functionName, null, null, null, null);
203-
StepsDAO.recordStepResult(conn, ctx.schema(), output, startTime, System.currentTimeMillis());
208+
StepsDAO.recordStepResult(
209+
conn, ctx.schema(), output, startTime, System.currentTimeMillis());
204210
}
205211

206212
conn.commit();

transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import dev.dbos.transact.workflow.QueueConflictResolution;
3434
import dev.dbos.transact.workflow.QueueOptions;
3535
import dev.dbos.transact.workflow.ScheduleStatus;
36+
import dev.dbos.transact.workflow.SendMessage;
3637
import dev.dbos.transact.workflow.SerializationStrategy;
3738
import dev.dbos.transact.workflow.StepInfo;
3839
import dev.dbos.transact.workflow.StepOptions;
@@ -450,30 +451,56 @@ public void fireAlertHandler(String name, String message, Map<String, String> me
450451

451452
// DBOS / DBOSClient API methods
452453

453-
public void send(
454-
String destinationId,
455-
Object message,
456-
String topic,
457-
String idempotencyKey,
458-
SerializationStrategy serialization) {
454+
private void sendBulkInternal(
455+
List<SendMessage> messages,
456+
boolean sendToForks,
457+
SerializationStrategy serialization,
458+
String functionName) {
459459

460460
DBOSContext ctx = DBOSContextHolder.get();
461461
if (ctx.isInWorkflow() && !ctx.isInStep()) {
462462
int stepId = ctx.getAndIncrementFunctionId();
463-
systemDatabase.send(
463+
systemDatabase.sendBulk(
464+
messages,
464465
ctx.getWorkflowId(),
465466
stepId,
466-
destinationId,
467-
message,
468-
topic,
469-
idempotencyKey,
467+
functionName,
468+
sendToForks,
470469
serialization.formatName());
471470
} else {
472-
systemDatabase.sendDirect(
473-
destinationId, message, topic, idempotencyKey, serialization.formatName());
471+
systemDatabase.sendBulk(
472+
messages, null, -1, functionName, sendToForks, serialization.formatName());
474473
}
475474
}
476475

476+
public void sendBulk(
477+
List<SendMessage> messages, boolean sendToForks, SerializationStrategy serialization) {
478+
sendBulkInternal(messages, sendToForks, serialization, "DBOS.sendBulk");
479+
}
480+
481+
public void send(
482+
String destinationId,
483+
Object message,
484+
String topic,
485+
String idempotencyKey,
486+
SerializationStrategy serialization,
487+
boolean sendToForks) {
488+
sendBulkInternal(
489+
List.of(new SendMessage(destinationId, message, topic, idempotencyKey)),
490+
sendToForks,
491+
serialization,
492+
"DBOS.send");
493+
}
494+
495+
public void send(
496+
String destinationId,
497+
Object message,
498+
String topic,
499+
String idempotencyKey,
500+
SerializationStrategy serialization) {
501+
send(destinationId, message, topic, idempotencyKey, serialization, false);
502+
}
503+
477504
public Object recv(String topic, Duration timeout) {
478505
DBOSContext ctx = DBOSContextHolder.get();
479506
if (!ctx.isInWorkflow()) {

transact/src/main/java/dev/dbos/transact/workflow/SendMessage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ public SendMessage(@NonNull String destinationId, @NonNull Object message) {
1313
this(destinationId, message, null, null);
1414
}
1515

16-
public SendMessage(@NonNull String destinationId, @NonNull Object message, @Nullable String topic) {
16+
public SendMessage(
17+
@NonNull String destinationId, @NonNull Object message, @Nullable String topic) {
1718
this(destinationId, message, topic, null);
1819
}
1920
}

transact/src/test/java/dev/dbos/transact/client/ClientTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
import dev.dbos.transact.utils.DBUtils;
1616
import dev.dbos.transact.utils.PgContainer;
1717
import dev.dbos.transact.workflow.Queue;
18+
import dev.dbos.transact.workflow.SendMessage;
1819
import dev.dbos.transact.workflow.WorkflowState;
1920

2021
import java.time.Duration;
2122
import java.time.Instant;
23+
import java.util.List;
2224
import java.util.UUID;
2325

2426
import com.zaxxer.hikari.HikariDataSource;
@@ -367,4 +369,21 @@ public void versionCrudCrossApiConsistency() throws Exception {
367369
assertEquals(dbosVersionNames, clientVersionNames);
368370
}
369371
}
372+
373+
@Test
374+
public void testClientSendBulk() throws Exception {
375+
// Start two recv workflows to act as destinations
376+
var handle1 = dbos.startWorkflow(() -> service.sendTest(1));
377+
var handle2 = dbos.startWorkflow(() -> service.sendTest(2));
378+
379+
try (var client = pgContainer.dbosClient()) {
380+
client.sendBulk(
381+
List.of(
382+
new SendMessage(handle1.workflowId(), "msg1", "test-topic", null),
383+
new SendMessage(handle2.workflowId(), "msg2", "test-topic", null)));
384+
}
385+
386+
assertEquals("1-msg1", handle1.getResult());
387+
assertEquals("2-msg2", handle2.getResult());
388+
}
370389
}

0 commit comments

Comments
 (0)