Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 0 additions & 193 deletions doc/user/content/ingest-data/mysql/source-versioning.md

This file was deleted.

4 changes: 0 additions & 4 deletions doc/user/data/mysql_config_settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ rows:
Value: "`FULL`"
Notes: ""

- MySQL Configuration: "`binlog_row_metadata`"
Value: "`FULL`"
Notes: "Required when using the `CREATE TABLE FROM SOURCE` syntax."

- MySQL Configuration: "`gtid_mode`"
Value: "`ON`"
Notes: "{{ $gtid_mode_note }}"
Expand Down
39 changes: 2 additions & 37 deletions src/mysql-util/src/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,44 +30,9 @@ pub fn pack_mysql_row(
table_desc: &MySqlTableDesc,
) -> Result<Row, MySqlError> {
let mut packer = row_container.packer();
let row_values = row.unwrap();

// If a column name begins with '@', then the binlog does not have full row metadata,
// meaning that full column names are not available and we need to rely on the order
// of the columns in the upstream table matching the order of the columns in the row.
// This is a fallback for MySQL servers that do not have `binlog_row_metadata` set to
// `FULL`. If the first column name does not begin with '@', then we can assume that
// full metadata is available and we can match columns by name.
let row_values: Vec<Value> = if row
.columns_ref()
.first()
.is_some_and(|col| col.name_ref().starts_with(b"@"))
{
row.unwrap()
} else {
row.columns_ref()
.iter()
.enumerate()
.filter(|(_, col)| {
table_desc
.columns
.iter()
.filter(|col| col.column_type.is_some())
.any(|c| c.name.as_str() == col.name_str())
})
.map(|(i, _)| {
row.as_ref(i)
.expect("Can't unwrap row if some of columns was taken")
.clone()
})
.collect()
};

for values in table_desc
.columns
.iter()
.filter(|col| col.column_type.is_some())
.zip_longest(row_values)
{
for values in table_desc.columns.iter().zip_longest(row_values) {
let (col_desc, value) = match values {
EitherOrBoth::Both(col_desc, value) => (col_desc, value),
EitherOrBoth::Left(col_desc) => {
Expand Down
43 changes: 12 additions & 31 deletions src/mysql-util/src/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,7 @@ impl MySqlTableDesc {
/// exceptions:
/// - `self`'s columns are a prefix of `other`'s columns.
/// - `self`'s keys are all present in `other`
pub fn determine_compatibility(
&self,
other: &MySqlTableDesc,
full_metadata: bool,
) -> Result<(), anyhow::Error> {
pub fn determine_compatibility(&self, other: &MySqlTableDesc) -> Result<(), anyhow::Error> {
if self == other {
return Ok(());
}
Expand All @@ -94,34 +90,18 @@ impl MySqlTableDesc {
);
}

// In the case that we don't have full binlog row metadata, `columns` is ordered by the
// ordinal_position of each column in the table, so as long as `self.columns` is a
// compatible prefix of `other.columns`, we can ignore extra columns from `other.columns`.
//
// If we do have full metadata, then we can match columns by name and just check that all
// columns in `self.columns` are present and compatible with columns in `other.columns`.
// `columns` is ordered by the ordinal_position of each column in the table,
// so as long as `self.columns` is a compatible prefix of `other.columns`, we can
// ignore extra columns from `other.columns`.
let mut other_columns = other.columns.iter();
for self_column in &self.columns {
let other_column = if full_metadata {
other_columns
.by_ref()
.find(|c| c.name == self_column.name)
.ok_or_else(|| {
anyhow::anyhow!(
"column {} no longer present in table {}",
self_column.name,
self.name
)
})?
} else {
other_columns.next().ok_or_else(|| {
anyhow::anyhow!(
"column {} no longer present in table {}",
self_column.name,
self.name
)
})?
};
let other_column = other_columns.next().ok_or_else(|| {
anyhow::anyhow!(
"column {} no longer present in table {}",
self_column.name,
self.name
)
})?;
if !self_column.is_compatible(other_column) {
bail!(
"column {} in table {} has been altered",
Expand All @@ -130,6 +110,7 @@ impl MySqlTableDesc {
);
}
}

// Our keys are all still present in exactly the same shape.
// TODO: Implement a more relaxed key compatibility check:
// We should check that for all keys that we know about there exists an upstream key whose
Expand Down
10 changes: 0 additions & 10 deletions src/sql/src/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1858,16 +1858,6 @@ async fn purify_create_table_from_source(
)
.await?;

let binlog_metadata_setting =
mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await?;
if binlog_metadata_setting != "FULL" {
Err(
MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting {
setting: binlog_metadata_setting,
},
)?;
}

// Retrieve the current @gtid_executed value of the server to mark as the effective
// initial snapshot point for this table.
let initial_gtid_set =
Expand Down
4 changes: 0 additions & 4 deletions src/sql/src/pure/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,6 @@ pub enum MySqlSourcePurificationError {
NoTablesFoundForSchemas(Vec<String>),
#[error(transparent)]
InvalidConnection(#[from] MySqlConnectionValidationError),
#[error(
"The MySQL system variable 'binlog_row_metadata' is set to an unsupported value: {setting}. Materialize requires this variable to be set to 'FULL' to use the \"CREATE TABLE FROM SOURCE\" syntax for MySQL sources."
)]
UnsupportedBinlogMetadataSetting { setting: String },
}

impl MySqlSourcePurificationError {
Expand Down
23 changes: 7 additions & 16 deletions src/storage/src/source/mysql/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,6 @@ where
})
.collect();

let full_metadata = conn
.query_first::<String, String>("SELECT @@binlog_row_metadata".to_string())
.await?
.unwrap()
.to_uppercase()
== "FULL";

Ok(expected
.into_iter()
.flat_map(|(table, outputs)| {
Expand All @@ -71,15 +64,13 @@ where
)),
);
match new_desc {
Ok(desc) => {
match output.desc.determine_compatibility(&desc, full_metadata) {
Ok(()) => None,
Err(err) => Some((
output,
DefiniteError::IncompatibleSchema(err.to_string()),
)),
}
}
Ok(desc) => match output.desc.determine_compatibility(&desc) {
Ok(()) => None,
Err(err) => Some((
output,
DefiniteError::IncompatibleSchema(err.to_string()),
)),
},
Err(err) => {
Some((output, DefiniteError::IncompatibleSchema(err.to_string())))
}
Expand Down
Loading
Loading