Skip to content

Commit bc136ee

Browse files
authored
[Feature](type) support doris decimla256 type (#628)
1 parent 11080a6 commit bc136ee

2 files changed

Lines changed: 95 additions & 1 deletion

File tree

flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.arrow.vector.BigIntVector;
2525
import org.apache.arrow.vector.BitVector;
2626
import org.apache.arrow.vector.DateDayVector;
27+
import org.apache.arrow.vector.Decimal256Vector;
2728
import org.apache.arrow.vector.DecimalVector;
2829
import org.apache.arrow.vector.FieldVector;
2930
import org.apache.arrow.vector.FixedSizeBinaryVector;
@@ -219,6 +220,7 @@ public void convertArrowToRowBatch() throws DorisException {
219220
FieldVector fieldVector = fieldVectors.get(col);
220221
MinorType minorType = fieldVector.getMinorType();
221222
final String currentType = schema.get(col).getType();
223+
final String colName = schema.get(col).getName();
222224
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
223225
boolean passed = doConvert(col, rowIndex, minorType, currentType, fieldVector);
224226
if (!passed) {
@@ -227,7 +229,8 @@ public void convertArrowToRowBatch() throws DorisException {
227229
+ currentType
228230
+ ", but arrow type is "
229231
+ minorType.name()
230-
+ ".");
232+
+ ", column Name is "
233+
+ colName);
231234
}
232235
}
233236
}
@@ -360,6 +363,18 @@ public boolean doConvert(
360363
BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros();
361364
addValueToRow(rowIndex, value);
362365
break;
366+
case "DECIMAL256":
367+
if (!minorType.equals(MinorType.DECIMAL256)) {
368+
return false;
369+
}
370+
Decimal256Vector decimal256Vector = (Decimal256Vector) fieldVector;
371+
if (decimal256Vector.isNull(rowIndex)) {
372+
addValueToRow(rowIndex, null);
373+
break;
374+
}
375+
BigDecimal value256 = decimal256Vector.getObject(rowIndex).stripTrailingZeros();
376+
addValueToRow(rowIndex, value256);
377+
break;
363378
case "DATE":
364379
case "DATEV2":
365380
if (!minorType.equals(MinorType.DATEDAY) && !minorType.equals(MinorType.VARCHAR)) {

flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.arrow.vector.BigIntVector;
2727
import org.apache.arrow.vector.BitVector;
2828
import org.apache.arrow.vector.DateDayVector;
29+
import org.apache.arrow.vector.Decimal256Vector;
2930
import org.apache.arrow.vector.DecimalVector;
3031
import org.apache.arrow.vector.FieldVector;
3132
import org.apache.arrow.vector.FixedSizeBinaryVector;
@@ -486,6 +487,84 @@ public void testDecimalV2() throws Exception {
486487
rowBatch.next();
487488
}
488489

490+
@Test
491+
public void testDecimal256() throws Exception {
492+
List<Field> childrenBuilder = new ArrayList<>();
493+
childrenBuilder.add(
494+
new Field("k8", FieldType.nullable(new ArrowType.Decimal(38, 18, 256)), null));
495+
496+
VectorSchemaRoot root =
497+
VectorSchemaRoot.create(
498+
new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
499+
new RootAllocator(Integer.MAX_VALUE));
500+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
501+
ArrowStreamWriter arrowStreamWriter =
502+
new ArrowStreamWriter(
503+
root, new DictionaryProvider.MapDictionaryProvider(), outputStream);
504+
505+
arrowStreamWriter.start();
506+
root.setRowCount(3);
507+
508+
FieldVector vector = root.getVector("k8");
509+
Decimal256Vector decimal256Vector = (Decimal256Vector) vector;
510+
decimal256Vector.setInitialCapacity(3);
511+
decimal256Vector.allocateNew();
512+
decimal256Vector.setIndexDefined(0);
513+
decimal256Vector.setSafe(0, new BigDecimal("123456789012345678.123456789012345678"));
514+
decimal256Vector.setIndexDefined(1);
515+
decimal256Vector.setSafe(1, new BigDecimal("987654321098765432.987654321098765432"));
516+
decimal256Vector.setIndexDefined(2);
517+
decimal256Vector.setSafe(2, new BigDecimal("111111111111111111.111111111111111111"));
518+
vector.setValueCount(3);
519+
520+
arrowStreamWriter.writeBatch();
521+
522+
arrowStreamWriter.end();
523+
arrowStreamWriter.close();
524+
525+
TStatus status = new TStatus();
526+
status.setStatusCode(TStatusCode.OK);
527+
TScanBatchResult scanBatchResult = new TScanBatchResult();
528+
scanBatchResult.setStatus(status);
529+
scanBatchResult.setEos(false);
530+
scanBatchResult.setRows(outputStream.toByteArray());
531+
532+
String schemaStr =
533+
"{\"properties\":[{\"type\":\"DECIMAL256\",\"scale\": 18,"
534+
+ "\"precision\": 38, \"name\":\"k8\",\"comment\":\"\"}], "
535+
+ "\"status\":200}";
536+
537+
Schema schema = RestService.parseSchema(schemaStr, logger);
538+
539+
RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
540+
541+
Assert.assertTrue(rowBatch.hasNext());
542+
List<Object> actualRow0 = rowBatch.next();
543+
Assert.assertEquals(
544+
DecimalData.fromBigDecimal(
545+
new BigDecimal("123456789012345678.123456789012345678"), 38, 18),
546+
DecimalData.fromBigDecimal((BigDecimal) actualRow0.get(0), 38, 18));
547+
548+
Assert.assertTrue(rowBatch.hasNext());
549+
List<Object> actualRow1 = rowBatch.next();
550+
Assert.assertEquals(
551+
DecimalData.fromBigDecimal(
552+
new BigDecimal("987654321098765432.987654321098765432"), 38, 18),
553+
DecimalData.fromBigDecimal((BigDecimal) actualRow1.get(0), 38, 18));
554+
555+
Assert.assertTrue(rowBatch.hasNext());
556+
List<Object> actualRow2 = rowBatch.next();
557+
Assert.assertEquals(
558+
DecimalData.fromBigDecimal(
559+
new BigDecimal("111111111111111111.111111111111111111"), 38, 18),
560+
DecimalData.fromBigDecimal((BigDecimal) actualRow2.get(0), 38, 18));
561+
562+
Assert.assertFalse(rowBatch.hasNext());
563+
thrown.expect(NoSuchElementException.class);
564+
thrown.expectMessage(startsWith("Get row offset:"));
565+
rowBatch.next();
566+
}
567+
489568
@Test
490569
public void testMap() throws IOException, DorisException {
491570

0 commit comments

Comments
 (0)