Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,18 @@ steps:
composition: parallel-workload
args: [--runtime=1200, --scenario=rename, --threads=8]

- id: parallel-workload-repeat-row
label: "Parallel Workload (repeat_row)"
depends_on: build-x86_64
artifact_paths: [parallel-workload-queries.log.zst]
timeout_in_minutes: 90
agents:
queue: hetzner-x86-64-12cpu-24gb
plugins:
- ./ci/plugins/mzcompose:
composition: parallel-workload
args: [--runtime=1200, --scenario=repeat-row, --threads=8]

- id: parallel-workload-cancel
label: "Parallel Workload (cancel)"
depends_on: build-x86_64
Expand Down
12 changes: 12 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
)
from materialize.parallel_workload.executor import Executor, Http
from materialize.parallel_workload.expression import ExprKind, expression
from materialize.parallel_workload.negative_accumulation_errors import (
NEGATIVE_ACCUMULATION_ERRORS,
)
from materialize.parallel_workload.settings import (
ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS,
Complexity,
Expand Down Expand Up @@ -227,6 +230,13 @@ def errors_to_ignore(self, exe: Executor) -> list[str]:
)
if exe.db.scenario == Scenario.Rename:
result.extend(["unknown schema", "ambiguous reference to schema name"])
if exe.db.scenario == Scenario.RepeatRow:
# Views that use `repeat_row` with a negative count can surface
# negative-accumulation errors both at DDL time (constant folding)
# and at read time (various operators checking multiplicities).
# The central list lives in `negative_accumulation_errors.py` so
# we can update it in one place when the error messages change.
result.extend(NEGATIVE_ACCUMULATION_ERRORS)
if materialize.parallel_workload.column.NAUGHTY_IDENTIFIERS:
result.extend(["identifier length exceeds 255 bytes"])
return result
Expand Down Expand Up @@ -1887,6 +1897,7 @@ def run(self, exe: Executor) -> bool:
base_object,
base_object2,
schema,
scenario=exe.db.scenario,
temp=True,
)
view.create(exe)
Expand All @@ -1908,6 +1919,7 @@ def run(self, exe: Executor) -> bool:
base_object,
base_object2,
schema,
scenario=exe.db.scenario,
)
# Randomly make materialized views replica-targeted
if (
Expand Down
64 changes: 63 additions & 1 deletion misc/python/materialize/parallel_workload/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Bytea,
DataType,
Jsonb,
Long,
Text,
TextTextMap,
)
Expand All @@ -43,6 +44,7 @@
from materialize.mzcompose.helpers.iceberg import setup_polaris_for_iceberg
from materialize.mzcompose.services.mysql import MySql
from materialize.mzcompose.services.sql_server import SqlServer
from materialize.parallel_workload.settings import Scenario
from materialize.parallel_workload.column import (
Column,
KafkaColumn,
Expand Down Expand Up @@ -236,11 +238,28 @@ def __init__(
base_object: DBObject,
base_object2: DBObject | None,
schema: Schema,
scenario: Scenario = Scenario.Regression,
temp: bool = False,
):
super().__init__()
self.rename = 0
self.view_id = view_id
# In the `RepeatRow` scenario, ~5% of views wrap their body with a
# `repeat_row(-1)` cross join (constant-folded into a `Constant` MIR
# node with negative diffs), and another ~5% are entirely replaced by
# a hardcoded body that drives `repeat_row` from a column of the
# `repeat_row_source` table (so the count is non-constant and can be
# negative at runtime). The two modes are mutually exclusive. The
# matching ignore list for negative-accumulation errors is wired
# through `Action.errors_to_ignore`.
self.repeat_row_const = (
scenario == Scenario.RepeatRow and rng.random() < 0.05
)
self.repeat_row_table = (
scenario == Scenario.RepeatRow
and not self.repeat_row_const
and rng.random() < 0.05
)
self.base_object = base_object
self.base_object2 = base_object2
self.schema = schema
Expand Down Expand Up @@ -289,6 +308,16 @@ def __init__(
for data_type in self.data_types
]

if self.repeat_row_table:
# Replace the randomly generated shape with a hardcoded single
# `bigint` column matching the body emitted by `get_select`. The
# body reads `diff` from `repeat_row_source` and feeds it as the
# `repeat_row` count, so the view's accumulation depends on the
# current contents of that table.
self.data_types = [Long]
self.columns = [Column(rng, 0, Long, self)]
self.union = False

def name(self) -> str:
if self.rename:
return naughtify(f"v-{self.view_id}-{self.rename}")
Expand All @@ -298,10 +327,25 @@ def __str__(self) -> str:
return f"{self.schema}.{identifier(self.name())}"

def get_select(self) -> str:
if self.repeat_row_table:
# Hardcoded body that drives `repeat_row` from a column whose
# value is determined at runtime, so the count can be negative
# depending on the current contents of `repeat_row_source`.
col = self.columns[0].name(True)
return (
f"SELECT r.diff::bigint AS {col} "
"FROM repeat_row_source r, repeat_row(r.diff)"
)

def select_str(exprs: str) -> str:
select = f"SELECT {exprs} FROM {self.base_object}"
if self.base_object2:
select += f" JOIN {self.base_object2} ON {self.on_expr}"
if self.repeat_row_const:
# `repeat_row(-1)` retracts each input row exactly once,
# producing a collection with a net-negative accumulation.
# Hardcoded to `-1` to avoid blowing up the data size.
select = f"SELECT * FROM ({select}) AS rr_inner, repeat_row(-1)"
return select

expressions_str = ", ".join(
Expand Down Expand Up @@ -1013,7 +1057,14 @@ def __init__(
base_object2: Table | None = rng.choice(self.tables)
if rng.choice([True, False]) or base_object2 == base_object:
base_object2 = None
view = View(rng, i, base_object, base_object2, rng.choice(self.schemas))
view = View(
rng,
i,
base_object,
base_object2,
rng.choice(self.schemas),
scenario=scenario,
)
self.views.append(view)
self.view_id = len(self.views)
self.roles = [Role(i) for i in range(rng.randint(0, MAX_INITIAL_ROLES))]
Expand Down Expand Up @@ -1160,6 +1211,17 @@ def create(self, exe: Executor, composition: Composition) -> None:
for relation in self:
relation.create(exe)

if self.scenario == Scenario.RepeatRow:
# Hardcoded helper table for the table-driven `repeat_row(diff)`
# mode in `View`. The mix of small positives, zeros, and `-1`s
# ensures the function sees both insertion and retraction counts
# at runtime, exercising negative-multiplicity code paths.
exe.execute("DROP TABLE IF EXISTS repeat_row_source CASCADE")
exe.execute("CREATE TABLE repeat_row_source (diff bigint NOT NULL)")
exe.execute(
"INSERT INTO repeat_row_source VALUES (1), (1), (-1), (-1), (0)"
)

# Questionable use
# result = composition.run(
# "sqlsmith",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""Error-message substrings produced when Materialize detects a negative
accumulation (or similar "non-positive multiplicity" condition).

Parallel Workload intentionally generates queries (via `repeat_row`) that can
trigger these errors; this module is the single place to update when the
messages change. Source of truth: database-issues#9308 (section "For reference,
here is a list of places in the code / error msgs where we detect negative
accumulations or similar issues").
"""

# Keep this list in sync with database-issues#9308. Each entry is a substring
# that may appear in an error surfaced to the client. When the underlying
# error messages are reworded, update this list in a single commit so the
# rest of Parallel Workload picks up the new strings automatically.
NEGATIVE_ACCUMULATION_ERRORS: list[str] = [
# Many places
"Non-monotonic input",
# TopK
"Negative multiplicities in TopK",
# Reduce
"Net-zero records with non-zero accumulation in ReduceAccumulable",
"Non-positive multiplicity in DistinctBy",
"Non-positive accumulation",
"Invalid negative unsigned aggregation in ReduceAccumulable",
"saw negative accumulation",
# Peek handling
"Invalid data in source, saw retractions",
"index peek encountered negative multiplicities in ok trace",
"index peek encountered negative multiplicities in error trace",
# S3 oneshot sink
"S3 oneshot sink encountered negative multiplicities",
# Constant folding
"Negative multiplicity in constant result",
# Scalar subquery guard
"negative number of rows produced in subquery",
]
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def run(
)
thread.start()
threads.append(thread)
elif scenario in (Scenario.Regression, Scenario.Rename):
elif scenario in (Scenario.Regression, Scenario.Rename, Scenario.RepeatRow):
pass
else:
raise ValueError(f"Unknown scenario {scenario}")
Expand Down
5 changes: 5 additions & 0 deletions misc/python/materialize/parallel_workload/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Scenario(Enum):
Rename = "rename"
BackupRestore = "backup-restore"
ZeroDowntimeDeploy = "0dt-deploy"
RepeatRow = "repeat-row"

@classmethod
def _missing_(cls, value):
Expand All @@ -44,4 +45,8 @@ def _missing_(cls, value):
"persist_stats_filter_enabled": "false",
# See https://materializeinc.slack.com/archives/CTESPM7FU/p1758195280629909, should reenable when it performs better
"enable_compute_logical_backpressure": "false",
# Allows the `Scenario.RepeatRow` scenario to call `repeat_row`. Having
# it on outside that scenario is harmless: no Parallel Workload codegen
# emits `repeat_row` unless the scenario is active.
"enable_repeat_row": "true",
}
Loading