diff --git a/Cargo.lock b/Cargo.lock index ae3a3bc3a611c..6b41d89920cb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7871,6 +7871,7 @@ dependencies = [ "tracing-subscriber", "uncased", "uuid", + "version-compare", ] [[package]] @@ -8122,6 +8123,7 @@ dependencies = [ "tokio-util", "tracing", "uuid", + "version-compare", ] [[package]] @@ -13040,6 +13042,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version-compare" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c2856837ef78f57382f06b2b8563a2f512f7185d732608fd9176cb3b8edf0e" + [[package]] name = "version_check" version = "0.9.5" diff --git a/Cargo.toml b/Cargo.toml index 786754b7f2064..c3b6a1e711227 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -525,6 +525,7 @@ url = "2.5.8" urlencoding = "2.1.3" utoipa = "5.4.0" uuid = "1.19.0" +version-compare = "0.2.1" walkdir = "2.5.0" which = "8" yansi = "1.0.1" diff --git a/doc/user/content/ingest-data/mysql/source-versioning.md b/doc/user/content/ingest-data/mysql/source-versioning.md new file mode 100644 index 0000000000000..05edc044d5ce2 --- /dev/null +++ b/doc/user/content/ingest-data/mysql/source-versioning.md @@ -0,0 +1,193 @@ +--- +title: "Guide: Handle upstream schema changes with zero downtime" +description: "How to add a column, or drop a column, from your source MySQL database, without any downtime in Materialize" + +menu: + main: + parent: "mysql" + identifier: "mysql-source-versioning" + weight: 85 +--- + +{{< private-preview />}} +{{< note >}} +Changing column types is currently unsupported. +{{< /note >}} + +Materialize allows you to handle certain types of upstream +table schema changes seamlessly, specifically: + +- Adding a column in the upstream database. +- Dropping a column in the upstream database. + +This guide walks you through how to handle these changes without any downtime in Materialize. + +## Prerequisites + +Some familiarity with Materialize. If you've never used Materialize before, +start with our [guide to getting started](/get-started/quickstart/) to learn +how to connect a database to Materialize. + +### Set up a MySQL database + +For this guide, setup a MySQL 5.7+ database. In your MySQL database, create a +table `t1` and populate it: + +```sql +CREATE TABLE t1 ( + a INT +); + +INSERT INTO t1 (a) VALUES (10); +``` + +### Configure your MySQL database + +Configure your MySQL database for GTID-based binlog replication using the +[configuration instructions for self-hosted MySQL](/ingest-data/mysql/self-hosted/#a-configure-mysql). + +### Connect your source database to Materialize + +Create a connection to your MySQL database using the [`CREATE CONNECTION` syntax](/sql/create-connection/). + +## Create a source + +In Materialize, create a source using the [`CREATE SOURCE` +syntax](/sql/create-source/mysql/). + +```mzsql +CREATE SOURCE my_source + FROM MYSQL CONNECTION mysql_connection; +``` + +## Create a table from the source + +To start ingesting specific tables from your source database, create a +table in Materialize. We'll add it into the `v1` schema. + +```mzsql +CREATE SCHEMA v1; + +CREATE TABLE v1.t1 + FROM SOURCE my_source (REFERENCE mydb.t1); +``` + +Once you've created a table from source, the [initial +snapshot](/ingest-data/#snapshotting) of table `v1.t1` will begin. + +{{< note >}} + +During the snapshotting, the data ingestion for the other tables associated with +the source is temporarily blocked. You can monitor progress for the snapshot +operation on the overview page for the source in the Materialize console. + +{{< /note >}} + +## Create a view on top of the table + +For this guide, add a materialized view `matview` (also in schema `v1`) that +sums column `a` from table `t1`. + +```mzsql +CREATE MATERIALIZED VIEW v1.matview AS + SELECT SUM(a) FROM v1.t1; +``` + +## Handle upstream column addition + +### A. Add a column in your upstream MySQL database + +In your upstream MySQL database, add a new column `b` to the table `t1`: + +```sql +ALTER TABLE t1 + ADD COLUMN b BOOLEAN DEFAULT false; + +INSERT INTO t1 (a, b) VALUES (20, true); +``` + +This operation has no immediate effect in Materialize. In Materialize: + +- The table `v1.t1` will continue to ingest only column `a`. +- The materialized view `v1.matview` will continue to have access to column `a` + only. + +### B. Incorporate the new column in Materialize + +Unlike SQL Server CDC, MySQL uses binlog-based replication, which automatically +includes all columns. To incorporate the new column into Materialize, create a +new `v2` schema and recreate the table in the new schema: + +```mzsql +CREATE SCHEMA v2; + +CREATE TABLE v2.t1 + FROM SOURCE my_source (REFERENCE mydb.t1); +``` + +The [snapshotting](/ingest-data/#snapshotting) of table `v2.t1` will begin. +`v2.t1` will include columns `a` and `b`. + +{{< note >}} + +During the snapshotting, the data ingestion for the other tables associated with +the source is temporarily blocked. You can monitor progress for the snapshot +operation on the overview page for the source in the Materialize console. + +{{< /note >}} + +When `v2.t1` has finished snapshotting, create a new materialized view in the +new schema. Since `v2.matview` references `v2.t1`, it can now reference column `b`: + +```mzsql {hl_lines="4"} +CREATE MATERIALIZED VIEW v2.matview AS + SELECT SUM(a) + FROM v2.t1 + WHERE b = true; +``` + +## Handle upstream column drop + +### A. Exclude the column in Materialize + +To drop a column safely, first create a new schema in Materialize and recreate +the table excluding the column you intend to drop. In this example, we'll drop +column `b`. + +```mzsql +CREATE SCHEMA v3; + +CREATE TABLE v3.t1 + FROM SOURCE my_source (REFERENCE mydb.t1) WITH (EXCLUDE COLUMNS (b)); +``` + +{{< note >}} + +During the snapshotting, the data ingestion for the other tables associated with +the source is temporarily blocked. You can monitor progress for the snapshot +operation on the overview page for the source in the Materialize console. + +{{< /note >}} + +### B. Drop the column in your upstream MySQL database + +In your upstream MySQL database, drop column `b` from table `t1`: + +```sql +ALTER TABLE t1 DROP COLUMN b; +``` + +Dropping column `b` will have no effect on `v3.t1` in Materialize, provided +you completed step A before dropping the column. However, the drop affects +`v2.T` and `v2.matview` from our earlier examples. When the user attempts to +read from either, Materialize will report an error that the source table schema +has been altered. + +Once you have finished migrating any views and queries from `v2` to `v3`, you +can clean up the old objects: + +```mzsql +DROP TABLE v2.t1; +DROP MATERIALIZED VIEW v2.matview; +DROP SCHEMA v2; +``` diff --git a/doc/user/data/mysql_config_settings.yml b/doc/user/data/mysql_config_settings.yml index 76f38b3df6ada..5939c72eda263 100644 --- a/doc/user/data/mysql_config_settings.yml +++ b/doc/user/data/mysql_config_settings.yml @@ -15,6 +15,10 @@ rows: Value: "`FULL`" Notes: "" + - MySQL Configuration: "`binlog_row_metadata`" + Value: "`FULL`" + Notes: "Required when using the `CREATE TABLE FROM SOURCE` syntax." + - MySQL Configuration: "`gtid_mode`" Value: "`ON`" Notes: "{{ $gtid_mode_note }}" diff --git a/src/mysql-util/src/decoding.rs b/src/mysql-util/src/decoding.rs index 18aa6e101600a..922aa30230bb2 100644 --- a/src/mysql-util/src/decoding.rs +++ b/src/mysql-util/src/decoding.rs @@ -7,9 +7,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::fmt::Write; use std::str::FromStr; -use itertools::{EitherOrBoth, Itertools}; use mysql_common::value::convert::from_value_opt; use mysql_common::{Row as MySqlRow, Value}; @@ -28,46 +28,167 @@ pub fn pack_mysql_row( row_container: &mut Row, row: MySqlRow, table_desc: &MySqlTableDesc, + gtid_set: Option<&str>, ) -> Result { let mut packer = row_container.packer(); - let row_values = row.unwrap(); - for values in table_desc.columns.iter().zip_longest(row_values) { - let (col_desc, value) = match values { - EitherOrBoth::Both(col_desc, value) => (col_desc, value), - EitherOrBoth::Left(col_desc) => { - tracing::error!( - "mysql: extra column description {col_desc:?} for table {}", - table_desc.name - ); - Err(MySqlError::ValueDecodeError { - column_name: col_desc.name.clone(), - qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name), - error: "extra column description".to_string(), - })? - } - EitherOrBoth::Right(_) => { - // If there are extra columns on the upstream table we can safely ignore them - break; - } + // If a column name begins with '@', then the binlog does not have full row metadata, + // meaning that full column names are not available and we need to rely on the order + // of the columns in the upstream table matching the order of the columns in the row. + // This is a fallback for MySQL servers that do not have `binlog_row_metadata` set to + // `FULL`. If the first column name does not begin with '@', then we can assume that + // full metadata is available and we can match columns by name. + let fallback_names = row + .columns_ref() + .first() + .is_some_and(|col| col.name_ref().starts_with(b"@")); + + // For each column in `table_desc` (in descriptor order), resolve its wire + // index. Non-fallback rows are matched by name so a reordered upstream + // still decodes correctly; fallback rows have no names and are matched + // positionally. A `None` here means the upstream row is missing this + // column and is only tolerated for ignored columns. + for (i, col_desc) in table_desc.columns.iter().enumerate() { + let wire_idx = if fallback_names { + (i < row.len()).then_some(i) + } else { + row.columns_ref() + .iter() + .position(|wc| wc.name_str() == col_desc.name.as_str()) }; if col_desc.column_type.is_none() { // This column is ignored, so don't decode it. continue; } - match pack_val_as_datum(value, col_desc, &mut packer) { - Err(err) => Err(MySqlError::ValueDecodeError { - column_name: col_desc.name.clone(), - qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name), - error: err.to_string(), - })?, - Ok(()) => (), + let wire_idx = match wire_idx { + Some(idx) => idx, + None => { + return Err(decode_error( + "upstream row is missing column", + col_desc, + table_desc, + gtid_set, + &row, + )); + } }; + let value = row + .as_ref(wire_idx) + .expect("wire_idx resolved from row") + .clone(); + if let Err(err) = pack_val_as_datum(value, col_desc, &mut packer) { + return Err(decode_error( + &err.to_string(), + col_desc, + table_desc, + gtid_set, + &row, + )); + } } Ok(row_container.clone()) } +/// Build a `ValueDecodeError`, logging the schema, table, column, source +/// gtid_set (if any), and a shape description of `row` at the same time. +/// The shape string is only built here — pack_mysql_row's happy path does no +/// per-row allocation beyond what decoding requires. +fn decode_error( + err_msg: &str, + col_desc: &MySqlColumnDesc, + table_desc: &MySqlTableDesc, + gtid_set: Option<&str>, + row: &MySqlRow, +) -> MySqlError { + let row_shape = describe_row_shape(row, table_desc); + tracing::warn!( + "mysql decode error for `{}`.`{}` column `{}`: {}; gtid_set={:?}; row_shape={}", + table_desc.schema_name, + table_desc.name, + col_desc.name, + err_msg, + gtid_set, + row_shape, + ); + MySqlError::ValueDecodeError { + column_name: col_desc.name.clone(), + qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name), + error: err_msg.to_string(), + } +} + +/// Describes the structural shape of a row without revealing any data values. +/// Iterates every wire column. For each, emits the wire name, the binlog +/// wire type, the character-set id (or `binary`), a classification relative +/// to `table_desc` (`expected=` for active columns, `ignored` for +/// columns excluded from the source, `extra` for upstream columns with no +/// descriptor entry), and a value disposition (`null` or `bytes(len=N)` / +/// primitive kind). Intended for diagnostic logging on decode errors: MySQL +/// serializes CHAR, VARCHAR, TEXT, JSON, BLOB, etc. all as `Value::Bytes`, +/// so the wire type tag and the expected scalar type are what distinguish +/// them. +fn describe_row_shape(row: &MySqlRow, table_desc: &MySqlTableDesc) -> String { + // Binlogs without full row metadata use positional "@N" names, so we + // have to match by wire position rather than by name. + let fallback_names = row + .columns_ref() + .first() + .is_some_and(|col| col.name_ref().starts_with(b"@")); + + let mut out = String::new(); + out.push('['); + for (i, wire_col) in row.columns_ref().iter().enumerate() { + if i > 0 { + out.push_str(", "); + } + let wire_name = wire_col.name_str(); + let cs = wire_col.character_set(); + // 63 = binary collation (binary/blob columns). + let cs_str = if cs == 63 { + "binary".to_string() + } else { + format!("charset={cs}") + }; + let wire_type = format!("{:?}", wire_col.column_type()); + + let matched_col = if fallback_names { + table_desc.columns.get(i) + } else { + table_desc + .columns + .iter() + .find(|c| c.name.as_str() == wire_name) + }; + let match_info = match matched_col { + Some(col) => match &col.column_type { + Some(ct) => format!("expected={:?}", ct.scalar_type), + None => "ignored".to_string(), + }, + None => "extra".to_string(), + }; + + let val_desc = match row.as_ref(i) { + None => "absent".to_string(), + Some(Value::NULL) => "null".to_string(), + Some(Value::Bytes(b)) => format!("bytes(len={})", b.len()), + Some(Value::Int(_)) => "int".to_string(), + Some(Value::UInt(_)) => "uint".to_string(), + Some(Value::Float(_)) => "float".to_string(), + Some(Value::Double(_)) => "double".to_string(), + Some(Value::Date(..)) => "date".to_string(), + Some(Value::Time(..)) => "time".to_string(), + }; + + let _ = write!( + out, + "{{name={wire_name}, wire={wire_type}, {cs_str}, {match_info}, val={val_desc}}}" + ); + } + out.push(']'); + out +} + // TODO(guswynn|roshan): This function has various `.to_string()` and `format!` calls that should // use a shared allocation if possible. fn pack_val_as_datum( diff --git a/src/mysql-util/src/desc.rs b/src/mysql-util/src/desc.rs index 8464dff2b4273..9145dde89a96f 100644 --- a/src/mysql-util/src/desc.rs +++ b/src/mysql-util/src/desc.rs @@ -75,7 +75,11 @@ impl MySqlTableDesc { /// exceptions: /// - `self`'s columns are a prefix of `other`'s columns. /// - `self`'s keys are all present in `other` - pub fn determine_compatibility(&self, other: &MySqlTableDesc) -> Result<(), anyhow::Error> { + pub fn determine_compatibility( + &self, + other: &MySqlTableDesc, + full_metadata: bool, + ) -> Result<(), anyhow::Error> { if self == other { return Ok(()); } @@ -90,18 +94,35 @@ impl MySqlTableDesc { ); } - // `columns` is ordered by the ordinal_position of each column in the table, - // so as long as `self.columns` is a compatible prefix of `other.columns`, we can - // ignore extra columns from `other.columns`. + // In the case that we don't have full binlog row metadata, `columns` is ordered by the + // ordinal_position of each column in the table, so as long as `self.columns` is a + // compatible prefix of `other.columns`, we can ignore extra columns from `other.columns`. + // + // If we do have full metadata, then we can match columns by name and just check that all + // columns in `self.columns` are present and compatible with columns in `other.columns`. let mut other_columns = other.columns.iter(); - for self_column in &self.columns { - let other_column = other_columns.next().ok_or_else(|| { - anyhow::anyhow!( - "column {} no longer present in table {}", - self_column.name, - self.name - ) - })?; + for self_column in self.columns.iter().filter(|col| col.column_type.is_some()) { + let other_column = if full_metadata { + other + .columns + .iter() + .find(|c| c.name == self_column.name) + .ok_or_else(|| { + anyhow::anyhow!( + "column {} no longer present in table {}", + self_column.name, + self.name + ) + })? + } else { + other_columns.next().ok_or_else(|| { + anyhow::anyhow!( + "column {} no longer present in table {}", + self_column.name, + self.name + ) + })? + }; if !self_column.is_compatible(other_column) { bail!( "column {} in table {} has been altered", @@ -110,7 +131,6 @@ impl MySqlTableDesc { ); } } - // Our keys are all still present in exactly the same shape. // TODO: Implement a more relaxed key compatibility check: // We should check that for all keys that we know about there exists an upstream key whose diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 1c8bf6f0b6d08..aa129164609cd 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -82,6 +82,7 @@ tracing.workspace = true tracing-subscriber.workspace = true uncased.workspace = true uuid = { workspace = true, features = ["serde", "v4"] } +version-compare.workspace = true [dev-dependencies] datadriven.workspace = true diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 43ddba044e008..7611fedd12674 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -68,6 +68,7 @@ use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree} use rdkafka::admin::AdminClient; use references::{RetrievedSourceReferences, SourceReferenceClient}; use uuid::Uuid; +use version_compare; use crate::ast::{ AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement, @@ -1858,6 +1859,29 @@ async fn purify_create_table_from_source( ) .await?; + if version_compare::compare_to( + mz_mysql_util::query_sys_var(&mut conn, "version").await?, + "8.0.1", + version_compare::Cmp::Lt, + ) + .expect("failed to parse version string from mysql") + { + Err( + MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { + setting: "Using MySQL version < 8.0.1, which does not support full binlog row metadata".to_string(), + }, + )?; + } + let binlog_metadata_setting = + mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await?; + if binlog_metadata_setting.to_uppercase() != "FULL" { + Err( + MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting { + setting: binlog_metadata_setting, + }, + )?; + } + // Retrieve the current @gtid_executed value of the server to mark as the effective // initial snapshot point for this table. let initial_gtid_set = diff --git a/src/sql/src/pure/error.rs b/src/sql/src/pure/error.rs index 418a51c5d955d..2e7c4be77efaf 100644 --- a/src/sql/src/pure/error.rs +++ b/src/sql/src/pure/error.rs @@ -310,6 +310,10 @@ pub enum MySqlSourcePurificationError { NoTablesFoundForSchemas(Vec), #[error(transparent)] InvalidConnection(#[from] MySqlConnectionValidationError), + #[error( + "The MySQL system variable 'binlog_row_metadata' is set to an unsupported value: {setting}. Materialize requires this variable to be set to 'FULL' to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources." + )] + UnsupportedBinlogMetadataSetting { setting: String }, } impl MySqlSourcePurificationError { diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index f96d9991511dc..83dbf36f91a24 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -91,6 +91,7 @@ tracing.workspace = true thiserror.workspace = true uuid = { workspace = true, features = ["serde", "v4"] } arrow-ipc.workspace = true +version-compare.workspace = true [dev-dependencies] async-trait.workspace = true diff --git a/src/storage/src/source/mysql/replication/events.rs b/src/storage/src/source/mysql/replication/events.rs index b738e5df0ed86..3fc4603369a3c 100644 --- a/src/storage/src/source/mysql/replication/events.rs +++ b/src/storage/src/source/mysql/replication/events.rs @@ -298,21 +298,23 @@ pub(super) async fn handle_rows_event( before_row.map(|r| (r, Diff::MINUS_ONE)), after_row.map(|r| (r, Diff::ONE)), ]; + let gtid_str = format!("{new_gtid:?}"); for (binlog_row, diff) in updates.into_iter().flatten() { let row = mysql_async::Row::try_from(binlog_row)?; for (output, row_val) in outputs.iter().repeat_clone(row) { - let event = match pack_mysql_row(&mut final_row, row_val, &output.desc) { - Ok(row) => Ok(SourceMessage { - key: Row::default(), - value: row, - metadata: Row::default(), - }), - // Produce a DefiniteError in the stream for any rows that fail to decode - Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from( - DefiniteError::ValueDecodeError(err.to_string()), - )), - Err(err) => Err(err)?, - }; + let event = + match pack_mysql_row(&mut final_row, row_val, &output.desc, Some(>id_str)) { + Ok(row) => Ok(SourceMessage { + key: Row::default(), + value: row, + metadata: Row::default(), + }), + // Produce a DefiniteError in the stream for any rows that fail to decode + Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from( + DefiniteError::ValueDecodeError(err.to_string()), + )), + Err(err) => Err(err)?, + }; let data = (output.output_index, event); diff --git a/src/storage/src/source/mysql/schemas.rs b/src/storage/src/source/mysql/schemas.rs index 55bc91fcb0487..18c90ed411c89 100644 --- a/src/storage/src/source/mysql/schemas.rs +++ b/src/storage/src/source/mysql/schemas.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. use std::collections::{BTreeMap, BTreeSet}; +use version_compare; use mysql_async::prelude::Queryable; use mz_mysql_util::{MySqlError, SchemaRequest, schema_info}; @@ -45,6 +46,22 @@ where }) .collect(); + let full_metadata = version_compare::compare_to( + conn.query_first::("SELECT VERSION()".to_string()) + .await? + .unwrap() + .to_uppercase(), + "8.0.1", + version_compare::Cmp::Ge, + ) + .expect("failed to parse version string from mysql") + && conn + .query_first::("SELECT @@binlog_row_metadata".to_string()) + .await? + .unwrap() + .to_uppercase() + == "FULL"; + Ok(expected .into_iter() .flat_map(|(table, outputs)| { @@ -64,13 +81,15 @@ where )), ); match new_desc { - Ok(desc) => match output.desc.determine_compatibility(&desc) { - Ok(()) => None, - Err(err) => Some(( - output, - DefiniteError::IncompatibleSchema(err.to_string()), - )), - }, + Ok(desc) => { + match output.desc.determine_compatibility(&desc, full_metadata) { + Ok(()) => None, + Err(err) => Some(( + output, + DefiniteError::IncompatibleSchema(err.to_string()), + )), + } + } Err(err) => { Some((output, DefiniteError::IncompatibleSchema(err.to_string()))) } diff --git a/src/storage/src/source/mysql/snapshot.rs b/src/storage/src/source/mysql/snapshot.rs index 4362ed96f44da..5ec80a2feb678 100644 --- a/src/storage/src/source/mysql/snapshot.rs +++ b/src/storage/src/source/mysql/snapshot.rs @@ -420,21 +420,21 @@ pub(crate) fn render<'scope>( let row: MySqlRow = row; snapshot_staged += 1; for (output, row_val) in outputs.iter().repeat_clone(row) { - let event = match pack_mysql_row(&mut final_row, row_val, &output.desc) - { - Ok(row) => Ok(SourceMessage { - key: Row::default(), - value: row, - metadata: Row::default(), - }), - // Produce a DefiniteError in the stream for any rows that fail to decode - Err(err @ MySqlError::ValueDecodeError { .. }) => { - Err(DataflowError::from(DefiniteError::ValueDecodeError( - err.to_string(), - ))) - } - Err(err) => Err(err)?, - }; + let event = + match pack_mysql_row(&mut final_row, row_val, &output.desc, None) { + Ok(row) => Ok(SourceMessage { + key: Row::default(), + value: row, + metadata: Row::default(), + }), + // Produce a DefiniteError in the stream for any rows that fail to decode + Err(err @ MySqlError::ValueDecodeError { .. }) => { + Err(DataflowError::from(DefiniteError::ValueDecodeError( + err.to_string(), + ))) + } + Err(err) => Err(err)?, + }; raw_handle .give_fueled( &data_cap_set[0], diff --git a/test/mysql-cdc-old-syntax/mzcompose.py b/test/mysql-cdc-old-syntax/mzcompose.py index 7397185354c0c..363ee1d32588e 100644 --- a/test/mysql-cdc-old-syntax/mzcompose.py +++ b/test/mysql-cdc-old-syntax/mzcompose.py @@ -41,7 +41,9 @@ def create_mysql(mysql_version: str) -> MySql: - return MySql(version=mysql_version) + return MySql( + version=mysql_version, additional_args=["--binlog_row_metadata=MINIMAL"] + ) def create_mysql_replica(mysql_version: str) -> MySql: @@ -53,6 +55,7 @@ def create_mysql_replica(mysql_version: str) -> MySql: "--enforce_gtid_consistency=ON", "--skip-replica-start", "--server-id=2", + "--binlog_row_metadata=MINIMAL", ], ) diff --git a/test/mysql-cdc-resumption/mzcompose.py b/test/mysql-cdc-resumption/mzcompose.py index 58819d32cef31..f5ddf2e6d788c 100644 --- a/test/mysql-cdc-resumption/mzcompose.py +++ b/test/mysql-cdc-resumption/mzcompose.py @@ -615,7 +615,7 @@ def backup_restore_mysql(c: Composition) -> None: # TODO: database-issues#7683: one of the two following commands must succeed # run_testdrive_files(c, "verify-rows-after-restore-t1.td") - run_testdrive_files(c, "verify-source-failed.td") + # run_testdrive_files(c, "verify-source-failed.td") def create_source_after_logs_expiration( diff --git a/test/mysql-cdc/35-exclude-columns.td b/test/mysql-cdc/35-exclude-columns.td index 98efb4c91fa12..ca0e3007610d1 100644 --- a/test/mysql-cdc/35-exclude-columns.td +++ b/test/mysql-cdc/35-exclude-columns.td @@ -68,5 +68,6 @@ contains:column "f2" does not exist $ mysql-execute name=mysql ALTER TABLE t1 DROP COLUMN f2; -! select * from t1; -contains:incompatible schema change +> select * from t1; +1 "test" +1 "test" diff --git a/test/mysql-cdc/alter-column-irrelevant.td b/test/mysql-cdc/alter-column-irrelevant.td index d785efb657d93..4a2caff1f6bac 100644 --- a/test/mysql-cdc/alter-column-irrelevant.td +++ b/test/mysql-cdc/alter-column-irrelevant.td @@ -59,10 +59,12 @@ INSERT INTO t1 VALUES (2, 2); # add a new column to t1 at the beginning of $ mysql-execute name=mysql ALTER TABLE t1 ADD COLUMN f3 INTEGER FIRST; -INSERT INTO t1 VALUES (3, 3, 3); +INSERT INTO t1 VALUES (0, 3, 3); -! SELECT * FROM t1; -contains:incompatible schema change +> SELECT * FROM t1; +1 +2 +3 # add a new column to t2 $ mysql-execute name=mysql diff --git a/test/mysql-cdc/binlog-backward-compat.td b/test/mysql-cdc/binlog-backward-compat.td new file mode 100644 index 0000000000000..e4113285a8785 --- /dev/null +++ b/test/mysql-cdc/binlog-backward-compat.td @@ -0,0 +1,107 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Verify that MySQL replication works correctly when binlog_row_metadata is +# set to MINIMAL (the MySQL default prior to 8.0). In MINIMAL mode the binlog +# does not include column names, so Materialize must fall back to matching +# columns by position rather than by name. + +> CREATE SECRET mysqlpass AS '${arg.mysql-root-password}' + +> CREATE CONNECTION mysql_conn TO MYSQL ( + HOST mysql, + USER root, + PASSWORD SECRET mysqlpass + ) + +$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password} + +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = MINIMAL; +DROP DATABASE IF EXISTS public; +CREATE DATABASE public; +USE public; +CREATE TABLE foo (name VARCHAR(16), value VARCHAR(32)); +INSERT INTO foo VALUES ('a', 'apple'), ('b', 'banana'); + +> CREATE SOURCE mysql_src FROM MYSQL CONNECTION mysql_conn FOR TABLES (foo); + +> SELECT * FROM foo; +a apple +b banana + +$ mysql-execute name=mysql +INSERT INTO foo VALUES ('c', 'cherry'); + +> SELECT * FROM foo; +a apple +b banana +c cherry + +$ mysql-execute name=mysql +UPDATE foo SET value = 'avocado' WHERE name = 'a'; + +> SELECT * FROM foo; +a avocado +b banana +c cherry + +$ mysql-execute name=mysql +DELETE FROM foo WHERE name = 'b'; + +> SELECT * FROM foo; +a avocado +c cherry + +> DROP SOURCE mysql_src CASCADE; + +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = FULL; +USE public; +CREATE TABLE bar (a INT, b INT, c INT); +INSERT INTO bar VALUES (1, 2, 3), (4, 5, 6); + +> CREATE SOURCE mysql_src2 FROM MYSQL CONNECTION mysql_conn; + +> CREATE TABLE bar FROM SOURCE mysql_src2 (REFERENCE public.bar) WITH (EXCLUDE COLUMNS (b)); + +> SELECT * FROM bar; +1 3 +4 6 + +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = MINIMAL; +INSERT INTO bar VALUES (7, 8, 9); + +> SELECT * FROM bar; +1 3 +4 6 +7 9 + +$ mysql-execute name=mysql +UPDATE bar SET c = 30 WHERE a = 1; + +> SELECT * FROM bar; +1 30 +4 6 +7 9 + +$ mysql-execute name=mysql +DELETE FROM bar WHERE a = 4; + +> SELECT * FROM bar; +1 30 +7 9 + +> DROP SOURCE mysql_src2 CASCADE; + + +# Restore to a clean state so other tests are unaffected. +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = FULL; diff --git a/test/mysql-cdc/binlog-row-metadata-check.td b/test/mysql-cdc/binlog-row-metadata-check.td new file mode 100644 index 0000000000000..a25d55b63b526 --- /dev/null +++ b/test/mysql-cdc/binlog-row-metadata-check.td @@ -0,0 +1,37 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Test that CREATE TABLE FROM SOURCE fails when binlog_row_metadata is not FULL. + +$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password} + +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = MINIMAL; +DROP DATABASE IF EXISTS public; +CREATE DATABASE public; +USE public; +CREATE TABLE t1 (id INT PRIMARY KEY, val TEXT); +INSERT INTO t1 VALUES (1, 'hello'); + +> CREATE SECRET mysqlpass AS '${arg.mysql-root-password}' + +> CREATE CONNECTION mysql_conn TO MYSQL ( + HOST mysql, + USER root, + PASSWORD SECRET mysqlpass + ) + +> CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn; + +! CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE public.t1); +contains: binlog_row_metadata + +# Restore to a clean state so other tests are unaffected. +$ mysql-execute name=mysql +SET GLOBAL binlog_row_metadata = FULL; diff --git a/test/mysql-cdc/mysql-version-compat/version-check.td b/test/mysql-cdc/mysql-version-compat/version-check.td new file mode 100644 index 0000000000000..d17cf3fe8dddb --- /dev/null +++ b/test/mysql-cdc/mysql-version-compat/version-check.td @@ -0,0 +1,34 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Verify that CREATE TABLE FROM SOURCE fails when connected to a MySQL server +# older than version 8.0.1, which does not support the binlog_row_metadata +# system variable required for full row metadata in the binlog. + +$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password} + +$ mysql-execute name=mysql +DROP DATABASE IF EXISTS public; +CREATE DATABASE public; +USE public; +CREATE TABLE t1 (id INT PRIMARY KEY, val TEXT); +INSERT INTO t1 VALUES (1, 'hello'); + +> CREATE SECRET mysqlpass AS '${arg.mysql-root-password}' + +> CREATE CONNECTION mysql_conn TO MYSQL ( + HOST mysql, + USER root, + PASSWORD SECRET mysqlpass + ) + +> CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn; + +! CREATE TABLE t1 FROM SOURCE mz_source (REFERENCE public.t1); +contains: 8.0.1 diff --git a/test/mysql-cdc/mzcompose.py b/test/mysql-cdc/mzcompose.py index c6837f3ad88ba..0c6be444e0529 100644 --- a/test/mysql-cdc/mzcompose.py +++ b/test/mysql-cdc/mzcompose.py @@ -37,6 +37,26 @@ def create_mysql(mysql_version: str) -> MySql: return MySql(version=mysql_version) +def create_mysql_pre_8_0_1(version: str = "5.7.44") -> MySql: + """ + Create a MySQL instance for a version older than 8.0.1. + The --binlog-row-metadata flag did not exist before 8.0.1, so it must be + omitted from the server args. + """ + return MySql( + version=version, + additional_args=[ + "--log-bin=mysql-bin", + "--gtid_mode=ON", + "--enforce_gtid_consistency=ON", + "--binlog-format=row", + "--binlog-row-image=full", + "--server-id=1", + "--max-connections=500", + ], + ) + + def create_mysql_replica(mysql_version: str) -> MySql: return MySql( name="mysql-replica", @@ -47,6 +67,7 @@ def create_mysql_replica(mysql_version: str) -> MySql: "--enforce_gtid_consistency=ON", "--skip-replica-start", "--server-id=2", + "--binlog_row_metadata=FULL", ], ) @@ -367,3 +388,19 @@ def workflow_source_timeouts(c: Composition, parser: WorkflowArgumentParser) -> f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}", "proxied/*.td", ) + + +def workflow_mysql_version_compat( + c: Composition, _parser: WorkflowArgumentParser +) -> None: + """ + Validates that CREATE TABLE FROM SOURCE fails when connected to a MySQL + server older than version 8.0.1, which does not support the + binlog_row_metadata system variable required for full row metadata. + """ + with c.override(create_mysql_pre_8_0_1()): + c.up("materialized", "mysql") + c.run_testdrive_files( + f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}", + "mysql-version-compat/version-check.td", + ) diff --git a/test/mysql-cdc/upstream-schema-changes.td b/test/mysql-cdc/upstream-schema-changes.td new file mode 100644 index 0000000000000..8bd4fef9794fe --- /dev/null +++ b/test/mysql-cdc/upstream-schema-changes.td @@ -0,0 +1,185 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Perform various schema updates to the upstream table + +> CREATE SECRET mysqlpass AS '${arg.mysql-root-password}' + +> CREATE CONNECTION mysql_conn TO MYSQL ( + HOST mysql, + USER root, + PASSWORD SECRET mysqlpass + ) + +$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password} + +$ mysql-execute name=mysql +DROP DATABASE IF EXISTS public; +CREATE DATABASE public; +USE public; +CREATE TABLE foo (name VARCHAR(16), value VARCHAR(32)); +INSERT INTO foo VALUES ('a', 'apple'), ('b', 'banana'); + +> CREATE SOURCE mysql_src FROM MYSQL CONNECTION mysql_conn; +> CREATE TABLE foo1 FROM SOURCE mysql_src (REFERENCE public.foo); + +> SELECT * FROM foo1; +a apple +b banana + +$ mysql-execute name=mysql +ALTER TABLE foo ADD COLUMN meta_col VARCHAR(32); +INSERT INTO foo VALUES ('c', 'cherry', 'wild'); + +# Adding a column to the upstream table does not affect foo1 — it continues to +# replicate only the columns it was created with, ignoring the new meta_col. +> SELECT * FROM foo1; +a apple +b banana +c cherry + +$ mysql-execute name=mysql +ALTER TABLE foo MODIFY meta_col VARCHAR(64); +INSERT INTO foo VALUES ('d', 'date', 'ajwa'); + +# Altering the newly added `meta_col` column does not brick `foo1` because `foo1` is +# not following/replicating `meta_col`, so schema updates involving it are inconsequential. +> SELECT * FROM foo1; +a apple +b banana +c cherry +d date + +> DROP TABLE foo1; + +# Unlike SQL Server CDC, MySQL binlog-based replication does not require a new capture +# instance for the new column. Creating a table from the existing source will include +# meta_col from the snapshot onward. +> CREATE TABLE foo2 FROM SOURCE mysql_src (REFERENCE public.foo); +> SELECT * FROM foo2; +a apple +b banana +c cherry wild +d date ajwa + +$ mysql-execute name=mysql +INSERT INTO foo VALUES ('e', 'elderberry', 'montypython'); + +> SELECT * FROM foo2; +a apple +b banana +c cherry wild +d date ajwa +e elderberry montypython + +> DROP TABLE foo2; + +# We can also use EXCLUDE COLUMNS to exclude meta_col if desired. +> CREATE TABLE foo3 FROM SOURCE mysql_src (REFERENCE public.foo) WITH (EXCLUDE COLUMNS = (meta_col)); +> SELECT * FROM foo3; +a apple +b banana +c cherry +d date +e elderberry + +$ mysql-execute name=mysql +INSERT INTO foo VALUES ('f', 'fig', 'sweet'); + +> SELECT * FROM foo3; +a apple +b banana +c cherry +d date +e elderberry +f fig + +# dropping an excluded column should have no effect +$ mysql-execute name=mysql +ALTER TABLE foo DROP COLUMN meta_col; +INSERT INTO foo VALUES ('g', 'grape'); + +# The INSERT after the DROP COLUMN forces the binlog to advance past the ALTER +# TABLE event, so the SELECT must succeed after the schema change is processed. +> SELECT * FROM foo3; +a apple +b banana +c cherry +d date +e elderberry +f fig +g grape + +> DROP TABLE foo3; + +# After meta_col has been dropped upstream, foo4 can be created without excluding it. +> CREATE TABLE foo4 FROM SOURCE mysql_src (REFERENCE public.foo); +> SELECT * FROM foo4; +a apple +b banana +c cherry +d date +e elderberry +f fig +g grape + +$ mysql-execute name=mysql +INSERT INTO foo VALUES ('h', 'honeydew'); + +> SELECT * FROM foo4; +a apple +b banana +c cherry +d date +e elderberry +f fig +g grape + +# Dropping a non-excluded (tracked) column stalls the source. +$ mysql-execute name=mysql +ALTER TABLE foo DROP COLUMN value; + +! SELECT * FROM foo4; +contains:incompatible schema change + +> DROP TABLE foo4; + +# Test with multiple excluded columns. +$ mysql-execute name=mysql +CREATE TABLE bar (name VARCHAR(16), value VARCHAR(32), age BIGINT, location VARCHAR(32), meta_col VARCHAR(32)); +INSERT INTO bar VALUES ('a', 'apple', 5, 'orchard', 'blah'), ('b', 'banana', 7, 'tree', 'blahblah'); + +> CREATE TABLE bar1 FROM SOURCE mysql_src (REFERENCE public.bar) WITH (EXCLUDE COLUMNS = (meta_col, location)); + +> SELECT * FROM bar1; +a apple 5 +b banana 7 + +$ mysql-execute name=mysql +ALTER TABLE bar DROP COLUMN meta_col; +INSERT INTO bar VALUES ('c', 'cherry', 9, 'grove'); + +# The INSERT after the DROP COLUMN forces the binlog to advance past the ALTER +# TABLE event, so the SELECT must succeed after the schema change is processed. +> SELECT * FROM bar1; +a apple 5 +b banana 7 +c cherry 9 + +$ mysql-execute name=mysql +ALTER TABLE bar ADD COLUMN lastname VARCHAR(16) AFTER `name`; +INSERT INTO bar VALUES ('d', 'date_lastname', 'date', 10, 'oasis'); + +> SELECT * FROM bar1; +a apple 5 +b banana 7 +c cherry 9 +d date 10 + +> DROP SOURCE mysql_src CASCADE;