1818//! (which covers the simpler two-input, one-metric-per-input case) with
1919//! the harder scenarios:
2020//!
21- //! - **Three inputs**, each carrying **three metrics** (`aaa.alpha`, `bbb.beta`,
22- //! `ccc.gamma`). Across inputs, the metrics overlap and the per-metric
23- //! timeseries IDs collide (each row's `timeseries_id` is derived from the
24- //! metric name, so input-x, input-y, input-z all share the same set of IDs
25- //! per metric). Timestamps within each (metric, timeseries) overlap across
26- //! inputs but are unique — the merge must interleave rows from all three
27- //! inputs heavily, not concatenate them.
28- //! - **Multi-row-group output** via `ParquetWriterConfig::row_group_size = 50`
29- //! on the n=1 tests, so the 180-row merge output breaks into 4 row groups.
30- //! Exercises the writer's multi-RG path in both engines.
31- //! - **Multi-row-group inputs with `rg_partition_prefix_len = 1`** in the
32- //! bonus tests (`write_prefix_aligned_input`): the writer flushes one row
33- //! group per distinct `metric_name`, so each input file carries three row
34- //! groups in alignment with the sort prefix. The streaming engine reads
35- //! these through its prefix-aware fast path.
21+ //! - **Three inputs**, each carrying **three metrics** (`aaa.alpha`, `bbb.beta`, `ccc.gamma`).
22+ //! Across inputs, the metrics overlap and the per-metric timeseries IDs collide (each row's
23+ //! `timeseries_id` is derived from the metric name, so input-x, input-y, input-z all share the
24+ //! same set of IDs per metric). Timestamps within each (metric, timeseries) overlap across inputs
25+ //! but are unique — the merge must interleave rows from all three inputs heavily, not concatenate
26+ //! them.
27+ //! - **Multi-row-group output** via `ParquetWriterConfig::row_group_size = 50` on the n=1 tests, so
28+ //! the 180-row merge output breaks into 4 row groups. Exercises the writer's multi-RG path in
29+ //! both engines.
30+ //! - **Multi-row-group inputs with `rg_partition_prefix_len = 1`** in the bonus tests
31+ //! (`write_prefix_aligned_input`): the writer flushes one row group per distinct `metric_name`,
32+ //! so each input file carries three row groups in alignment with the sort prefix. The streaming
33+ //! engine reads these through its prefix-aware fast path.
3634//! - **m:n merges** in the bonus tests: a small
37- //! `ParquetMergePipelineParams::target_split_size_bytes` forces the
38- //! executor to ask the engine for `num_outputs > 1`. The bonus
39- //! assertions cover the multi-output contract — sum-equals-total,
40- //! internal monotonicity, inter-output `sorted_series` disjointness,
41- //! and union-equals-full-set on metrics/services.
35+ //! `ParquetMergePipelineParams::target_split_size_bytes` forces the executor to ask the engine
36+ //! for `num_outputs > 1`. The bonus assertions cover the multi-output contract —
37+ //! sum-equals-total, internal monotonicity, inter-output `sorted_series` disjointness, and
38+ //! union-equals-full-set on metrics/services.
4239//!
4340//! Both `ParquetMergePipelineParams::use_streaming_engine = false` (the
4441//! in-memory engine) and `= true` (the streaming engine) are exercised
@@ -689,14 +686,11 @@ async fn assert_three_input_three_metric_single_output_correct(
689686/// - The pipeline staged at least two output splits (proves splitting happened).
690687/// - The sum of per-output row counts equals the total input row count.
691688/// - Each output is internally sorted on `sorted_series`.
692- /// - Across outputs, the `sorted_series` partition is **disjoint** (no two
693- /// outputs share any `sorted_series` value — the merge engine splits at
694- /// series boundaries, never inside).
695- /// - The union of metric_names / services across outputs covers the full
696- /// input set.
697- /// - Every output declares `num_merge_ops = 1` (first merge over level-0
698- /// inputs) and has `row_keys_proto` + `metric_name` zonemap regex
699- /// populated.
689+ /// - Across outputs, the `sorted_series` partition is **disjoint** (no two outputs share any
690+ /// `sorted_series` value — the merge engine splits at series boundaries, never inside).
691+ /// - The union of metric_names / services across outputs covers the full input set.
692+ /// - Every output declares `num_merge_ops = 1` (first merge over level-0 inputs) and has
693+ /// `row_keys_proto` + `metric_name` zonemap regex populated.
700694async fn assert_three_input_three_metric_multi_output_correct (
701695 staged_metadata : & Arc < std:: sync:: Mutex < Vec < ParquetSplitMetadata > > > ,
702696 replaced_ids : & Arc < std:: sync:: Mutex < Vec < String > > > ,
@@ -749,8 +743,8 @@ async fn assert_three_input_three_metric_multi_output_correct(
749743 let series = extract_binary_column ( & batch, SORTED_SERIES_COLUMN ) ;
750744 assert ! (
751745 !series. is_empty( ) ,
752- "every output must have at least one row (empty outputs should be dropped \
753- by the engine)"
746+ "every output must have at least one row (empty outputs should be dropped by the \
747+ engine)"
754748 ) ;
755749 for i in 1 ..series. len ( ) {
756750 assert ! (
@@ -773,8 +767,7 @@ async fn assert_three_input_three_metric_multi_output_correct(
773767 let ( right_min, _, right_file) = & window[ 1 ] ;
774768 assert ! (
775769 left_max < right_min,
776- "outputs {} and {} overlap on sorted_series: \
777- left max = {:?}, right min = {:?}",
770+ "outputs {} and {} overlap on sorted_series: left max = {:?}, right min = {:?}" ,
778771 left_file,
779772 right_file,
780773 left_max,
@@ -929,8 +922,8 @@ async fn test_multi_metric_three_input_single_output_streaming_engine() {
929922 |staged, replaced, storage| async move {
930923 assert ! (
931924 PEAK_BODY_COL_PAGE_CACHE_LEN . load( Ordering :: Relaxed ) > 0 ,
932- "streaming engine did not write to PEAK_BODY_COL_PAGE_CACHE_LEN — \
933- routing may have silently fallen back to the in-memory engine",
925+ "streaming engine did not write to PEAK_BODY_COL_PAGE_CACHE_LEN — routing may \
926+ have silently fallen back to the in-memory engine",
934927 ) ;
935928 assert_three_input_three_metric_single_output_correct ( & staged, & replaced, & storage)
936929 . await ;
@@ -1052,8 +1045,8 @@ async fn test_prefix_aligned_multi_metric_three_input_multi_output_streaming_eng
10521045 |staged, replaced, storage| async move {
10531046 assert ! (
10541047 PEAK_BODY_COL_PAGE_CACHE_LEN . load( Ordering :: Relaxed ) > 0 ,
1055- "streaming engine did not write to PEAK_BODY_COL_PAGE_CACHE_LEN — \
1056- routing may have silently fallen back to the in-memory engine",
1048+ "streaming engine did not write to PEAK_BODY_COL_PAGE_CACHE_LEN — routing may \
1049+ have silently fallen back to the in-memory engine",
10571050 ) ;
10581051 assert_three_input_three_metric_multi_output_correct (
10591052 & staged,
0 commit comments