Skip to content

Commit e4dd1f3

Browse files
authored
partition queues (#245)
fixes #206
1 parent 156e825 commit e4dd1f3

19 files changed

Lines changed: 1093 additions & 844 deletions

File tree

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ public record EnqueueOptions(
114114
Duration timeout,
115115
Instant deadline,
116116
String deduplicationId,
117-
Integer priority) {
117+
Integer priority,
118+
String queuePartitionKey) {
118119

119120
public EnqueueOptions {
120121
if (Objects.requireNonNull(workflowName, "EnqueueOptions workflowName must not be null")
@@ -132,6 +133,16 @@ public record EnqueueOptions(
132133
throw new IllegalArgumentException("EnqueueOptions className must not be empty");
133134
}
134135

136+
if (queuePartitionKey != null && queuePartitionKey.isEmpty()) {
137+
throw new IllegalArgumentException(
138+
"EnqueueOptions queuePartitionKey must not be empty if not null");
139+
}
140+
141+
if (deduplicationId != null && deduplicationId.isEmpty()) {
142+
throw new IllegalArgumentException(
143+
"EnqueueOptions deduplicationId must not be empty if not null");
144+
}
145+
135146
if (instanceName == null) instanceName = "";
136147

137148
if (timeout != null) {
@@ -149,7 +160,7 @@ public record EnqueueOptions(
149160

150161
/** Construct `EnqueueOptions` with a minimum set of required options */
151162
public EnqueueOptions(String className, String workflowName, String queueName) {
152-
this(workflowName, queueName, className, "", null, null, null, null, null, null);
163+
this(workflowName, queueName, className, "", null, null, null, null, null, null, null);
153164
}
154165

155166
/**
@@ -169,7 +180,8 @@ public EnqueueOptions withClassName(String className) {
169180
this.timeout,
170181
this.deadline,
171182
this.deduplicationId,
172-
this.priority);
183+
this.priority,
184+
this.queuePartitionKey);
173185
}
174186

175187
/**
@@ -190,7 +202,8 @@ public EnqueueOptions withWorkflowId(String workflowId) {
190202
this.timeout,
191203
this.deadline,
192204
this.deduplicationId,
193-
this.priority);
205+
this.priority,
206+
this.queuePartitionKey);
194207
}
195208

196209
/**
@@ -211,7 +224,8 @@ public EnqueueOptions withAppVersion(String appVersion) {
211224
this.timeout,
212225
this.deadline,
213226
this.deduplicationId,
214-
this.priority);
227+
this.priority,
228+
this.queuePartitionKey);
215229
}
216230

217231
/**
@@ -232,7 +246,8 @@ public EnqueueOptions withTimeout(Duration timeout) {
232246
timeout,
233247
this.deadline,
234248
this.deduplicationId,
235-
this.priority);
249+
this.priority,
250+
this.queuePartitionKey);
236251
}
237252

238253
/**
@@ -253,7 +268,8 @@ public EnqueueOptions withDeadline(Instant deadline) {
253268
this.timeout,
254269
deadline,
255270
this.deduplicationId,
256-
this.priority);
271+
this.priority,
272+
this.queuePartitionKey);
257273
}
258274

259275
/**
@@ -274,7 +290,8 @@ public EnqueueOptions withDeduplicationId(String deduplicationId) {
274290
this.timeout,
275291
this.deadline,
276292
deduplicationId,
277-
this.priority);
293+
this.priority,
294+
this.queuePartitionKey);
278295
}
279296

280297
/**
@@ -295,7 +312,8 @@ public EnqueueOptions withInstanceName(String instName) {
295312
this.timeout,
296313
this.deadline,
297314
this.deduplicationId,
298-
this.priority);
315+
this.priority,
316+
this.queuePartitionKey);
299317
}
300318

301319
/**
@@ -315,7 +333,31 @@ public EnqueueOptions withPriority(Integer priority) {
315333
this.timeout,
316334
this.deadline,
317335
this.deduplicationId,
318-
priority);
336+
priority,
337+
this.queuePartitionKey);
338+
}
339+
340+
/**
341+
* Creates a new EnqueueOptions instance with the specified queue partition key. The partition
342+
* key is used to determine which partition of the queue the workflow should be enqueued to,
343+
* allowing for better load distribution and ordering guarantees.
344+
*
345+
* @param partitionKey the partition key to use for queue partitioning, can be null
346+
* @return a new EnqueueOptions instance with the specified partition key
347+
*/
348+
public EnqueueOptions withQueuePartitionKey(String partitionKey) {
349+
return new EnqueueOptions(
350+
this.workflowName,
351+
this.queueName,
352+
this.className,
353+
this.instanceName,
354+
this.workflowId,
355+
this.appVersion,
356+
this.timeout,
357+
this.deadline,
358+
this.deduplicationId,
359+
this.priority,
360+
partitionKey);
319361
}
320362

321363
/**
@@ -356,6 +398,7 @@ public <T, E extends Exception> WorkflowHandle<T, E> enqueueWorkflow(
356398
options.queueName(), "EnqueueOptions queueName must not be null"),
357399
options.deduplicationId,
358400
options.priority,
401+
options.queuePartitionKey,
359402
false,
360403
false),
361404
null,
@@ -379,8 +422,9 @@ public void send(String destinationId, Object message, String topic, String idem
379422
var workflowId = "%s-%s".formatted(destinationId, idempotencyKey);
380423

381424
var status =
382-
new WorkflowStatusInternal(workflowId, WorkflowState.SUCCESS)
383-
.withName("temp_workflow-send-client");
425+
WorkflowStatusInternal.builder(workflowId, WorkflowState.SUCCESS)
426+
.name("temp_workflow-send-client")
427+
.build();
384428
systemDatabase.initWorkflowStatus(status, null, false, false);
385429
systemDatabase.send(status.workflowId(), 0, destinationId, message, topic);
386430
}

transact/src/main/java/dev/dbos/transact/StartWorkflowOptions.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,17 @@
2323
* enqueued workflows.
2424
* @param priority If `queueName` is specified and refers to a queue with priority enabled, the
2525
* priority to assign.
26+
* @param queuePartitionKey If `queueName` is specified, an optional partition key used to
27+
* distribute workflows across queue partitions for load balancing and ordered processing.
2628
*/
2729
public record StartWorkflowOptions(
2830
String workflowId,
2931
Timeout timeout,
3032
Instant deadline,
3133
String queueName,
3234
String deduplicationId,
33-
Integer priority) {
35+
Integer priority,
36+
String queuePartitionKey) {
3437

3538
public StartWorkflowOptions {
3639
if (timeout instanceof Timeout.Explicit explicit) {
@@ -44,16 +47,26 @@ public record StartWorkflowOptions(
4447
"StartWorkflowOptions explicit timeout and deadline cannot both be set");
4548
}
4649
}
50+
51+
if (queuePartitionKey != null && queuePartitionKey.isEmpty()) {
52+
throw new IllegalArgumentException(
53+
"EnqueueOptions queuePartitionKey must not be empty if not null");
54+
}
55+
56+
if (deduplicationId != null && deduplicationId.isEmpty()) {
57+
throw new IllegalArgumentException(
58+
"EnqueueOptions deduplicationId must not be empty if not null");
59+
}
4760
}
4861

4962
/** Construct with default options */
5063
public StartWorkflowOptions() {
51-
this(null, null, null, null, null, null);
64+
this(null, null, null, null, null, null, null);
5265
}
5366

5467
/** Construct with a specified workflow ID */
5568
public StartWorkflowOptions(String workflowId) {
56-
this(workflowId, null, null, null, null, null);
69+
this(workflowId, null, null, null, null, null, null);
5770
}
5871

5972
/** Produces a new StartWorkflowOptions that overrides the ID assigned to the started workflow */
@@ -64,7 +77,8 @@ public StartWorkflowOptions withWorkflowId(String workflowId) {
6477
this.deadline,
6578
this.queueName,
6679
this.deduplicationId,
67-
this.priority);
80+
this.priority,
81+
this.queuePartitionKey);
6882
}
6983

7084
/** Produces a new StartWorkflowOptions that overrides timeout value for the started workflow */
@@ -75,7 +89,8 @@ public StartWorkflowOptions withTimeout(Timeout timeout) {
7589
this.deadline,
7690
this.queueName,
7791
this.deduplicationId,
78-
this.priority);
92+
this.priority,
93+
this.queuePartitionKey);
7994
}
8095

8196
/** Produces a new StartWorkflowOptions that overrides timeout value for the started workflow */
@@ -96,7 +111,8 @@ public StartWorkflowOptions withNoTimeout() {
96111
this.deadline,
97112
this.queueName,
98113
this.deduplicationId,
99-
this.priority);
114+
this.priority,
115+
this.queuePartitionKey);
100116
}
101117

102118
/** Produces a new StartWorkflowOptions that overrides deadline value for the started workflow */
@@ -107,13 +123,20 @@ public StartWorkflowOptions withDeadline(Instant deadline) {
107123
deadline,
108124
this.queueName,
109125
this.deduplicationId,
110-
this.priority);
126+
this.priority,
127+
this.queuePartitionKey);
111128
}
112129

113130
/** Produces a new StartWorkflowOptions that assigns the started workflow to a queue */
114131
public StartWorkflowOptions withQueue(String queue) {
115132
return new StartWorkflowOptions(
116-
this.workflowId, this.timeout, this.deadline, queue, this.deduplicationId, this.priority);
133+
this.workflowId,
134+
this.timeout,
135+
this.deadline,
136+
queue,
137+
this.deduplicationId,
138+
this.priority,
139+
this.queuePartitionKey);
117140
}
118141

119142
/** Produces a new StartWorkflowOptions that assigns the started workflow to a queue */
@@ -132,7 +155,8 @@ public StartWorkflowOptions withDeduplicationId(String deduplicationId) {
132155
this.deadline,
133156
this.queueName,
134157
deduplicationId,
135-
this.priority);
158+
this.priority,
159+
this.queuePartitionKey);
136160
}
137161

138162
/**
@@ -146,7 +170,20 @@ public StartWorkflowOptions withPriority(Integer priority) {
146170
this.deadline,
147171
this.queueName,
148172
this.deduplicationId,
149-
priority);
173+
priority,
174+
this.queuePartitionKey);
175+
}
176+
177+
/** Produces a new StartWorkflowOptions that assigns a queue partition key */
178+
public StartWorkflowOptions withQueuePartitionKey(String queuePartitionKey) {
179+
return new StartWorkflowOptions(
180+
this.workflowId,
181+
this.timeout,
182+
this.deadline,
183+
this.queueName,
184+
this.deduplicationId,
185+
this.priority,
186+
queuePartitionKey);
150187
}
151188

152189
/** Get the assigned workflow ID, replacing empty with null */

transact/src/main/java/dev/dbos/transact/admin/AdminServer.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,20 @@
99
import java.io.IOException;
1010
import java.io.OutputStream;
1111
import java.net.InetSocketAddress;
12+
import java.time.Duration;
1213
import java.util.HashMap;
1314
import java.util.List;
1415
import java.util.Map;
1516
import java.util.concurrent.atomic.AtomicBoolean;
1617
import java.util.regex.Pattern;
1718
import java.util.stream.Collectors;
1819

20+
import com.fasterxml.jackson.core.JsonGenerator;
1921
import com.fasterxml.jackson.core.type.TypeReference;
2022
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.fasterxml.jackson.databind.SerializerProvider;
24+
import com.fasterxml.jackson.databind.module.SimpleModule;
25+
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
2126
import com.sun.net.httpserver.HttpExchange;
2227
import com.sun.net.httpserver.HttpHandler;
2328
import com.sun.net.httpserver.HttpServer;
@@ -28,6 +33,25 @@ public class AdminServer implements AutoCloseable {
2833
private static final Logger logger = LoggerFactory.getLogger(AdminServer.class);
2934
private static final ObjectMapper mapper = new ObjectMapper();
3035

36+
static class DurationSerializer extends StdSerializer<Duration> {
37+
38+
public DurationSerializer() {
39+
super(Duration.class);
40+
}
41+
42+
@Override
43+
public void serialize(Duration value, JsonGenerator gen, SerializerProvider provider)
44+
throws IOException {
45+
gen.writeNumber(value.toMillis() / 1000.0);
46+
}
47+
}
48+
49+
static {
50+
var module = new SimpleModule();
51+
module.addSerializer(Duration.class, new DurationSerializer());
52+
mapper.registerModule(module);
53+
}
54+
3155
private HttpServer server;
3256
private final SystemDatabase systemDatabase;
3357
private final DBOSExecutor dbosExecutor;

transact/src/main/java/dev/dbos/transact/conductor/protocol/WorkflowsOutput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public WorkflowsOutput(WorkflowStatus status) {
6767
status.deadlineEpochMs() == null ? null : status.deadlineEpochMs().toString();
6868
this.DeduplicationID = status.deduplicationId();
6969
this.Priority = Objects.requireNonNullElse(status.priority(), 0).toString();
70-
this.QueuePartitionKey = status.partitionKey();
70+
this.QueuePartitionKey = status.queuePartitionKey();
7171
this.ForkedFrom = status.forkedFrom();
7272
}
7373
}

0 commit comments

Comments
 (0)