Skip to content

Commit f718b15

Browse files
committed
[Analytics Engine] Carry array-typed cells through RowResponseCodec without JSON-stringifying
The row-oriented fragment-execution wire format (`FragmentExecutionResponse`, used when arrow-flight streaming is disabled — every single-node test cluster today) shipped each cell through OpenSearch's `writeGenericValue` / `readGenericValue`, which preserves `List` values as `ArrayList<Object>`. On the coordinator side, `RowResponseCodec.decode` then re-materialized the rows into a `VectorSchemaRoot` for `Iterable<VectorSchemaRoot>`-style consumers. Two bugs in that re-materialization were eating array values: 1. `inferArrowType` walked rows for the first non-null cell and matched against {Long, Integer, …, CharSequence, byte[], Number}. {@code List} wasn't in the chain, so it fell through to {@code break} and the fallback {@link ArrowType.Utf8} — every array column became a VARCHAR column. 2. `setVectorValue` for {@link VarCharVector} called {@code value.toString()}. For a {@code JsonStringArrayList} that returns the JSON form {@code "[2,3,4]"}, which then got serialized as a JSON string in the final response. Tests like {@code testMvindexRangePositive} saw their array result come back as a string `"[2,3,4]"` instead of an array `[2, 3, 4]`. Fix: * Replace {@code inferArrowType} with {@code inferField} that returns a full {@link Field}. For {@code List} cells, build a list field with the inner element type inferred from the first non-null element (with a fallback that scans later rows in case the first list is empty/all-null). * Add a {@code ListVector} arm to {@code setVectorValue} that delegates to a new {@code writeListValue}. The writer bypasses {@link UnionListWriter} entirely — it writes directly to the list's offset / validity buffers and to the inner data vector via the inner vector's typed `setSafe`. The writer-based API requires per-element `ArrowBuf` allocations for varchar elements that are easy to leak or use-after-free; the direct path is simpler and avoids both classes of bug. Plus a separate Arrow gotcha that surfaced once arrays started flowing through correctly: * {@code ListVector.getObject} for a {@code VarCharVector} child returns a {@code JsonStringArrayList} whose elements are Arrow's {@link Text} class, not Java {@link String}. {@code ExprValueUtils.fromObjectValue} doesn't recognize {@code Text} and threw "unsupported object class org.apache.arrow.vector.util.Text". {@code ArrowValues.toJavaValue} now mirrors its top-level VarChar branch for list cells: when a list value comes back from a {@code ListVector}, normalize each {@code Text} element to a {@link String} before handing the list upward. * Before: 12/60 (mvindex range tests still showed expected-vs-actual diff because `[2,3,4]` came back as a JSON string, not an array). * After: 26/60. Newly passing: testMvindexRangePositive, testMvindexRangeNegative, testMvindexRangeMixed, testMvindexRangeFirstThree, testMvindexRangeLastThree, testMvindexRangeSingleElement, testMvdedupWithDuplicates, testMvdedupWithAllDuplicates, testMvdedupWithNoDuplicates, testMvdedupWithStrings, testArrayWithString, testSplitWithSemicolonDelimiter, testSplitWithMultiCharDelimiter, testSplitWithEmptyDelimiter. Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 27f1c99 commit f718b15

2 files changed

Lines changed: 131 additions & 20 deletions

File tree

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/ArrowValues.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@
1010

1111
import org.apache.arrow.vector.FieldVector;
1212
import org.apache.arrow.vector.VarCharVector;
13+
import org.apache.arrow.vector.complex.ListVector;
1314
import org.apache.arrow.vector.util.Text;
1415

1516
import java.nio.charset.StandardCharsets;
17+
import java.util.ArrayList;
18+
import java.util.List;
1619

1720
/**
1821
* Helpers for reading Arrow vector cells as plain Java values at the
@@ -35,10 +38,22 @@ public static Object toJavaValue(FieldVector vector, int index) {
3538
if (vector instanceof VarCharVector v) {
3639
return new String(v.get(index), StandardCharsets.UTF_8);
3740
}
38-
Object obj = vector.getObject(index);
39-
if (obj instanceof Text t) {
41+
Object value = vector.getObject(index);
42+
if (vector instanceof ListVector && value instanceof List<?> raw) {
43+
// ListVector.getObject returns a JsonStringArrayList whose elements are the
44+
// child vector's typed values. For VarCharVector children that's Arrow's
45+
// Text, which downstream consumers (e.g. {@code ExprValueUtils.fromObjectValue})
46+
// don't recognize and reject as "unsupported object class". Mirror the
47+
// top-level VarCharVector branch above and substitute Java strings.
48+
List<Object> normalized = new ArrayList<>(raw.size());
49+
for (Object element : raw) {
50+
normalized.add(element instanceof Text t ? t.toString() : element);
51+
}
52+
return normalized;
53+
}
54+
if (value instanceof Text t) {
4055
return t.toString();
4156
}
42-
return obj;
57+
return value;
4358
}
4459
}

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/RowResponseCodec.java

Lines changed: 113 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.arrow.vector.VarBinaryVector;
2121
import org.apache.arrow.vector.VarCharVector;
2222
import org.apache.arrow.vector.VectorSchemaRoot;
23+
import org.apache.arrow.vector.complex.ListVector;
24+
import org.apache.arrow.vector.complex.impl.UnionListWriter;
2325
import org.apache.arrow.vector.types.pojo.ArrowType;
2426
import org.apache.arrow.vector.types.pojo.Field;
2527
import org.apache.arrow.vector.types.pojo.FieldType;
@@ -28,6 +30,7 @@
2830

2931
import java.nio.charset.StandardCharsets;
3032
import java.util.ArrayList;
33+
import java.util.Collections;
3134
import java.util.List;
3235

3336
/**
@@ -60,8 +63,7 @@ public VectorSchemaRoot decode(FragmentExecutionResponse response, BufferAllocat
6063
// Infer Arrow type per column from the first non-null value
6164
List<Field> fields = new ArrayList<>();
6265
for (int col = 0; col < fieldNames.size(); col++) {
63-
ArrowType arrowType = inferArrowType(rows, col);
64-
fields.add(new Field(fieldNames.get(col), FieldType.nullable(arrowType), null));
66+
fields.add(inferField(fieldNames.get(col), rows, col));
6567
}
6668
Schema schema = new Schema(fields);
6769

@@ -86,29 +88,67 @@ public VectorSchemaRoot decode(FragmentExecutionResponse response, BufferAllocat
8688
}
8789

8890
/**
89-
* Infers the Arrow type for a column by scanning rows for the first
90-
* non-null value. Falls back to {@code Utf8} (VarChar) if all values
91-
* are null or the Java type is unrecognized.
91+
* Infers the Arrow {@link Field} for a column by scanning rows for the first
92+
* non-null value. Falls back to a nullable {@code Utf8} (VarChar) field if all
93+
* values are null or the Java type is unrecognized.
94+
*
95+
* <p>For {@link List} cells (produced by analytics-engine routes that emit
96+
* array-typed values — PPL {@code array(...)}, {@code array_slice}, …), this
97+
* returns a {@code List<inner>} field where the inner element type is inferred
98+
* from the first non-null element. Without this branch, list values fall
99+
* through to the {@code Utf8} fallback and {@link #setVectorValue} produces
100+
* {@code value.toString()} (e.g. {@code "[2,3,4]"} as a JSON-like string)
101+
* instead of a typed array.
92102
*/
93-
static ArrowType inferArrowType(List<Object[]> rows, int col) {
103+
static Field inferField(String name, List<Object[]> rows, int col) {
94104
for (Object[] row : rows) {
95105
Object value = row[col];
96106
if (value == null) continue;
97-
if (value instanceof Long) return new ArrowType.Int(64, true);
98-
if (value instanceof Integer) return new ArrowType.Int(32, true);
99-
if (value instanceof Short) return new ArrowType.Int(16, true);
100-
if (value instanceof Byte) return new ArrowType.Int(8, true);
101-
if (value instanceof Double) return new ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE);
102-
if (value instanceof Float) return new ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE);
103-
if (value instanceof Boolean) return ArrowType.Bool.INSTANCE;
104-
if (value instanceof CharSequence) return ArrowType.Utf8.INSTANCE;
105-
if (value instanceof byte[]) return ArrowType.Binary.INSTANCE;
106-
if (value instanceof Number) return new ArrowType.Int(64, true);
107-
break;
107+
if (value instanceof List<?> list) {
108+
ArrowType elementType = inferElementArrowType(list, rows, col);
109+
Field elementField = new Field("$data$", FieldType.nullable(elementType), null);
110+
return new Field(name, FieldType.nullable(ArrowType.List.INSTANCE), Collections.singletonList(elementField));
111+
}
112+
return new Field(name, FieldType.nullable(scalarArrowType(value)), null);
113+
}
114+
return new Field(name, FieldType.nullable(ArrowType.Utf8.INSTANCE), null);
115+
}
116+
117+
/**
118+
* Best-effort inference for a list element type. Looks at the first non-null
119+
* element of the given list, then falls back to scanning later rows of the
120+
* same column if this list is empty or all-null. Defaults to {@code Utf8}.
121+
*/
122+
private static ArrowType inferElementArrowType(List<?> list, List<Object[]> rows, int col) {
123+
for (Object element : list) {
124+
if (element != null) return scalarArrowType(element);
125+
}
126+
for (Object[] row : rows) {
127+
Object value = row[col];
128+
if (value instanceof List<?> other) {
129+
for (Object element : other) {
130+
if (element != null) return scalarArrowType(element);
131+
}
132+
}
108133
}
109134
return ArrowType.Utf8.INSTANCE;
110135
}
111136

137+
/** Maps a Java scalar value to the corresponding Arrow scalar type. */
138+
private static ArrowType scalarArrowType(Object value) {
139+
if (value instanceof Long) return new ArrowType.Int(64, true);
140+
if (value instanceof Integer) return new ArrowType.Int(32, true);
141+
if (value instanceof Short) return new ArrowType.Int(16, true);
142+
if (value instanceof Byte) return new ArrowType.Int(8, true);
143+
if (value instanceof Double) return new ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE);
144+
if (value instanceof Float) return new ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE);
145+
if (value instanceof Boolean) return ArrowType.Bool.INSTANCE;
146+
if (value instanceof CharSequence) return ArrowType.Utf8.INSTANCE;
147+
if (value instanceof byte[]) return ArrowType.Binary.INSTANCE;
148+
if (value instanceof Number) return new ArrowType.Int(64, true);
149+
return ArrowType.Utf8.INSTANCE;
150+
}
151+
112152
/**
113153
* Sets a value on the appropriate Arrow vector type. Handles null by
114154
* calling {@code setNull}. For typed vectors, casts the Java value to
@@ -137,8 +177,64 @@ static void setVectorValue(FieldVector vector, int index, Object value) {
137177
((VarCharVector) vector).setSafe(index, value.toString().getBytes(StandardCharsets.UTF_8));
138178
} else if (vector instanceof VarBinaryVector) {
139179
((VarBinaryVector) vector).setSafe(index, (byte[]) value);
180+
} else if (vector instanceof ListVector listVector) {
181+
writeListValue(listVector, index, (List<?>) value);
140182
} else {
141183
throw new IllegalArgumentException("Unsupported Arrow vector type: " + vector.getClass().getSimpleName());
142184
}
143185
}
186+
187+
/**
188+
* Writes a Java {@link List} into an Arrow {@link ListVector} at the given row
189+
* index. Bypasses {@link UnionListWriter} entirely — writes directly to the
190+
* list's offset / validity buffers and to the inner data vector via the inner
191+
* vector's own typed setter. The writer-based API requires an
192+
* {@link org.apache.arrow.memory.ArrowBuf} per varchar element which Arrow
193+
* couples to a release lifecycle that's tricky to get right (early close →
194+
* use-after-free, no close → leak); the direct path avoids that altogether.
195+
*
196+
* <p>The inner data vector's type was decided in {@link #inferField} from the
197+
* first non-null list element, so the {@code instanceof} dispatch here simply
198+
* needs to match.
199+
*/
200+
private static void writeListValue(ListVector listVector, int index, List<?> list) {
201+
FieldVector dataVector = listVector.getDataVector();
202+
int startOffset = listVector.getOffsetBuffer().getInt((long) index * ListVector.OFFSET_WIDTH);
203+
int writePos = startOffset;
204+
for (Object element : list) {
205+
// Grow the data vector if needed before writing. setSafe-style helpers handle this
206+
// automatically per type, but we still need to pre-position the cursor.
207+
if (element == null) {
208+
dataVector.setNull(writePos);
209+
} else if (dataVector instanceof BigIntVector v) {
210+
v.setSafe(writePos, ((Number) element).longValue());
211+
} else if (dataVector instanceof IntVector v) {
212+
v.setSafe(writePos, ((Number) element).intValue());
213+
} else if (dataVector instanceof SmallIntVector v) {
214+
v.setSafe(writePos, ((Number) element).shortValue());
215+
} else if (dataVector instanceof TinyIntVector v) {
216+
v.setSafe(writePos, ((Number) element).byteValue());
217+
} else if (dataVector instanceof Float8Vector v) {
218+
v.setSafe(writePos, ((Number) element).doubleValue());
219+
} else if (dataVector instanceof Float4Vector v) {
220+
v.setSafe(writePos, ((Number) element).floatValue());
221+
} else if (dataVector instanceof BitVector v) {
222+
v.setSafe(writePos, ((Boolean) element) ? 1 : 0);
223+
} else if (dataVector instanceof VarCharVector v) {
224+
v.setSafe(writePos, element.toString().getBytes(StandardCharsets.UTF_8));
225+
} else if (dataVector instanceof VarBinaryVector v) {
226+
v.setSafe(writePos, (byte[]) element);
227+
} else {
228+
throw new IllegalArgumentException("Unsupported list element vector type: " + dataVector.getClass().getSimpleName());
229+
}
230+
writePos++;
231+
}
232+
// Mark this row's list as non-null and update its end offset.
233+
listVector.setNotNull(index);
234+
listVector.getOffsetBuffer().setInt((long) (index + 1) * ListVector.OFFSET_WIDTH, writePos);
235+
// Keep the data vector's value count in sync so subsequent reads see the new tail.
236+
if (writePos > dataVector.getValueCount()) {
237+
dataVector.setValueCount(writePos);
238+
}
239+
}
144240
}

0 commit comments

Comments
 (0)