Skip to content

Commit fce4386

Browse files
divboksan81
authored andcommitted
Handling mysql decimal data types with precision 19 or higher (opensearch-project#6369)
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent 21f3df1 commit fce4386

2 files changed

Lines changed: 84 additions & 3 deletions

File tree

data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/GenericRecordJsonEncoder.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.avro.generic.GenericContainer;
2222
import org.apache.avro.generic.GenericData;
2323
import org.apache.avro.generic.GenericEnumSymbol;
24+
import org.apache.avro.generic.GenericFixed;
2425
import org.apache.avro.generic.GenericRecord;
2526
import org.apache.avro.generic.IndexedRecord;
2627
import org.apache.commons.text.StringEscapeUtils;
@@ -84,8 +85,8 @@ private void serialize(final Object datum, final StringBuilder buffer,
8485
if (fieldSchema.getType() == Schema.Type.UNION) {
8586
for (Schema s : fieldSchema.getTypes()) {
8687
if (s.getType() != Schema.Type.NULL) {
87-
if (s.getType() == Schema.Type.BYTES &&
88-
s.getLogicalType() instanceof LogicalTypes.Decimal) {
88+
if ((s.getType() == Schema.Type.BYTES || s.getType() == Schema.Type.FIXED)
89+
&& s.getLogicalType() instanceof LogicalTypes.Decimal) {
8990
serialize(logicalTypeConverter.apply(getField(datum, f.name(), f.pos())), buffer, seenObjects, ((LogicalTypes.Decimal) s.getLogicalType()).getScale());
9091
serializedDecimal = true;
9192
break;
@@ -172,7 +173,22 @@ private void serialize(final Object datum, final StringBuilder buffer,
172173
buffer.append("\"");
173174
buffer.append(datum);
174175
buffer.append("\"");
175-
} else if (datum instanceof GenericData) {
176+
} else if (datum instanceof GenericFixed ) {
177+
GenericFixed fixed = (GenericFixed) datum;
178+
byte[] bytes = fixed.bytes();
179+
if (decimalScale != null) {
180+
BigInteger unscaledValue = new BigInteger(bytes);
181+
BigDecimal decimal = new BigDecimal(unscaledValue, decimalScale);
182+
buffer.append(decimal.toString());
183+
}
184+
//Fallback mechanism
185+
else {
186+
buffer.append("{\"bytes\": \"");
187+
writeEscapedString(new String(bytes, StandardCharsets.ISO_8859_1), buffer);
188+
buffer.append("\"}");
189+
}
190+
}
191+
else if (datum instanceof GenericData) {
176192
if (seenObjects.containsKey(datum)) {
177193
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
178194
return;

data-prepper-plugins/parquet-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/GenericRecordJsonEncoderTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,69 @@ void serialize_WithNullDecimalLogicalType_ReturnsNull() {
316316
assertEquals("{\"amount\": null}", json);
317317
}
318318

319+
@Test
320+
void serialize_WithFixedDecimalLogicalType_UsesScaleFromSchema() {
321+
BigDecimal value = new BigDecimal("12.34").setScale(2);
322+
byte[] decimalBytes = value.unscaledValue().toByteArray();
323+
324+
Schema decimalSchema = new Schema.Parser().parse(
325+
"{ \"type\": \"record\", \"name\": \"DecimalRecord\", \"fields\": [" +
326+
"{\"name\": \"amount\", \"type\": [\"null\", {\"type\":\"fixed\",\"size\":" + decimalBytes.length +
327+
",\"name\":\"DecimalFixed\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}]}" +
328+
"] }"
329+
);
330+
331+
GenericRecord record = new GenericData.Record(decimalSchema);
332+
333+
Schema fixedSchema = decimalSchema.getField("amount").schema().getTypes().get(1);
334+
GenericData.Fixed fixedValue = new GenericData.Fixed(fixedSchema, decimalBytes);
335+
record.put("amount", fixedValue);
336+
337+
String json = encoder.serialize(record);
338+
339+
assertEquals("{\"amount\": 12.34}", json);
340+
}
341+
342+
@Test
343+
void serialize_WithNonNullableFixedDecimalLogicalType_UsesScaleFromSchema() {
344+
BigDecimal value = new BigDecimal("12.3456").setScale(4);
345+
byte[] decimalBytes = value.unscaledValue().toByteArray();
346+
347+
Schema decimalSchema = new Schema.Parser().parse(
348+
"{ \"type\": \"record\", \"name\": \"DecimalRecord\", \"fields\": [" +
349+
"{\"name\": \"amount\", \"type\": {\"type\":\"fixed\",\"size\":" + decimalBytes.length +
350+
",\"name\":\"DecimalFixed\",\"logicalType\":\"decimal\",\"precision\":6,\"scale\":4}}" +
351+
"] }"
352+
);
353+
354+
GenericRecord record = new GenericData.Record(decimalSchema);
355+
356+
Schema fixedSchema = decimalSchema.getField("amount").schema();
357+
GenericData.Fixed fixedValue = new GenericData.Fixed(fixedSchema, decimalBytes);
358+
record.put("amount", fixedValue);
359+
360+
String json = encoder.serialize(record);
361+
362+
assertEquals("{\"amount\": 12.3456}", json);
363+
}
364+
365+
@Test
366+
void serialize_WithNonDecimalFixedType_ReturnsEscapedString() {
367+
byte[] tokenBytes = new byte[] { 34, 92, 13, 10, 9};
368+
Schema tokenSchema = new Schema.Parser().parse(
369+
"{ \"type\": \"record\", \"name\": \"MyRecord\", \"fields\": [" +
370+
"{\"name\": \"token\", \"type\": {\"type\":\"fixed\",\"name\":\"TokenFixed\",\"size\":5}}" +
371+
"] }"
372+
);
373+
374+
GenericRecord record = new GenericData.Record(tokenSchema);
375+
376+
Schema fixedSchema = tokenSchema.getField("token").schema();
377+
GenericData.Fixed fixedValue = new GenericData.Fixed(fixedSchema, tokenBytes);
378+
record.put("token", fixedValue);
379+
380+
String json = encoder.serialize(record);
381+
382+
assertEquals("{\"token\": {\"bytes\": \"\\\"\\\\\\r\\n\\t\"}}", json);
383+
}
319384
}

0 commit comments

Comments
 (0)