Skip to content

Wire .transform() and .transform_join() into stage profiling #109

@lxsaah

Description

@lxsaah

Issue #58 added automatic stage profiling for .source(), .tap(), and
.link() (RFC docs/design/014-M6-stage-profiling.md).
Transforms were intentionally scoped out and left as a stub:

  • aimdb-core/src/profiling/record_profiling.rs:
    /// Reserved for `.transform()` instrumentation (not yet wired).
    #[allow(dead_code)]
    transforms: Vec<StageEntry>,
  • aimdb-core/src/typed_api.rs: StageKind::Transform
    exists; .transform() / .transform_raw() / .transform_join() set
    last_stage = Some((StageKind::Transform, 0)) so .with_name(...) does not
    panic, but no metrics are recorded and the name has nowhere to land.
  • RecordProfilingMetrics::snapshot() iterates only sources / taps / links —
    transforms are never reported by get_stage_profiling.

The bottleneck-detection in the get_stage_profiling MCP tool can therefore
mislead users. When a transform is the actual bottleneck, the tool just doesn't see it and falls back to flagging the slowest of the remaining source/tap/link stages.

Goal

Make .transform() and .transform_join() first-class stage-profiling
participants:

  • get_stage_profiling returns a stage_type: "transform" entry per
    registered transform, with the same call_count / avg / min / max
    shape as the other stages.
  • .with_name("...") after a .transform* registration is honored and the
    name surfaces in the MCP output.
  • Existing source / tap / link profiling behavior is unchanged.
  • Off by default, zero-cost when the profiling feature is disabled, and
    still builds on no_std + alloc + Embassy.

Design sketch

Single-input transforms (.transform() / .transform_raw()):

  • Instrument run_single_transform:
    time each transform_fn(&input_value, &mut state) call. Wall-clock
    Clock::now() before/after the closure invocation; on Some(output) (and
    on None, which still consumed an input) call
    StageMetrics::record(elapsed_ns). This matches the user's mental model:
    "time per input value processed."
  • Plumbing: extend run_single_transform to take an
    Option<Arc<StageMetrics>> populated from
    RecordProfilingMetrics::push_transform() at registration time, analogous
    to how taps thread the metrics in typed_record.rs.

Multi-input joins (.transform_join() / .transform_join_raw()):

  • The Design 027 task-model handler (on_triggers(JoinEventRx, Producer))
    owns its own event loop, so we can't directly time "per-trigger" work
    without intruding into user code.
  • Cleanest option: time intervals between Producer::produce() calls on the
    output producer (same trick ProducerProfilingState already uses for
    .source()). The first produce seeds the cursor; subsequent produces
    record the wall-clock gap since the previous one. Interpretation: "average
    time between successive join outputs", which is a useful proxy for join
    throughput.
  • Plumbing: attach ProducerProfilingState to the Producer<O, R> passed
    into the join task before run_join_transform is spawned.

RecordProfilingMetrics:

  • Add push_transform() (sibling of push_source/tap/link).
  • Update snapshot() to include ("transform", self.transforms()) in the
    iteration tuple.
  • Drop the #[allow(dead_code)] on transforms.

RecordRegistrar:

  • In all four entry points (transform_raw, transform, transform_join_raw,
    transform_join), call self.rec.profiling_mut().push_transform() and
    store last_stage = Some((StageKind::Transform, idx)). Today these set
    (StageKind::Transform, 0) which makes .with_name(...) silently no-op
    past the first transform.

StageProfilingInfo::stage_type:

  • Add "transform" as the fourth value the MCP layer understands. The MCP
    tool already iterates whatever the snapshot returns, so once the snapshot
    emits transform entries the bottleneck detection picks them up for free.

Tasks

  • RecordProfilingMetrics::push_transform() + remove #[allow(dead_code)]
  • RecordProfilingMetrics::snapshot() includes transforms in the
    sources → taps → links → transforms order
  • RecordRegistrar::transform_raw / transform /
    transform_join_raw / transform_join register a transform stage and
    store the real index in last_stage
  • Thread Option<Arc<StageMetrics>> through run_single_transform;
    wrap the transform_fn call with Clock measurement
  • Attach ProducerProfilingState to the producer passed into
    run_join_transform so output cadence is timed
  • Unit tests: transform metrics record correctly, with_name lands on
    the right index, multiple transforms on adjacent records don't cross-talk
  • Integration test in aimdb-tokio-adapter/tests/: source → transform →
    tap pipeline; assert min ≤ avg ≤ max on the transform stage and that
    the slowest-by-design stage is reported as the bottleneck
  • examples/remote-access-demo: add one record driven by a transform so
    users can see get_stage_profiling returning a stage_type: "transform"
    entry; update the README's stage table
  • Embassy cross-compile: make test-embedded already gates
    embassy-runtime,profiling — verify it still passes
  • RFC update: amend docs/design/014-M6-stage-profiling.md
    "Implementation notes" section to describe transform timing semantics
    (per-call for single, output-interval for join)

Acceptance criteria

  1. With --features profiling, get_stage_profiling for a record with a
    transform returns a stage_type: "transform" entry containing
    call_count, avg_time_ns, min_time_ns, max_time_ns, plus the name
    set via .with_name(...).
  2. When the transform is the slowest stage, bottleneck in the MCP response
    points at it with the human-readable recommendation string.
  3. Without the profiling feature, no overhead and no API change.
  4. Embassy thumbv7em-none-eabihf build still passes with the feature on.
  5. All existing tests continue to pass.

Out of scope

  • CPU time tracking (still wall-clock only).
  • Histogram / percentile metrics.
  • Per-input-port timing for joins (single aggregate "output cadence" only —
    per-port timing would mean instrumenting the fan-in forwarders, which is a
    separate piece of work if it turns out to be needed).

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions