Skip to content

Commit 3e6369f

Browse files
authored
java docs & nullability annotations for QueueOptions (#407)
Also adds java/dbos-toolbox to test_demo_apps
1 parent af8ba4d commit 3e6369f

2 files changed

Lines changed: 167 additions & 29 deletions

File tree

.github/workflows/test_demo_apps.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ jobs:
5454
include:
5555
- app: java/dbos-starter
5656
build_file: app/build.gradle.kts
57+
- app: java/dbos-toolbox
58+
build_file: app/build.gradle.kts
5759
- app: java/widget-store
5860
build_file: build.gradle.kts
5961
- app: kotlin/dbos-starter

transact/src/main/java/dev/dbos/transact/workflow/QueueOptions.java

Lines changed: 165 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.Optional;
55
import java.util.concurrent.TimeUnit;
66

7+
import org.jspecify.annotations.NonNull;
78
import org.jspecify.annotations.Nullable;
89

910
/**
@@ -17,15 +18,32 @@
1718
* <p>The non-nullable queue properties ({@code priorityEnabled}, {@code partitionQueue}, {@code
1819
* pollingInterval}) use {@link Optional} — {@link Optional#empty()} means use the default on
1920
* creation or leave unchanged on update; a present value sets the column.
21+
*
22+
* @param concurrency max concurrent executions of this queue across all workers; {@link
23+
* Field#absent()} means no limit on creation or leave unchanged on update; {@code
24+
* Field.of(null)} clears the column
25+
* @param workerConcurrency max concurrent executions of this queue per worker process; {@link
26+
* Field#absent()} means no limit on creation or leave unchanged on update; {@code
27+
* Field.of(null)} clears the column
28+
* @param rateLimitMax maximum number of starts allowed in each rate-limit window; must be paired
29+
* with {@code rateLimitPeriod}; {@link Field#absent()} means no rate limit or leave unchanged
30+
* @param rateLimitPeriod duration of the rolling rate-limit window; must be paired with {@code
31+
* rateLimitMax}; {@link Field#absent()} means no rate limit or leave unchanged
32+
* @param priorityEnabled whether priority-based ordering is enabled for this queue; {@link
33+
* Optional#empty()} means use the default on creation or leave unchanged on update
34+
* @param partitionQueue whether to partition queue entries by workflow class so each class gets its
35+
* own concurrency slot; {@link Optional#empty()} means use the default or leave unchanged
36+
* @param pollingInterval how often workers poll the database for new queue entries; {@link
37+
* Optional#empty()} means use the default or leave unchanged
2038
*/
2139
public record QueueOptions(
22-
Field<Integer> concurrency,
23-
Field<Integer> workerConcurrency,
24-
Field<Integer> rateLimitMax,
25-
Field<Duration> rateLimitPeriod,
26-
Optional<Boolean> priorityEnabled,
27-
Optional<Boolean> partitionQueue,
28-
Optional<Duration> pollingInterval) {
40+
@NonNull Field<Integer> concurrency,
41+
@NonNull Field<Integer> workerConcurrency,
42+
@NonNull Field<Integer> rateLimitMax,
43+
@NonNull Field<Duration> rateLimitPeriod,
44+
@NonNull Optional<Boolean> priorityEnabled,
45+
@NonNull Optional<Boolean> partitionQueue,
46+
@NonNull Optional<Duration> pollingInterval) {
2947

3048
private static final QueueOptions EMPTY =
3149
new QueueOptions(
@@ -37,10 +55,12 @@ public record QueueOptions(
3755
Optional.empty(),
3856
Optional.empty());
3957

40-
public static QueueOptions empty() {
58+
/** Returns the shared all-absent instance; no queue property will be set or changed. */
59+
public static @NonNull QueueOptions empty() {
4160
return EMPTY;
4261
}
4362

63+
/** Returns {@code true} if all fields are absent — no property will be set or changed. */
4464
public boolean isEmpty() {
4565
return !concurrency.isPresent()
4666
&& !workerConcurrency.isPresent()
@@ -53,37 +73,82 @@ public boolean isEmpty() {
5373

5474
// ── Static factories ──────────────────────────────────────────────────────
5575

56-
public static QueueOptions setConcurrency(@Nullable Integer value) {
76+
/**
77+
* Creates options that set only {@code concurrency}; all other fields are absent.
78+
*
79+
* @param value the concurrency limit, or {@code null} to clear the column
80+
*/
81+
public static @NonNull QueueOptions setConcurrency(@Nullable Integer value) {
5782
return EMPTY.withConcurrency(Field.of(value));
5883
}
5984

60-
public static QueueOptions setWorkerConcurrency(@Nullable Integer value) {
85+
/**
86+
* Creates options that set only {@code workerConcurrency}; all other fields are absent.
87+
*
88+
* @param value the per-worker concurrency limit, or {@code null} to clear the column
89+
*/
90+
public static @NonNull QueueOptions setWorkerConcurrency(@Nullable Integer value) {
6191
return EMPTY.withWorkerConcurrency(Field.of(value));
6292
}
6393

64-
public static QueueOptions setRateLimit(@Nullable Integer max, @Nullable Duration period) {
94+
/**
95+
* Creates options that set only the rate limit; all other fields are absent.
96+
*
97+
* @param max max starts per window, or {@code null} to clear
98+
* @param period length of the rolling window, or {@code null} to clear
99+
*/
100+
public static @NonNull QueueOptions setRateLimit(
101+
@Nullable Integer max, @Nullable Duration period) {
65102
return EMPTY.withRateLimitMax(Field.of(max)).withRateLimitPeriod(Field.of(period));
66103
}
67104

68-
public static QueueOptions setRateLimit(int limit, long period, TimeUnit unit) {
105+
/**
106+
* Creates options that set only the rate limit; all other fields are absent.
107+
*
108+
* @param limit max starts per window
109+
* @param period length of the rolling window
110+
* @param unit time unit for {@code period}
111+
*/
112+
public static @NonNull QueueOptions setRateLimit(int limit, long period, @NonNull TimeUnit unit) {
69113
return setRateLimit(limit, Duration.of(period, unit.toChronoUnit()));
70114
}
71115

72-
public static QueueOptions setPriorityEnabled(boolean value) {
116+
/**
117+
* Creates options that set only {@code priorityEnabled}; all other fields are absent.
118+
*
119+
* @param value {@code true} to enable priority ordering, {@code false} to disable
120+
*/
121+
public static @NonNull QueueOptions setPriorityEnabled(boolean value) {
73122
return EMPTY.withPriorityEnabled(Optional.of(value));
74123
}
75124

76-
public static QueueOptions setPartitionQueue(boolean value) {
125+
/**
126+
* Creates options that set only {@code partitionQueue}; all other fields are absent.
127+
*
128+
* @param value {@code true} to enable per-class queue partitioning, {@code false} to disable
129+
*/
130+
public static @NonNull QueueOptions setPartitionQueue(boolean value) {
77131
return EMPTY.withPartitionQueue(Optional.of(value));
78132
}
79133

80-
public static QueueOptions setPollingInterval(Duration value) {
134+
/**
135+
* Creates options that set only {@code pollingInterval}; all other fields are absent.
136+
*
137+
* @param value the interval at which workers poll for new queue entries
138+
*/
139+
public static @NonNull QueueOptions setPollingInterval(@NonNull Duration value) {
81140
return EMPTY.withPollingInterval(Optional.of(value));
82141
}
83142

84143
// ── Builders for chaining ─────────────────────────────────────────────────
85144

86-
public QueueOptions withConcurrency(Field<Integer> concurrency) {
145+
/**
146+
* Returns a copy of these options with {@code concurrency} replaced.
147+
*
148+
* @param concurrency the new value; use {@link Field#absent()} to leave unchanged, or {@code
149+
* Field.of(null)} to clear the column
150+
*/
151+
public @NonNull QueueOptions withConcurrency(@NonNull Field<Integer> concurrency) {
87152
return new QueueOptions(
88153
concurrency,
89154
workerConcurrency,
@@ -94,7 +159,13 @@ public QueueOptions withConcurrency(Field<Integer> concurrency) {
94159
pollingInterval);
95160
}
96161

97-
public QueueOptions withWorkerConcurrency(Field<Integer> workerConcurrency) {
162+
/**
163+
* Returns a copy of these options with {@code workerConcurrency} replaced.
164+
*
165+
* @param workerConcurrency the new value; use {@link Field#absent()} to leave unchanged, or
166+
* {@code Field.of(null)} to clear the column
167+
*/
168+
public @NonNull QueueOptions withWorkerConcurrency(@NonNull Field<Integer> workerConcurrency) {
98169
return new QueueOptions(
99170
concurrency,
100171
workerConcurrency,
@@ -105,7 +176,13 @@ public QueueOptions withWorkerConcurrency(Field<Integer> workerConcurrency) {
105176
pollingInterval);
106177
}
107178

108-
public QueueOptions withRateLimitMax(Field<Integer> rateLimitMax) {
179+
/**
180+
* Returns a copy of these options with {@code rateLimitMax} replaced.
181+
*
182+
* @param rateLimitMax the new value; use {@link Field#absent()} to leave unchanged, or {@code
183+
* Field.of(null)} to clear the column
184+
*/
185+
public @NonNull QueueOptions withRateLimitMax(@NonNull Field<Integer> rateLimitMax) {
109186
return new QueueOptions(
110187
concurrency,
111188
workerConcurrency,
@@ -116,7 +193,13 @@ public QueueOptions withRateLimitMax(Field<Integer> rateLimitMax) {
116193
pollingInterval);
117194
}
118195

119-
public QueueOptions withRateLimitPeriod(Field<Duration> rateLimitPeriod) {
196+
/**
197+
* Returns a copy of these options with {@code rateLimitPeriod} replaced.
198+
*
199+
* @param rateLimitPeriod the new value; use {@link Field#absent()} to leave unchanged, or {@code
200+
* Field.of(null)} to clear the column
201+
*/
202+
public @NonNull QueueOptions withRateLimitPeriod(@NonNull Field<Duration> rateLimitPeriod) {
120203
return new QueueOptions(
121204
concurrency,
122205
workerConcurrency,
@@ -127,7 +210,12 @@ public QueueOptions withRateLimitPeriod(Field<Duration> rateLimitPeriod) {
127210
pollingInterval);
128211
}
129212

130-
public QueueOptions withPriorityEnabled(Optional<Boolean> priorityEnabled) {
213+
/**
214+
* Returns a copy of these options with {@code priorityEnabled} replaced.
215+
*
216+
* @param priorityEnabled the new value; use {@link Optional#empty()} to leave unchanged
217+
*/
218+
public @NonNull QueueOptions withPriorityEnabled(@NonNull Optional<Boolean> priorityEnabled) {
131219
return new QueueOptions(
132220
concurrency,
133221
workerConcurrency,
@@ -138,7 +226,12 @@ public QueueOptions withPriorityEnabled(Optional<Boolean> priorityEnabled) {
138226
pollingInterval);
139227
}
140228

141-
public QueueOptions withPartitionQueue(Optional<Boolean> partitionQueue) {
229+
/**
230+
* Returns a copy of these options with {@code partitionQueue} replaced.
231+
*
232+
* @param partitionQueue the new value; use {@link Optional#empty()} to leave unchanged
233+
*/
234+
public @NonNull QueueOptions withPartitionQueue(@NonNull Optional<Boolean> partitionQueue) {
142235
return new QueueOptions(
143236
concurrency,
144237
workerConcurrency,
@@ -149,7 +242,12 @@ public QueueOptions withPartitionQueue(Optional<Boolean> partitionQueue) {
149242
pollingInterval);
150243
}
151244

152-
public QueueOptions withPollingInterval(Optional<Duration> pollingInterval) {
245+
/**
246+
* Returns a copy of these options with {@code pollingInterval} replaced.
247+
*
248+
* @param pollingInterval the new value; use {@link Optional#empty()} to leave unchanged
249+
*/
250+
public @NonNull QueueOptions withPollingInterval(@NonNull Optional<Duration> pollingInterval) {
153251
return new QueueOptions(
154252
concurrency,
155253
workerConcurrency,
@@ -162,31 +260,69 @@ public QueueOptions withPollingInterval(Optional<Duration> pollingInterval) {
162260

163261
// ── Convenience chaining methods ──────────────────────────────────────────
164262

165-
public QueueOptions andConcurrency(@Nullable Integer value) {
263+
/**
264+
* Returns a copy of these options with {@code concurrency} set to the given value.
265+
*
266+
* @param value the concurrency limit, or {@code null} to clear the column
267+
*/
268+
public @NonNull QueueOptions andConcurrency(@Nullable Integer value) {
166269
return withConcurrency(Field.of(value));
167270
}
168271

169-
public QueueOptions andWorkerConcurrency(@Nullable Integer value) {
272+
/**
273+
* Returns a copy of these options with {@code workerConcurrency} set to the given value.
274+
*
275+
* @param value the per-worker concurrency limit, or {@code null} to clear the column
276+
*/
277+
public @NonNull QueueOptions andWorkerConcurrency(@Nullable Integer value) {
170278
return withWorkerConcurrency(Field.of(value));
171279
}
172280

173-
public QueueOptions andRateLimit(@Nullable Integer max, @Nullable Duration period) {
281+
/**
282+
* Returns a copy of these options with the rate limit set.
283+
*
284+
* @param max max starts per window, or {@code null} to clear
285+
* @param period length of the rolling window, or {@code null} to clear
286+
*/
287+
public @NonNull QueueOptions andRateLimit(@Nullable Integer max, @Nullable Duration period) {
174288
return withRateLimitMax(Field.of(max)).withRateLimitPeriod(Field.of(period));
175289
}
176290

177-
public QueueOptions andRateLimit(int max, long period, TimeUnit unit) {
291+
/**
292+
* Returns a copy of these options with the rate limit set.
293+
*
294+
* @param max max starts per window
295+
* @param period length of the rolling window
296+
* @param unit time unit for {@code period}
297+
*/
298+
public @NonNull QueueOptions andRateLimit(int max, long period, @NonNull TimeUnit unit) {
178299
return andRateLimit(max, Duration.of(period, unit.toChronoUnit()));
179300
}
180301

181-
public QueueOptions andPriorityEnabled(boolean value) {
302+
/**
303+
* Returns a copy of these options with {@code priorityEnabled} set to the given value.
304+
*
305+
* @param value {@code true} to enable priority ordering, {@code false} to disable
306+
*/
307+
public @NonNull QueueOptions andPriorityEnabled(boolean value) {
182308
return withPriorityEnabled(Optional.of(value));
183309
}
184310

185-
public QueueOptions andPartitionQueue(boolean value) {
311+
/**
312+
* Returns a copy of these options with {@code partitionQueue} set to the given value.
313+
*
314+
* @param value {@code true} to enable per-class queue partitioning, {@code false} to disable
315+
*/
316+
public @NonNull QueueOptions andPartitionQueue(boolean value) {
186317
return withPartitionQueue(Optional.of(value));
187318
}
188319

189-
public QueueOptions andPollingInterval(Duration value) {
320+
/**
321+
* Returns a copy of these options with {@code pollingInterval} set to the given value.
322+
*
323+
* @param value the interval at which workers poll for new queue entries
324+
*/
325+
public @NonNull QueueOptions andPollingInterval(@NonNull Duration value) {
190326
return withPollingInterval(Optional.of(value));
191327
}
192328
}

0 commit comments

Comments
 (0)