Issue #58 added automatic stage profiling for .source(), .tap(), and
.link() (RFC docs/design/014-M6-stage-profiling.md).
Transforms were intentionally scoped out and left as a stub:
aimdb-core/src/profiling/record_profiling.rs:
/// Reserved for `.transform()` instrumentation (not yet wired).
#[allow(dead_code)]
transforms: Vec<StageEntry>,
aimdb-core/src/typed_api.rs: StageKind::Transform
exists; .transform() / .transform_raw() / .transform_join() set
last_stage = Some((StageKind::Transform, 0)) so .with_name(...) does not
panic, but no metrics are recorded and the name has nowhere to land.
RecordProfilingMetrics::snapshot() iterates only sources / taps / links —
transforms are never reported by get_stage_profiling.
The bottleneck-detection in the get_stage_profiling MCP tool can therefore
mislead users. When a transform is the actual bottleneck, the tool just doesn't see it and falls back to flagging the slowest of the remaining source/tap/link stages.
Goal
Make .transform() and .transform_join() first-class stage-profiling
participants:
get_stage_profiling returns a stage_type: "transform" entry per
registered transform, with the same call_count / avg / min / max
shape as the other stages.
.with_name("...") after a .transform* registration is honored and the
name surfaces in the MCP output.
- Existing source / tap / link profiling behavior is unchanged.
- Off by default, zero-cost when the
profiling feature is disabled, and
still builds on no_std + alloc + Embassy.
Design sketch
Single-input transforms (.transform() / .transform_raw()):
- Instrument
run_single_transform:
time each transform_fn(&input_value, &mut state) call. Wall-clock
Clock::now() before/after the closure invocation; on Some(output) (and
on None, which still consumed an input) call
StageMetrics::record(elapsed_ns). This matches the user's mental model:
"time per input value processed."
- Plumbing: extend
run_single_transform to take an
Option<Arc<StageMetrics>> populated from
RecordProfilingMetrics::push_transform() at registration time, analogous
to how taps thread the metrics in typed_record.rs.
Multi-input joins (.transform_join() / .transform_join_raw()):
- The Design 027 task-model handler (
on_triggers(JoinEventRx, Producer))
owns its own event loop, so we can't directly time "per-trigger" work
without intruding into user code.
- Cleanest option: time intervals between
Producer::produce() calls on the
output producer (same trick ProducerProfilingState already uses for
.source()). The first produce seeds the cursor; subsequent produces
record the wall-clock gap since the previous one. Interpretation: "average
time between successive join outputs", which is a useful proxy for join
throughput.
- Plumbing: attach
ProducerProfilingState to the Producer<O, R> passed
into the join task before run_join_transform is spawned.
RecordProfilingMetrics:
- Add
push_transform() (sibling of push_source/tap/link).
- Update
snapshot() to include ("transform", self.transforms()) in the
iteration tuple.
- Drop the
#[allow(dead_code)] on transforms.
RecordRegistrar:
- In all four entry points (
transform_raw, transform, transform_join_raw,
transform_join), call self.rec.profiling_mut().push_transform() and
store last_stage = Some((StageKind::Transform, idx)). Today these set
(StageKind::Transform, 0) which makes .with_name(...) silently no-op
past the first transform.
StageProfilingInfo::stage_type:
- Add
"transform" as the fourth value the MCP layer understands. The MCP
tool already iterates whatever the snapshot returns, so once the snapshot
emits transform entries the bottleneck detection picks them up for free.
Tasks
Acceptance criteria
- With
--features profiling, get_stage_profiling for a record with a
transform returns a stage_type: "transform" entry containing
call_count, avg_time_ns, min_time_ns, max_time_ns, plus the name
set via .with_name(...).
- When the transform is the slowest stage,
bottleneck in the MCP response
points at it with the human-readable recommendation string.
- Without the
profiling feature, no overhead and no API change.
- Embassy
thumbv7em-none-eabihf build still passes with the feature on.
- All existing tests continue to pass.
Out of scope
- CPU time tracking (still wall-clock only).
- Histogram / percentile metrics.
- Per-input-port timing for joins (single aggregate "output cadence" only —
per-port timing would mean instrumenting the fan-in forwarders, which is a
separate piece of work if it turns out to be needed).
Related
Issue #58 added automatic stage profiling for
.source(),.tap(), and.link()(RFCdocs/design/014-M6-stage-profiling.md).Transforms were intentionally scoped out and left as a stub:
aimdb-core/src/profiling/record_profiling.rs:aimdb-core/src/typed_api.rs:StageKind::Transformexists;
.transform()/.transform_raw()/.transform_join()setlast_stage = Some((StageKind::Transform, 0))so.with_name(...)does notpanic, but no metrics are recorded and the name has nowhere to land.
RecordProfilingMetrics::snapshot()iterates only sources / taps / links —transforms are never reported by
get_stage_profiling.The bottleneck-detection in the
get_stage_profilingMCP tool can thereforemislead users. When a transform is the actual bottleneck, the tool just doesn't see it and falls back to flagging the slowest of the remaining source/tap/link stages.
Goal
Make
.transform()and.transform_join()first-class stage-profilingparticipants:
get_stage_profilingreturns astage_type: "transform"entry perregistered transform, with the same
call_count/avg/min/maxshape as the other stages.
.with_name("...")after a.transform*registration is honored and thename surfaces in the MCP output.
profilingfeature is disabled, andstill builds on
no_std + alloc+ Embassy.Design sketch
Single-input transforms (
.transform()/.transform_raw()):run_single_transform:time each
transform_fn(&input_value, &mut state)call. Wall-clockClock::now()before/after the closure invocation; onSome(output)(andon
None, which still consumed an input) callStageMetrics::record(elapsed_ns). This matches the user's mental model:"time per input value processed."
run_single_transformto take anOption<Arc<StageMetrics>>populated fromRecordProfilingMetrics::push_transform()at registration time, analogousto how taps thread the metrics in
typed_record.rs.Multi-input joins (
.transform_join()/.transform_join_raw()):on_triggers(JoinEventRx, Producer))owns its own event loop, so we can't directly time "per-trigger" work
without intruding into user code.
Producer::produce()calls on theoutput producer (same trick
ProducerProfilingStatealready uses for.source()). The first produce seeds the cursor; subsequent producesrecord the wall-clock gap since the previous one. Interpretation: "average
time between successive join outputs", which is a useful proxy for join
throughput.
ProducerProfilingStateto theProducer<O, R>passedinto the join task before
run_join_transformis spawned.RecordProfilingMetrics:push_transform()(sibling ofpush_source/tap/link).snapshot()to include("transform", self.transforms())in theiteration tuple.
#[allow(dead_code)]ontransforms.RecordRegistrar:transform_raw,transform,transform_join_raw,transform_join), callself.rec.profiling_mut().push_transform()andstore
last_stage = Some((StageKind::Transform, idx)). Today these set(StageKind::Transform, 0)which makes.with_name(...)silently no-oppast the first transform.
StageProfilingInfo::stage_type:"transform"as the fourth value the MCP layer understands. The MCPtool already iterates whatever the snapshot returns, so once the snapshot
emits transform entries the bottleneck detection picks them up for free.
Tasks
RecordProfilingMetrics::push_transform()+ remove#[allow(dead_code)]RecordProfilingMetrics::snapshot()includes transforms in thesources → taps → links → transformsorderRecordRegistrar::transform_raw/transform/transform_join_raw/transform_joinregister a transform stage andstore the real index in
last_stageOption<Arc<StageMetrics>>throughrun_single_transform;wrap the
transform_fncall withClockmeasurementProducerProfilingStateto the producer passed intorun_join_transformso output cadence is timedwith_namelands onthe right index, multiple transforms on adjacent records don't cross-talk
aimdb-tokio-adapter/tests/: source → transform →tap pipeline; assert
min ≤ avg ≤ maxon the transform stage and thatthe slowest-by-design stage is reported as the bottleneck
examples/remote-access-demo: add one record driven by a transform sousers can see
get_stage_profilingreturning astage_type: "transform"entry; update the README's stage table
make test-embeddedalready gatesembassy-runtime,profiling— verify it still passesdocs/design/014-M6-stage-profiling.md"Implementation notes" section to describe transform timing semantics
(per-call for single, output-interval for join)
Acceptance criteria
--features profiling,get_stage_profilingfor a record with atransform returns a
stage_type: "transform"entry containingcall_count,avg_time_ns,min_time_ns,max_time_ns, plus the nameset via
.with_name(...).bottleneckin the MCP responsepoints at it with the human-readable recommendation string.
profilingfeature, no overhead and no API change.thumbv7em-none-eabihfbuild still passes with the feature on.Out of scope
per-port timing would mean instrumenting the fan-in forwarders, which is a
separate piece of work if it turns out to be needed).
Related
.transform()follow-up left explicitly out of scope by Integrate Stage Profiling #58.docs/design/014-M6-stage-profiling.mdno_stdTransform API) defines the join task model thisbuilds on.