1313import java .sql .SQLException ;
1414import java .time .Duration ;
1515import java .util .Objects ;
16+ import java .util .UUID ;
1617import java .util .concurrent .TimeUnit ;
1718
1819import javax .sql .DataSource ;
@@ -43,11 +44,12 @@ void speedUpPollingForTest() {
4344 }
4445
4546 void send (
46- String workflowUuid ,
47- int functionId ,
48- String destinationUuid ,
47+ String workflowId ,
48+ int stepId ,
49+ String destinationId ,
4950 Object message ,
5051 String topic ,
52+ String messageId ,
5153 String serialization )
5254 throws SQLException {
5355
@@ -61,52 +63,55 @@ void send(
6163 try {
6264 // Check if operation was already executed
6365 StepResult recordedOutput =
64- StepsDAO .checkStepExecutionTxn (
65- workflowUuid , functionId , functionName , conn , this .schema );
66+ StepsDAO .checkStepExecutionTxn (workflowId , stepId , functionName , conn , this .schema );
6667
6768 if (recordedOutput != null ) {
6869 logger .debug (
6970 "Replaying send, id: {}, destination_uuid: {}, topic: {}" ,
70- functionId ,
71- destinationUuid ,
71+ stepId ,
72+ destinationId ,
7273 finalTopic );
7374 conn .commit ();
7475 return ;
7576 } else {
7677 logger .debug (
7778 "Running send, id: {}, destination_uuid: {}, topic: {}" ,
78- functionId ,
79- destinationUuid ,
79+ stepId ,
80+ destinationId ,
8081 finalTopic );
8182 }
8283
83- // Serialize the message using the specified format
84- SerializationUtil . SerializedResult serializedMsg =
84+ var finalMessageId = ( messageId != null ) ? messageId : UUID . randomUUID (). toString ();
85+ var serializedMsg =
8586 SerializationUtil .serializeValue (message , serialization , this .serializer );
8687
8788 // Insert notification with serialization format
8889 final String sql =
8990 """
90- INSERT INTO "%s".notifications (destination_uuid, topic, message, serialization) VALUES (?, ?, ?, ?)
91+ INSERT INTO "%s".notifications
92+ (destination_uuid, topic, message, serialization, message_uuid)
93+ VALUES (?, ?, ?, ?, ?)
94+ ON CONFLICT (message_uuid) DO NOTHING
9195 """
9296 .formatted (this .schema );
9397
9498 try (PreparedStatement stmt = conn .prepareStatement (sql )) {
95- stmt .setString (1 , destinationUuid );
99+ stmt .setString (1 , destinationId );
96100 stmt .setString (2 , finalTopic );
97101 stmt .setString (3 , serializedMsg .serializedValue ());
98102 stmt .setString (4 , serializedMsg .serialization ());
103+ stmt .setString (5 , finalMessageId );
99104 stmt .executeUpdate ();
100105 } catch (SQLException e ) {
101106 // Foreign key violation
102107 if ("23503" .equals (e .getSQLState ())) {
103- throw new DBOSNonExistentWorkflowException (destinationUuid );
108+ throw new DBOSNonExistentWorkflowException (destinationId );
104109 }
105110 throw e ;
106111 }
107112
108113 // Record operation result
109- var output = new StepResult (workflowUuid , functionId , functionName , null , null , null , null );
114+ var output = new StepResult (workflowId , stepId , functionName , null , null , null , null );
110115 StepsDAO .recordStepResultTxn (
111116 output , startTime , System .currentTimeMillis (), conn , this .schema );
112117
@@ -123,8 +128,40 @@ void send(
123128 }
124129 }
125130
126- Object recv (
127- String workflowUuid , int functionId , int timeoutFunctionId , String topic , Duration timeout )
131+ void sendDirect (
132+ String destinationId , Object message , String topic , String messageId , String serialization )
133+ throws SQLException {
134+ String finalTopic = (topic != null ) ? topic : Constants .DBOS_NULL_TOPIC ;
135+ String finalMessageId = (messageId != null ) ? messageId : UUID .randomUUID ().toString ();
136+ var serializedMsg = SerializationUtil .serializeValue (message , serialization , this .serializer );
137+
138+ final String sql =
139+ """
140+ INSERT INTO "%s".notifications
141+ (destination_uuid, topic, message, message_uuid, serialization)
142+ VALUES (?, ?, ?, ?, ?)
143+ ON CONFLICT (message_uuid) DO NOTHING
144+ """
145+ .formatted (this .schema );
146+
147+ try (var conn = dataSource .getConnection ();
148+ var stmt = conn .prepareStatement (sql )) {
149+ stmt .setString (1 , destinationId );
150+ stmt .setString (2 , finalTopic );
151+ stmt .setString (3 , serializedMsg .serializedValue ());
152+ stmt .setString (4 , finalMessageId );
153+ stmt .setString (5 , serializedMsg .serialization ());
154+ stmt .executeUpdate ();
155+ } catch (SQLException e ) {
156+ // Foreign key violation
157+ if ("23503" .equals (e .getSQLState ())) {
158+ throw new DBOSNonExistentWorkflowException (destinationId );
159+ }
160+ throw e ;
161+ }
162+ }
163+
164+ Object recv (String workflowId , int stepId , int timeoutFunctionId , String topic , Duration timeout )
128165 throws SQLException {
129166
130167 var startTime = System .currentTimeMillis ();
@@ -133,28 +170,26 @@ Object recv(
133170
134171 // First, check for previous executions
135172 StepResult recordedOutput = null ;
136-
137173 try (Connection c = dataSource .getConnection ()) {
138174 recordedOutput =
139- StepsDAO .checkStepExecutionTxn (workflowUuid , functionId , functionName , c , this .schema );
175+ StepsDAO .checkStepExecutionTxn (workflowId , stepId , functionName , c , this .schema );
140176 }
141177
142178 if (recordedOutput != null ) {
143- logger .debug ("Replaying recv, id: {}, topic: {}" , functionId , finalTopic );
179+ logger .debug ("Replaying recv, id: {}, topic: {}" , stepId , finalTopic );
144180 if (recordedOutput .output () != null ) {
145181 return SerializationUtil .deserializeValue (
146182 recordedOutput .output (), recordedOutput .serialization (), this .serializer );
147183 } else {
148184 throw new RuntimeException ("No output recorded in the last recv" );
149185 }
150186 } else {
151- logger .debug (
152- "Running recv, wfid {}, id: {}, topic: {}" , workflowUuid , functionId , finalTopic );
187+ logger .debug ("Running recv, wfid {}, id: {}, topic: {}" , workflowId , stepId , finalTopic );
153188 }
154189
155190 // Insert a condition to the notifications map
156- String payload = workflowUuid + "::" + finalTopic ;
157- NotificationService . LockConditionPair lockPair = new NotificationService .LockConditionPair ();
191+ String payload = workflowId + "::" + finalTopic ;
192+ var lockPair = new NotificationService .LockConditionPair ();
158193
159194 // Timeout / deadline for the notification
160195 double actualTimeout = timeout .toMillis ();
@@ -166,7 +201,7 @@ Object recv(
166201 boolean success = notificationService .registerNotificationCondition (payload , lockPair );
167202 if (!success ) {
168203 // if this happens, the workflow is executing concurrently
169- throw new DBOSWorkflowExecutionConflictException (workflowUuid );
204+ throw new DBOSWorkflowExecutionConflictException (workflowId );
170205 }
171206
172207 while (true ) {
@@ -176,12 +211,13 @@ Object recv(
176211 try (Connection conn = dataSource .getConnection ()) {
177212 final String sql =
178213 """
179- SELECT topic FROM "%s".notifications WHERE destination_uuid = ? AND topic = ?
214+ SELECT topic FROM "%s".notifications
215+ WHERE destination_uuid = ? AND topic = ? AND consumed = FALSE
180216 """
181217 .formatted (this .schema );
182218
183219 try (PreparedStatement stmt = conn .prepareStatement (sql )) {
184- stmt .setString (1 , workflowUuid );
220+ stmt .setString (1 , workflowId );
185221 stmt .setString (2 , finalTopic );
186222 try (ResultSet rs = stmt .executeQuery ()) {
187223 hasExistingNotification = rs .next ();
@@ -199,7 +235,7 @@ Object recv(
199235 actualTimeout =
200236 StepsDAO .durableSleepDuration (
201237 dataSource ,
202- workflowUuid ,
238+ workflowId ,
203239 timeoutFunctionId ,
204240 timeout ,
205241 this .schema ,
@@ -223,63 +259,61 @@ Object recv(
223259 notificationService .unregisterNotificationCondition (payload );
224260 }
225261
226- // Transactionally consume and return the message if it's in the database
262+ // Transactionally consume and return the oldest unconsumed message, or null if none.
227263 try (Connection conn = dataSource .getConnection ()) {
228264 conn .setAutoCommit (false );
229265
230266 try {
231- // Find and delete the oldest entry for this workflow+topic
267+ // Find and mark consumed the oldest entry for this workflow+topic
232268 final String sql =
233269 """
234- WITH oldest_entry AS (
235- SELECT destination_uuid, topic, message, serialization, created_at_epoch_ms
236- FROM "%1$s".notifications
237- WHERE destination_uuid = ? AND topic = ?
238- ORDER BY created_at_epoch_ms ASC
239- LIMIT 1
270+ UPDATE "%1$s".notifications
271+ SET consumed = TRUE
272+ WHERE destination_uuid = ?
273+ AND topic = ?
274+ AND consumed = FALSE
275+ AND message_uuid = (
276+ SELECT message_uuid FROM "%1$s".notifications
277+ WHERE destination_uuid = ?
278+ AND topic = ?
279+ AND consumed = FALSE
280+ ORDER BY created_at_epoch_ms ASC
281+ LIMIT 1
240282 )
241- DELETE FROM "%1$s".notifications
242- WHERE destination_uuid = (SELECT destination_uuid FROM oldest_entry)
243- AND topic = (SELECT topic FROM oldest_entry)
244- AND created_at_epoch_ms = (SELECT created_at_epoch_ms FROM oldest_entry)
245- RETURNING message, serialization
283+ RETURNING message, serialization
246284 """
247285 .formatted (this .schema );
248286
249- Object recvdMessage = null ;
287+ String serializedMessage = null ;
288+ String serialization = null ;
250289 try (PreparedStatement stmt = conn .prepareStatement (sql )) {
251- stmt .setString (1 , workflowUuid );
290+ // JDBC uses positional parameters (?), and each placeholder must be bound explicitly,
291+ // so we need to set the same values again for the nested SELECT
292+ stmt .setString (1 , workflowId );
252293 stmt .setString (2 , finalTopic );
294+ stmt .setString (3 , workflowId );
295+ stmt .setString (4 , finalTopic );
253296
254297 try (ResultSet rs = stmt .executeQuery ()) {
255298 if (rs .next ()) {
256- String serializedMessage = rs .getString ("message" );
257- String serialization = rs .getString ("serialization" );
258- recvdMessage =
259- SerializationUtil .deserializeValue (
260- serializedMessage , serialization , this .serializer );
299+ serializedMessage = rs .getString ("message" );
300+ serialization = rs .getString ("serialization" );
261301 }
262302 }
263303 }
264304
305+ var recvdMessage =
306+ SerializationUtil .deserializeValue (serializedMessage , serialization , this .serializer );
307+
265308 // Record operation result
266- Object toSave = recvdMessage ;
267- var toSaveSer = SerializationUtil .serializeValue (toSave , null , this .serializer );
268309 StepResult output =
269310 new StepResult (
270- workflowUuid ,
271- functionId ,
272- functionName ,
273- null ,
274- null ,
275- null ,
276- toSaveSer .serialization ())
277- .withOutput (toSaveSer .serializedValue ());
311+ workflowId , stepId , functionName , serializedMessage , null , null , serialization );
278312 StepsDAO .recordStepResultTxn (
279313 output , startTime , System .currentTimeMillis (), conn , this .schema );
280314
281315 conn .commit ();
282- return toSave ;
316+ return recvdMessage ;
283317
284318 } catch (Exception e ) {
285319 conn .rollback ();
0 commit comments