Skip to content

Commit 9f1a060

Browse files
authored
88 replace spawn trait with futuresunordered (#117)
* feat: add design document for removing `Spawn` trait and refactoring of futures handling in AimDB * Refactor AimDB to remove `Spawn` trait and collect futures in `build()` - Updated `WebSocketConnectorImpl` to collect outbound publisher futures instead of spawning them directly. - Refactored `ServerState` to build the WebSocket Axum server future and return it as a `BoxFuture`. - Revised design documentation to reflect the removal of the `Spawn` trait and the new approach for handling futures. - Modified examples to adapt to the new `build()` and `run()` pattern, ensuring proper initialization and concurrent execution of tasks. - Removed unnecessary `Spawn` trait bounds from various components and updated related code generation. - Cleaned up example projects to align with the new architecture, including adjustments to how the Embassy adapter is initialized and how the database runner is executed. * feat: add configurable command channel capacity for KNX connector * chore: update dependencies in Cargo.lock and embassy submodule * feat: remove `Spawn` trait and update `AimDbBuilder::build()` to return `(AimDb, AimDbRunner)` across the workspace * feat: Remove runtime parameter `R` from `Producer<T>` and `Consumer<T>` - Updated `Producer<T, R>` to `Producer<T>` and `Consumer<T, R>` to `Consumer<T>`, simplifying user-facing signatures. - Removed the `.key()` method from `Producer<T>`, as the record key is now captured at registration. - Adjusted tests and examples to reflect the new type signatures. - Updated documentation to describe the architectural changes and the motivation behind the removal of `R`. - Ensured backward compatibility for existing async function signatures while optimizing the hot path for production and consumption of records. * Refactor produce method to be synchronous - Changed the `produce` method in various modules from async to synchronous, removing unnecessary await calls. - Updated related documentation and examples to reflect the new synchronous behavior. - Ensured that all instances of `producer.produce(value).await` were replaced with `producer.produce(value)`. - This change simplifies the API and improves performance by eliminating the overhead of async handling where it is not needed. * feat: simplify DewPoint production by removing await and error handling * feat: add "macros" feature to Tokio dependency in Cargo.toml * feat: update documentation and design notes for removing `Spawn` trait and `R` parameter from `Producer<T>` and `Consumer<T>` * feat: add build command for remote-access-demo in Makefile and update config producer in server * feat: update dependencies and remove alloc feature checks across the codebase * feat: add "generic-queue-16" feature to embassy-time dependency in Cargo.toml * feat: update embassy-time dependency to include generic-queue-16 feature for FuturesUnordered compatibility
1 parent 9c5ea19 commit 9f1a060

92 files changed

Lines changed: 3408 additions & 2248 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2727
2828
## [Unreleased]
2929

30+
### Changed (breaking)
31+
32+
- **M13 — `Spawn` trait removed across the workspace; `AimDbBuilder::build()` now returns `(AimDb, AimDbRunner)` (Issue #88, [Design 028](docs/design/028-M13-remove-spawn-trait.md)).** Every future the database drives — producer services, taps, transforms, join forwarders, connector loops, the remote-access supervisor, `on_start` tasks — is collected at build time and driven by a single `FuturesUnordered` inside `runner.run().await`. Adapter implementations (`TokioAdapter`, `EmbassyAdapter`, `WasmAdapter`) drop their `impl Spawn`. The `embassy-task-pool-8/16/32` features are deleted and `EmbassyAdapter::new_with_network` no longer takes a `Spawner`. Connector authors must update `ConnectorBuilder::build()` to return `Vec<BoxFuture>` instead of `Arc<dyn Connector>`. See each crate's CHANGELOG for the per-crate impact.
33+
3034
## [1.1.0] - 2026-05-22
3135

3236
### Added

Cargo.lock

Lines changed: 11 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ tracing = { version = "0.1", default-features = false }
7575

7676
# Async utilities
7777
futures = "0.3"
78+
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
7879

7980
# CLI (for aimdb-cli)
8081
clap = { version = "4.0", features = ["derive"] }

Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,10 +277,14 @@ examples:
277277
@printf "$(GREEN)Building all example projects...$(NC)\n"
278278
@printf "$(YELLOW) → Building sync-api-demo (synchronous API wrapper)$(NC)\n"
279279
cargo build --package sync-api-demo
280+
@printf "$(YELLOW) → Building mqtt-connector-demo-common (shared MQTT demo code, runtime-agnostic)$(NC)\n"
281+
cargo build --package mqtt-connector-demo-common
280282
@printf "$(YELLOW) → Building tokio-mqtt-connector-demo (native, tokio runtime)$(NC)\n"
281283
cargo build --package tokio-mqtt-connector-demo
282284
@printf "$(YELLOW) → Building embassy-mqtt-connector-demo (embedded, embassy runtime)$(NC)\n"
283285
cargo build --package embassy-mqtt-connector-demo --target thumbv7em-none-eabihf
286+
@printf "$(YELLOW) → Building knx-connector-demo-common (shared KNX demo code, runtime-agnostic)$(NC)\n"
287+
cargo build --package knx-connector-demo-common
284288
@printf "$(YELLOW) → Building tokio-knx-connector-demo (native, tokio runtime)$(NC)\n"
285289
cargo build --package tokio-knx-connector-demo
286290
@printf "$(YELLOW) → Building embassy-knx-connector-demo (embedded, embassy runtime)$(NC)\n"
@@ -295,6 +299,8 @@ examples:
295299
cargo build --package weather-station-beta
296300
@printf "$(YELLOW) → Building weather-station-gamma (embedded, embassy runtime)$(NC)\n"
297301
cargo build --package weather-station-gamma --target thumbv7em-none-eabihf
302+
@printf "$(YELLOW) → Building remote-access-demo (AimX server + client)$(NC)\n"
303+
cargo build --package remote-access-demo
298304
@printf "$(YELLOW) → Building hello-mailbox (sync)$(NC)\n"
299305
cargo build --package hello-mailbox
300306
@printf "$(YELLOW) → Building hello-single-latest-async$(NC)\n"

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
9393
.finish();
9494
});
9595

96-
builder.build()?.run().await?;
96+
// `.run()` builds the database, collects every producer/consumer/transform
97+
// future, and drives them all on a single `FuturesUnordered`. It blocks
98+
// until shutdown. For programmatic access to the `AimDb` handle, call
99+
// `.build().await?` directly — it returns `(AimDb, AimDbRunner)`.
100+
builder.run().await?;
97101
Ok(())
98102
}
99103
```

_external/embassy

Submodule embassy updated 181 files

aimdb-codegen/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Changed (breaking)
11+
12+
- Emitted task scaffolds now use `Producer<T>` / `Consumer<T>` (no `, TokioAdapter` second parameter) and emitted doc tables show the same form, matching the M14 cleanup in `aimdb-core` (Design 029). Regenerate downstream scaffolds after upgrading.
13+
- Emitted `configure_schema` signature changed from `<R: Spawn + 'static>` to `<R: RuntimeAdapter + 'static>`; emitted prelude now imports `aimdb_executor::RuntimeAdapter` instead of `Spawn` (Issue #88). Regenerate downstream schemas.
14+
1015
## [0.2.0] - 2026-05-22
1116

1217
### Changed

aimdb-codegen/src/rust.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -387,12 +387,12 @@ pub fn generate_tasks_rs(state: &ArchitectureState, binary_name: &str) -> Option
387387
for input in &task.inputs {
388388
let arg_name = format_ident!("{}", to_snake_case(&input.record));
389389
let value_type = format_ident!("{}Value", input.record);
390-
params.push(quote! { #arg_name: Consumer<#value_type, TokioAdapter> });
390+
params.push(quote! { #arg_name: Consumer<#value_type> });
391391
}
392392
for output in &task.outputs {
393393
let arg_name = format_ident!("{}", to_snake_case(&output.record));
394394
let value_type = format_ident!("{}Value", output.record);
395-
params.push(quote! { #arg_name: Producer<#value_type, TokioAdapter> });
395+
params.push(quote! { #arg_name: Producer<#value_type> });
396396
}
397397

398398
let todo_msg = match &task.task_type {
@@ -557,7 +557,7 @@ fn emit_imports(state: &ArchitectureState) -> TokenStream {
557557
use aimdb_core::builder::AimDbBuilder;
558558
use aimdb_core::RecordKey;
559559
use aimdb_data_contracts::{#(#contract_traits),*};
560-
use aimdb_executor::Spawn;
560+
use aimdb_executor::RuntimeAdapter;
561561
use serde::{Deserialize, Serialize};
562562
}
563563
}
@@ -731,7 +731,7 @@ fn emit_configure_schema(state: &ArchitectureState) -> TokenStream {
731731
/// addresses. Producers, consumers, serializers, and deserializers contain
732732
/// business logic and must be provided by application code — they are not
733733
/// generated here.
734-
pub fn configure_schema<R: Spawn + 'static>(builder: &mut AimDbBuilder<R>) {
734+
pub fn configure_schema<R: RuntimeAdapter + 'static>(builder: &mut AimDbBuilder<R>) {
735735
#(#record_blocks)*
736736
}
737737
}
@@ -1250,7 +1250,7 @@ pub fn generate_hub_schema_rs(state: &ArchitectureState) -> String {
12501250
let file_tokens = quote! {
12511251
use aimdb_core::buffer::BufferCfg;
12521252
use aimdb_core::builder::AimDbBuilder;
1253-
use aimdb_executor::Spawn;
1253+
use aimdb_executor::RuntimeAdapter;
12541254
use #common_crate::*;
12551255

12561256
#configure_fn
@@ -1611,10 +1611,10 @@ fn build_transform_call(task: &TaskDef, variant_ident: &syn::Ident) -> TokenStre
16111611
///
16121612
/// | Inputs | Outputs | API | Generated stub |
16131613
/// |--------|---------|-----------------------|---------------------------|
1614-
/// | N > 1 | ≥ 1 | `.transform_join()` | `async fn task_handler(JoinEventRx, Producer<O, R>)` |
1614+
/// | N > 1 | ≥ 1 | `.transform_join()` | `async fn task_handler(JoinEventRx, Producer<O>)` |
16151615
/// | 1 | ≥ 1 | `.transform().map()` | `fn task_transform(&Input) -> Option<Output>` |
1616-
/// | 0 | ≥ 1 | `.source()` | `async fn task(RuntimeContext, Producer<O, R>)` |
1617-
/// | ≥ 1 | 0 | `.tap()` | `async fn task(RuntimeContext, Consumer<I, R>)` |
1616+
/// | 0 | ≥ 1 | `.source()` | `async fn task(RuntimeContext, Producer<O>)` |
1617+
/// | ≥ 1 | 0 | `.tap()` | `async fn task(RuntimeContext, Consumer<I>)` |
16181618
pub fn generate_hub_tasks_rs(state: &ArchitectureState) -> String {
16191619
let project = state
16201620
.project
@@ -1660,7 +1660,7 @@ pub fn generate_hub_tasks_rs(state: &ArchitectureState) -> String {
16601660
/// {inputs_doc}\n\
16611661
pub async fn {handler}(\n\
16621662
mut _rx: aimdb_core::transform::JoinEventRx,\n\
1663-
_producer: aimdb_core::Producer<{out_t}, TokioAdapter>,\n\
1663+
_producer: aimdb_core::Producer<{out_t}>,\n\
16641664
) {{\n\
16651665
while let Ok(_trigger) = _rx.recv().await {{\n\
16661666
todo!(\"implement {handler}\")\n\
@@ -1689,7 +1689,7 @@ pub fn {handler}(input: &{in_t}) -> Option<{out_t}> {{\n\
16891689
fns.push_str(&format!(
16901690
"pub async fn {}(\n\
16911691
_ctx: aimdb_core::RuntimeContext<TokioAdapter>,\n\
1692-
_producer: aimdb_core::Producer<{out_t}, TokioAdapter>,\n\
1692+
_producer: aimdb_core::Producer<{out_t}>,\n\
16931693
) {{\n\
16941694
todo!(\"implement {}\")\n\
16951695
}}\n\n",
@@ -1700,7 +1700,7 @@ pub fn {handler}(input: &{in_t}) -> Option<{out_t}> {{\n\
17001700
fns.push_str(&format!(
17011701
"pub async fn {}(\n\
17021702
_ctx: aimdb_core::RuntimeContext<TokioAdapter>,\n\
1703-
_consumer: aimdb_core::Consumer<{in_t}, TokioAdapter>,\n\
1703+
_consumer: aimdb_core::Consumer<{in_t}>,\n\
17041704
) {{\n\
17051705
todo!(\"implement {}\")\n\
17061706
}}\n\n",
@@ -1833,8 +1833,8 @@ url = "mqtt://ota/cmd/{variant}"
18331833
"Missing RecordKey import:\n{out}"
18341834
);
18351835
assert!(
1836-
out.contains("use aimdb_executor::Spawn;"),
1837-
"Missing Spawn import:\n{out}"
1836+
out.contains("use aimdb_executor::RuntimeAdapter;"),
1837+
"Missing RuntimeAdapter import:\n{out}"
18381838
);
18391839
assert!(
18401840
out.contains("use serde::{Deserialize, Serialize};"),
@@ -1938,7 +1938,7 @@ url = "mqtt://ota/cmd/{variant}"
19381938
let out = generated();
19391939
assert!(
19401940
out.contains(
1941-
"pub fn configure_schema<R: Spawn + 'static>(builder: &mut AimDbBuilder<R>)"
1941+
"pub fn configure_schema<R: RuntimeAdapter + 'static>(builder: &mut AimDbBuilder<R>)"
19421942
),
19431943
"Missing configure_schema function:\n{out}"
19441944
);

aimdb-core/CHANGELOG.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,38 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Changed (breaking)
11+
12+
- **`Producer::produce` is now sync + infallible; `Consumer::subscribe` is now infallible (Design 029 follow-up, M14).** The pre-resolved `WriteHandle::push` cannot fail and the pre-resolved buffer Arc makes `subscribe()` infallible. Call sites collapse: `producer.produce(x).await?``producer.produce(x);` and `let Ok(reader) = consumer.subscribe() else { ... }``let reader = consumer.subscribe();`. The `ProducerTrait::produce_any` / `ConsumerTrait::subscribe_any` trait surfaces stay `Result`/`async` because the type-erasure downcast remains fallible.
13+
- `AimDb::produce<T>(key, value) -> DbResult<()>` is now sync; `.await` on the call site goes away. Only the key lookup can fail.
14+
- `Database::produce` likewise sync.
15+
- `TypedRecord::produce` is now `pub fn produce(&self, val: T)` (was `pub async fn produce`).
16+
- `aimdb-wasm-adapter`: `bindings::poll_sync` helper deleted — no remaining callers now that `TypedRecord::produce` is sync.
17+
- Dead `consumer.subscribe()` error arms in `transform/single.rs` and `transform/join.rs` removed (the `Err` branch was unreachable after M14).
18+
19+
- **`Producer<T>` / `Consumer<T>` drop the runtime parameter `R` and pre-resolve the record at build time (Design 029, M14).** Producer/Consumer become handles to a buffer rather than tickets to look one up: `produce()` is one virtual call (no `HashMap<key>` probe, no `TypeId` check, no downcast), and `subscribe()` collapses to `buffer.subscribe_boxed()`. The internal mechanic is a new crate-private `WriteHandle<T>` trait backed by `RecordWriter<T>` (in `aimdb-core/src/buffer/writer.rs`), pre-bound to the record's `Arc<dyn DynBuffer<T>>` + snapshot mutex + metadata tracker.
20+
- `Producer<T, R>``Producer<T>`; `Consumer<T, R>``Consumer<T>`. User code that names the two-parameter form must drop the trailing adapter arg.
21+
- `Producer::key(&self) -> &str` is **removed**. Capture the record key at the registration site instead.
22+
- `Producer::produce(value) -> ()` and `Consumer::subscribe() -> Box<dyn BufferReader<T> + Send>` (v0.4 revision — see the sync/infallible bullet above for the rationale and migration). The `ProducerTrait::produce_any` / `ConsumerTrait::subscribe_any` trait surfaces retain `async`/`Result` for the type-erased downcast that can still fail.
23+
- `AimDb::producer<T>(key)` / `AimDb::consumer<T>(key)` now return `DbResult<…>` (was infallible). They resolve the typed record up front, so callers that previously assumed inference must add `?`.
24+
- `Consumer<T>` cannot exist without a buffer: `.tap()` on a record with no `.buffer(...)` now surfaces as `MissingConfiguration` at build time (was a deferred subscribe-time error).
25+
- `TypedRecord::buffer` field is `Option<Arc<dyn DynBuffer<T>>>` (was `Box`); `TypedRecord::set_buffer(Box<…>)` keeps its public signature and converts via `Arc::from(box_)` internally.
26+
- `TypedRecord::create_producer_trait(&self)` no longer takes `db` / `record_key` — it uses the new `writer_handle()`.
27+
- `ConnectorBuilder<R>` cascade is zero-LOC: no connector struct carried `R` after M13. The outbound `consumer_factory` / inbound `producer_factory` callbacks now resolve the record once at link-startup time (via `db.inner().get_typed_record_by_key`) and construct the new handles.
28+
- Codegen-emitted task scaffolds use `Producer<T>` / `Consumer<T>` (no `, TokioAdapter`).
29+
- `data-contracts` `log_tap` parameter is `Consumer<T>`.
30+
31+
- **`Spawn` trait removed; `AimDbBuilder::build()` now returns `(AimDb<R>, AimDbRunner)` (Issue #88, Design 028).** Every future the database needs — `.source()`/`.tap()`/`.transform()` tasks, on_start hooks, connector loops, the remote-access supervisor — is collected at build time into the new `AimDbRunner`, then driven by a single `FuturesUnordered` from `runner.run().await`. No background work runs until the runner is polled.
32+
- `AimDb::spawn_task` is **deleted**. Migrate to `on_start()` (collected at build) or to a private `FuturesUnordered` inside your own future.
33+
- The `Runtime` bundle no longer supertrait-requires `Spawn`. Custom adapters drop `impl Spawn`.
34+
- `R: Spawn` bounds are gone everywhere in `aimdb-core` (`Producer`, `Consumer`, `TypedRecord`, `TransformDescriptor`, `RecordRegistrar`, `RecordT`, `AnyRecordExt::as_typed`, remote handler/supervisor, `Database<A>`) — replaced by `R: RuntimeAdapter`.
35+
- `RecordSpawner<T>` renamed to `RecordFutureCollector<T>`; its `spawn_all_tasks``collect_all_futures`. Internal `spawn_consumer_tasks`/`spawn_producer_service`/`spawn_transform_task` on `TypedRecord` become `collect_consumer_futures`/`collect_producer_future`/`collect_transform_futures`.
36+
- Join transforms now hoist their per-input forwarder construction to build time — `JoinPipeline::into_descriptor()` returns a `CollectedTransform { task_future, fanin_futures }` and the lazy `runtime.spawn(forwarder)` inside `run_join_transform` is gone.
37+
- `ConnectorBuilder::build()` now returns `Vec<BoxFuture<'static, ()>>` instead of `Arc<dyn Connector>` (which `AimDbBuilder` already discarded).
38+
- Unsafe `impl Send/Sync` blocks on `Producer<T, R>` / `Consumer<T, R>` deleted — they auto-derive now.
39+
- On the AimX remote-access path, three `runtime.spawn(...)` call sites bridge to `tokio::spawn` directly under `#[cfg(feature = "std")]`. These (per-connection handler, per-subscription event stream, `subscribe_record_updates`) are addressed in the AimX portability follow-up.
40+
- `on_start` no_std bifurcation collapsed: a single `StartFnType<R>` alias replaces the byte-identical std/no_std pair.
41+
1042
## [1.1.0] - 2026-05-22
1143

1244
### Added

aimdb-core/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ Portable code uses `R: Runtime` — no platform imports needed:
5353

5454
```rust
5555
/// Producer: reads a sensor and pushes typed values into AimDB.
56-
async fn sensor_producer<R: Runtime>(ctx: RuntimeContext<R>, producer: Producer<Temperature, R>) {
56+
async fn sensor_producer<R: Runtime>(ctx: RuntimeContext<R>, producer: Producer<Temperature>) {
5757
loop {
5858
let reading = read_sensor().await;
5959
producer.produce(Temperature {
@@ -65,7 +65,7 @@ async fn sensor_producer<R: Runtime>(ctx: RuntimeContext<R>, producer: Producer<
6565
}
6666

6767
/// Consumer: subscribes to the buffer and reacts to every new value.
68-
async fn temp_logger<R: Runtime>(ctx: RuntimeContext<R>, consumer: Consumer<Temperature, R>) {
68+
async fn temp_logger<R: Runtime>(ctx: RuntimeContext<R>, consumer: Consumer<Temperature>) {
6969
let mut reader = consumer.subscribe().unwrap();
7070
while let Ok(temp) = reader.recv().await {
7171
ctx.log().info(&format!("{}: {:.1}°C", temp.sensor_id, temp.celsius));

0 commit comments

Comments
 (0)