Skip to content

Commit 8617fb8

Browse files
committed
Make column iteration based on table_desc
1 parent b870827 commit 8617fb8

1 file changed

Lines changed: 22 additions & 34 deletions

File tree

src/mysql-util/src/decoding.rs

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
use std::fmt::Write;
1111
use std::str::FromStr;
1212

13-
use itertools::{EitherOrBoth, Itertools};
1413
use mysql_common::value::convert::from_value_opt;
1514
use mysql_common::{Row as MySqlRow, Value};
1615

@@ -43,50 +42,39 @@ pub fn pack_mysql_row(
4342
.columns_ref()
4443
.first()
4544
.is_some_and(|col| col.name_ref().starts_with(b"@"));
46-
// Wire indices of `row` to decode, in iteration order. Keeping indices
47-
// (rather than moving values out via `row.unwrap()`) lets us describe the
48-
// row's shape on a decode error without paying the allocation cost on the
49-
// happy path.
50-
let active_indices: Vec<usize> = if fallback_names {
51-
(0..row.len()).collect()
52-
} else {
53-
row.columns_ref()
54-
.iter()
55-
.enumerate()
56-
.filter(|(_, col)| {
57-
table_desc
58-
.columns
59-
.iter()
60-
.any(|c| c.name.as_str() == col.name_str())
61-
})
62-
.map(|(i, _)| i)
63-
.collect()
64-
};
6545

66-
for pair in table_desc.columns.iter().zip_longest(&active_indices) {
67-
let (col_desc, wire_idx) = match pair {
68-
EitherOrBoth::Both(col_desc, &idx) => (col_desc, idx),
69-
EitherOrBoth::Left(col_desc) => {
46+
// For each column in `table_desc` (in descriptor order), resolve its wire
47+
// index. Non-fallback rows are matched by name so a reordered upstream
48+
// still decodes correctly; fallback rows have no names and are matched
49+
// positionally. A `None` here means the upstream row is missing this
50+
// column and is only tolerated for ignored columns.
51+
for (i, col_desc) in table_desc.columns.iter().enumerate() {
52+
let wire_idx = if fallback_names {
53+
(i < row.len()).then_some(i)
54+
} else {
55+
row.columns_ref()
56+
.iter()
57+
.position(|wc| wc.name_str() == col_desc.name.as_str())
58+
};
59+
if col_desc.column_type.is_none() {
60+
// This column is ignored, so don't decode it.
61+
continue;
62+
}
63+
let wire_idx = match wire_idx {
64+
Some(idx) => idx,
65+
None => {
7066
return Err(decode_error(
71-
"extra column description",
67+
"upstream row is missing column",
7268
col_desc,
7369
table_desc,
7470
gtid_set,
7571
&row,
7672
));
7773
}
78-
EitherOrBoth::Right(_) => {
79-
// If there are extra columns on the upstream table we can safely ignore them
80-
break;
81-
}
8274
};
83-
if col_desc.column_type.is_none() {
84-
// This column is ignored, so don't decode it.
85-
continue;
86-
}
8775
let value = row
8876
.as_ref(wire_idx)
89-
.expect("active_indices is within row bounds")
77+
.expect("wire_idx resolved from row")
9078
.clone();
9179
if let Err(err) = pack_val_as_datum(value, col_desc, &mut packer) {
9280
return Err(decode_error(

0 commit comments

Comments
 (0)