Background
After M14 (Design 029), Producer::produce is synchronous and infallible:
impl<T> Producer<T> {
pub fn produce(&self, value: T);
}
That's correct for today's buffers (SpmcRing, SingleLatest, Mailbox) —
they all overwrite on overflow, so the caller can't observe a "full" condition
and the value is never lost in a recoverable way. The producer either gets the
write through or another writer overwrites it; either is acceptable for the
fire-and-forget telemetry workloads we target today.
Problem
For some upcoming buffer types, overwrite is the wrong default:
- Bounded, non-overwriting — e.g. a command queue where dropping the
oldest command is unsafe. The producer needs to know push failed so it can
retry, escalate, or back off.
- Hot loops with explicit drop policy — telemetry producers that want
"if the consumer is behind, drop this sample and log a counter" rather
than the implicit silent-overwrite of SpmcRing.
- Caller-side rate limiting — applications that want to react when
pressure builds up (e.g. switch to a slower mode, raise an alert) instead
of being shielded from overflow.
Producer::produce(value) swallows the value into the buffer unconditionally;
the caller has no way to recover the value or even know it didn't land cleanly.
Note on AimDB semantics: Full here reflects consumer lag on a
bounded buffer, not multi-producer contention. AimDB's single-writer-per-key
invariant is unchanged — only one Producer<T> writes a given record.
Proposed API
Add a non-blocking, fallible variant on Producer<T> alongside today's
produce. The error must carry the value back so the caller doesn't lose it:
impl<T> Producer<T> {
/// Sync, infallible. Today's API. Best for overwriting / fire-and-forget
/// buffers (`SpmcRing`, `SingleLatest`, `Mailbox`).
pub fn produce(&self, value: T);
/// Non-blocking. Returns the value back if the buffer is full or closed,
/// so the caller can decide what to do (retry, drop, escalate, log).
/// Maps to `tokio::sync::mpsc::Sender::try_send` semantics.
pub fn try_produce(&self, value: T) -> Result<(), TryProduceError<T>>;
}
/// Non-blocking push error — carries the value back to the caller.
pub enum TryProduceError<T> {
/// Buffer is at capacity and configured not to overwrite.
Full(T),
/// Buffer / record has been torn down (e.g. shutdown).
Closed(T),
}
Example usage (illustrative)
// Bounded command queue: caller decides what to do on backpressure.
match producer.try_produce(cmd) {
Ok(()) => {}
Err(TryProduceError::Full(cmd)) => retry_queue.push(cmd),
Err(TryProduceError::Closed(_)) => return Err(Shutdown),
}
The consumer side already covers this asymmetry via BufferReader::try_recv()
— no consumer-side change needed.
WriteHandle trait extension
Extend the crate-private trait in aimdb-core/src/buffer/traits.rs with a
default-implemented non-blocking variant so existing buffer impls compile
unchanged:
pub(crate) trait WriteHandle<T: Clone + Send + 'static>: Send + Sync {
fn push(&self, value: T);
/// Default: delegate to `push` and return `Ok(())`. Overwriting buffers
/// cannot fail, so the value is always accepted. Bounded / non-overwriting
/// buffers override this to return `Full(value)` or `Closed(value)`.
fn try_push(&self, value: T) -> Result<(), TryProduceError<T>> {
self.push(value);
Ok(())
}
}
This means SpmcRing / SingleLatest / Mailbox need no changes —
they get try_produce for free, always succeeding (which is the correct
semantics for overwrite-on-overflow).
Decisions (final — please don't relitigate in the PR)
| # |
Decision |
| 1 |
Naming: try_produce (mirrors mpsc::Sender::try_send). |
| 2 |
Error type: dedicated TryProduceError<T> enum — not (DbError, T). The Full vs Closed distinction is the whole point. |
| 3 |
Location: TryProduceError<T> lives in aimdb-core::buffer (next to the traits that produce it), re-exported from the crate root alongside the existing Producer re-export. |
| 4 |
No changes to ProducerTrait::produce_any or any connector-side code in this PR. |
| 5 |
No metrics wiring in this PR. Auto-incrementing dropped_count on Full is a clean follow-up; first land the API. |
Doc-comment guidance
The PR should include short rustdoc on both produce and try_produce so
callers can pick the right one without reading the impl. Suggested phrasing:
produce: "Push a value. Infallible — overwrite-on-overflow buffers
cannot reject. Use this for fire-and-forget telemetry."
try_produce: "Non-blocking push. Returns the value back via
TryProduceError::Full if a bounded buffer is at capacity, or
TryProduceError::Closed if the record is shutting down. Use when the
caller has a meaningful response to backpressure."
Acceptance criteria
Out of scope
- Implementing an actual non-overwriting buffer (separate milestone, driven
by a concrete need: command queue / bounded fan-in / explicit drop policy).
produce_async — covered by the sibling issue (backpressure-aware variant).
Consumer<T>::try_subscribe — BufferReader::try_recv already covers
non-blocking consumption.
- Any change to
ProducerTrait::produce_any or connector ingress paths.
- Wiring
Full returns into BufferMetricsSnapshot::dropped_count.
Depends on / blocks
- Depends on: Design 029 — Remove
R from Producer<T> / Consumer<T>
(M14, ✅ Implemented). The WriteHandle<T> indirection is what makes this
cheap to add.
- Blocks: any new buffer impl with bounded non-overwriting semantics,
and any caller that wants to react to backpressure explicitly without
going async.
References
docs/design/029-M14-remove-r-from-typed-handles.md — Decisions item 2
set up this issue.
aimdb-core/src/buffer/traits.rs — WriteHandle<T> trait to extend
(line 108); test WriteHandle impls already live in the #[cfg(test)]
module at the bottom of the file — add the new one there.
aimdb-core/src/typed_api.rs — Producer<T>::produce at line 139, extend here.
aimdb-core/src/buffer/traits.rs — BufferReader::try_recv is the symmetric
consumer-side API; mirror its doc-comment style.
Background
After M14 (Design 029),
Producer::produceis synchronous and infallible:That's correct for today's buffers (
SpmcRing,SingleLatest,Mailbox) —they all overwrite on overflow, so the caller can't observe a "full" condition
and the value is never lost in a recoverable way. The producer either gets the
write through or another writer overwrites it; either is acceptable for the
fire-and-forget telemetry workloads we target today.
Problem
For some upcoming buffer types, overwrite is the wrong default:
oldest command is unsafe. The producer needs to know push failed so it can
retry, escalate, or back off.
"if the consumer is behind, drop this sample and log a counter" rather
than the implicit silent-overwrite of
SpmcRing.pressure builds up (e.g. switch to a slower mode, raise an alert) instead
of being shielded from overflow.
Producer::produce(value)swallows the value into the buffer unconditionally;the caller has no way to recover the value or even know it didn't land cleanly.
Proposed API
Add a non-blocking, fallible variant on
Producer<T>alongside today'sproduce. The error must carry the value back so the caller doesn't lose it:Example usage (illustrative)
The consumer side already covers this asymmetry via
BufferReader::try_recv()— no consumer-side change needed.
WriteHandletrait extensionExtend the crate-private trait in
aimdb-core/src/buffer/traits.rswith adefault-implemented non-blocking variant so existing buffer impls compile
unchanged:
This means
SpmcRing/SingleLatest/Mailboxneed no changes —they get
try_producefor free, always succeeding (which is the correctsemantics for overwrite-on-overflow).
Decisions (final — please don't relitigate in the PR)
try_produce(mirrorsmpsc::Sender::try_send).TryProduceError<T>enum — not(DbError, T). TheFullvsCloseddistinction is the whole point.TryProduceError<T>lives inaimdb-core::buffer(next to the traits that produce it), re-exported from the crate root alongside the existingProducerre-export.ProducerTrait::produce_anyor any connector-side code in this PR.dropped_countonFullis a clean follow-up; first land the API.Doc-comment guidance
The PR should include short rustdoc on both
produceandtry_producesocallers can pick the right one without reading the impl. Suggested phrasing:
produce: "Push a value. Infallible — overwrite-on-overflow bufferscannot reject. Use this for fire-and-forget telemetry."
try_produce: "Non-blocking push. Returns the value back viaTryProduceError::Fullif a bounded buffer is at capacity, orTryProduceError::Closedif the record is shutting down. Use when thecaller has a meaningful response to backpressure."
Acceptance criteria
Producer::produceunchanged — no call-site breakage anywhere in the workspace.Producer::try_produceexists with signaturefn try_produce(&self, value: T) -> Result<(), TryProduceError<T>>.TryProduceError<T>exists inaimdb-core::buffer, distinguishesFull(T)(transient) fromClosed(T)(terminal), and carries the value back in both arms.WriteHandle::try_pushis added with a default impl that delegates topushand returnsOk(()).SpmcRing,SingleLatest,Mailbox) compile and pass their existing tests with no changes to their files.WriteHandleimpl in the existing#[cfg(test)]module inaimdb-core/src/buffer/traits.rsoverridestry_pushto returnFull(value), and a test verifies the value round-trips intact throughProducer::try_produce.produceandtry_producecarry rustdoc explaining when to pick which (see "Doc-comment guidance" above).cargo fmt --checkcargo clippy --all-targets --all-features -- -D warningscargo test -p aimdb-coreOut of scope
by a concrete need: command queue / bounded fan-in / explicit drop policy).
produce_async— covered by the sibling issue (backpressure-aware variant).Consumer<T>::try_subscribe—BufferReader::try_recvalready coversnon-blocking consumption.
ProducerTrait::produce_anyor connector ingress paths.Fullreturns intoBufferMetricsSnapshot::dropped_count.Depends on / blocks
RfromProducer<T>/Consumer<T>(M14, ✅ Implemented). The
WriteHandle<T>indirection is what makes thischeap to add.
and any caller that wants to react to backpressure explicitly without
going async.
References
docs/design/029-M14-remove-r-from-typed-handles.md— Decisions item 2set up this issue.
aimdb-core/src/buffer/traits.rs—WriteHandle<T>trait to extend(line 108); test
WriteHandleimpls already live in the#[cfg(test)]module at the bottom of the file — add the new one there.
aimdb-core/src/typed_api.rs—Producer<T>::produceat line 139, extend here.aimdb-core/src/buffer/traits.rs—BufferReader::try_recvis the symmetricconsumer-side API; mirror its doc-comment style.