Skip to content

Commit fc8d9dc

Browse files
Revert MySQL source versioning changes (#36211)
These changes have been determined to be the cause of [incident-971](https://materializeinc.slack.com/archives/C0ATJEV0SSG). There is a fix for the issue in #36195, but to be safe, and give us more time to verify and test these changes, we will revert the original breaking changes as a mitigation, until we are confident in the fixes. --------- Co-authored-by: Dennis Felsing <dennis@felsing.org>
1 parent 4d48ae7 commit fc8d9dc

14 files changed

Lines changed: 26 additions & 582 deletions

File tree

doc/user/content/ingest-data/mysql/source-versioning.md

Lines changed: 0 additions & 193 deletions
This file was deleted.

doc/user/data/mysql_config_settings.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@ rows:
1515
Value: "`FULL`"
1616
Notes: ""
1717

18-
- MySQL Configuration: "`binlog_row_metadata`"
19-
Value: "`FULL`"
20-
Notes: "Required when using the `CREATE TABLE FROM SOURCE` syntax."
21-
2218
- MySQL Configuration: "`gtid_mode`"
2319
Value: "`ON`"
2420
Notes: "{{ $gtid_mode_note }}"

src/mysql-util/src/decoding.rs

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,44 +30,9 @@ pub fn pack_mysql_row(
3030
table_desc: &MySqlTableDesc,
3131
) -> Result<Row, MySqlError> {
3232
let mut packer = row_container.packer();
33+
let row_values = row.unwrap();
3334

34-
// If a column name begins with '@', then the binlog does not have full row metadata,
35-
// meaning that full column names are not available and we need to rely on the order
36-
// of the columns in the upstream table matching the order of the columns in the row.
37-
// This is a fallback for MySQL servers that do not have `binlog_row_metadata` set to
38-
// `FULL`. If the first column name does not begin with '@', then we can assume that
39-
// full metadata is available and we can match columns by name.
40-
let row_values: Vec<Value> = if row
41-
.columns_ref()
42-
.first()
43-
.is_some_and(|col| col.name_ref().starts_with(b"@"))
44-
{
45-
row.unwrap()
46-
} else {
47-
row.columns_ref()
48-
.iter()
49-
.enumerate()
50-
.filter(|(_, col)| {
51-
table_desc
52-
.columns
53-
.iter()
54-
.filter(|col| col.column_type.is_some())
55-
.any(|c| c.name.as_str() == col.name_str())
56-
})
57-
.map(|(i, _)| {
58-
row.as_ref(i)
59-
.expect("Can't unwrap row if some of columns was taken")
60-
.clone()
61-
})
62-
.collect()
63-
};
64-
65-
for values in table_desc
66-
.columns
67-
.iter()
68-
.filter(|col| col.column_type.is_some())
69-
.zip_longest(row_values)
70-
{
35+
for values in table_desc.columns.iter().zip_longest(row_values) {
7136
let (col_desc, value) = match values {
7237
EitherOrBoth::Both(col_desc, value) => (col_desc, value),
7338
EitherOrBoth::Left(col_desc) => {

src/mysql-util/src/desc.rs

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,7 @@ impl MySqlTableDesc {
7575
/// exceptions:
7676
/// - `self`'s columns are a prefix of `other`'s columns.
7777
/// - `self`'s keys are all present in `other`
78-
pub fn determine_compatibility(
79-
&self,
80-
other: &MySqlTableDesc,
81-
full_metadata: bool,
82-
) -> Result<(), anyhow::Error> {
78+
pub fn determine_compatibility(&self, other: &MySqlTableDesc) -> Result<(), anyhow::Error> {
8379
if self == other {
8480
return Ok(());
8581
}
@@ -94,34 +90,18 @@ impl MySqlTableDesc {
9490
);
9591
}
9692

97-
// In the case that we don't have full binlog row metadata, `columns` is ordered by the
98-
// ordinal_position of each column in the table, so as long as `self.columns` is a
99-
// compatible prefix of `other.columns`, we can ignore extra columns from `other.columns`.
100-
//
101-
// If we do have full metadata, then we can match columns by name and just check that all
102-
// columns in `self.columns` are present and compatible with columns in `other.columns`.
93+
// `columns` is ordered by the ordinal_position of each column in the table,
94+
// so as long as `self.columns` is a compatible prefix of `other.columns`, we can
95+
// ignore extra columns from `other.columns`.
10396
let mut other_columns = other.columns.iter();
10497
for self_column in &self.columns {
105-
let other_column = if full_metadata {
106-
other_columns
107-
.by_ref()
108-
.find(|c| c.name == self_column.name)
109-
.ok_or_else(|| {
110-
anyhow::anyhow!(
111-
"column {} no longer present in table {}",
112-
self_column.name,
113-
self.name
114-
)
115-
})?
116-
} else {
117-
other_columns.next().ok_or_else(|| {
118-
anyhow::anyhow!(
119-
"column {} no longer present in table {}",
120-
self_column.name,
121-
self.name
122-
)
123-
})?
124-
};
98+
let other_column = other_columns.next().ok_or_else(|| {
99+
anyhow::anyhow!(
100+
"column {} no longer present in table {}",
101+
self_column.name,
102+
self.name
103+
)
104+
})?;
125105
if !self_column.is_compatible(other_column) {
126106
bail!(
127107
"column {} in table {} has been altered",
@@ -130,6 +110,7 @@ impl MySqlTableDesc {
130110
);
131111
}
132112
}
113+
133114
// Our keys are all still present in exactly the same shape.
134115
// TODO: Implement a more relaxed key compatibility check:
135116
// We should check that for all keys that we know about there exists an upstream key whose

src/sql/src/pure.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,16 +1858,6 @@ async fn purify_create_table_from_source(
18581858
)
18591859
.await?;
18601860

1861-
let binlog_metadata_setting =
1862-
mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await?;
1863-
if binlog_metadata_setting != "FULL" {
1864-
Err(
1865-
MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting {
1866-
setting: binlog_metadata_setting,
1867-
},
1868-
)?;
1869-
}
1870-
18711861
// Retrieve the current @gtid_executed value of the server to mark as the effective
18721862
// initial snapshot point for this table.
18731863
let initial_gtid_set =

src/sql/src/pure/error.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,10 +310,6 @@ pub enum MySqlSourcePurificationError {
310310
NoTablesFoundForSchemas(Vec<String>),
311311
#[error(transparent)]
312312
InvalidConnection(#[from] MySqlConnectionValidationError),
313-
#[error(
314-
"The MySQL system variable 'binlog_row_metadata' is set to an unsupported value: {setting}. Materialize requires this variable to be set to 'FULL' to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources."
315-
)]
316-
UnsupportedBinlogMetadataSetting { setting: String },
317313
}
318314

319315
impl MySqlSourcePurificationError {

src/storage/src/source/mysql/schemas.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,6 @@ where
4545
})
4646
.collect();
4747

48-
let full_metadata = conn
49-
.query_first::<String, String>("SELECT @@binlog_row_metadata".to_string())
50-
.await?
51-
.unwrap()
52-
.to_uppercase()
53-
== "FULL";
54-
5548
Ok(expected
5649
.into_iter()
5750
.flat_map(|(table, outputs)| {
@@ -71,15 +64,13 @@ where
7164
)),
7265
);
7366
match new_desc {
74-
Ok(desc) => {
75-
match output.desc.determine_compatibility(&desc, full_metadata) {
76-
Ok(()) => None,
77-
Err(err) => Some((
78-
output,
79-
DefiniteError::IncompatibleSchema(err.to_string()),
80-
)),
81-
}
82-
}
67+
Ok(desc) => match output.desc.determine_compatibility(&desc) {
68+
Ok(()) => None,
69+
Err(err) => Some((
70+
output,
71+
DefiniteError::IncompatibleSchema(err.to_string()),
72+
)),
73+
},
8374
Err(err) => {
8475
Some((output, DefiniteError::IncompatibleSchema(err.to_string())))
8576
}

0 commit comments

Comments
 (0)