Skip to content

Commit 7a1fe5e

Browse files
committed
Merge remote-tracking branch 'apache/main' into native-physical-optimizer
2 parents c5d92f6 + 1e1b88d commit 7a1fe5e

52 files changed

Lines changed: 3201 additions & 1038 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/iceberg_spark_test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ jobs:
6969
~/.cargo/registry
7070
~/.cargo/git
7171
native/target
72-
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
72+
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
7373
restore-keys: |
74-
${{ runner.os }}-cargo-ci-
74+
${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-
7575
7676
- name: Build native library
7777
# Use CI profile for faster builds (no LTO) and to share cache with pr_build_linux.yml.
@@ -88,7 +88,7 @@ jobs:
8888
~/.cargo/registry
8989
~/.cargo/git
9090
native/target
91-
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
91+
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
9292

9393
- name: Upload native library
9494
uses: actions/upload-artifact@v6

.github/workflows/pr_build_linux.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ jobs:
8484
~/.cargo/registry
8585
~/.cargo/git
8686
native/target
87-
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
87+
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
8888
restore-keys: |
89-
${{ runner.os }}-cargo-ci-
89+
${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-
9090
9191
- name: Build native library (CI profile)
9292
run: |
@@ -112,7 +112,7 @@ jobs:
112112
~/.cargo/registry
113113
~/.cargo/git
114114
native/target
115-
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
115+
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
116116

117117
# Run Rust tests (runs in parallel with build-native, uses debug builds)
118118
linux-test-rust:
@@ -138,9 +138,9 @@ jobs:
138138
~/.cargo/git
139139
native/target
140140
# Note: Java version intentionally excluded - Rust target is JDK-independent
141-
key: ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
141+
key: ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
142142
restore-keys: |
143-
${{ runner.os }}-cargo-debug-
143+
${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-
144144
145145
- name: Rust test steps
146146
uses: ./.github/actions/rust-test
@@ -153,7 +153,7 @@ jobs:
153153
~/.cargo/registry
154154
~/.cargo/git
155155
native/target
156-
key: ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
156+
key: ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
157157

158158
linux-test:
159159
needs: build-native

.github/workflows/pr_build_macos.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ jobs:
8484
~/.cargo/registry
8585
~/.cargo/git
8686
native/target
87-
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
87+
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
8888
restore-keys: |
89-
${{ runner.os }}-cargo-ci-
89+
${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-
9090
9191
- name: Build native library (CI profile)
9292
run: |
@@ -112,7 +112,7 @@ jobs:
112112
~/.cargo/registry
113113
~/.cargo/git
114114
native/target
115-
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
115+
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
116116

117117
macos-aarch64-test:
118118
needs: build-native

.github/workflows/spark_sql_test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ jobs:
7575
~/.cargo/registry
7676
~/.cargo/git
7777
native/target
78-
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
78+
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
7979
restore-keys: |
80-
${{ runner.os }}-cargo-ci-
80+
${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-
8181
8282
- name: Build native library (CI profile)
8383
run: |
@@ -101,7 +101,7 @@ jobs:
101101
~/.cargo/registry
102102
~/.cargo/git
103103
native/target
104-
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
104+
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
105105

106106
spark-sql-test:
107107
needs: build-native
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.parquet;
21+
22+
import java.math.BigDecimal;
23+
24+
import org.apache.arrow.memory.BufferAllocator;
25+
import org.apache.arrow.memory.RootAllocator;
26+
import org.apache.arrow.vector.*;
27+
import org.apache.spark.sql.catalyst.InternalRow;
28+
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
29+
import org.apache.spark.sql.types.*;
30+
import org.apache.spark.unsafe.types.UTF8String;
31+
32+
import org.apache.comet.vector.CometPlainVector;
33+
import org.apache.comet.vector.CometVector;
34+
35+
/**
36+
* A column reader that returns constant vectors using Arrow Java vectors directly (no native
37+
* mutable buffers). Used for partition columns and missing columns in the native_iceberg_compat
38+
* scan path.
39+
*
40+
* <p>The vector is filled with the constant value repeated for every row in the batch. This is
41+
* necessary because the underlying Arrow vector's buffers must be large enough to match the
42+
* reported value count — otherwise variable-width types (strings, binary) would have undersized
43+
* offset buffers, causing out-of-bounds reads on the native side.
44+
*/
45+
public class ArrowConstantColumnReader extends AbstractColumnReader {
46+
private final BufferAllocator allocator = new RootAllocator();
47+
48+
private boolean isNull;
49+
private Object value;
50+
private FieldVector fieldVector;
51+
private CometPlainVector vector;
52+
private int currentSize;
53+
54+
/** Constructor for missing columns (default values from schema). */
55+
ArrowConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) {
56+
super(field.dataType(), TypeUtil.convertToParquet(field), useDecimal128, false);
57+
this.batchSize = batchSize;
58+
this.value =
59+
ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new StructField[] {field}))[
60+
0];
61+
initVector(value, batchSize);
62+
}
63+
64+
/** Constructor for partition columns with values from a row. */
65+
ArrowConstantColumnReader(
66+
StructField field, int batchSize, InternalRow values, int index, boolean useDecimal128) {
67+
super(field.dataType(), TypeUtil.convertToParquet(field), useDecimal128, false);
68+
this.batchSize = batchSize;
69+
Object v = values.get(index, field.dataType());
70+
this.value = v;
71+
initVector(v, batchSize);
72+
}
73+
74+
@Override
75+
public void setBatchSize(int batchSize) {
76+
close();
77+
this.batchSize = batchSize;
78+
initVector(value, batchSize);
79+
}
80+
81+
@Override
82+
public void readBatch(int total) {
83+
if (total != currentSize) {
84+
close();
85+
initVector(value, total);
86+
}
87+
}
88+
89+
@Override
90+
public CometVector currentBatch() {
91+
return vector;
92+
}
93+
94+
@Override
95+
public void close() {
96+
if (vector != null) {
97+
vector.close();
98+
vector = null;
99+
}
100+
if (fieldVector != null) {
101+
fieldVector.close();
102+
fieldVector = null;
103+
}
104+
}
105+
106+
private void initVector(Object value, int count) {
107+
currentSize = count;
108+
if (value == null) {
109+
isNull = true;
110+
fieldVector = createNullVector(count);
111+
} else {
112+
isNull = false;
113+
fieldVector = createFilledVector(value, count);
114+
}
115+
vector = new CometPlainVector(fieldVector, useDecimal128, false, true);
116+
}
117+
118+
/** Creates a vector of the correct type with {@code count} null values. */
119+
private FieldVector createNullVector(int count) {
120+
String name = "constant";
121+
FieldVector v;
122+
if (type == DataTypes.BooleanType) {
123+
v = new BitVector(name, allocator);
124+
} else if (type == DataTypes.ByteType) {
125+
v = new TinyIntVector(name, allocator);
126+
} else if (type == DataTypes.ShortType) {
127+
v = new SmallIntVector(name, allocator);
128+
} else if (type == DataTypes.IntegerType || type == DataTypes.DateType) {
129+
v = new IntVector(name, allocator);
130+
} else if (type == DataTypes.LongType
131+
|| type == DataTypes.TimestampType
132+
|| type == TimestampNTZType$.MODULE$) {
133+
v = new BigIntVector(name, allocator);
134+
} else if (type == DataTypes.FloatType) {
135+
v = new Float4Vector(name, allocator);
136+
} else if (type == DataTypes.DoubleType) {
137+
v = new Float8Vector(name, allocator);
138+
} else if (type == DataTypes.BinaryType) {
139+
v = new VarBinaryVector(name, allocator);
140+
} else if (type == DataTypes.StringType) {
141+
v = new VarCharVector(name, allocator);
142+
} else if (type instanceof DecimalType) {
143+
DecimalType dt = (DecimalType) type;
144+
if (!useDecimal128 && dt.precision() <= Decimal.MAX_INT_DIGITS()) {
145+
v = new IntVector(name, allocator);
146+
} else if (!useDecimal128 && dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
147+
v = new BigIntVector(name, allocator);
148+
} else {
149+
v = new DecimalVector(name, allocator, dt.precision(), dt.scale());
150+
}
151+
} else {
152+
throw new UnsupportedOperationException("Unsupported Spark type: " + type);
153+
}
154+
v.setValueCount(count);
155+
return v;
156+
}
157+
158+
/** Creates a vector filled with {@code count} copies of the given value. */
159+
private FieldVector createFilledVector(Object value, int count) {
160+
String name = "constant";
161+
if (type == DataTypes.BooleanType) {
162+
BitVector v = new BitVector(name, allocator);
163+
v.allocateNew(count);
164+
int bit = (boolean) value ? 1 : 0;
165+
for (int i = 0; i < count; i++) v.setSafe(i, bit);
166+
v.setValueCount(count);
167+
return v;
168+
} else if (type == DataTypes.ByteType) {
169+
TinyIntVector v = new TinyIntVector(name, allocator);
170+
v.allocateNew(count);
171+
byte val = (byte) value;
172+
for (int i = 0; i < count; i++) v.setSafe(i, val);
173+
v.setValueCount(count);
174+
return v;
175+
} else if (type == DataTypes.ShortType) {
176+
SmallIntVector v = new SmallIntVector(name, allocator);
177+
v.allocateNew(count);
178+
short val = (short) value;
179+
for (int i = 0; i < count; i++) v.setSafe(i, val);
180+
v.setValueCount(count);
181+
return v;
182+
} else if (type == DataTypes.IntegerType || type == DataTypes.DateType) {
183+
IntVector v = new IntVector(name, allocator);
184+
v.allocateNew(count);
185+
int val = (int) value;
186+
for (int i = 0; i < count; i++) v.setSafe(i, val);
187+
v.setValueCount(count);
188+
return v;
189+
} else if (type == DataTypes.LongType
190+
|| type == DataTypes.TimestampType
191+
|| type == TimestampNTZType$.MODULE$) {
192+
BigIntVector v = new BigIntVector(name, allocator);
193+
v.allocateNew(count);
194+
long val = (long) value;
195+
for (int i = 0; i < count; i++) v.setSafe(i, val);
196+
v.setValueCount(count);
197+
return v;
198+
} else if (type == DataTypes.FloatType) {
199+
Float4Vector v = new Float4Vector(name, allocator);
200+
v.allocateNew(count);
201+
float val = (float) value;
202+
for (int i = 0; i < count; i++) v.setSafe(i, val);
203+
v.setValueCount(count);
204+
return v;
205+
} else if (type == DataTypes.DoubleType) {
206+
Float8Vector v = new Float8Vector(name, allocator);
207+
v.allocateNew(count);
208+
double val = (double) value;
209+
for (int i = 0; i < count; i++) v.setSafe(i, val);
210+
v.setValueCount(count);
211+
return v;
212+
} else if (type == DataTypes.BinaryType) {
213+
VarBinaryVector v = new VarBinaryVector(name, allocator);
214+
v.allocateNew(count);
215+
byte[] bytes = (byte[]) value;
216+
for (int i = 0; i < count; i++) v.setSafe(i, bytes, 0, bytes.length);
217+
v.setValueCount(count);
218+
return v;
219+
} else if (type == DataTypes.StringType) {
220+
VarCharVector v = new VarCharVector(name, allocator);
221+
v.allocateNew(count);
222+
byte[] bytes = ((UTF8String) value).getBytes();
223+
for (int i = 0; i < count; i++) v.setSafe(i, bytes, 0, bytes.length);
224+
v.setValueCount(count);
225+
return v;
226+
} else if (type instanceof DecimalType) {
227+
DecimalType dt = (DecimalType) type;
228+
Decimal d = (Decimal) value;
229+
if (!useDecimal128 && dt.precision() <= Decimal.MAX_INT_DIGITS()) {
230+
IntVector v = new IntVector(name, allocator);
231+
v.allocateNew(count);
232+
int val = (int) d.toUnscaledLong();
233+
for (int i = 0; i < count; i++) v.setSafe(i, val);
234+
v.setValueCount(count);
235+
return v;
236+
} else if (!useDecimal128 && dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
237+
BigIntVector v = new BigIntVector(name, allocator);
238+
v.allocateNew(count);
239+
long val = d.toUnscaledLong();
240+
for (int i = 0; i < count; i++) v.setSafe(i, val);
241+
v.setValueCount(count);
242+
return v;
243+
} else {
244+
DecimalVector v = new DecimalVector(name, allocator, dt.precision(), dt.scale());
245+
v.allocateNew(count);
246+
BigDecimal bd = d.toJavaBigDecimal();
247+
for (int i = 0; i < count; i++) v.setSafe(i, bd);
248+
v.setValueCount(count);
249+
return v;
250+
}
251+
} else {
252+
throw new UnsupportedOperationException("Unsupported Spark type: " + type);
253+
}
254+
}
255+
}

0 commit comments

Comments
 (0)