Skip to content

Commit 08b59c7

Browse files
Merge branch 'maz-mysql-debug-decode-err' of https://github.com/martykulma/materialize into patrick/mysql-fixes
2 parents 7dcefc4 + 8617fb8 commit 08b59c7

3 files changed

Lines changed: 168 additions & 80 deletions

File tree

src/mysql-util/src/decoding.rs

Lines changed: 139 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
// the Business Source License, use of this software will be governed
88
// by the Apache License, Version 2.0.
99

10+
use std::fmt::Write;
1011
use std::str::FromStr;
1112

12-
use itertools::{EitherOrBoth, Itertools};
1313
use mysql_common::value::convert::from_value_opt;
1414
use mysql_common::{Row as MySqlRow, Value};
1515

@@ -28,6 +28,7 @@ pub fn pack_mysql_row(
2828
row_container: &mut Row,
2929
row: MySqlRow,
3030
table_desc: &MySqlTableDesc,
31+
gtid_set: Option<&str>,
3132
) -> Result<Row, MySqlError> {
3233
let mut packer = row_container.packer();
3334

@@ -37,72 +38,157 @@ pub fn pack_mysql_row(
3738
// This is a fallback for MySQL servers that do not have `binlog_row_metadata` set to
3839
// `FULL`. If the first column name does not begin with '@', then we can assume that
3940
// full metadata is available and we can match columns by name.
40-
let zip_values: Vec<EitherOrBoth<&MySqlColumnDesc, Value>> = if row
41+
let fallback_names = row
4142
.columns_ref()
4243
.first()
43-
.is_some_and(|col| col.name_ref().starts_with(b"@"))
44-
{
45-
table_desc
46-
.columns
47-
.iter()
48-
.zip_longest(row.unwrap())
49-
.collect()
50-
} else {
51-
table_desc
52-
.columns
53-
.iter()
54-
.filter(|col| col.column_type.is_some())
55-
.map(|col| {
56-
let pos = row
57-
.columns_ref()
58-
.iter()
59-
.position(|row_col| row_col.name_str() == col.name.as_str())
60-
.expect("column in table desc not found in row metadata");
61-
EitherOrBoth::Both(
62-
col,
63-
row.get(pos)
64-
.expect("Can't unwrap row if some of columns was taken"),
65-
)
66-
})
67-
.collect()
68-
};
44+
.is_some_and(|col| col.name_ref().starts_with(b"@"));
6945

70-
for values in zip_values {
71-
let (col_desc, value) = match values {
72-
EitherOrBoth::Both(col_desc, value) => (col_desc, value),
73-
EitherOrBoth::Left(col_desc) => {
74-
tracing::error!(
75-
"mysql: extra column description {col_desc:?} for table {}",
76-
table_desc.name
77-
);
78-
Err(MySqlError::ValueDecodeError {
79-
column_name: col_desc.name.clone(),
80-
qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name),
81-
error: "extra column description".to_string(),
82-
})?
83-
}
84-
EitherOrBoth::Right(_) => {
85-
// If there are extra columns on the upstream table we can safely ignore them
86-
break;
87-
}
46+
// For each column in `table_desc` (in descriptor order), resolve its wire
47+
// index. Non-fallback rows are matched by name so a reordered upstream
48+
// still decodes correctly; fallback rows have no names and are matched
49+
// positionally. A `None` here means the upstream row is missing this
50+
// column and is only tolerated for ignored columns.
51+
for (i, col_desc) in table_desc.columns.iter().enumerate() {
52+
let wire_idx = if fallback_names {
53+
(i < row.len()).then_some(i)
54+
} else {
55+
row.columns_ref()
56+
.iter()
57+
.position(|wc| wc.name_str() == col_desc.name.as_str())
8858
};
8959
if col_desc.column_type.is_none() {
9060
// This column is ignored, so don't decode it.
9161
continue;
9262
}
93-
match pack_val_as_datum(value, col_desc, &mut packer) {
94-
Err(err) => Err(MySqlError::ValueDecodeError {
95-
column_name: col_desc.name.clone(),
96-
qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name),
97-
error: err.to_string(),
98-
})?,
99-
Ok(()) => (),
63+
let wire_idx = match wire_idx {
64+
Some(idx) => idx,
65+
None => {
66+
return Err(decode_error(
67+
"upstream row is missing column",
68+
col_desc,
69+
table_desc,
70+
gtid_set,
71+
&row,
72+
));
73+
}
10074
};
75+
let value = row
76+
.as_ref(wire_idx)
77+
.expect("wire_idx resolved from row")
78+
.clone();
79+
if let Err(err) = pack_val_as_datum(value, col_desc, &mut packer) {
80+
return Err(decode_error(
81+
&err.to_string(),
82+
col_desc,
83+
table_desc,
84+
gtid_set,
85+
&row,
86+
));
87+
}
10188
}
10289

10390
Ok(row_container.clone())
10491
}
10592

93+
/// Build a `ValueDecodeError`, logging the schema, table, column, source
94+
/// gtid_set (if any), and a shape description of `row` at the same time.
95+
/// The shape string is only built here — pack_mysql_row's happy path does no
96+
/// per-row allocation beyond what decoding requires.
97+
fn decode_error(
98+
err_msg: &str,
99+
col_desc: &MySqlColumnDesc,
100+
table_desc: &MySqlTableDesc,
101+
gtid_set: Option<&str>,
102+
row: &MySqlRow,
103+
) -> MySqlError {
104+
let row_shape = describe_row_shape(row, table_desc);
105+
tracing::warn!(
106+
"mysql decode error for `{}`.`{}` column `{}`: {}; gtid_set={:?}; row_shape={}",
107+
table_desc.schema_name,
108+
table_desc.name,
109+
col_desc.name,
110+
err_msg,
111+
gtid_set,
112+
row_shape,
113+
);
114+
MySqlError::ValueDecodeError {
115+
column_name: col_desc.name.clone(),
116+
qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name),
117+
error: err_msg.to_string(),
118+
}
119+
}
120+
121+
/// Describes the structural shape of a row without revealing any data values.
122+
/// Iterates every wire column. For each, emits the wire name, the binlog
123+
/// wire type, the character-set id (or `binary`), a classification relative
124+
/// to `table_desc` (`expected=<scalar>` for active columns, `ignored` for
125+
/// columns excluded from the source, `extra` for upstream columns with no
126+
/// descriptor entry), and a value disposition (`null` or `bytes(len=N)` /
127+
/// primitive kind). Intended for diagnostic logging on decode errors: MySQL
128+
/// serializes CHAR, VARCHAR, TEXT, JSON, BLOB, etc. all as `Value::Bytes`,
129+
/// so the wire type tag and the expected scalar type are what distinguish
130+
/// them.
131+
fn describe_row_shape(row: &MySqlRow, table_desc: &MySqlTableDesc) -> String {
132+
// Binlogs without full row metadata use positional "@N" names, so we
133+
// have to match by wire position rather than by name.
134+
let fallback_names = row
135+
.columns_ref()
136+
.first()
137+
.is_some_and(|col| col.name_ref().starts_with(b"@"));
138+
139+
let mut out = String::new();
140+
out.push('[');
141+
for (i, wire_col) in row.columns_ref().iter().enumerate() {
142+
if i > 0 {
143+
out.push_str(", ");
144+
}
145+
let wire_name = wire_col.name_str();
146+
let cs = wire_col.character_set();
147+
// 63 = binary collation (binary/blob columns).
148+
let cs_str = if cs == 63 {
149+
"binary".to_string()
150+
} else {
151+
format!("charset={cs}")
152+
};
153+
let wire_type = format!("{:?}", wire_col.column_type());
154+
155+
let matched_col = if fallback_names {
156+
table_desc.columns.get(i)
157+
} else {
158+
table_desc
159+
.columns
160+
.iter()
161+
.find(|c| c.name.as_str() == wire_name)
162+
};
163+
let match_info = match matched_col {
164+
Some(col) => match &col.column_type {
165+
Some(ct) => format!("expected={:?}", ct.scalar_type),
166+
None => "ignored".to_string(),
167+
},
168+
None => "extra".to_string(),
169+
};
170+
171+
let val_desc = match row.as_ref(i) {
172+
None => "absent".to_string(),
173+
Some(Value::NULL) => "null".to_string(),
174+
Some(Value::Bytes(b)) => format!("bytes(len={})", b.len()),
175+
Some(Value::Int(_)) => "int".to_string(),
176+
Some(Value::UInt(_)) => "uint".to_string(),
177+
Some(Value::Float(_)) => "float".to_string(),
178+
Some(Value::Double(_)) => "double".to_string(),
179+
Some(Value::Date(..)) => "date".to_string(),
180+
Some(Value::Time(..)) => "time".to_string(),
181+
};
182+
183+
let _ = write!(
184+
out,
185+
"{{name={wire_name}, wire={wire_type}, {cs_str}, {match_info}, val={val_desc}}}"
186+
);
187+
}
188+
out.push(']');
189+
out
190+
}
191+
106192
// TODO(guswynn|roshan): This function has various `.to_string()` and `format!` calls that should
107193
// use a shared allocation if possible.
108194
fn pack_val_as_datum(

src/storage/src/source/mysql/replication/events.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -298,21 +298,23 @@ pub(super) async fn handle_rows_event(
298298
before_row.map(|r| (r, Diff::MINUS_ONE)),
299299
after_row.map(|r| (r, Diff::ONE)),
300300
];
301+
let gtid_str = format!("{new_gtid:?}");
301302
for (binlog_row, diff) in updates.into_iter().flatten() {
302303
let row = mysql_async::Row::try_from(binlog_row)?;
303304
for (output, row_val) in outputs.iter().repeat_clone(row) {
304-
let event = match pack_mysql_row(&mut final_row, row_val, &output.desc) {
305-
Ok(row) => Ok(SourceMessage {
306-
key: Row::default(),
307-
value: row,
308-
metadata: Row::default(),
309-
}),
310-
// Produce a DefiniteError in the stream for any rows that fail to decode
311-
Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from(
312-
DefiniteError::ValueDecodeError(err.to_string()),
313-
)),
314-
Err(err) => Err(err)?,
315-
};
305+
let event =
306+
match pack_mysql_row(&mut final_row, row_val, &output.desc, Some(&gtid_str)) {
307+
Ok(row) => Ok(SourceMessage {
308+
key: Row::default(),
309+
value: row,
310+
metadata: Row::default(),
311+
}),
312+
// Produce a DefiniteError in the stream for any rows that fail to decode
313+
Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from(
314+
DefiniteError::ValueDecodeError(err.to_string()),
315+
)),
316+
Err(err) => Err(err)?,
317+
};
316318

317319
let data = (output.output_index, event);
318320

src/storage/src/source/mysql/snapshot.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -420,21 +420,21 @@ pub(crate) fn render<'scope>(
420420
let row: MySqlRow = row;
421421
snapshot_staged += 1;
422422
for (output, row_val) in outputs.iter().repeat_clone(row) {
423-
let event = match pack_mysql_row(&mut final_row, row_val, &output.desc)
424-
{
425-
Ok(row) => Ok(SourceMessage {
426-
key: Row::default(),
427-
value: row,
428-
metadata: Row::default(),
429-
}),
430-
// Produce a DefiniteError in the stream for any rows that fail to decode
431-
Err(err @ MySqlError::ValueDecodeError { .. }) => {
432-
Err(DataflowError::from(DefiniteError::ValueDecodeError(
433-
err.to_string(),
434-
)))
435-
}
436-
Err(err) => Err(err)?,
437-
};
423+
let event =
424+
match pack_mysql_row(&mut final_row, row_val, &output.desc, None) {
425+
Ok(row) => Ok(SourceMessage {
426+
key: Row::default(),
427+
value: row,
428+
metadata: Row::default(),
429+
}),
430+
// Produce a DefiniteError in the stream for any rows that fail to decode
431+
Err(err @ MySqlError::ValueDecodeError { .. }) => {
432+
Err(DataflowError::from(DefiniteError::ValueDecodeError(
433+
err.to_string(),
434+
)))
435+
}
436+
Err(err) => Err(err)?,
437+
};
438438
raw_handle
439439
.give_fueled(
440440
&data_cap_set[0],

0 commit comments

Comments
 (0)