Skip to content

Commit 6ae5b83

Browse files
committed
Updated logic for payload exchange
1 parent c5def01 commit 6ae5b83

4 files changed

Lines changed: 67 additions & 26 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
send_response.txt
2222
MATCH_STATS.md
2323
server_log.txt
24+
repopack-output.txt
25+
repomix-output.txt
2426
# Test coverage reports
2527
coverage/
2628
*.profraw

src/queue/q_matching.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl UnifiedQueue {
108108
}
109109

110110
/// Helper method to atomically match two requests
111-
pub(super) fn atomic_match(&self, new_request: &QueuedRequest, matched_request_ref: &QueuedRequest) -> Result<MatchResult, BumpError> {
111+
pub(super) fn atomic_match(&self, new_request: &QueuedRequest, matched_request_ref: &QueuedRequest) -> Result<(MatchResult, MatchResult), BumpError> {
112112
// In this new implementation:
113113
// - new_request is always the request being added (has full channel)
114114
// - matched_request_ref is just a reference object with the ID of the matched request
@@ -388,12 +388,12 @@ impl UnifiedQueue {
388388
log::info!("Notifying send request {} of match", send_id);
389389
match tx {
390390
ResponseChannel::OneShot(tx) => {
391-
if let Err(e) = tx.send(send_match_result.clone()) {
391+
if let Err(e) = tx.send((send_match_result.clone(), receive_match_result.clone())) {
392392
log::error!("Failed to send match result to send request {}: {:?}", send_id, e);
393393
}
394394
},
395395
ResponseChannel::Broadcast(tx) => {
396-
if let Err(e) = tx.send(send_match_result.clone()) {
396+
if let Err(e) = tx.send((send_match_result.clone(), receive_match_result.clone())) {
397397
log::error!("Failed to broadcast match result to send request {}: {:?}", send_id, e);
398398
}
399399
}
@@ -404,18 +404,18 @@ impl UnifiedQueue {
404404
log::info!("Notifying receive request {} of match", receive_id);
405405
match tx {
406406
ResponseChannel::OneShot(tx) => {
407-
if let Err(e) = tx.send(receive_match_result.clone()) {
407+
if let Err(e) = tx.send((send_match_result.clone(), receive_match_result.clone())) {
408408
log::error!("Failed to send match result to receive request {}: {:?}", receive_id, e);
409409
}
410410
},
411411
ResponseChannel::Broadcast(tx) => {
412-
if let Err(e) = tx.send(receive_match_result.clone()) {
412+
if let Err(e) = tx.send((send_match_result.clone(), receive_match_result.clone())) {
413413
log::error!("Failed to broadcast match result to receive request {}: {:?}", receive_id, e);
414414
}
415415
}
416416
}
417417
}
418418

419-
Ok(send_match_result)
419+
Ok((send_match_result, receive_match_result))
420420
}
421421
}

src/queue/types.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ pub struct QueuedRequest {
3131
#[derive(Debug)]
3232
pub enum ResponseChannel {
3333
/// Traditional oneshot channel for send/receive
34-
OneShot(tokio::sync::oneshot::Sender<MatchResult>),
34+
OneShot(tokio::sync::oneshot::Sender<(MatchResult, MatchResult)>),
3535
/// Broadcast channel for bump requests
36-
Broadcast(tokio::sync::broadcast::Sender<MatchResult>),
36+
Broadcast(tokio::sync::broadcast::Sender<(MatchResult, MatchResult)>),
3737
}
3838

3939
impl Clone for QueuedRequest {
@@ -119,7 +119,7 @@ pub enum RequestEventType {
119119
#[async_trait::async_trait]
120120
pub trait RequestQueue: Send + Sync + 'static {
121121
/// Add a new request to the queue
122-
async fn add_request(&self, request: QueuedRequest) -> Result<Option<MatchResult>, BumpError>;
122+
async fn add_request(&self, request: QueuedRequest) -> Result<Option<(MatchResult, MatchResult)>, BumpError>;
123123

124124
/// Remove a request from the queue
125125
async fn remove_request(&self, request_id: &str) -> Result<(), BumpError>;
@@ -149,5 +149,5 @@ pub trait RequestQueue: Send + Sync + 'static {
149149
/// Applications should now create a channel when adding the request and
150150
/// wait on that channel directly.
151151
#[allow(unused)]
152-
async fn wait_for_match(&self, request_id: &str, ttl_ms: u64) -> Result<Option<MatchResult>, BumpError>;
152+
async fn wait_for_match(&self, request_id: &str, ttl_ms: u64) -> Result<Option<(MatchResult, MatchResult)>, BumpError>;
153153
}

src/service.rs

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -206,15 +206,22 @@ impl MatchingService {
206206
// Add the request to the queue
207207
log::info!("Adding send request {} to queue", request_id);
208208
match self.queue.add_request(queued_request).await {
209-
Ok(Some(match_result)) => {
209+
Ok(Some((send_result, recv_result))) => {
210210
// Immediate match found
211211
log::info!("Immediate match found for send request {}", request_id);
212212

213+
// For send requests, we want the send side result
214+
let our_match_result = if request_id == send_result.matched_with {
215+
recv_result // We're the receive side
216+
} else {
217+
send_result // We're the send side
218+
};
219+
213220
return Ok(MatchResponse {
214221
status: MatchStatus::Matched,
215222
sender_id: Some(request_id),
216-
receiver_id: Some(match_result.matched_with),
217-
timestamp: match_result.timestamp,
223+
receiver_id: Some(our_match_result.matched_with),
224+
timestamp: our_match_result.timestamp,
218225
payload: Some(request.payload),
219226
message: None,
220227
});
@@ -226,15 +233,22 @@ impl MatchingService {
226233
// Wait for a match with timeout
227234
let ttl = std::time::Duration::from_millis(request.ttl as u64);
228235
match tokio::time::timeout(ttl, rx).await {
229-
Ok(Ok(match_result)) => {
236+
Ok(Ok((send_result, recv_result))) => {
230237
// We got a match result from the oneshot channel
231238
log::info!("Match found for send request {}", request_id);
232239

240+
// For send requests, we want the send side result
241+
let our_match_result = if request_id == send_result.matched_with {
242+
recv_result // We're the receive side
243+
} else {
244+
send_result // We're the send side
245+
};
246+
233247
return Ok(MatchResponse {
234248
status: MatchStatus::Matched,
235249
sender_id: Some(request_id),
236-
receiver_id: Some(match_result.matched_with),
237-
timestamp: match_result.timestamp,
250+
receiver_id: Some(our_match_result.matched_with),
251+
timestamp: our_match_result.timestamp,
238252
payload: Some(request.payload),
239253
message: None,
240254
});
@@ -292,16 +306,23 @@ impl MatchingService {
292306
// Add the request to the unified queue
293307
log::info!("Adding receive request {} to queue", request_id);
294308
match self.queue.add_request(queued_request).await {
295-
Ok(Some(match_result)) => {
309+
Ok(Some((send_result, recv_result))) => {
296310
// Immediate match found
297311
log::info!("Immediate match found for receive request {}", request_id);
298312

313+
// For receive requests, we want the receive side result
314+
let our_match_result = if request_id == send_result.matched_with {
315+
recv_result // We're the receive side
316+
} else {
317+
send_result // We're the send side
318+
};
319+
299320
return Ok(MatchResponse {
300321
status: MatchStatus::Matched,
301-
sender_id: Some(match_result.matched_with),
322+
sender_id: Some(our_match_result.matched_with),
302323
receiver_id: Some(request_id),
303-
timestamp: match_result.timestamp,
304-
payload: match_result.payload,
324+
timestamp: our_match_result.timestamp,
325+
payload: our_match_result.payload,
305326
message: None,
306327
});
307328
},
@@ -400,12 +421,21 @@ impl MatchingService {
400421
// Immediate match found
401422
log::info!("Immediate match found for bump request {}", request_id);
402423

424+
// Choose the correct match result based on which side we are
425+
let our_match_result = if request_id == send_match_result.matched_with {
426+
// We are the receive side
427+
recv_match_result
428+
} else {
429+
// We are the send side
430+
send_match_result
431+
};
432+
403433
return Ok(MatchResponse {
404434
status: MatchStatus::Matched,
405435
sender_id: Some(request_id),
406-
receiver_id: Some(match_result.matched_with),
407-
timestamp: match_result.timestamp,
408-
payload: match_result.payload,
436+
receiver_id: Some(our_match_result.matched_with),
437+
timestamp: our_match_result.timestamp,
438+
payload: our_match_result.payload,
409439
message: None,
410440
});
411441
},
@@ -420,12 +450,21 @@ impl MatchingService {
420450
// We got a match result from the oneshot channel
421451
log::info!("Match found for bump request {}", request_id);
422452

453+
// Choose the correct match result based on which side we are
454+
let our_match_result = if request_id == send_match_result.matched_with {
455+
// We are the receive side
456+
recv_match_result
457+
} else {
458+
// We are the send side
459+
send_match_result
460+
};
461+
423462
return Ok(MatchResponse {
424463
status: MatchStatus::Matched,
425464
sender_id: Some(request_id),
426-
receiver_id: Some(match_result.matched_with),
427-
timestamp: match_result.timestamp,
428-
payload: match_result.payload,
465+
receiver_id: Some(our_match_result.matched_with),
466+
timestamp: our_match_result.timestamp,
467+
payload: our_match_result.payload,
429468
message: None,
430469
});
431470
},

0 commit comments

Comments
 (0)