-
Notifications
You must be signed in to change notification settings - Fork 513
clusterd-test-driver: headless frontend to clusterd for scripted compute tests #37008
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
antiguru
merged 2 commits into
MaterializeInc:main
from
antiguru:headless-compute-driver
Jun 18, 2026
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
319 changes: 319 additions & 0 deletions
319
doc/developer/design/20260612_headless_clusterd_test_driver.md
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| [package] | ||
| name = "mz-clusterd-test-driver" | ||
| description = "Headless frontend to clusterd for scripted compute tests." | ||
| version = "0.0.0" | ||
| edition.workspace = true | ||
| rust-version.workspace = true | ||
| publish = false | ||
|
|
||
| [lints] | ||
| workspace = true | ||
|
|
||
| [dependencies] | ||
| anyhow.workspace = true | ||
| futures.workspace = true | ||
| mz-compute-client = { path = "../compute-client" } | ||
| mz-compute-types = { path = "../compute-types" } | ||
| mz-dyncfg = { path = "../dyncfg" } | ||
| mz-expr = { path = "../expr" } | ||
| # The repo's MzReflect-free MIR-from-text parser (the `.spec` test syntax), so | ||
| # `define` authors MIR in the readable pretty form rather than a hand-rolled | ||
| # JSON vocabulary. | ||
| mz-expr-parser = { path = "../expr-parser" } | ||
| # Used by the binary to configure tracing. It also keeps orchestrator-tracing's | ||
| # `tokio-console` feature enabled, which must stay consistent with mz-ore's | ||
| # `tokio-console` (pulled transitively via mz-compute-client) when this bin is | ||
| # built in the same cargo invocation as orchestratord — otherwise | ||
| # orchestrator-tracing fails to compile (E0063 on `TracingConfig::tokio_console`). | ||
| mz-orchestrator-tracing = { path = "../orchestrator-tracing" } | ||
| # default-features = false to avoid pulling mz-ore's default `tokio-console` | ||
| # feature: this bin can be grouped into a single cargo build with other images' | ||
| # bins (e.g. orchestratord), and enabling tokio-console there breaks | ||
| # orchestrator-tracing, which is built with its own tokio-console feature off. | ||
| mz-ore = { path = "../ore", default-features = false, features = [ | ||
| "async", | ||
| "tracing", | ||
| "metrics", | ||
| ] } | ||
| mz-persist-client = { path = "../persist-client" } | ||
| mz-persist-types = { path = "../persist-types" } | ||
| mz-repr = { path = "../repr" } | ||
| mz-service = { path = "../service" } | ||
| mz-storage-types = { path = "../storage-types" } | ||
| # Optional MIR dataflow optimizer, run only by `create-dataflow ... optimize`. | ||
| # Needed for shapes that don't lower from raw MIR (notably joins, whose | ||
| # `implementation` defaults to `Unimplemented`); the default path lowers faithfully | ||
| # without it. | ||
| mz-transform = { path = "../transform" } | ||
| serde = { workspace = true, features = ["derive"] } | ||
| timely.workspace = true | ||
| tokio = { workspace = true, features = ["full"] } | ||
| tokio-stream.workspace = true | ||
| tracing.workspace = true | ||
| uuid.workspace = true | ||
|
|
||
| [dev-dependencies] | ||
| # The `test` feature provides the `#[mz_ore::test]` macro; it is only needed | ||
| # when building tests, not the production binary, so keep it out of the bin's | ||
| # feature set (see the [dependencies] mz-ore comment). | ||
| mz-ore = { path = "../ore", features = ["test"] } | ||
| tempfile.workspace = true | ||
|
|
||
| [package.metadata.cargo-udeps.ignore] | ||
| normal = [] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| /headless-driver |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| # Copyright Materialize, Inc. and contributors. All rights reserved. | ||
| # | ||
| # Use of this software is governed by the Business Source License | ||
| # included in the LICENSE file at the root of this repository. | ||
| # | ||
| # As of the Change Date specified in that file, in accordance with | ||
| # the Business Source License, use of this software will be governed | ||
| # by the Apache License, Version 2.0. | ||
|
|
||
| MZFROM prod-base | ||
|
|
||
| COPY headless-driver /usr/local/bin/ | ||
|
|
||
| USER materialize | ||
|
|
||
| ENTRYPOINT ["tini", "--", "headless-driver"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| # Copyright Materialize, Inc. and contributors. All rights reserved. | ||
| # | ||
| # Use of this software is governed by the Business Source License | ||
| # included in the LICENSE file at the root of this repository. | ||
| # | ||
| # As of the Change Date specified in that file, in accordance with | ||
| # the Business Source License, use of this software will be governed | ||
| # by the Apache License, Version 2.0. | ||
|
|
||
| name: clusterd-test-driver | ||
| description: Headless frontend to clusterd for scripted compute tests. | ||
| pre-image: | ||
| - type: cargo-build | ||
| bin: headless-driver |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| // Copyright Materialize, Inc. and contributors. All rights reserved. | ||
| // | ||
| // Use of this software is governed by the Business Source License | ||
| // included in the LICENSE file. | ||
| // | ||
| // As of the Change Date specified in that file, in accordance with | ||
| // the Business Source License, use of this software will be governed | ||
| // by the Apache License, Version 2.0. | ||
|
|
||
| //! Headless driver entry point for `mzcompose`. Connects to a running `clusterd`, | ||
| //! hosts persist PubSub, and runs a JSON command script against it (see | ||
| //! [`mz_clusterd_test_driver::script`]), exiting non-zero on assertion failure. | ||
| //! | ||
| //! The script source is the file named by `DRIVER_SCRIPT`, or stdin when that is | ||
| //! unset. Connection and persist configuration come from the environment: | ||
| //! `CLUSTERD_COMPUTE_ADDR`, `PERSIST_BLOB_URL`, `PERSIST_CONSENSUS_URL`, and | ||
| //! `DRIVER_PUBSUB_BIND`. | ||
|
|
||
| use std::net::SocketAddr; | ||
|
|
||
| use anyhow::Context; | ||
| use mz_clusterd_test_driver::driver::Driver; | ||
| use mz_clusterd_test_driver::persist_host::PersistHost; | ||
| use mz_clusterd_test_driver::script; | ||
| use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs}; | ||
| use mz_ore::metrics::MetricsRegistry; | ||
| use mz_persist_types::PersistLocation; | ||
| use tokio::io::AsyncReadExt; | ||
|
|
||
| /// Connect to `clusterd` and host persist PubSub, reading configuration from the | ||
| /// environment. Returns the persist location (for dataflow imports) and a | ||
| /// connected [`Driver`]. | ||
| async fn setup() -> anyhow::Result<(PersistLocation, Driver)> { | ||
| let compute_addr = | ||
| std::env::var("CLUSTERD_COMPUTE_ADDR").unwrap_or_else(|_| "clusterd:2101".to_string()); | ||
| let blob = std::env::var("PERSIST_BLOB_URL").expect("PERSIST_BLOB_URL"); | ||
| let consensus = std::env::var("PERSIST_CONSENSUS_URL").expect("PERSIST_CONSENSUS_URL"); | ||
| let pubsub_bind: SocketAddr = std::env::var("DRIVER_PUBSUB_BIND") | ||
| .unwrap_or_else(|_| "0.0.0.0:6879".to_string()) | ||
| .parse()?; | ||
|
|
||
| let loc = PersistLocation { | ||
| blob_uri: blob.parse()?, | ||
| consensus_uri: consensus.parse()?, | ||
| }; | ||
| let host = PersistHost::start_on(pubsub_bind, loc.clone()).await?; | ||
| let driver = Driver::connect(host, &compute_addr).await?; | ||
| Ok((loc, driver)) | ||
| } | ||
|
|
||
| #[tokio::main] | ||
| async fn main() -> anyhow::Result<()> { | ||
| // Configure tracing so the driver emits structured logs like the real | ||
| // Materialize binaries. | ||
| let _tracing_handle = TracingCliArgs::default() | ||
| .configure_tracing( | ||
| StaticTracingConfig { | ||
| service_name: "headless-driver", | ||
| build_info: mz_persist_client::BUILD_INFO, | ||
| }, | ||
| MetricsRegistry::new(), | ||
| ) | ||
| .await?; | ||
|
|
||
| let (loc, driver) = setup().await?; | ||
|
|
||
| // Read the script from `DRIVER_SCRIPT` if set, else stdin. The path is passed | ||
| // through so a `REWRITE` run can rewrite the file in place. | ||
| match std::env::var("DRIVER_SCRIPT") { | ||
| Ok(path) => { | ||
| let content = tokio::fs::read_to_string(&path) | ||
| .await | ||
| .with_context(|| format!("reading DRIVER_SCRIPT {path}"))?; | ||
| script::run(driver, loc, &content, Some(std::path::Path::new(&path))).await | ||
| } | ||
| Err(_) => { | ||
| let mut content = String::new(); | ||
| tokio::io::stdin().read_to_string(&mut content).await?; | ||
| script::run(driver, loc, &content, None).await | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| // Copyright Materialize, Inc. and contributors. All rights reserved. | ||
| // | ||
| // Use of this software is governed by the Business Source License | ||
| // included in the LICENSE file. | ||
| // | ||
| // As of the Change Date specified in that file, in accordance with | ||
| // the Business Source License, use of this software will be governed | ||
| // by the Apache License, Version 2.0. | ||
|
|
||
| //! Compute CTP connection and the `Hello` step of the controller handshake. | ||
| //! Generic: it sends any `ComputeCommand` and receives any `ComputeResponse`. | ||
| //! | ||
| //! Only `Hello` (the transport/version step) happens here; the controller | ||
| //! handshake proper — `CreateInstance`, `UpdateConfiguration`, | ||
| //! `InitializationComplete` — is driven by explicit caller commands, so the | ||
| //! caller controls the instance config, the peek-stash setting, and exactly when | ||
| //! the reconciliation window opens and closes. | ||
|
|
||
| use std::time::Duration; | ||
|
|
||
| use mz_compute_client::protocol::command::ComputeCommand; | ||
| use mz_compute_client::protocol::response::ComputeResponse; | ||
| use mz_service::client::GenericClient; | ||
| use mz_service::transport::{Client, NoopMetrics}; | ||
| use uuid::Uuid; | ||
|
|
||
| pub type ComputeCtpClient = Client<ComputeCommand, ComputeResponse>; | ||
|
|
||
| /// Connects to a clusterd compute controller address and sends `Hello`, leaving | ||
| /// the controller handshake (`CreateInstance` onward) to the caller. | ||
| /// | ||
| /// A reconnect re-runs exactly this: a fresh transport connection plus `Hello`. | ||
| /// The reconciliation window then opens when the script sends `CreateInstance` | ||
| /// and closes when it sends `InitializationComplete`; in between, the replica | ||
| /// reconciles the replayed dataflows against its live ones rather than | ||
| /// rehydrating. | ||
| pub async fn connect_and_hello(compute_addr: &str) -> anyhow::Result<ComputeCtpClient> { | ||
| // Use persist-client's BUILD_INFO: it is release-versioned (synced by | ||
| // bin/bump-version), so it matches the clusterd we connect to. Our own | ||
| // crate is `0.0.0`, which would fail the handshake's version check. | ||
| let version = mz_persist_client::BUILD_INFO.semver_version(); | ||
| let mut client = Client::<ComputeCommand, ComputeResponse>::connect( | ||
| compute_addr, | ||
| version, | ||
| Duration::from_secs(30), | ||
| Duration::from_secs(60), | ||
| NoopMetrics, | ||
| ) | ||
| .await?; | ||
|
|
||
| client | ||
| .send(ComputeCommand::Hello { | ||
| nonce: Uuid::new_v4(), | ||
| }) | ||
| .await?; | ||
|
|
||
| Ok(client) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::target; | ||
|
|
||
| #[mz_ore::test(tokio::test)] | ||
| #[cfg_attr(miri, ignore)] | ||
| async fn hello_holds_connection() { | ||
| if !target::e2e_enabled() { | ||
| return; | ||
| } | ||
| let mut client = connect_and_hello(&target::compute_addr()) | ||
| .await | ||
| .expect("hello"); | ||
| let r = tokio::time::timeout(Duration::from_millis(500), client.recv()).await; | ||
| assert!(r.is_err() || r.unwrap().is_ok()); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only called from the driver. Might not need to be its own file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's shared by two modules, not just the driver:
driver.rsusesconnect_and_hello, andresponses.rsuses theComputeCtpClienttype for the response pump. Folding it intodriver.rswould make the response pump depend on the driver just for the transport type, so I'd keep the CTP primitives in their own small module. Happy to revisit if you feel strongly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed it in my grep. My bad!