diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 693d4baa0d857..231e9af1a9c8d 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -1970,6 +1970,18 @@ steps: composition: parallel-workload args: [--runtime=1200, --scenario=rename, --threads=8] + - id: parallel-workload-repeat-row + label: "Parallel Workload (repeat_row)" + depends_on: build-x86_64 + artifact_paths: [parallel-workload-queries.log.zst] + timeout_in_minutes: 90 + agents: + queue: hetzner-x86-64-12cpu-24gb + plugins: + - ./ci/plugins/mzcompose: + composition: parallel-workload + args: [--runtime=1200, --scenario=repeat-row, --threads=8] + - id: parallel-workload-cancel label: "Parallel Workload (cancel)" depends_on: build-x86_64 diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index f80884e9a1845..e25bde863d17b 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -82,6 +82,9 @@ ) from materialize.parallel_workload.executor import Executor, Http from materialize.parallel_workload.expression import ExprKind, expression +from materialize.parallel_workload.negative_accumulation_errors import ( + NEGATIVE_ACCUMULATION_ERRORS, +) from materialize.parallel_workload.settings import ( ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS, Complexity, @@ -227,6 +230,13 @@ def errors_to_ignore(self, exe: Executor) -> list[str]: ) if exe.db.scenario == Scenario.Rename: result.extend(["unknown schema", "ambiguous reference to schema name"]) + if exe.db.scenario == Scenario.RepeatRow: + # Views that use `repeat_row` with a negative count can surface + # negative-accumulation errors both at DDL time (constant folding) + # and at read time (various operators checking multiplicities). + # The central list lives in `negative_accumulation_errors.py` so + # we can update it in one place when the error messages change. + result.extend(NEGATIVE_ACCUMULATION_ERRORS) if materialize.parallel_workload.column.NAUGHTY_IDENTIFIERS: result.extend(["identifier length exceeds 255 bytes"]) return result @@ -1887,6 +1897,7 @@ def run(self, exe: Executor) -> bool: base_object, base_object2, schema, + scenario=exe.db.scenario, temp=True, ) view.create(exe) @@ -1908,6 +1919,7 @@ def run(self, exe: Executor) -> bool: base_object, base_object2, schema, + scenario=exe.db.scenario, ) # Randomly make materialized views replica-targeted if ( diff --git a/misc/python/materialize/parallel_workload/database.py b/misc/python/materialize/parallel_workload/database.py index 8a5930e445c62..778505a014a7b 100644 --- a/misc/python/materialize/parallel_workload/database.py +++ b/misc/python/materialize/parallel_workload/database.py @@ -26,6 +26,7 @@ Bytea, DataType, Jsonb, + Long, Text, TextTextMap, ) @@ -43,6 +44,7 @@ from materialize.mzcompose.helpers.iceberg import setup_polaris_for_iceberg from materialize.mzcompose.services.mysql import MySql from materialize.mzcompose.services.sql_server import SqlServer +from materialize.parallel_workload.settings import Scenario from materialize.parallel_workload.column import ( Column, KafkaColumn, @@ -236,11 +238,28 @@ def __init__( base_object: DBObject, base_object2: DBObject | None, schema: Schema, + scenario: Scenario = Scenario.Regression, temp: bool = False, ): super().__init__() self.rename = 0 self.view_id = view_id + # In the `RepeatRow` scenario, ~5% of views wrap their body with a + # `repeat_row(-1)` cross join (constant-folded into a `Constant` MIR + # node with negative diffs), and another ~5% are entirely replaced by + # a hardcoded body that drives `repeat_row` from a column of the + # `repeat_row_source` table (so the count is non-constant and can be + # negative at runtime). The two modes are mutually exclusive. The + # matching ignore list for negative-accumulation errors is wired + # through `Action.errors_to_ignore`. + self.repeat_row_const = ( + scenario == Scenario.RepeatRow and rng.random() < 0.05 + ) + self.repeat_row_table = ( + scenario == Scenario.RepeatRow + and not self.repeat_row_const + and rng.random() < 0.05 + ) self.base_object = base_object self.base_object2 = base_object2 self.schema = schema @@ -289,6 +308,16 @@ def __init__( for data_type in self.data_types ] + if self.repeat_row_table: + # Replace the randomly generated shape with a hardcoded single + # `bigint` column matching the body emitted by `get_select`. The + # body reads `diff` from `repeat_row_source` and feeds it as the + # `repeat_row` count, so the view's accumulation depends on the + # current contents of that table. + self.data_types = [Long] + self.columns = [Column(rng, 0, Long, self)] + self.union = False + def name(self) -> str: if self.rename: return naughtify(f"v-{self.view_id}-{self.rename}") @@ -298,10 +327,25 @@ def __str__(self) -> str: return f"{self.schema}.{identifier(self.name())}" def get_select(self) -> str: + if self.repeat_row_table: + # Hardcoded body that drives `repeat_row` from a column whose + # value is determined at runtime, so the count can be negative + # depending on the current contents of `repeat_row_source`. + col = self.columns[0].name(True) + return ( + f"SELECT r.diff::bigint AS {col} " + "FROM repeat_row_source r, repeat_row(r.diff)" + ) + def select_str(exprs: str) -> str: select = f"SELECT {exprs} FROM {self.base_object}" if self.base_object2: select += f" JOIN {self.base_object2} ON {self.on_expr}" + if self.repeat_row_const: + # `repeat_row(-1)` retracts each input row exactly once, + # producing a collection with a net-negative accumulation. + # Hardcoded to `-1` to avoid blowing up the data size. + select = f"SELECT * FROM ({select}) AS rr_inner, repeat_row(-1)" return select expressions_str = ", ".join( @@ -1013,7 +1057,14 @@ def __init__( base_object2: Table | None = rng.choice(self.tables) if rng.choice([True, False]) or base_object2 == base_object: base_object2 = None - view = View(rng, i, base_object, base_object2, rng.choice(self.schemas)) + view = View( + rng, + i, + base_object, + base_object2, + rng.choice(self.schemas), + scenario=scenario, + ) self.views.append(view) self.view_id = len(self.views) self.roles = [Role(i) for i in range(rng.randint(0, MAX_INITIAL_ROLES))] @@ -1160,6 +1211,17 @@ def create(self, exe: Executor, composition: Composition) -> None: for relation in self: relation.create(exe) + if self.scenario == Scenario.RepeatRow: + # Hardcoded helper table for the table-driven `repeat_row(diff)` + # mode in `View`. The mix of small positives, zeros, and `-1`s + # ensures the function sees both insertion and retraction counts + # at runtime, exercising negative-multiplicity code paths. + exe.execute("DROP TABLE IF EXISTS repeat_row_source CASCADE") + exe.execute("CREATE TABLE repeat_row_source (diff bigint NOT NULL)") + exe.execute( + "INSERT INTO repeat_row_source VALUES (1), (1), (-1), (-1), (0)" + ) + # Questionable use # result = composition.run( # "sqlsmith", diff --git a/misc/python/materialize/parallel_workload/negative_accumulation_errors.py b/misc/python/materialize/parallel_workload/negative_accumulation_errors.py new file mode 100644 index 0000000000000..d9749507d468c --- /dev/null +++ b/misc/python/materialize/parallel_workload/negative_accumulation_errors.py @@ -0,0 +1,45 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +"""Error-message substrings produced when Materialize detects a negative +accumulation (or similar "non-positive multiplicity" condition). + +Parallel Workload intentionally generates queries (via `repeat_row`) that can +trigger these errors; this module is the single place to update when the +messages change. Source of truth: database-issues#9308 (section "For reference, +here is a list of places in the code / error msgs where we detect negative +accumulations or similar issues"). +""" + +# Keep this list in sync with database-issues#9308. Each entry is a substring +# that may appear in an error surfaced to the client. When the underlying +# error messages are reworded, update this list in a single commit so the +# rest of Parallel Workload picks up the new strings automatically. +NEGATIVE_ACCUMULATION_ERRORS: list[str] = [ + # Many places + "Non-monotonic input", + # TopK + "Negative multiplicities in TopK", + # Reduce + "Net-zero records with non-zero accumulation in ReduceAccumulable", + "Non-positive multiplicity in DistinctBy", + "Non-positive accumulation", + "Invalid negative unsigned aggregation in ReduceAccumulable", + "saw negative accumulation", + # Peek handling + "Invalid data in source, saw retractions", + "index peek encountered negative multiplicities in ok trace", + "index peek encountered negative multiplicities in error trace", + # S3 oneshot sink + "S3 oneshot sink encountered negative multiplicities", + # Constant folding + "Negative multiplicity in constant result", + # Scalar subquery guard + "negative number of rows produced in subquery", +] diff --git a/misc/python/materialize/parallel_workload/parallel_workload.py b/misc/python/materialize/parallel_workload/parallel_workload.py index 081a69208b962..cf860d978f188 100644 --- a/misc/python/materialize/parallel_workload/parallel_workload.py +++ b/misc/python/materialize/parallel_workload/parallel_workload.py @@ -299,7 +299,7 @@ def run( ) thread.start() threads.append(thread) - elif scenario in (Scenario.Regression, Scenario.Rename): + elif scenario in (Scenario.Regression, Scenario.Rename, Scenario.RepeatRow): pass else: raise ValueError(f"Unknown scenario {scenario}") diff --git a/misc/python/materialize/parallel_workload/settings.py b/misc/python/materialize/parallel_workload/settings.py index 1bdc631246a47..39894cd0dc536 100644 --- a/misc/python/materialize/parallel_workload/settings.py +++ b/misc/python/materialize/parallel_workload/settings.py @@ -30,6 +30,7 @@ class Scenario(Enum): Rename = "rename" BackupRestore = "backup-restore" ZeroDowntimeDeploy = "0dt-deploy" + RepeatRow = "repeat-row" @classmethod def _missing_(cls, value): @@ -44,4 +45,8 @@ def _missing_(cls, value): "persist_stats_filter_enabled": "false", # See https://materializeinc.slack.com/archives/CTESPM7FU/p1758195280629909, should reenable when it performs better "enable_compute_logical_backpressure": "false", + # Allows the `Scenario.RepeatRow` scenario to call `repeat_row`. Having + # it on outside that scenario is harmless: no Parallel Workload codegen + # emits `repeat_row` unless the scenario is active. + "enable_repeat_row": "true", }