From 41a1d042c0cec8d5b961eca46ea1df7bd139413a Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 30 Dec 2025 16:47:32 +0800 Subject: [PATCH] add decimla256 --- .../doris/flink/serialization/RowBatch.java | 17 +++- .../flink/serialization/TestRowBatch.java | 79 +++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index f012b9c67..21a42614b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -24,6 +24,7 @@ import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.Decimal256Vector; import org.apache.arrow.vector.DecimalVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.FixedSizeBinaryVector; @@ -219,6 +220,7 @@ public void convertArrowToRowBatch() throws DorisException { FieldVector fieldVector = fieldVectors.get(col); MinorType minorType = fieldVector.getMinorType(); final String currentType = schema.get(col).getType(); + final String colName = schema.get(col).getName(); for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { boolean passed = doConvert(col, rowIndex, minorType, currentType, fieldVector); if (!passed) { @@ -227,7 +229,8 @@ public void convertArrowToRowBatch() throws DorisException { + currentType + ", but arrow type is " + minorType.name() - + "."); + + ", column Name is " + + colName); } } } @@ -360,6 +363,18 @@ public boolean doConvert( BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros(); addValueToRow(rowIndex, value); break; + case "DECIMAL256": + if (!minorType.equals(MinorType.DECIMAL256)) { + return false; + } + Decimal256Vector decimal256Vector = (Decimal256Vector) fieldVector; + if (decimal256Vector.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + break; + } + BigDecimal value256 = decimal256Vector.getObject(rowIndex).stripTrailingZeros(); + addValueToRow(rowIndex, value256); + break; case "DATE": case "DATEV2": if (!minorType.equals(MinorType.DATEDAY) && !minorType.equals(MinorType.VARCHAR)) { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java index b13cfe157..a740b2adb 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java @@ -26,6 +26,7 @@ import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.Decimal256Vector; import org.apache.arrow.vector.DecimalVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.FixedSizeBinaryVector; @@ -486,6 +487,84 @@ public void testDecimalV2() throws Exception { rowBatch.next(); } + @Test + public void testDecimal256() throws Exception { + List childrenBuilder = new ArrayList<>(); + childrenBuilder.add( + new Field("k8", FieldType.nullable(new ArrowType.Decimal(38, 18, 256)), null)); + + VectorSchemaRoot root = + VectorSchemaRoot.create( + new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null), + new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ArrowStreamWriter arrowStreamWriter = + new ArrowStreamWriter( + root, new DictionaryProvider.MapDictionaryProvider(), outputStream); + + arrowStreamWriter.start(); + root.setRowCount(3); + + FieldVector vector = root.getVector("k8"); + Decimal256Vector decimal256Vector = (Decimal256Vector) vector; + decimal256Vector.setInitialCapacity(3); + decimal256Vector.allocateNew(); + decimal256Vector.setIndexDefined(0); + decimal256Vector.setSafe(0, new BigDecimal("123456789012345678.123456789012345678")); + decimal256Vector.setIndexDefined(1); + decimal256Vector.setSafe(1, new BigDecimal("987654321098765432.987654321098765432")); + decimal256Vector.setIndexDefined(2); + decimal256Vector.setSafe(2, new BigDecimal("111111111111111111.111111111111111111")); + vector.setValueCount(3); + + arrowStreamWriter.writeBatch(); + + arrowStreamWriter.end(); + arrowStreamWriter.close(); + + TStatus status = new TStatus(); + status.setStatusCode(TStatusCode.OK); + TScanBatchResult scanBatchResult = new TScanBatchResult(); + scanBatchResult.setStatus(status); + scanBatchResult.setEos(false); + scanBatchResult.setRows(outputStream.toByteArray()); + + String schemaStr = + "{\"properties\":[{\"type\":\"DECIMAL256\",\"scale\": 18," + + "\"precision\": 38, \"name\":\"k8\",\"comment\":\"\"}], " + + "\"status\":200}"; + + Schema schema = RestService.parseSchema(schemaStr, logger); + + RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow0 = rowBatch.next(); + Assert.assertEquals( + DecimalData.fromBigDecimal( + new BigDecimal("123456789012345678.123456789012345678"), 38, 18), + DecimalData.fromBigDecimal((BigDecimal) actualRow0.get(0), 38, 18)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow1 = rowBatch.next(); + Assert.assertEquals( + DecimalData.fromBigDecimal( + new BigDecimal("987654321098765432.987654321098765432"), 38, 18), + DecimalData.fromBigDecimal((BigDecimal) actualRow1.get(0), 38, 18)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow2 = rowBatch.next(); + Assert.assertEquals( + DecimalData.fromBigDecimal( + new BigDecimal("111111111111111111.111111111111111111"), 38, 18), + DecimalData.fromBigDecimal((BigDecimal) actualRow2.get(0), 38, 18)); + + Assert.assertFalse(rowBatch.hasNext()); + thrown.expect(NoSuchElementException.class); + thrown.expectMessage(startsWith("Get row offset:")); + rowBatch.next(); + } + @Test public void testMap() throws IOException, DorisException {