Skip to content

Commit 7dcefc4

Browse files
1 parent 1b5481c commit 7dcefc4

6 files changed

Lines changed: 42 additions & 7 deletions

File tree

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,7 @@ url = "2.5.8"
525525
urlencoding = "2.1.3"
526526
utoipa = "5.4.0"
527527
uuid = "1.19.0"
528+
version-compare = "0.2.1"
528529
walkdir = "2.5.0"
529530
which = "8"
530531
yansi = "1.0.1"

src/sql/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ tracing.workspace = true
8282
tracing-subscriber.workspace = true
8383
uncased.workspace = true
8484
uuid = { workspace = true, features = ["serde", "v4"] }
85+
version-compare.workspace = true
8586

8687
[dev-dependencies]
8788
datadriven.workspace = true

src/sql/src/pure.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree}
6868
use rdkafka::admin::AdminClient;
6969
use references::{RetrievedSourceReferences, SourceReferenceClient};
7070
use uuid::Uuid;
71+
use version_compare;
7172

7273
use crate::ast::{
7374
AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
@@ -1858,9 +1859,22 @@ async fn purify_create_table_from_source(
18581859
)
18591860
.await?;
18601861

1862+
if version_compare::compare_to(
1863+
mz_mysql_util::query_sys_var(&mut conn, "version").await?,
1864+
"8.0.1",
1865+
version_compare::Cmp::Lt,
1866+
)
1867+
.expect("failed to parse version string from mysql")
1868+
{
1869+
Err(
1870+
MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting {
1871+
setting: "Using MySQL version < 8.0.1, which does not support full binlog row metadata".to_string(),
1872+
},
1873+
)?;
1874+
}
18611875
let binlog_metadata_setting =
18621876
mz_mysql_util::query_sys_var(&mut conn, "binlog_row_metadata").await?;
1863-
if binlog_metadata_setting != "FULL" {
1877+
if binlog_metadata_setting.to_uppercase() != "FULL" {
18641878
Err(
18651879
MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting {
18661880
setting: binlog_metadata_setting,

src/storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ tracing.workspace = true
9191
thiserror.workspace = true
9292
uuid = { workspace = true, features = ["serde", "v4"] }
9393
arrow-ipc.workspace = true
94+
version-compare.workspace = true
9495

9596
[dev-dependencies]
9697
async-trait.workspace = true

src/storage/src/source/mysql/schemas.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
// by the Apache License, Version 2.0.
99

1010
use std::collections::{BTreeMap, BTreeSet};
11+
use version_compare;
1112

1213
use mysql_async::prelude::Queryable;
1314
use mz_mysql_util::{MySqlError, SchemaRequest, schema_info};
@@ -45,12 +46,21 @@ where
4546
})
4647
.collect();
4748

48-
let full_metadata = conn
49-
.query_first::<String, String>("SELECT @@binlog_row_metadata".to_string())
50-
.await?
51-
.unwrap()
52-
.to_uppercase()
53-
== "FULL";
49+
let full_metadata = version_compare::compare_to(
50+
conn.query_first::<String, String>("SELECT VERSION()".to_string())
51+
.await?
52+
.unwrap()
53+
.to_uppercase(),
54+
"8.0.1",
55+
version_compare::Cmp::Ge,
56+
)
57+
.expect("failed to parse version string from mysql")
58+
&& conn
59+
.query_first::<String, String>("SELECT @@binlog_row_metadata".to_string())
60+
.await?
61+
.unwrap()
62+
.to_uppercase()
63+
== "FULL";
5464

5565
Ok(expected
5666
.into_iter()

0 commit comments

Comments
 (0)