Skip to content

Commit 8dcf6d3

Browse files
committed
fix: json interal type
1 parent 88d2e0a commit 8dcf6d3

8 files changed

Lines changed: 67 additions & 135 deletions

File tree

ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,19 +82,25 @@ public static void main(String[] args) throws Exception {
8282

8383
Config cfg = Config.newBuilder()
8484
.allocatorInitReservation(0)
85-
.allocatorMaxAllocation(1024 * 1024 * 1024)
86-
.timeoutMsPerMessage(10000)
85+
.allocatorMaxAllocation(1024 * 1024 * 1024L)
86+
.timeoutMsPerMessage(30000)
8787
.maxRequestsInFlight(8)
8888
.build();
8989
Context ctx = Context.newDefault().withCompression(Compression.None);
9090

91+
// Bulk write api cannot auto create table
92+
Table toCreate = Table.from(schema);
93+
toCreate.addRow(generateOneRow(100000));
94+
toCreate.complete();
95+
greptimeDB.write(toCreate).get();
96+
9197
try (BulkStreamWriter bulkStreamWriter = greptimeDB.bulkStreamWriter(schema, cfg, ctx)) {
9298

93-
// Write 100 times, each time write 100000 rows
99+
// Write 100 times, each time write 10000 rows
94100
for (int i = 0; i < 100; i++) {
95101
long start = System.currentTimeMillis();
96-
Table.TableBufferRoot table = bulkStreamWriter.tableBufferRoot();
97-
for (int j = 0; j < 100000; j++) {
102+
Table.TableBufferRoot table = bulkStreamWriter.tableBufferRoot(1024);
103+
for (int j = 0; j < 10000; j++) {
98104
// with 100000 cardinality
99105
Object[] row = generateOneRow(100000);
100106
table.addRow(row);
@@ -150,7 +156,7 @@ private static Object[] generateOneRow(int cardinality) {
150156
System.currentTimeMillis(), // ts
151157
random.nextInt(127), // field_int8
152158
random.nextInt(32767), // field_int16
153-
random.nextInt(), // field_int32
159+
null, // field_int32
154160
random.nextLong(), // field_int64
155161
random.nextInt(255), // field_uint8
156162
random.nextInt(65535), // field_uint16

ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,20 @@ public Object[] next() {
9292
String traceId = "trace_" + random.nextLong(1000000);
9393
String spanId = "span_" + random.nextLong(1000000);
9494
String errno = "errno_" + random.nextInt(1000);
95-
String traceFlags = "trace_flags_" + random.nextInt(1000);
96-
String traceState = "trace_state_" + random.nextInt(1000);
95+
String traceFlags;
96+
int flags = random.nextInt(1000);
97+
if (flags % 2 == 0) {
98+
traceFlags = "trace_flags_" + flags;
99+
} else {
100+
traceFlags = null;
101+
}
102+
int state = random.nextInt(1000);
103+
String traceState;
104+
if (state % 3 == 0) {
105+
traceState = "trace_state_" + state;
106+
} else {
107+
traceState = null;
108+
}
97109
String podName = "pod_" + random.nextInt(1000);
98110
timerContext.stop();
99111
MetricsUtil.histogram("random_table_data_provider.log_message_length")

ingester-protocol/src/main/java/io/greptime/models/ArrowHelper.java

Lines changed: 3 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public static void addValues(
188188
if (value == null) {
189189
vector.setNull(startRowIndex++);
190190
} else {
191-
((UInt4Vector) vector).setSafe(startRowIndex++, ((Long) value).intValue());
191+
((UInt4Vector) vector).setSafe(startRowIndex++, ValueUtil.getIntValue(value));
192192
}
193193
}
194194
break;
@@ -385,7 +385,7 @@ public static void addValues(
385385
vector.setNull(startRowIndex++);
386386
} else {
387387
byte[] jsonBytes = ValueUtil.getJsonString(value).getBytes(StandardCharsets.UTF_8);
388-
((VarCharVector) vector).setSafe(startRowIndex++, jsonBytes);
388+
((VarBinaryVector) vector).setSafe(startRowIndex++, jsonBytes);
389389
}
390390
}
391391
break;
@@ -394,121 +394,6 @@ public static void addValues(
394394
}
395395
}
396396

397-
public static void addValue(
398-
FieldVector vector,
399-
int rowIndex,
400-
Common.ColumnDataType dataType,
401-
Common.ColumnDataTypeExtension dataTypeExtension,
402-
Object value) {
403-
if (value == null) {
404-
vector.setNull(rowIndex);
405-
return;
406-
}
407-
408-
switch (dataType) {
409-
case INT8:
410-
((TinyIntVector) vector).setSafe(rowIndex, (int) value);
411-
break;
412-
case INT16:
413-
((SmallIntVector) vector).setSafe(rowIndex, (int) value);
414-
break;
415-
case INT32:
416-
((IntVector) vector).setSafe(rowIndex, (int) value);
417-
break;
418-
case INT64:
419-
((BigIntVector) vector).setSafe(rowIndex, (long) value);
420-
break;
421-
case UINT8:
422-
((UInt1Vector) vector).setSafe(rowIndex, (int) value);
423-
break;
424-
case UINT16:
425-
((UInt2Vector) vector).setSafe(rowIndex, (int) value);
426-
break;
427-
case UINT32:
428-
((UInt4Vector) vector).setSafe(rowIndex, ((Long) value).intValue());
429-
break;
430-
case UINT64:
431-
((UInt8Vector) vector).setSafe(rowIndex, (long) value);
432-
break;
433-
case FLOAT32:
434-
((Float4Vector) vector).setSafe(rowIndex, (float) value);
435-
break;
436-
case FLOAT64:
437-
((Float8Vector) vector).setSafe(rowIndex, (double) value);
438-
break;
439-
case BOOLEAN:
440-
((BitVector) vector).setSafe(rowIndex, (boolean) value ? 1 : 0);
441-
break;
442-
case BINARY:
443-
((VarBinaryVector) vector).setSafe(rowIndex, (byte[]) value);
444-
break;
445-
case STRING:
446-
((VarCharVector) vector).setSafe(rowIndex, ((String) value).getBytes(StandardCharsets.UTF_8));
447-
break;
448-
case DATE:
449-
((DateDayVector) vector).setSafe(rowIndex, ValueUtil.getDateValue(value));
450-
break;
451-
case TIMESTAMP_SECOND: {
452-
TimeStampSecHolder holder = new TimeStampSecHolder();
453-
holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.SECONDS);
454-
((TimeStampSecVector) vector).setSafe(rowIndex, holder);
455-
break;
456-
}
457-
case TIMESTAMP_MILLISECOND: {
458-
TimeStampMilliHolder holder = new TimeStampMilliHolder();
459-
holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.MILLISECONDS);
460-
((TimeStampMilliVector) vector).setSafe(rowIndex, holder);
461-
break;
462-
}
463-
case TIMESTAMP_MICROSECOND: {
464-
TimeStampMicroHolder holder = new TimeStampMicroHolder();
465-
holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.MICROSECONDS);
466-
((TimeStampMicroVector) vector).setSafe(rowIndex, holder);
467-
break;
468-
}
469-
case TIMESTAMP_NANOSECOND: {
470-
TimeStampNanoHolder holder = new TimeStampNanoHolder();
471-
holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.NANOSECONDS);
472-
((TimeStampNanoVector) vector).setSafe(rowIndex, holder);
473-
break;
474-
}
475-
case TIME_SECOND: {
476-
TimeSecHolder holder = new TimeSecHolder();
477-
holder.value = (int) ValueUtil.getLongValue(value);
478-
((TimeSecVector) vector).setSafe(rowIndex, holder);
479-
break;
480-
}
481-
case TIME_MILLISECOND: {
482-
TimeMilliHolder holder = new TimeMilliHolder();
483-
holder.value = (int) ValueUtil.getLongValue(value);
484-
((TimeMilliVector) vector).setSafe(rowIndex, holder);
485-
break;
486-
}
487-
case TIME_MICROSECOND: {
488-
TimeMicroHolder holder = new TimeMicroHolder();
489-
holder.value = ValueUtil.getLongValue(value);
490-
((TimeMicroVector) vector).setSafe(rowIndex, holder);
491-
break;
492-
}
493-
case TIME_NANOSECOND: {
494-
TimeNanoHolder holder = new TimeNanoHolder();
495-
holder.value = ValueUtil.getLongValue(value);
496-
((TimeNanoVector) vector).setSafe(rowIndex, holder);
497-
break;
498-
}
499-
case DECIMAL128:
500-
byte[] bytes = ValueUtil.getDecimal128BigEndianBytes(dataTypeExtension, value);
501-
((DecimalVector) vector).setBigEndianSafe(rowIndex, bytes);
502-
break;
503-
case JSON:
504-
byte[] jsonBytes = ValueUtil.getJsonString(value).getBytes(StandardCharsets.UTF_8);
505-
((VarCharVector) vector).setSafe(rowIndex, jsonBytes);
506-
break;
507-
default:
508-
throw new IllegalArgumentException("Unsupported data type: " + dataType);
509-
}
510-
}
511-
512397
static ArrowType convertToArrowType(
513398
Common.ColumnDataType dataType, Common.ColumnDataTypeExtension dataTypeExtension) {
514399
switch (dataType) {
@@ -565,7 +450,7 @@ static ArrowType convertToArrowType(
565450
Ensures.ensureNonNull(decimalTypeExtension, "decimalTypeExtension is null");
566451
return new ArrowType.Decimal(decimalTypeExtension.getPrecision(), decimalTypeExtension.getScale(), 128);
567452
case JSON:
568-
return new ArrowType.Utf8();
453+
return new ArrowType.Binary();
569454
default:
570455
throw new IllegalArgumentException("Unsupported data type: " + dataType);
571456
}

ingester-protocol/src/main/java/io/greptime/models/RowHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public static void addValue(
5757
valueBuilder.setU16Value((int) value);
5858
break;
5959
case UINT32:
60-
valueBuilder.setU32Value((int) value);
60+
valueBuilder.setU32Value(ValueUtil.getIntValue(value));
6161
break;
6262
case UINT64:
6363
valueBuilder.setU64Value(ValueUtil.getLongValue(value));

ingester-protocol/src/main/java/io/greptime/models/Table.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,11 @@ private static Table buildTable(
218218
RowData.ColumnSchema.Builder builder = RowData.ColumnSchema.newBuilder();
219219
builder.setColumnName(columnNames.get(i))
220220
.setSemanticType(semanticTypes.get(i))
221-
.setDatatype(dataTypes.get(i))
222-
.setDatatypeExtension(dataTypeExtensions.get(i));
221+
.setDatatype(dataTypes.get(i));
222+
Common.ColumnDataTypeExtension ext = dataTypeExtensions.get(i);
223+
if (ext != null) {
224+
builder.setDatatypeExtension(ext);
225+
}
223226
table.columnSchemas.add(builder.build());
224227
}
225228
return table;

ingester-protocol/src/main/java/io/greptime/models/TableSchema.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,14 +189,23 @@ public Builder addColumn(
189189
this.columnNames.add(name);
190190
this.semanticTypes.add(semanticType.toProtoValue());
191191
this.dataTypes.add(dataType.toProtoValue());
192-
if (decimalTypeExtension == null) {
193-
this.dataTypeExtensions.add(Common.ColumnDataTypeExtension.getDefaultInstance());
194-
} else {
195-
Ensures.ensure(dataType == DataType.Decimal128, "Only decimal type can have decimal type extension");
192+
193+
if (dataType == DataType.Json) {
194+
Common.ColumnDataTypeExtension ext = Common.ColumnDataTypeExtension.newBuilder()
195+
.setJsonType(Common.JsonTypeExtension.JSON_BINARY)
196+
.build();
197+
this.dataTypeExtensions.add(ext);
198+
} else if (dataType == DataType.Decimal128) {
199+
if (decimalTypeExtension == null) {
200+
decimalTypeExtension = DataType.DecimalTypeExtension.DEFAULT;
201+
}
196202
Common.ColumnDataTypeExtension ext = Common.ColumnDataTypeExtension.newBuilder()
197203
.setDecimalType(decimalTypeExtension.into())
198204
.build();
199205
this.dataTypeExtensions.add(ext);
206+
} else {
207+
Ensures.ensure(decimalTypeExtension == null, "Only decimal type can have decimal type extension");
208+
this.dataTypeExtensions.add(null);
200209
}
201210
return this;
202211
}

ingester-protocol/src/main/java/io/greptime/models/ValueUtil.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,23 @@ public class ValueUtil {
3434

3535
static int ONE_DAY_IN_SECONDS = 86400;
3636

37+
static int getIntValue(Object value) {
38+
if (value instanceof Integer) {
39+
return (int) value;
40+
}
41+
42+
if (value instanceof Long) {
43+
return ((Long) value).intValue();
44+
}
45+
46+
if (value instanceof Number) {
47+
return ((Number) value).intValue();
48+
}
49+
50+
// Not null
51+
throw new IllegalArgumentException("Cannot convert value of type " + value.getClass() + " to int");
52+
}
53+
3754
static long getLongValue(Object value) {
3855
if (value instanceof Integer) {
3956
return (int) value;
@@ -48,7 +65,7 @@ static long getLongValue(Object value) {
4865
}
4966

5067
// Not null
51-
throw new IllegalArgumentException("Unsupported value type: " + value.getClass());
68+
throw new IllegalArgumentException("Cannot convert value of type " + value.getClass() + " to long");
5269
}
5370

5471
static int getDateValue(Object value) {

ingester-protocol/src/test/java/io/greptime/models/ArrowHelperTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public void testCreateSchema() {
147147
new ArrowType.Decimal(38, 18, 128).getTypeID(),
148148
schema.getFields().get(21).getType().getTypeID());
149149
Assert.assertEquals(
150-
new ArrowType.Utf8().getTypeID(),
150+
new ArrowType.Binary().getTypeID(),
151151
schema.getFields().get(22).getType().getTypeID());
152152
}
153153
}

0 commit comments

Comments
 (0)