|
1 | 1 | package dev.dbos.transact.queue; |
2 | 2 |
|
3 | 3 | import static org.junit.jupiter.api.Assertions.assertEquals; |
| 4 | +import static org.junit.jupiter.api.Assertions.assertNull; |
4 | 5 | import static org.junit.jupiter.api.Assertions.assertThrows; |
5 | 6 | import static org.junit.jupiter.api.Assertions.assertTrue; |
6 | 7 |
|
@@ -86,21 +87,67 @@ public void testQueuedWorkflow() throws Exception { |
86 | 87 | @Test |
87 | 88 | public void testDedupeId() throws Exception { |
88 | 89 |
|
89 | | - Queue firstQ = new Queue("firstQueue").withConcurrency(1).withWorkerConcurrency(1); |
| 90 | + Queue firstQ = new Queue("firstQueue"); |
90 | 91 | DBOS.registerQueue(firstQ); |
91 | 92 |
|
92 | 93 | ServiceQ serviceQ = DBOS.registerWorkflows(ServiceQ.class, new ServiceQImpl()); |
93 | 94 | DBOS.launch(); |
94 | 95 |
|
| 96 | + // pause queue service for test validation |
95 | 97 | var qs = DBOSTestAccess.getQueueService(); |
96 | 98 | qs.pause(); |
97 | 99 |
|
98 | | - var options = new StartWorkflowOptions().withQueue(firstQ).withDeduplicationId("dedupe"); |
99 | | - DBOS.startWorkflow(() -> serviceQ.simpleQWorkflow("inputq"), options); |
| 100 | + var options = new StartWorkflowOptions().withQueue(firstQ); |
| 101 | + var dedupeId = "dedupeId"; |
| 102 | + var h1 = |
| 103 | + DBOS.startWorkflow( |
| 104 | + () -> serviceQ.simpleQWorkflow("abc"), options.withDeduplicationId(dedupeId)); |
| 105 | + var s1 = h1.getStatus(); |
| 106 | + assertEquals(s1.queueName(), firstQ.name()); |
| 107 | + assertEquals(s1.deduplicationId(), dedupeId); |
| 108 | + |
| 109 | + // enqueue with different dedupe ID should be fine |
| 110 | + var dedupeId2 = "different-dedupeId"; |
| 111 | + var h2 = |
| 112 | + DBOS.startWorkflow( |
| 113 | + () -> serviceQ.simpleQWorkflow("def"), options.withDeduplicationId(dedupeId2)); |
| 114 | + var s2 = h2.getStatus(); |
| 115 | + assertEquals(s2.queueName(), firstQ.name()); |
| 116 | + assertEquals(s2.deduplicationId(), dedupeId2); |
| 117 | + |
| 118 | + // enqueue with no dedupe ID should be fine |
| 119 | + var h3 = DBOS.startWorkflow(() -> serviceQ.simpleQWorkflow("ghi"), options); |
| 120 | + var s3 = h3.getStatus(); |
| 121 | + assertEquals(s3.queueName(), firstQ.name()); |
| 122 | + assertNull(s3.deduplicationId()); |
100 | 123 |
|
101 | 124 | assertThrows( |
102 | 125 | RuntimeException.class, |
103 | | - () -> DBOS.startWorkflow(() -> serviceQ.simpleQWorkflow("id"), options)); |
| 126 | + () -> |
| 127 | + DBOS.startWorkflow( |
| 128 | + () -> serviceQ.simpleQWorkflow("jkl"), options.withDeduplicationId(dedupeId))); |
| 129 | + |
| 130 | + // enable queue service to run |
| 131 | + qs.unpause(); |
| 132 | + |
| 133 | + // wait for initial workflow with initial dedupe ID to finish |
| 134 | + h1.getResult(); |
| 135 | + h2.getResult(); |
| 136 | + h3.getResult(); |
| 137 | + |
| 138 | + var h4 = |
| 139 | + DBOS.startWorkflow( |
| 140 | + () -> serviceQ.simpleQWorkflow("jkl"), options.withDeduplicationId(dedupeId)); |
| 141 | + h4.getResult(); |
| 142 | + |
| 143 | + var rows = DBUtils.getWorkflowRows(dataSource); |
| 144 | + assertEquals(4, rows.size()); |
| 145 | + |
| 146 | + for (var row : rows) { |
| 147 | + assertEquals("SUCCESS", row.status()); |
| 148 | + assertEquals("firstQueue", row.queueName()); |
| 149 | + assertNull(row.deduplicationId()); |
| 150 | + } |
104 | 151 | } |
105 | 152 |
|
106 | 153 | @RetryingTest(3) |
|
0 commit comments