Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.Decimal256Vector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
Expand Down Expand Up @@ -219,6 +220,7 @@ public void convertArrowToRowBatch() throws DorisException {
FieldVector fieldVector = fieldVectors.get(col);
MinorType minorType = fieldVector.getMinorType();
final String currentType = schema.get(col).getType();
final String colName = schema.get(col).getName();
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
boolean passed = doConvert(col, rowIndex, minorType, currentType, fieldVector);
if (!passed) {
Expand All @@ -227,7 +229,8 @@ public void convertArrowToRowBatch() throws DorisException {
+ currentType
+ ", but arrow type is "
+ minorType.name()
+ ".");
+ ", column Name is "
+ colName);
}
}
}
Expand Down Expand Up @@ -360,6 +363,18 @@ public boolean doConvert(
BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros();
addValueToRow(rowIndex, value);
break;
case "DECIMAL256":
if (!minorType.equals(MinorType.DECIMAL256)) {
return false;
}
Decimal256Vector decimal256Vector = (Decimal256Vector) fieldVector;
if (decimal256Vector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
break;
}
BigDecimal value256 = decimal256Vector.getObject(rowIndex).stripTrailingZeros();
addValueToRow(rowIndex, value256);
break;
case "DATE":
case "DATEV2":
if (!minorType.equals(MinorType.DATEDAY) && !minorType.equals(MinorType.VARCHAR)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.Decimal256Vector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
Expand Down Expand Up @@ -486,6 +487,84 @@ public void testDecimalV2() throws Exception {
rowBatch.next();
}

@Test
public void testDecimal256() throws Exception {
List<Field> childrenBuilder = new ArrayList<>();
childrenBuilder.add(
new Field("k8", FieldType.nullable(new ArrowType.Decimal(38, 18, 256)), null));

VectorSchemaRoot root =
VectorSchemaRoot.create(
new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
new RootAllocator(Integer.MAX_VALUE));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ArrowStreamWriter arrowStreamWriter =
new ArrowStreamWriter(
root, new DictionaryProvider.MapDictionaryProvider(), outputStream);

arrowStreamWriter.start();
root.setRowCount(3);

FieldVector vector = root.getVector("k8");
Decimal256Vector decimal256Vector = (Decimal256Vector) vector;
decimal256Vector.setInitialCapacity(3);
decimal256Vector.allocateNew();
decimal256Vector.setIndexDefined(0);
decimal256Vector.setSafe(0, new BigDecimal("123456789012345678.123456789012345678"));
decimal256Vector.setIndexDefined(1);
decimal256Vector.setSafe(1, new BigDecimal("987654321098765432.987654321098765432"));
decimal256Vector.setIndexDefined(2);
decimal256Vector.setSafe(2, new BigDecimal("111111111111111111.111111111111111111"));
vector.setValueCount(3);

arrowStreamWriter.writeBatch();

arrowStreamWriter.end();
arrowStreamWriter.close();

TStatus status = new TStatus();
status.setStatusCode(TStatusCode.OK);
TScanBatchResult scanBatchResult = new TScanBatchResult();
scanBatchResult.setStatus(status);
scanBatchResult.setEos(false);
scanBatchResult.setRows(outputStream.toByteArray());

String schemaStr =
"{\"properties\":[{\"type\":\"DECIMAL256\",\"scale\": 18,"
+ "\"precision\": 38, \"name\":\"k8\",\"comment\":\"\"}], "
+ "\"status\":200}";

Schema schema = RestService.parseSchema(schemaStr, logger);

RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();

Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
Assert.assertEquals(
DecimalData.fromBigDecimal(
new BigDecimal("123456789012345678.123456789012345678"), 38, 18),
DecimalData.fromBigDecimal((BigDecimal) actualRow0.get(0), 38, 18));

Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow1 = rowBatch.next();
Assert.assertEquals(
DecimalData.fromBigDecimal(
new BigDecimal("987654321098765432.987654321098765432"), 38, 18),
DecimalData.fromBigDecimal((BigDecimal) actualRow1.get(0), 38, 18));

Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow2 = rowBatch.next();
Assert.assertEquals(
DecimalData.fromBigDecimal(
new BigDecimal("111111111111111111.111111111111111111"), 38, 18),
DecimalData.fromBigDecimal((BigDecimal) actualRow2.get(0), 38, 18));

Assert.assertFalse(rowBatch.hasNext());
thrown.expect(NoSuchElementException.class);
thrown.expectMessage(startsWith("Get row offset:"));
rowBatch.next();
}

@Test
public void testMap() throws IOException, DorisException {

Expand Down