Skip to content

Commit 0ef8fe7

Browse files
1 parent 2bbb5eb commit 0ef8fe7

2 files changed

Lines changed: 61 additions & 20 deletions

File tree

src/mysql-util/src/decoding.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,37 +37,37 @@ pub fn pack_mysql_row(
3737
// This is a fallback for MySQL servers that do not have `binlog_row_metadata` set to
3838
// `FULL`. If the first column name does not begin with '@', then we can assume that
3939
// full metadata is available and we can match columns by name.
40-
let row_values: Vec<Value> = if row
40+
let zip_values: Vec<EitherOrBoth<&MySqlColumnDesc, Value>> = if row
4141
.columns_ref()
4242
.first()
4343
.is_some_and(|col| col.name_ref().starts_with(b"@"))
4444
{
45-
row.unwrap()
45+
table_desc
46+
.columns
47+
.iter()
48+
.zip_longest(row.unwrap())
49+
.collect()
4650
} else {
47-
row.columns_ref()
51+
table_desc
52+
.columns
4853
.iter()
49-
.enumerate()
50-
.filter(|(_, col)| {
51-
table_desc
52-
.columns
54+
.filter(|col| col.column_type.is_some())
55+
.map(|col| {
56+
let pos = row
57+
.columns_ref()
5358
.iter()
54-
.filter(|col| col.column_type.is_some())
55-
.any(|c| c.name.as_str() == col.name_str())
56-
})
57-
.map(|(i, _)| {
58-
row.as_ref(i)
59-
.expect("Can't unwrap row if some of columns was taken")
60-
.clone()
59+
.position(|row_col| row_col.name_str() == col.name.as_str())
60+
.expect("column in table desc not found in row metadata");
61+
EitherOrBoth::Both(
62+
col,
63+
row.get(pos)
64+
.expect("Can't unwrap row if some of columns was taken"),
65+
)
6166
})
6267
.collect()
6368
};
6469

65-
for values in table_desc
66-
.columns
67-
.iter()
68-
.filter(|col| col.column_type.is_some())
69-
.zip_longest(row_values)
70-
{
70+
for values in zip_values {
7171
let (col_desc, value) = match values {
7272
EitherOrBoth::Both(col_desc, value) => (col_desc, value),
7373
EitherOrBoth::Left(col_desc) => {

test/mysql-cdc/binlog-backward-compat.td

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,47 @@ c cherry
6161

6262
> DROP SOURCE mysql_src CASCADE;
6363

64+
$ mysql-execute name=mysql
65+
SET GLOBAL binlog_row_metadata = FULL;
66+
USE public;
67+
CREATE TABLE bar (a INT, b INT, c INT);
68+
INSERT INTO bar VALUES (1, 2, 3), (4, 5, 6);
69+
70+
> CREATE SOURCE mysql_src2 FROM MYSQL CONNECTION mysql_conn;
71+
72+
> CREATE TABLE bar FROM SOURCE mysql_src2 (REFERENCE public.bar) WITH (EXCLUDE COLUMNS (b));
73+
74+
> SELECT * FROM bar;
75+
1 3
76+
4 6
77+
78+
$ mysql-execute name=mysql
79+
SET GLOBAL binlog_row_metadata = MINIMAL;
80+
INSERT INTO bar VALUES (7, 8, 9);
81+
82+
> SELECT * FROM bar;
83+
1 3
84+
4 6
85+
7 9
86+
87+
$ mysql-execute name=mysql
88+
UPDATE bar SET c = 30 WHERE a = 1;
89+
90+
> SELECT * FROM bar;
91+
1 30
92+
4 6
93+
7 9
94+
95+
$ mysql-execute name=mysql
96+
DELETE FROM bar WHERE a = 4;
97+
98+
> SELECT * FROM bar;
99+
1 30
100+
7 9
101+
102+
> DROP SOURCE mysql_src2 CASCADE;
103+
104+
64105
# Restore to a clean state so other tests are unaffected.
65106
$ mysql-execute name=mysql
66107
SET GLOBAL binlog_row_metadata = FULL;

0 commit comments

Comments
 (0)