Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"src/cluster",
"src/cluster-client",
"src/clusterd",
"src/clusterd-test-driver",
"src/compute",
"src/compute-client",
"src/compute-types",
Expand Down Expand Up @@ -155,6 +156,7 @@ default-members = [
"src/cluster",
"src/cluster-client",
"src/clusterd",
"src/clusterd-test-driver",
"src/compute",
"src/compute-client",
"src/compute-types",
Expand Down
16 changes: 16 additions & 0 deletions ci/test/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,22 @@ steps:
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: clusterd-test-driver
label: Clusterd test driver
# Depend on both builds: the `clusterd-test-driver` image changes every commit,
# so its arch-specific tag is pushed late in its build. `mkpipeline` may switch
# this aarch64 job to an x86_64 agent (Hetzner aarch64 availability), so waiting
# for only one build races the other arch's freshly-built image (it would fail
# to pull `clusterd-test-driver:mzbuild-…: not found`). Waiting for both is safe.
depends_on: [build-aarch64, build-x86_64]
timeout_in_minutes: 20
inputs: [test/clusterd-test-driver]
plugins:
- ./ci/plugins/mzcompose:
composition: clusterd-test-driver
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: dbt-materialize
label: dbt-materialize
depends_on: build-aarch64
Expand Down
319 changes: 319 additions & 0 deletions doc/developer/design/20260612_headless_clusterd_test_driver.md

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions misc/python/materialize/mzcompose/composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ def run(
entrypoint: str | None = None,
check: bool = True,
silent: bool = False,
use_aliases: bool = False,
) -> subprocess.CompletedProcess:
"""Run a one-off command in a service.

Expand All @@ -929,6 +930,9 @@ def run(
stdin: read STDIN from a string.
env_extra: Additional environment variables to set in the container.
rm: Remove container after run.
use_aliases: Connect the container to the network(s) using the
service's network aliases, so other services can reach it by
its service name (`docker compose run` omits aliases by default).
capture: Capture the stdout of the `docker compose` invocation.
capture_stderr: Capture the stderr of the `docker compose` invocation.
capture_and_print: Print during execution and capture the
Expand All @@ -940,6 +944,7 @@ def run(
*(f"-e{k}" for k in env_extra.keys()),
*(["--detach"] if detach else []),
*(["--rm"] if rm else []),
*(["--use-aliases"] if use_aliases else []),
service,
*args,
capture=capture,
Expand Down
14 changes: 12 additions & 2 deletions misc/python/materialize/mzcompose/services/clusterd.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
ServiceConfig,
)

# Arrangement merge effort (`arrangement_exert_proportionality`) for the compute
# and storage timely clusters. Kept as named constants so other launchers (e.g.
# the clusterd-test-driver local runner) reuse the same defaults.
DEFAULT_COMPUTE_EXERT_PROPORTIONALITY = 16
DEFAULT_STORAGE_EXERT_PROPORTIONALITY = 1337


class Clusterd(Service):
def __init__(
Expand Down Expand Up @@ -59,8 +65,12 @@ def __init__(

process_names = process_names if process_names else [name]
process_index = process_names.index(name)
compute_timely_config = timely_config(process_names, 2102, workers, 16)
storage_timely_config = timely_config(process_names, 2103, workers, 1337)
compute_timely_config = timely_config(
process_names, 2102, workers, DEFAULT_COMPUTE_EXERT_PROPORTIONALITY
)
storage_timely_config = timely_config(
process_names, 2103, workers, DEFAULT_STORAGE_EXERT_PROPORTIONALITY
)

environment += [
f"CLUSTERD_PROCESS={process_index}",
Expand Down
63 changes: 63 additions & 0 deletions src/clusterd-test-driver/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
[package]
name = "mz-clusterd-test-driver"
description = "Headless frontend to clusterd for scripted compute tests."
version = "0.0.0"
edition.workspace = true
rust-version.workspace = true
publish = false

[lints]
workspace = true

[dependencies]
anyhow.workspace = true
futures.workspace = true
mz-compute-client = { path = "../compute-client" }
mz-compute-types = { path = "../compute-types" }
mz-dyncfg = { path = "../dyncfg" }
mz-expr = { path = "../expr" }
# The repo's MzReflect-free MIR-from-text parser (the `.spec` test syntax), so
# `define` authors MIR in the readable pretty form rather than a hand-rolled
# JSON vocabulary.
mz-expr-parser = { path = "../expr-parser" }
# Used by the binary to configure tracing. It also keeps orchestrator-tracing's
# `tokio-console` feature enabled, which must stay consistent with mz-ore's
# `tokio-console` (pulled transitively via mz-compute-client) when this bin is
# built in the same cargo invocation as orchestratord — otherwise
# orchestrator-tracing fails to compile (E0063 on `TracingConfig::tokio_console`).
mz-orchestrator-tracing = { path = "../orchestrator-tracing" }
# default-features = false to avoid pulling mz-ore's default `tokio-console`
# feature: this bin can be grouped into a single cargo build with other images'
# bins (e.g. orchestratord), and enabling tokio-console there breaks
# orchestrator-tracing, which is built with its own tokio-console feature off.
mz-ore = { path = "../ore", default-features = false, features = [
"async",
"tracing",
"metrics",
] }
mz-persist-client = { path = "../persist-client" }
mz-persist-types = { path = "../persist-types" }
mz-repr = { path = "../repr" }
mz-service = { path = "../service" }
mz-storage-types = { path = "../storage-types" }
# Optional MIR dataflow optimizer, run only by `create-dataflow ... optimize`.
# Needed for shapes that don't lower from raw MIR (notably joins, whose
# `implementation` defaults to `Unimplemented`); the default path lowers faithfully
# without it.
mz-transform = { path = "../transform" }
serde = { workspace = true, features = ["derive"] }
timely.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-stream.workspace = true
tracing.workspace = true
uuid.workspace = true

[dev-dependencies]
# The `test` feature provides the `#[mz_ore::test]` macro; it is only needed
# when building tests, not the production binary, so keep it out of the bin's
# feature set (see the [dependencies] mz-ore comment).
mz-ore = { path = "../ore", features = ["test"] }
tempfile.workspace = true

[package.metadata.cargo-udeps.ignore]
normal = []
1 change: 1 addition & 0 deletions src/clusterd-test-driver/ci/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/headless-driver
16 changes: 16 additions & 0 deletions src/clusterd-test-driver/ci/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

MZFROM prod-base

COPY headless-driver /usr/local/bin/

USER materialize

ENTRYPOINT ["tini", "--", "headless-driver"]
14 changes: 14 additions & 0 deletions src/clusterd-test-driver/ci/mzbuild.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

name: clusterd-test-driver
description: Headless frontend to clusterd for scripted compute tests.
pre-image:
- type: cargo-build
bin: headless-driver
82 changes: 82 additions & 0 deletions src/clusterd-test-driver/src/bin/headless-driver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Headless driver entry point for `mzcompose`. Connects to a running `clusterd`,
//! hosts persist PubSub, and runs a JSON command script against it (see
//! [`mz_clusterd_test_driver::script`]), exiting non-zero on assertion failure.
//!
//! The script source is the file named by `DRIVER_SCRIPT`, or stdin when that is
//! unset. Connection and persist configuration come from the environment:
//! `CLUSTERD_COMPUTE_ADDR`, `PERSIST_BLOB_URL`, `PERSIST_CONSENSUS_URL`, and
//! `DRIVER_PUBSUB_BIND`.

use std::net::SocketAddr;

use anyhow::Context;
use mz_clusterd_test_driver::driver::Driver;
use mz_clusterd_test_driver::persist_host::PersistHost;
use mz_clusterd_test_driver::script;
use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
use mz_ore::metrics::MetricsRegistry;
use mz_persist_types::PersistLocation;
use tokio::io::AsyncReadExt;

/// Connect to `clusterd` and host persist PubSub, reading configuration from the
/// environment. Returns the persist location (for dataflow imports) and a
/// connected [`Driver`].
async fn setup() -> anyhow::Result<(PersistLocation, Driver)> {
let compute_addr =
std::env::var("CLUSTERD_COMPUTE_ADDR").unwrap_or_else(|_| "clusterd:2101".to_string());
let blob = std::env::var("PERSIST_BLOB_URL").expect("PERSIST_BLOB_URL");
let consensus = std::env::var("PERSIST_CONSENSUS_URL").expect("PERSIST_CONSENSUS_URL");
let pubsub_bind: SocketAddr = std::env::var("DRIVER_PUBSUB_BIND")
.unwrap_or_else(|_| "0.0.0.0:6879".to_string())
.parse()?;

let loc = PersistLocation {
blob_uri: blob.parse()?,
consensus_uri: consensus.parse()?,
};
let host = PersistHost::start_on(pubsub_bind, loc.clone()).await?;
let driver = Driver::connect(host, &compute_addr).await?;
Ok((loc, driver))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Configure tracing so the driver emits structured logs like the real
// Materialize binaries.
let _tracing_handle = TracingCliArgs::default()
.configure_tracing(
StaticTracingConfig {
service_name: "headless-driver",
build_info: mz_persist_client::BUILD_INFO,
},
MetricsRegistry::new(),
)
.await?;

let (loc, driver) = setup().await?;

// Read the script from `DRIVER_SCRIPT` if set, else stdin. The path is passed
// through so a `REWRITE` run can rewrite the file in place.
match std::env::var("DRIVER_SCRIPT") {
Ok(path) => {
let content = tokio::fs::read_to_string(&path)
.await
.with_context(|| format!("reading DRIVER_SCRIPT {path}"))?;
script::run(driver, loc, &content, Some(std::path::Path::new(&path))).await
}
Err(_) => {
let mut content = String::new();
tokio::io::stdin().read_to_string(&mut content).await?;
script::run(driver, loc, &content, None).await
}
}
}
77 changes: 77 additions & 0 deletions src/clusterd-test-driver/src/ctp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Compute CTP connection and the `Hello` step of the controller handshake.
//! Generic: it sends any `ComputeCommand` and receives any `ComputeResponse`.
//!
//! Only `Hello` (the transport/version step) happens here; the controller
//! handshake proper — `CreateInstance`, `UpdateConfiguration`,
//! `InitializationComplete` — is driven by explicit caller commands, so the
//! caller controls the instance config, the peek-stash setting, and exactly when
//! the reconciliation window opens and closes.

use std::time::Duration;

use mz_compute_client::protocol::command::ComputeCommand;
use mz_compute_client::protocol::response::ComputeResponse;
use mz_service::client::GenericClient;
use mz_service::transport::{Client, NoopMetrics};
use uuid::Uuid;

pub type ComputeCtpClient = Client<ComputeCommand, ComputeResponse>;

/// Connects to a clusterd compute controller address and sends `Hello`, leaving
/// the controller handshake (`CreateInstance` onward) to the caller.
///
/// A reconnect re-runs exactly this: a fresh transport connection plus `Hello`.
/// The reconciliation window then opens when the script sends `CreateInstance`
/// and closes when it sends `InitializationComplete`; in between, the replica
/// reconciles the replayed dataflows against its live ones rather than
/// rehydrating.
pub async fn connect_and_hello(compute_addr: &str) -> anyhow::Result<ComputeCtpClient> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only called from the driver. Might not need to be its own file

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's shared by two modules, not just the driver: driver.rs uses connect_and_hello, and responses.rs uses the ComputeCtpClient type for the response pump. Folding it into driver.rs would make the response pump depend on the driver just for the transport type, so I'd keep the CTP primitives in their own small module. Happy to revisit if you feel strongly.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed it in my grep. My bad!

// Use persist-client's BUILD_INFO: it is release-versioned (synced by
// bin/bump-version), so it matches the clusterd we connect to. Our own
// crate is `0.0.0`, which would fail the handshake's version check.
let version = mz_persist_client::BUILD_INFO.semver_version();
let mut client = Client::<ComputeCommand, ComputeResponse>::connect(
compute_addr,
version,
Duration::from_secs(30),
Duration::from_secs(60),
NoopMetrics,
)
.await?;

client
.send(ComputeCommand::Hello {
nonce: Uuid::new_v4(),
})
.await?;

Ok(client)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::target;

#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)]
async fn hello_holds_connection() {
if !target::e2e_enabled() {
return;
}
let mut client = connect_and_hello(&target::compute_addr())
.await
.expect("hello");
let r = tokio::time::timeout(Duration::from_millis(500), client.recv()).await;
assert!(r.is_err() || r.unwrap().is_ok());
}
}
Loading
Loading