Skip to content

Commit 45780a2

Browse files
antiguruclaude
andcommitted
clusterd-test-driver: text script runner and scenarios
Drives the headless harness from text command scripts instead of recompiled Rust scenarios. A script is a sequence of commands, each with a `----` golden block that is the assertion; `create-dataflow` carries arbitrary MIR (parsed by `mz-expr-parser`, the `.spec` syntax) over the full `DataflowBuilder` surface, including index / materialized-view / subscribe exports and an opt-in `optimize`. Adds the `text` parser, the `script` interpreter, the `headless-driver` binary, the mzbuild image and CI pipeline entry, the mzcompose composition and a host-local runner (`run-local.py`), and the scenario scripts (index, deep-history, side-effects, reconciliation, error-behavior, reduce, materialized-view, subscribe, join, index-and-mv, custom-schema). `Composition.run` gains `use_aliases` so the one-shot driver container can host the PubSub that clusterd dials. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 0a8dd9e commit 45780a2

26 files changed

Lines changed: 3433 additions & 9 deletions

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ci/test/pipeline.template.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,22 @@ steps:
549549
agents:
550550
queue: hetzner-aarch64-4cpu-8gb
551551

552+
- id: clusterd-test-driver
553+
label: Clusterd test driver
554+
# Depend on both builds: the `clusterd-test-driver` image changes every commit,
555+
# so its arch-specific tag is pushed late in its build. `mkpipeline` may switch
556+
# this aarch64 job to an x86_64 agent (Hetzner aarch64 availability), so waiting
557+
# for only one build races the other arch's freshly-built image (it would fail
558+
# to pull `clusterd-test-driver:mzbuild-…: not found`). Waiting for both is safe.
559+
depends_on: [build-aarch64, build-x86_64]
560+
timeout_in_minutes: 20
561+
inputs: [test/clusterd-test-driver]
562+
plugins:
563+
- ./ci/plugins/mzcompose:
564+
composition: clusterd-test-driver
565+
agents:
566+
queue: hetzner-aarch64-4cpu-8gb
567+
552568
- id: dbt-materialize
553569
label: dbt-materialize
554570
depends_on: build-aarch64

misc/python/materialize/mzcompose/composition.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,7 @@ def run(
913913
entrypoint: str | None = None,
914914
check: bool = True,
915915
silent: bool = False,
916+
use_aliases: bool = False,
916917
) -> subprocess.CompletedProcess:
917918
"""Run a one-off command in a service.
918919
@@ -929,6 +930,9 @@ def run(
929930
stdin: read STDIN from a string.
930931
env_extra: Additional environment variables to set in the container.
931932
rm: Remove container after run.
933+
use_aliases: Connect the container to the network(s) using the
934+
service's network aliases, so other services can reach it by
935+
its service name (`docker compose run` omits aliases by default).
932936
capture: Capture the stdout of the `docker compose` invocation.
933937
capture_stderr: Capture the stderr of the `docker compose` invocation.
934938
capture_and_print: Print during execution and capture the
@@ -940,6 +944,7 @@ def run(
940944
*(f"-e{k}" for k in env_extra.keys()),
941945
*(["--detach"] if detach else []),
942946
*(["--rm"] if rm else []),
947+
*(["--use-aliases"] if use_aliases else []),
943948
service,
944949
*args,
945950
capture=capture,

misc/python/materialize/mzcompose/services/clusterd.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@
1515
ServiceConfig,
1616
)
1717

18+
# Arrangement merge effort (`arrangement_exert_proportionality`) for the compute
19+
# and storage timely clusters. Kept as named constants so other launchers (e.g.
20+
# the clusterd-test-driver local runner) reuse the same defaults.
21+
DEFAULT_COMPUTE_EXERT_PROPORTIONALITY = 16
22+
DEFAULT_STORAGE_EXERT_PROPORTIONALITY = 1337
23+
1824

1925
class Clusterd(Service):
2026
def __init__(
@@ -59,8 +65,12 @@ def __init__(
5965

6066
process_names = process_names if process_names else [name]
6167
process_index = process_names.index(name)
62-
compute_timely_config = timely_config(process_names, 2102, workers, 16)
63-
storage_timely_config = timely_config(process_names, 2103, workers, 1337)
68+
compute_timely_config = timely_config(
69+
process_names, 2102, workers, DEFAULT_COMPUTE_EXERT_PROPORTIONALITY
70+
)
71+
storage_timely_config = timely_config(
72+
process_names, 2103, workers, DEFAULT_STORAGE_EXERT_PROPORTIONALITY
73+
)
6474

6575
environment += [
6676
f"CLUSTERD_PROCESS={process_index}",

src/clusterd-test-driver/Cargo.toml

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,20 @@ mz-compute-client = { path = "../compute-client" }
1616
mz-compute-types = { path = "../compute-types" }
1717
mz-dyncfg = { path = "../dyncfg" }
1818
mz-expr = { path = "../expr" }
19+
# The repo's MzReflect-free MIR-from-text parser (the `.spec` test syntax), so
20+
# `define` authors MIR in the readable pretty form rather than a hand-rolled
21+
# JSON vocabulary.
22+
mz-expr-parser = { path = "../expr-parser" }
23+
# Used by the binary to configure tracing. It also keeps orchestrator-tracing's
24+
# `tokio-console` feature enabled, which must stay consistent with mz-ore's
25+
# `tokio-console` (pulled transitively via mz-compute-client) when this bin is
26+
# built in the same cargo invocation as orchestratord — otherwise
27+
# orchestrator-tracing fails to compile (E0063 on `TracingConfig::tokio_console`).
28+
mz-orchestrator-tracing = { path = "../orchestrator-tracing" }
1929
# default-features = false to avoid pulling mz-ore's default `tokio-console`
20-
# feature (pulled transitively via mz-compute-client); the harness only needs the
21-
# async, tracing, and metrics helpers.
30+
# feature: this bin can be grouped into a single cargo build with other images'
31+
# bins (e.g. orchestratord), and enabling tokio-console there breaks
32+
# orchestrator-tracing, which is built with its own tokio-console feature off.
2233
mz-ore = { path = "../ore", default-features = false, features = [
2334
"async",
2435
"tracing",
@@ -29,10 +40,12 @@ mz-persist-types = { path = "../persist-types" }
2940
mz-repr = { path = "../repr" }
3041
mz-service = { path = "../service" }
3142
mz-storage-types = { path = "../storage-types" }
32-
# Optional MIR dataflow optimizer, run only by `DataflowBuilder::optimize`. Needed
33-
# for shapes that don't lower from raw MIR (notably joins, whose `implementation`
34-
# defaults to `Unimplemented`); the default path lowers faithfully without it.
43+
# Optional MIR dataflow optimizer, run only by `create-dataflow ... optimize`.
44+
# Needed for shapes that don't lower from raw MIR (notably joins, whose
45+
# `implementation` defaults to `Unimplemented`); the default path lowers faithfully
46+
# without it.
3547
mz-transform = { path = "../transform" }
48+
serde = { workspace = true, features = ["derive"] }
3649
timely.workspace = true
3750
tokio = { workspace = true, features = ["full"] }
3851
tokio-stream.workspace = true
@@ -41,8 +54,8 @@ uuid.workspace = true
4154

4255
[dev-dependencies]
4356
# The `test` feature provides the `#[mz_ore::test]` macro; it is only needed
44-
# when building tests, so keep it out of the non-test feature set (see the
45-
# [dependencies] mz-ore comment).
57+
# when building tests, not the production binary, so keep it out of the bin's
58+
# feature set (see the [dependencies] mz-ore comment).
4659
mz-ore = { path = "../ore", features = ["test"] }
4760
tempfile.workspace = true
4861

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/headless-driver
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the LICENSE file at the root of this repository.
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0.
9+
10+
MZFROM prod-base
11+
12+
COPY headless-driver /usr/local/bin/
13+
14+
USER materialize
15+
16+
ENTRYPOINT ["tini", "--", "headless-driver"]
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the LICENSE file at the root of this repository.
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0.
9+
10+
name: clusterd-test-driver
11+
description: Headless frontend to clusterd for scripted compute tests.
12+
pre-image:
13+
- type: cargo-build
14+
bin: headless-driver
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright Materialize, Inc. and contributors. All rights reserved.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the LICENSE file.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0.
9+
10+
//! Headless driver entry point for `mzcompose`. Connects to a running `clusterd`,
11+
//! hosts persist PubSub, and runs a JSON command script against it (see
12+
//! [`mz_clusterd_test_driver::script`]), exiting non-zero on assertion failure.
13+
//!
14+
//! The script source is the file named by `DRIVER_SCRIPT`, or stdin when that is
15+
//! unset. Connection and persist configuration come from the environment:
16+
//! `CLUSTERD_COMPUTE_ADDR`, `PERSIST_BLOB_URL`, `PERSIST_CONSENSUS_URL`, and
17+
//! `DRIVER_PUBSUB_BIND`.
18+
19+
use std::net::SocketAddr;
20+
21+
use anyhow::Context;
22+
use mz_clusterd_test_driver::driver::Driver;
23+
use mz_clusterd_test_driver::persist_host::PersistHost;
24+
use mz_clusterd_test_driver::script;
25+
use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
26+
use mz_ore::metrics::MetricsRegistry;
27+
use mz_persist_types::PersistLocation;
28+
use tokio::io::AsyncReadExt;
29+
30+
/// Connect to `clusterd` and host persist PubSub, reading configuration from the
31+
/// environment. Returns the persist location (for dataflow imports) and a
32+
/// connected [`Driver`].
33+
async fn setup() -> anyhow::Result<(PersistLocation, Driver)> {
34+
let compute_addr =
35+
std::env::var("CLUSTERD_COMPUTE_ADDR").unwrap_or_else(|_| "clusterd:2101".to_string());
36+
let blob = std::env::var("PERSIST_BLOB_URL").expect("PERSIST_BLOB_URL");
37+
let consensus = std::env::var("PERSIST_CONSENSUS_URL").expect("PERSIST_CONSENSUS_URL");
38+
let pubsub_bind: SocketAddr = std::env::var("DRIVER_PUBSUB_BIND")
39+
.unwrap_or_else(|_| "0.0.0.0:6879".to_string())
40+
.parse()?;
41+
42+
let loc = PersistLocation {
43+
blob_uri: blob.parse()?,
44+
consensus_uri: consensus.parse()?,
45+
};
46+
let host = PersistHost::start_on(pubsub_bind, loc.clone()).await?;
47+
let driver = Driver::connect(host, &compute_addr).await?;
48+
Ok((loc, driver))
49+
}
50+
51+
#[tokio::main]
52+
async fn main() -> anyhow::Result<()> {
53+
// Configure tracing so the driver emits structured logs like the real
54+
// Materialize binaries.
55+
let _tracing_handle = TracingCliArgs::default()
56+
.configure_tracing(
57+
StaticTracingConfig {
58+
service_name: "headless-driver",
59+
build_info: mz_persist_client::BUILD_INFO,
60+
},
61+
MetricsRegistry::new(),
62+
)
63+
.await?;
64+
65+
let (loc, driver) = setup().await?;
66+
67+
// Read the script from `DRIVER_SCRIPT` if set, else stdin. The path is passed
68+
// through so a `REWRITE` run can rewrite the file in place.
69+
match std::env::var("DRIVER_SCRIPT") {
70+
Ok(path) => {
71+
let content = tokio::fs::read_to_string(&path)
72+
.await
73+
.with_context(|| format!("reading DRIVER_SCRIPT {path}"))?;
74+
script::run(driver, loc, &content, Some(std::path::Path::new(&path))).await
75+
}
76+
Err(_) => {
77+
let mut content = String::new();
78+
tokio::io::stdin().read_to_string(&mut content).await?;
79+
script::run(driver, loc, &content, None).await
80+
}
81+
}
82+
}

src/clusterd-test-driver/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,6 @@ pub mod dataflow;
1717
pub mod driver;
1818
pub mod persist_host;
1919
pub mod responses;
20+
pub mod script;
2021
pub mod target;
22+
pub mod text;

0 commit comments

Comments
 (0)