Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 63 additions & 4 deletions transact/src/main/java/dev/dbos/transact/DBOS.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import dev.dbos.transact.workflow.QueueConflictResolution;
import dev.dbos.transact.workflow.QueueOptions;
import dev.dbos.transact.workflow.ScheduleStatus;
import dev.dbos.transact.workflow.SendMessage;
import dev.dbos.transact.workflow.SerializationStrategy;
import dev.dbos.transact.workflow.Step;
import dev.dbos.transact.workflow.StepInfo;
Expand Down Expand Up @@ -130,7 +131,7 @@ public static String version() {
/**
* Register a lifecycle listener that receives callbacks when DBOS is launched or shut down
*
* @param listener
* @param listener the lifecycle listener to register
*/
private void registerLifecycleListener(@NonNull DBOSLifecycleListener listener) {
if (dbosExecutor.get() != null) {
Expand Down Expand Up @@ -512,7 +513,7 @@ public void send(
@NonNull Object message,
@Nullable String topic,
@Nullable String idempotencyKey) {
send(destinationId, message, topic, idempotencyKey, null);
send(destinationId, message, topic, idempotencyKey, null, false);
}

/**
Expand All @@ -523,7 +524,7 @@ public void send(
* @param topic topic to which the message is send
*/
public void send(@NonNull String destinationId, @NonNull Object message, @Nullable String topic) {
send(destinationId, message, topic, null, null);
send(destinationId, message, topic, null, null, false);
}

/**
Expand All @@ -541,8 +542,66 @@ public void send(
@Nullable String topic,
@Nullable String idempotencyKey,
@Nullable SerializationStrategy serialization) {
send(destinationId, message, topic, idempotencyKey, serialization, false);
}

/**
* Send a message to a workflow with all options
*
* @param destinationId recipient of the message
* @param message message to be sent
* @param topic topic to which the message is sent
* @param idempotencyKey optional idempotency key for exactly-once send
* @param serialization serialization strategy to use (null for default)
* @param sendToForks if true, also deliver the message to any forked copies of the destination
* workflow
*/
public void send(
Comment thread
devhawk marked this conversation as resolved.
@NonNull String destinationId,
@NonNull Object message,
@Nullable String topic,
@Nullable String idempotencyKey,
@Nullable SerializationStrategy serialization,
boolean sendToForks) {
if (serialization == null) serialization = SerializationStrategy.DEFAULT;
ensureLaunched("send")
.send(destinationId, message, topic, idempotencyKey, serialization, sendToForks);
}

/**
* Send multiple messages to workflows using default options
*
* @param messages list of messages to send
*/
public void sendBulk(@NonNull List<SendMessage> messages) {
sendBulk(messages, false, null);
}

/**
* Send multiple messages to workflows
*
* @param messages list of messages to send
* @param sendToForks if true, also deliver each message to any forked copies of the destination
* workflow
*/
public void sendBulk(@NonNull List<SendMessage> messages, boolean sendToForks) {
sendBulk(messages, sendToForks, null);
}

/**
* Send multiple messages to workflows with full options
*
* @param messages list of messages to send
* @param sendToForks if true, also deliver each message to any forked copies of the destination
* workflow
* @param serialization serialization strategy to use (null for default)
*/
public void sendBulk(
@NonNull List<SendMessage> messages,
boolean sendToForks,
@Nullable SerializationStrategy serialization) {
if (serialization == null) serialization = SerializationStrategy.DEFAULT;
ensureLaunched("send").send(destinationId, message, topic, idempotencyKey, serialization);
ensureLaunched("sendBulk").sendBulk(messages, sendToForks, serialization);
}

/**
Expand Down
73 changes: 69 additions & 4 deletions transact/src/main/java/dev/dbos/transact/DBOSClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import dev.dbos.transact.workflow.QueueConflictResolution;
import dev.dbos.transact.workflow.QueueOptions;
import dev.dbos.transact.workflow.ScheduleStatus;
import dev.dbos.transact.workflow.SendMessage;
import dev.dbos.transact.workflow.SerializationStrategy;
import dev.dbos.transact.workflow.StepInfo;
import dev.dbos.transact.workflow.Timeout;
Expand Down Expand Up @@ -119,6 +120,16 @@ public DBOSClient(
this(url, user, password, schema, serializer, true);
}

/**
* Construct a DBOSClient, by providing system database access credentials
*
* @param url System database JDBC URL
* @param user System database user
* @param password System database credential / password
* @param schema Database schema for DBOS tables
* @param serializer Custom serializer for serialization/deserialization
* @param useListenNotify if true, use PostgreSQL LISTEN/NOTIFY for real-time event notifications
*/
public DBOSClient(
@NonNull String url,
@NonNull String user,
Expand Down Expand Up @@ -564,6 +575,18 @@ public EnqueueOptions(
}
}

/**
* Enqueue a workflow with explicit serialization format and support for both positional and named
* arguments.
*
* @param <T> Return type of workflow function
* @param <E> Exception thrown by workflow function
* @param options {@link EnqueueOptions} for configuring the workflow enqueue
* @param positionalArgs Positional arguments to pass to the workflow function
* @param namedArgs Named arguments to pass to the workflow function (e.g., for Python kwargs)
* @param serializationFormat Serialization format string to use (null for default)
* @return WorkflowHandle for retrieving workflow ID, status, and results
*/
public <T, E extends Exception> @NonNull WorkflowHandle<T, E> enqueueWorkflow(
@NonNull EnqueueOptions options,
@Nullable Object[] positionalArgs,
Expand Down Expand Up @@ -647,15 +670,15 @@ public EnqueueOptions(
}

/** Options for sending a message. */
public record SendOptions(@Nullable SerializationStrategy serialization) {
public record SendOptions(@Nullable SerializationStrategy serialization, boolean sendToForks) {
/**
* Create SendOptions with default serialization strategy. Uses the system's default
* serialization format for message encoding.
*
* @return SendOptions configured with default serialization
*/
public static SendOptions defaults() {
return new SendOptions(SerializationStrategy.DEFAULT);
return new SendOptions(SerializationStrategy.DEFAULT, false);
}

/**
Expand All @@ -665,7 +688,17 @@ public static SendOptions defaults() {
* @return SendOptions configured with portable JSON serialization
*/
public static SendOptions portable() {
return new SendOptions(SerializationStrategy.PORTABLE);
return new SendOptions(SerializationStrategy.PORTABLE, false);
}

/**
* Create a new SendOptions with the sendToForks flag set.
*
* @param v if true, deliver the message to any forked copies of the destination workflow
* @return new SendOptions with the sendToForks flag updated
*/
public SendOptions withSendToForks(boolean v) {
return new SendOptions(serialization, v);
}
}

Expand Down Expand Up @@ -705,8 +738,40 @@ public void send(
(options != null && options.serialization() != null)
? options.serialization().formatName()
: null;
boolean sendToForks = options != null && options.sendToForks();

systemDatabase.sendBulk(
List.of(new SendMessage(destinationId, message, topic, idempotencyKey)),
null,
-1,
"DBOS.send",
sendToForks,
serializationFormat);
}

/**
* Send multiple messages to workflows using default options
*
* @param messages list of messages to send
*/
public void sendBulk(@NonNull List<SendMessage> messages) {
sendBulk(messages, null);
}

systemDatabase.sendDirect(destinationId, message, topic, idempotencyKey, serializationFormat);
/**
* Send multiple messages to workflows with serialization options
*
* @param messages list of messages to send
* @param options optional send options including serialization type and fork delivery; null for
* defaults
*/
public void sendBulk(@NonNull List<SendMessage> messages, @Nullable SendOptions options) {
String serializationFormat =
(options != null && options.serialization() != null)
? options.serialization().formatName()
: null;
boolean sendToForks = options != null && options.sendToForks();
systemDatabase.sendBulk(messages, null, -1, "DBOS.sendBulk", sendToForks, serializationFormat);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import dev.dbos.transact.workflow.Queue;
import dev.dbos.transact.workflow.QueueOptions;
import dev.dbos.transact.workflow.ScheduleStatus;
import dev.dbos.transact.workflow.SendMessage;
import dev.dbos.transact.workflow.StepInfo;
import dev.dbos.transact.workflow.VersionInfo;
import dev.dbos.transact.workflow.WorkflowAggregateRow;
Expand Down Expand Up @@ -454,26 +455,17 @@ public Optional<String> checkChildWorkflow(String workflowUuid, int functionId)
return dbRetry(() -> WorkflowDAO.checkChildWorkflow(ctx, workflowUuid, functionId));
}

public void send(
public void sendBulk(
List<SendMessage> messages,
String workflowId,
int stepId,
String destinationId,
Object message,
String topic,
String messageId,
String functionName,
boolean sendToForks,
String serialization) {
dbRetry(
() ->
NotificationsDAO.send(
ctx, workflowId, stepId, destinationId, message, topic, messageId, serialization));
}

public void sendDirect(
String destinationId, Object message, String topic, String messageId, String serialization) {
dbRetry(
() ->
NotificationsDAO.sendDirect(
ctx, destinationId, message, topic, messageId, serialization));
NotificationsDAO.sendBulk(
ctx, messages, workflowId, stepId, functionName, sendToForks, serialization));
}

public Object recv(
Expand Down
Loading