Skip to content

Commit 56b6485

Browse files
committed
Enhance AnySender trait with capacity and is_closed methods for better error handling
1 parent bc7205a commit 56b6485

4 files changed

Lines changed: 117 additions & 10 deletions

File tree

aimdb-core/src/emitter.rs

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -318,14 +318,23 @@ impl Emitter {
318318
// Try to send via type-erased trait method
319319
let boxed_value = Box::new(value) as Box<dyn Any + Send>;
320320
sender.try_send_any(boxed_value).map_err(|_returned_value| {
321-
// We can't easily distinguish between Full and Closed in the type-erased interface
322-
// For now, assume Full (more common case). Capacity is unknown at this level.
323-
DbError::OutboxFull {
324-
capacity: 0, // Unknown at this level
325-
#[cfg(feature = "std")]
326-
type_name: core::any::type_name::<T>().to_string(),
327-
#[cfg(not(feature = "std"))]
328-
_type_name: (),
321+
// Check if channel is closed to distinguish error types
322+
if sender.is_closed() {
323+
DbError::OutboxClosed {
324+
#[cfg(feature = "std")]
325+
type_name: core::any::type_name::<T>().to_string(),
326+
#[cfg(not(feature = "std"))]
327+
_type_name: (),
328+
}
329+
} else {
330+
// Channel is full - now we can report the actual capacity
331+
DbError::OutboxFull {
332+
capacity: sender.capacity(),
333+
#[cfg(feature = "std")]
334+
type_name: core::any::type_name::<T>().to_string(),
335+
#[cfg(not(feature = "std"))]
336+
_type_name: (),
337+
}
329338
}
330339
})
331340
}
@@ -435,6 +444,16 @@ mod tests {
435444
Ok(())
436445
}
437446
}
447+
448+
fn capacity(&self) -> usize {
449+
// Mock capacity for testing
450+
100
451+
}
452+
453+
fn is_closed(&self) -> bool {
454+
// If should_fail is true, simulate closed channel
455+
self.should_fail
456+
}
438457
}
439458

440459
fn create_test_emitter() -> Emitter {
@@ -548,13 +567,48 @@ mod tests {
548567

549568
match result {
550569
Err(crate::DbError::OutboxFull { capacity, .. }) => {
551-
// Capacity is placeholder (0) at this level
552-
assert_eq!(capacity, 0);
570+
// Capacity should now be reported from the mock sender
571+
assert_eq!(capacity, 100);
553572
}
554573
_ => panic!("Expected OutboxFull error"),
555574
}
556575
}
557576

577+
#[test]
578+
fn test_try_enqueue_channel_closed() {
579+
let emitter = create_test_emitter();
580+
581+
// Register a mock sender that simulates closed channel (should_fail=true)
582+
let mock_sender = MockSender::<TestMessage>::new(true, false);
583+
let type_id = TypeId::of::<TestMessage>();
584+
585+
#[cfg(feature = "std")]
586+
{
587+
let mut outboxes = emitter.inner.outboxes.lock().unwrap();
588+
outboxes.insert(type_id, Box::new(mock_sender));
589+
}
590+
#[cfg(not(feature = "std"))]
591+
{
592+
let mut outboxes = emitter.inner.outboxes.lock();
593+
outboxes.insert(type_id, Box::new(mock_sender));
594+
}
595+
596+
let msg = TestMessage {
597+
id: 88,
598+
content: "closed",
599+
};
600+
601+
let result = emitter.try_enqueue(msg);
602+
assert!(result.is_err());
603+
604+
match result {
605+
Err(crate::DbError::OutboxClosed { .. }) => {
606+
// Expected - should detect closed channel via is_closed()
607+
}
608+
_ => panic!("Expected OutboxClosed error"),
609+
}
610+
}
611+
558612
// Tests for enqueue method (async)
559613
#[cfg(feature = "std")]
560614
#[tokio::test]

aimdb-core/src/outbox.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,26 @@ pub trait AnySender: Send + Sync {
209209
/// * `Ok(())` - Message sent successfully
210210
/// * `Err(value)` - Channel full or closed, value returned
211211
fn try_send_any(&self, value: Box<dyn Any + Send>) -> Result<(), Box<dyn Any + Send>>;
212+
213+
/// Returns the channel capacity
214+
///
215+
/// This provides information about the maximum number of messages
216+
/// the channel can buffer. Useful for error reporting and metrics.
217+
///
218+
/// # Returns
219+
///
220+
/// The channel capacity, or 0 if unknown/unbounded
221+
fn capacity(&self) -> usize;
222+
223+
/// Checks if the channel is closed
224+
///
225+
/// This allows distinguishing between a full channel (temporary backpressure)
226+
/// and a closed channel (permanent failure) when `try_send_any` fails.
227+
///
228+
/// # Returns
229+
///
230+
/// `true` if the channel is closed, `false` otherwise
231+
fn is_closed(&self) -> bool;
212232
}
213233

214234
/// Future type for async send operations
@@ -732,6 +752,16 @@ mod tests {
732752
self.values.lock().unwrap().push(value);
733753
Ok(())
734754
}
755+
756+
fn capacity(&self) -> usize {
757+
// Mock capacity for testing
758+
1000
759+
}
760+
761+
fn is_closed(&self) -> bool {
762+
// Mock: never closed
763+
false
764+
}
735765
}
736766

737767
#[derive(Debug, Clone, PartialEq)]

aimdb-embassy-adapter/src/outbox.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,19 @@ impl<T: Send + 'static, const N: usize> AnySender for EmbassySender<T, N> {
216216
Box::new(v) as Box<dyn Any + Send>
217217
})
218218
}
219+
220+
fn capacity(&self) -> usize {
221+
// Embassy channels have const generic capacity N
222+
N
223+
}
224+
225+
fn is_closed(&self) -> bool {
226+
// Embassy channels don't have a direct is_closed() check
227+
// They remain open as long as at least one sender exists
228+
// Since we hold a sender reference, it's not closed
229+
// This is a simplification - Embassy channels don't close in the same way
230+
false
231+
}
219232
}
220233

221234
// Implement Clone for EmbassySender since embassy Sender is Clone

aimdb-tokio-adapter/src/outbox.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,16 @@ impl<T: Send + 'static> AnySender for TokioSender<T> {
192192
mpsc::error::TrySendError::Closed(v) => Box::new(v) as Box<dyn Any + Send>,
193193
})
194194
}
195+
196+
fn capacity(&self) -> usize {
197+
// Tokio channels expose max_capacity() which returns the capacity
198+
self.inner.max_capacity()
199+
}
200+
201+
fn is_closed(&self) -> bool {
202+
// Tokio channels expose is_closed() method
203+
self.inner.is_closed()
204+
}
195205
}
196206

197207
// Implement Clone for TokioSender since mpsc::Sender is Clone

0 commit comments

Comments
 (0)