Skip to content

Commit e1cc7dc

Browse files
committed
in memory cache of dynamic queues for validation
1 parent 0e51c0b commit e1cc7dc

2 files changed

Lines changed: 15 additions & 4 deletions

File tree

transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,9 @@ public Optional<RegisteredWorkflow> getRegisteredWorkflow(
395395
}
396396

397397
public Optional<Queue> findQueue(String queueName) {
398-
return findStaticQueue(queueName).or(() -> systemDatabase.findQueue(queueName));
398+
return findStaticQueue(queueName)
399+
.or(() -> queueService.findDynamicQueue(queueName))
400+
.or(() -> systemDatabase.findQueue(queueName));
399401
}
400402

401403
public Collection<Queue> getStaticQueues() {
@@ -1458,8 +1460,7 @@ private void validateWorkflow(String workflowName, String className, String inst
14581460

14591461
private void validateQueue(String queueName) {
14601462
if (queueName != null) {
1461-
findStaticQueue(queueName)
1462-
.or(() -> systemDatabase.findQueue(queueName))
1463+
findQueue(queueName)
14631464
.orElseThrow(
14641465
() -> new IllegalStateException("Queue %s is not registered".formatted(queueName)));
14651466
}
@@ -1472,7 +1473,7 @@ private void validateQueue(String queueName, String queuePartitionKey) {
14721473
"DBOS internal queue is not a partitioned queue, but a partition key was provided");
14731474
}
14741475
} else {
1475-
var queue = findStaticQueue(queueName).or(() -> systemDatabase.findQueue(queueName));
1476+
var queue = findQueue(queueName);
14761477
if (queue.isPresent()) {
14771478
if (queue.get().partitioningEnabled() && queuePartitionKey == null) {
14781479
throw new IllegalArgumentException(

transact/src/main/java/dev/dbos/transact/execution/QueueService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
import java.time.Duration;
88
import java.util.Collection;
9+
import java.util.Map;
910
import java.util.Objects;
11+
import java.util.Optional;
1012
import java.util.Set;
1113
import java.util.concurrent.ConcurrentHashMap;
1214
import java.util.concurrent.Executors;
@@ -15,6 +17,7 @@
1517
import java.util.concurrent.TimeUnit;
1618
import java.util.concurrent.atomic.AtomicBoolean;
1719
import java.util.concurrent.atomic.AtomicReference;
20+
import java.util.stream.Collectors;
1821

1922
import org.slf4j.Logger;
2023
import org.slf4j.LoggerFactory;
@@ -28,6 +31,7 @@ public class QueueService implements AutoCloseable {
2831
private final AtomicReference<ScheduledExecutorService> execServiceRef = new AtomicReference<>();
2932
private final AtomicBoolean paused = new AtomicBoolean(false);
3033
private final Set<String> dbListeningQueues = ConcurrentHashMap.newKeySet();
34+
private volatile Map<String, Queue> dynamicQueueMap = Map.of();
3135

3236
private final SystemDatabase systemDatabase;
3337
private final DBOSExecutor dbosExecutor;
@@ -80,6 +84,10 @@ public boolean isStopped() {
8084
return this.execServiceRef.get() == null;
8185
}
8286

87+
public Optional<Queue> findDynamicQueue(String queueName) {
88+
return Optional.ofNullable(dynamicQueueMap.get(queueName));
89+
}
90+
8391
private boolean isListening(String queueName) {
8492
return queueName.equals(Constants.DBOS_INTERNAL_QUEUE)
8593
|| listenQueues.isEmpty()
@@ -102,6 +110,8 @@ private void pollDynamicQueues() {
102110
if (execServiceRef.get() == null) return;
103111

104112
var dbQueues = systemDatabase.listQueues();
113+
dynamicQueueMap =
114+
dbQueues.stream().collect(Collectors.toUnmodifiableMap(Queue::name, q -> q));
105115
if (logger.isDebugEnabled()) {
106116
logger.debug("pollDynamicQueues found {} queues", dbQueues.size());
107117
for (var q : dbQueues) {

0 commit comments

Comments
 (0)