Skip to content

Commit 4ea428c

Browse files
[SS-193] Fix issue where copy into doesn't respect target table's numeric precision (#37276)
### Motivation Dennis filed an issue flagging a test failure in the iceberg sink where we failed to output the decimal value "62912915282.360725" to an iceberg sink where we expected precision of 3. The issue appears to be upstream, where the data being sinked doesn't match the schema. For parallel_workload it appears we use `COPY`. Issue https://linear.app/materializeinc/issue/SS-193/iceberg-failed-to-convert-row-to-recordbatch-failed-to-add-insert-row ### Description I added some tests for `COPY` behavior that failed initially and now pass, covering default and CSV input via STDIN and Parquet. Generally trying to mirror existing insert behavior that ultimately calls rescale. ### Verification I've compared the results for the basic tab-separated `COPY` command to postgres and confirmed that postgres rounds while materialize currently doesn't. I've added some testdrive tests and unit tests to prevent future regressions and explore edge cases like numbers overflowing max precision on rescale that wouldn't have before. Note: There are a variety of other types that may have similar issues, i.e. timestamp max precision, which I'm thinking I'll file a separate follow-up issue on. ### Potential Gotchas The fivetran sink uses the constraints from `Type::Numeric`, so would end up creating a `DECIMAL` with no options, see https://github.com/MaterializeInc/materialize/blob/4229f003472695da7f0fb9c8c1f2aecc3fa8e39e/src/fivetran-destination/src/utils.rs#L94. I think DECIMAL in the SDK maps to fivetran's BIGDECIMAL and so should be more correct by default.
1 parent 3d56dc0 commit 4ea428c

6 files changed

Lines changed: 277 additions & 25 deletions

File tree

src/arrow-util/src/reader.rs

Lines changed: 140 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use mz_ore::cast::CastFrom;
3030
use mz_repr::adt::date::Date;
3131
use mz_repr::adt::interval::Interval;
3232
use mz_repr::adt::jsonb::JsonbPacker;
33-
use mz_repr::adt::numeric::Numeric;
33+
use mz_repr::adt::numeric::{Numeric, rescale};
3434
use mz_repr::adt::range::{Range, RangeLowerBound, RangeUpperBound};
3535
use mz_repr::adt::timestamp::CheckedTimestamp;
3636
use mz_repr::{Datum, RelationDesc, Row, RowPacker, SharedRow, SqlScalarType};
@@ -188,8 +188,7 @@ fn scalar_type_and_array_to_reader(
188188
(SqlScalarType::Float64, DataType::Float64) => {
189189
Ok(ColReader::Float64(downcast_array::<Float64Array>(array)))
190190
}
191-
// TODO(cf3): Consider the max_scale for numeric.
192-
(SqlScalarType::Numeric { .. }, DataType::Decimal128(precision, scale)) => {
191+
(SqlScalarType::Numeric { max_scale }, DataType::Decimal128(precision, scale)) => {
193192
use num_traits::Pow;
194193

195194
let base = Numeric::from(10);
@@ -209,10 +208,10 @@ fn scalar_type_and_array_to_reader(
209208
array,
210209
scale_factor,
211210
precision,
211+
destination_max_scale: (*max_scale).map(|s| s.into_u8()),
212212
})
213213
}
214-
// TODO(cf3): Consider the max_scale for numeric.
215-
(SqlScalarType::Numeric { .. }, DataType::Decimal256(precision, scale)) => {
214+
(SqlScalarType::Numeric { max_scale }, DataType::Decimal256(precision, scale)) => {
216215
use num_traits::Pow;
217216

218217
let base = Numeric::from(10);
@@ -232,6 +231,7 @@ fn scalar_type_and_array_to_reader(
232231
array,
233232
scale_factor,
234233
precision,
234+
destination_max_scale: (*max_scale).map(|s| s.into_u8()),
235235
})
236236
}
237237
(SqlScalarType::Bytes, DataType::Binary) => {
@@ -489,11 +489,13 @@ enum ColReader {
489489
array: Decimal128Array,
490490
scale_factor: Numeric,
491491
precision: usize,
492+
destination_max_scale: Option<u8>,
492493
},
493494
Decimal256 {
494495
array: Decimal256Array,
495496
scale_factor: Numeric,
496497
precision: usize,
498+
destination_max_scale: Option<u8>,
497499
},
498500

499501
Binary(arrow::array::BinaryArray),
@@ -609,21 +611,31 @@ impl ColReader {
609611
array,
610612
scale_factor,
611613
precision,
612-
} => array.is_valid(idx).then(|| array.value(idx)).map(|x| {
613-
// Create a Numeric from our i128 with precision.
614-
let mut ctx = dec::Context::<Numeric>::default();
615-
ctx.set_precision(*precision).expect("checked before");
616-
let mut num = ctx.from_i128(x);
614+
destination_max_scale,
615+
} => array
616+
.is_valid(idx)
617+
.then(|| array.value(idx))
618+
.map(|x| {
619+
// Create a Numeric from our i128 with precision.
620+
let mut ctx = dec::Context::<Numeric>::default();
621+
ctx.set_precision(*precision).expect("checked before");
622+
let mut num = ctx.from_i128(x);
623+
624+
// Scale the number.
625+
ctx.div(&mut num, scale_factor);
617626

618-
// Scale the number.
619-
ctx.div(&mut num, scale_factor);
627+
if let Some(destination_max_scale) = destination_max_scale {
628+
rescale(&mut num, *destination_max_scale)?;
629+
}
620630

621-
Datum::Numeric(OrderedDecimal(num))
622-
}),
631+
Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
632+
})
633+
.transpose()?,
623634
ColReader::Decimal256 {
624635
array,
625636
scale_factor,
626637
precision,
638+
destination_max_scale,
627639
} => array
628640
.is_valid(idx)
629641
.then(|| array.value(idx))
@@ -642,6 +654,10 @@ impl ColReader {
642654
// Scale the number.
643655
ctx.div(&mut num, scale_factor);
644656

657+
if let Some(destination_max_scale) = destination_max_scale {
658+
rescale(&mut num, *destination_max_scale)?;
659+
}
660+
645661
Ok::<_, anyhow::Error>(Datum::Numeric(OrderedDecimal(num)))
646662
})
647663
.transpose()?,
@@ -1183,6 +1199,116 @@ mod tests {
11831199
assert_eq!(num.0, Numeric::from(100000000.009f64));
11841200
}
11851201

1202+
/// Regression test for SS-193: when the destination column declares a
1203+
/// `max_scale`, the reader should round the decoded value to that scale
1204+
/// rather than preserving the source file's scale.
1205+
#[mz_ore::test]
1206+
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1207+
fn decimal_applies_destination_max_scale() {
1208+
use mz_repr::adt::numeric::NumericMaxScale;
1209+
1210+
// Destination column is numeric(10, 2): scale 2.
1211+
let desc = RelationDesc::builder()
1212+
.with_column(
1213+
"a",
1214+
SqlScalarType::Numeric {
1215+
max_scale: Some(NumericMaxScale::try_from(2_i64).unwrap()),
1216+
}
1217+
.nullable(true),
1218+
)
1219+
.finish();
1220+
1221+
let expected = Numeric::from(10.45f64);
1222+
1223+
// The source files carry scale-3 values (10.447) that don't round
1224+
// evenly to the destination's scale 2.
1225+
let mut dec128 = arrow::array::Decimal128Builder::new();
1226+
dec128 = dec128.with_precision_and_scale(12, 3).unwrap();
1227+
dec128.append_value(10447);
1228+
let dec128 = dec128.finish();
1229+
#[allow(clippy::as_conversions)]
1230+
let batch128 = StructArray::from(vec![(
1231+
Arc::new(Field::new("a", dec128.data_type().clone(), true)),
1232+
Arc::new(dec128) as arrow::array::ArrayRef,
1233+
)]);
1234+
1235+
let reader = ArrowReader::new(&desc, batch128).unwrap();
1236+
let mut rnd_row = Row::default();
1237+
reader.read(0, &mut rnd_row).unwrap();
1238+
let num = rnd_row.into_element().unwrap_numeric();
1239+
assert_eq!(num.0, expected, "Decimal128 did not round to max_scale");
1240+
1241+
let mut dec256 = arrow::array::Decimal256Builder::new();
1242+
dec256 = dec256.with_precision_and_scale(12, 3).unwrap();
1243+
dec256.append_value(arrow::datatypes::i256::from(10447));
1244+
let dec256 = dec256.finish();
1245+
#[allow(clippy::as_conversions)]
1246+
let batch256 = StructArray::from(vec![(
1247+
Arc::new(Field::new("a", dec256.data_type().clone(), true)),
1248+
Arc::new(dec256) as arrow::array::ArrayRef,
1249+
)]);
1250+
1251+
let reader = ArrowReader::new(&desc, batch256).unwrap();
1252+
let mut rnd_row = Row::default();
1253+
reader.read(0, &mut rnd_row).unwrap();
1254+
let num = rnd_row.into_element().unwrap_numeric();
1255+
assert_eq!(num.0, expected, "Decimal256 did not round to max_scale");
1256+
}
1257+
1258+
#[mz_ore::test]
1259+
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1260+
fn decimal_rescale_to_higher_scale_overflows() {
1261+
use mz_repr::adt::numeric::NumericMaxScale;
1262+
1263+
let value: i128 = 10_i128.pow(33) - 1;
1264+
1265+
let build_batch = || {
1266+
let mut dec128 = arrow::array::Decimal128Builder::new();
1267+
dec128 = dec128.with_precision_and_scale(38, 0).unwrap();
1268+
dec128.append_value(value);
1269+
let dec128 = dec128.finish();
1270+
#[allow(clippy::as_conversions)]
1271+
StructArray::from(vec![(
1272+
Arc::new(Field::new("a", dec128.data_type().clone(), true)),
1273+
Arc::new(dec128) as arrow::array::ArrayRef,
1274+
)])
1275+
};
1276+
1277+
let desc_with_scale = |scale: i64| {
1278+
RelationDesc::builder()
1279+
.with_column(
1280+
"a",
1281+
SqlScalarType::Numeric {
1282+
max_scale: Some(NumericMaxScale::try_from(scale).unwrap()),
1283+
}
1284+
.nullable(true),
1285+
)
1286+
.finish()
1287+
};
1288+
1289+
// Test working case with supportable scale then broken case.
1290+
let reader = ArrowReader::new(&desc_with_scale(2), build_batch()).unwrap();
1291+
let mut row = Row::default();
1292+
reader
1293+
.read(0, &mut row)
1294+
.expect("value must decode at destination scale 2");
1295+
let num = row.into_element().unwrap_numeric();
1296+
let mut expected = Numeric::try_from(value).unwrap();
1297+
rescale(&mut expected, 2).unwrap();
1298+
assert_eq!(num.0, expected, "value did not rescale to scale 2");
1299+
1300+
let reader = ArrowReader::new(&desc_with_scale(8), build_batch()).unwrap();
1301+
let mut row = Row::default();
1302+
let err = reader
1303+
.read(0, &mut row)
1304+
.expect_err("value must overflow at destination scale 8");
1305+
1306+
assert!(
1307+
format!("{err:#}").contains("exceed maximum precision"),
1308+
"unexpected error: {err:#}",
1309+
);
1310+
}
1311+
11861312
/// Regression test for database-issues#11330: when a Parquet file authored
11871313
/// by an external engine encodes a discrete range in non-canonical form
11881314
/// (e.g. `[1,10]` for `int4range`), the reader must canonicalize it to MZ's

src/environmentd/tests/testdata/http/post

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ http
331331
{"queries":[{"query":"select $1::decimal+2 as col","params":["nan"]}]}
332332
----
333333
200 OK
334-
{"results":[{"tag":"SELECT 1","rows":[["NaN"]],"desc":{"columns":[{"name":"col","type_oid":1700,"type_len":-1,"type_mod":2555947}]},"notices":[]}]}
334+
{"results":[{"tag":"SELECT 1","rows":[["NaN"]],"desc":{"columns":[{"name":"col","type_oid":1700,"type_len":-1,"type_mod":-1}]},"notices":[]}]}
335335

336336
# Null string value parameters
337337
http

src/pgrepr/src/types.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1257,12 +1257,9 @@ impl From<&SqlScalarType> for Type {
12571257
},
12581258
SqlScalarType::Uuid => Type::Uuid,
12591259
SqlScalarType::Numeric { max_scale } => Type::Numeric {
1260-
constraints: Some(NumericConstraints {
1260+
constraints: max_scale.map(|max_scale| NumericConstraints {
12611261
max_precision: i32::from(NUMERIC_DATUM_MAX_PRECISION),
1262-
max_scale: match max_scale {
1263-
Some(max_scale) => i32::from(max_scale.into_u8()),
1264-
None => i32::from(NUMERIC_DATUM_MAX_PRECISION),
1265-
},
1262+
max_scale: i32::from(max_scale.into_u8()),
12661263
}),
12671264
},
12681265
SqlScalarType::RegClass => Type::RegClass,

src/pgrepr/src/value.rs

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::{io, str};
1313

1414
use bytes::{BufMut, BytesMut};
1515
use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
16+
use dec::OrderedDecimal;
1617
use itertools::Itertools;
1718
use mz_ore::cast::ReinterpretCast;
1819
use mz_pgrepr_consts::oid::TYPE_INT2_OID;
@@ -22,6 +23,7 @@ use mz_repr::adt::char;
2223
use mz_repr::adt::date::Date;
2324
use mz_repr::adt::jsonb::JsonbRef;
2425
use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
26+
use mz_repr::adt::numeric::{self as mz_repr_numeric, NumericMaxScale, rescale};
2527
use mz_repr::adt::pg_legacy_name::NAME_MAX_BYTES;
2628
use mz_repr::adt::range::{Range, RangeInner};
2729
use mz_repr::adt::timestamp::CheckedTimestamp;
@@ -30,7 +32,7 @@ use mz_repr::{Datum, RowArena, RowPacker, RowRef, SqlRelationType, SqlScalarType
3032
use postgres_types::{FromSql, IsNull, ToSql, Type as PgType};
3133
use uuid::Uuid;
3234

33-
use crate::types::{UINT2, UINT4, UINT8};
35+
use crate::types::{NumericConstraints, UINT2, UINT4, UINT8};
3436
use crate::value::error::IntoDatumError;
3537
use crate::{Interval, Jsonb, Numeric, Type, UInt2, UInt4, UInt8};
3638

@@ -711,7 +713,10 @@ impl Value {
711713
},
712714
)?),
713715
Type::Name => Value::Name(strconv::parse_pg_legacy_name(s)),
714-
Type::Numeric { .. } => Value::Numeric(Numeric(strconv::parse_numeric(s)?)),
716+
Type::Numeric { constraints } => Value::Numeric(Numeric(rescale_numeric(
717+
strconv::parse_numeric(s)?,
718+
constraints.as_ref(),
719+
)?)),
715720
Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
716721
Value::Oid(strconv::parse_oid(s)?)
717722
}
@@ -815,7 +820,10 @@ impl Value {
815820
})?;
816821
}
817822
Type::Name => packer.push(Datum::String(&strconv::parse_pg_legacy_name(s))),
818-
Type::Numeric { .. } => packer.push(Datum::Numeric(strconv::parse_numeric(s)?)),
823+
Type::Numeric { constraints } => packer.push(Datum::Numeric(rescale_numeric(
824+
strconv::parse_numeric(s)?,
825+
constraints.as_ref(),
826+
)?)),
819827
Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
820828
packer.push(Datum::UInt32(strconv::parse_oid(s)?))
821829
}
@@ -887,7 +895,13 @@ impl Value {
887895
}
888896
Ok(Value::Name(s))
889897
}
890-
Type::Numeric { .. } => Numeric::from_sql(ty.inner(), raw).map(Value::Numeric),
898+
Type::Numeric { constraints } => {
899+
let n = Numeric::from_sql(ty.inner(), raw)?;
900+
Ok(Value::Numeric(Numeric(rescale_numeric(
901+
n.0,
902+
constraints.as_ref(),
903+
)?)))
904+
}
891905
Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
892906
u32::from_sql(ty.inner(), raw).map(Value::Oid)
893907
}
@@ -923,6 +937,20 @@ impl Value {
923937
}
924938
}
925939

940+
/// Rescales `n` to the scale required by `constraints`, if any.
941+
fn rescale_numeric(
942+
mut n: OrderedDecimal<mz_repr_numeric::Numeric>,
943+
constraints: Option<&NumericConstraints>,
944+
) -> Result<OrderedDecimal<mz_repr_numeric::Numeric>, Box<dyn Error + Sync + Send>> {
945+
if let Some(constraints) = constraints {
946+
rescale(
947+
&mut n.0,
948+
NumericMaxScale::try_from(i64::from(constraints.max_scale()))?.into_u8(),
949+
)?;
950+
}
951+
Ok(n)
952+
}
953+
926954
fn encode_element(buf: &mut BytesMut, elem: Option<&Value>, ty: &Type) -> Result<(), io::Error> {
927955
match elem {
928956
None => buf.put_i32(-1),
@@ -1020,4 +1048,37 @@ mod tests {
10201048
"invalid input syntax for type array: Specifying array lower bounds is not supported: \"[0:0]={t}\"".to_string()
10211049
);
10221050
}
1051+
1052+
/// Decoding a numeric must round it to the destination's declared scale,
1053+
/// and the text and binary paths must agree. `COPY ... FROM` relies on the
1054+
/// text/CSV side of this (SS-193); binary parameters rely on the binary
1055+
/// side. `COPY ... FORMAT BINARY` is unsupported, so the binary path is
1056+
/// exercised here rather than via mzcompose.
1057+
#[mz_ore::test]
1058+
#[cfg_attr(miri, ignore)] // numeric/decimal contexts unsupported under miri
1059+
fn decode_numeric_applies_destination_scale() {
1060+
// A `numeric(10, 2)` destination: scale 2.
1061+
let ty = Type::from(&SqlScalarType::Numeric {
1062+
max_scale: Some(NumericMaxScale::try_from(2_i64).unwrap()),
1063+
});
1064+
let expected = strconv::parse_numeric("10.45").unwrap();
1065+
1066+
// Encode an over-scale value (10.447, scale 3) to binary, then decode
1067+
// it back through the scale-2 type.
1068+
let input = Value::Numeric(Numeric(strconv::parse_numeric("10.447").unwrap()));
1069+
let mut buf = BytesMut::new();
1070+
input
1071+
.encode_binary(&ty, &mut buf)
1072+
.expect("encoding 10.447 as numeric must succeed");
1073+
let Value::Numeric(Numeric(binary)) = Value::decode_binary(&ty, &buf).unwrap() else {
1074+
panic!("decode_binary of a numeric must yield Value::Numeric");
1075+
};
1076+
assert_eq!(binary, expected, "binary decode did not rescale to scale 2");
1077+
1078+
// The text path must agree with the binary path.
1079+
let Value::Numeric(Numeric(text)) = Value::decode_text(&ty, b"10.447").unwrap() else {
1080+
panic!("decode_text of a numeric must yield Value::Numeric");
1081+
};
1082+
assert_eq!(text, expected, "text decode did not rescale to scale 2");
1083+
}
10231084
}

0 commit comments

Comments
 (0)