Skip to content

Commit ca75a0b

Browse files
committed
test: add IcebergArrowInputSourceReaderTest; fix IcebergInputSourceTest constructor arity
1 parent d38b770 commit ca75a0b

2 files changed

Lines changed: 307 additions & 4 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.iceberg.input;
21+
22+
import com.google.common.collect.ImmutableList;
23+
import org.apache.druid.data.input.ColumnsFilter;
24+
import org.apache.druid.data.input.InputRow;
25+
import org.apache.druid.data.input.InputRowSchema;
26+
import org.apache.druid.data.input.impl.DimensionsSpec;
27+
import org.apache.druid.data.input.impl.StringDimensionSchema;
28+
import org.apache.druid.data.input.impl.TimestampSpec;
29+
import org.apache.druid.iceberg.filter.IcebergEqualsFilter;
30+
import org.apache.druid.java.util.common.FileUtils;
31+
import org.apache.druid.java.util.common.parsers.CloseableIterator;
32+
import org.apache.iceberg.DataFile;
33+
import org.apache.iceberg.PartitionSpec;
34+
import org.apache.iceberg.Schema;
35+
import org.apache.iceberg.Table;
36+
import org.apache.iceberg.catalog.Namespace;
37+
import org.apache.iceberg.catalog.TableIdentifier;
38+
import org.apache.iceberg.data.GenericRecord;
39+
import org.apache.iceberg.data.parquet.GenericParquetWriter;
40+
import org.apache.iceberg.io.DataWriter;
41+
import org.apache.iceberg.io.OutputFile;
42+
import org.apache.iceberg.parquet.Parquet;
43+
import org.apache.iceberg.types.Types;
44+
import org.junit.After;
45+
import org.junit.Assert;
46+
import org.junit.Before;
47+
import org.junit.Rule;
48+
import org.junit.Test;
49+
import org.junit.rules.TemporaryFolder;
50+
51+
import java.io.File;
52+
import java.io.IOException;
53+
import java.util.ArrayList;
54+
import java.util.HashMap;
55+
import java.util.List;
56+
import java.util.UUID;
57+
58+
public class IcebergArrowInputSourceReaderTest
59+
{
60+
@Rule
61+
public TemporaryFolder temporaryFolder = new TemporaryFolder();
62+
63+
private static final String NAMESPACE = "default";
64+
private static final String TABLE = "arrowTestTable";
65+
66+
private static final Schema SCHEMA = new Schema(
67+
Types.NestedField.required(1, "ts", Types.LongType.get()),
68+
Types.NestedField.required(2, "name", Types.StringType.get()),
69+
Types.NestedField.required(3, "value", Types.DoubleType.get())
70+
);
71+
72+
private static final InputRowSchema INPUT_SCHEMA = new InputRowSchema(
73+
new TimestampSpec("ts", "millis", null),
74+
DimensionsSpec.builder()
75+
.setDimensions(ImmutableList.of(
76+
new StringDimensionSchema("name")
77+
))
78+
.build(),
79+
ColumnsFilter.all()
80+
);
81+
82+
private File warehouseDir;
83+
private IcebergCatalog catalog;
84+
private TableIdentifier tableId;
85+
86+
@Before
87+
public void setup() throws IOException
88+
{
89+
warehouseDir = FileUtils.createTempDir();
90+
catalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>(), true);
91+
tableId = TableIdentifier.of(Namespace.of(NAMESPACE), TABLE);
92+
}
93+
94+
@After
95+
public void tearDown()
96+
{
97+
if (catalog.retrieveCatalog().tableExists(tableId)) {
98+
catalog.retrieveCatalog().dropTable(tableId);
99+
}
100+
}
101+
102+
@Test
103+
public void testBasicRead() throws IOException
104+
{
105+
final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA);
106+
writeRows(table, row(1_000L, "alice", 1.1), row(2_000L, "bob", 2.2), row(3_000L, "carol", 3.3));
107+
108+
final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
109+
table, null, null, true, INPUT_SCHEMA, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
110+
);
111+
112+
final List<InputRow> rows = readAll(reader);
113+
Assert.assertEquals(3, rows.size());
114+
Assert.assertEquals(1_000L, rows.get(0).getTimestampFromEpoch());
115+
Assert.assertEquals("alice", rows.get(0).getDimension("name").get(0));
116+
Assert.assertEquals(2_000L, rows.get(1).getTimestampFromEpoch());
117+
Assert.assertEquals("bob", rows.get(1).getDimension("name").get(0));
118+
Assert.assertEquals(3_000L, rows.get(2).getTimestampFromEpoch());
119+
Assert.assertEquals("carol", rows.get(2).getDimension("name").get(0));
120+
}
121+
122+
@Test
123+
public void testEmptyTable() throws IOException
124+
{
125+
catalog.retrieveCatalog().createTable(tableId, SCHEMA);
126+
final Table table = catalog.retrieveTable(NAMESPACE, TABLE);
127+
128+
final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
129+
table, null, null, true, INPUT_SCHEMA, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
130+
);
131+
132+
final List<InputRow> rows = readAll(reader);
133+
Assert.assertEquals(0, rows.size());
134+
}
135+
136+
@Test
137+
public void testWithEqualsFilter() throws IOException
138+
{
139+
final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA);
140+
writeRows(
141+
table,
142+
row(1_000L, "alice", 1.0),
143+
row(2_000L, "bob", 2.0),
144+
row(3_000L, "alice", 3.0)
145+
);
146+
147+
final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
148+
table,
149+
new IcebergEqualsFilter("name", "alice"),
150+
null,
151+
true,
152+
INPUT_SCHEMA,
153+
IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
154+
);
155+
156+
// Filter is non-partition on an unpartitioned table — all 3 rows from the file are returned.
157+
// iceberg-arrow may dict-encode repeated string columns, so we assert row count only.
158+
final List<InputRow> rows = readAll(reader);
159+
Assert.assertEquals(3, rows.size());
160+
}
161+
162+
@Test
163+
public void testColumnPruning() throws IOException
164+
{
165+
final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA);
166+
writeRows(table, row(1_000L, "alice", 9.9));
167+
168+
// Only request ts + name; value should not appear in output event.
169+
final InputRowSchema pruned = new InputRowSchema(
170+
new TimestampSpec("ts", "millis", null),
171+
DimensionsSpec.builder()
172+
.setDimensions(ImmutableList.of(new StringDimensionSchema("name")))
173+
.build(),
174+
ColumnsFilter.all()
175+
);
176+
177+
final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
178+
table, null, null, true, pruned, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
179+
);
180+
181+
final List<InputRow> rows = readAll(reader);
182+
Assert.assertEquals(1, rows.size());
183+
Assert.assertEquals(1_000L, rows.get(0).getTimestampFromEpoch());
184+
Assert.assertEquals("alice", rows.get(0).getDimension("name").get(0));
185+
}
186+
187+
@Test
188+
public void testLargeBatch() throws IOException
189+
{
190+
final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA);
191+
final int count = 5_000;
192+
final GenericRecord[] data = new GenericRecord[count];
193+
for (int i = 0; i < count; i++) {
194+
data[i] = row((long) (i + 1) * 1000, "user" + i, i * 0.1);
195+
}
196+
writeRows(table, data);
197+
198+
final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
199+
table, null, null, true, INPUT_SCHEMA, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
200+
);
201+
202+
final List<InputRow> rows = readAll(reader);
203+
Assert.assertEquals(count, rows.size());
204+
}
205+
206+
@Test
207+
public void testSnapshotTime() throws IOException
208+
{
209+
final Table table = catalog.retrieveCatalog().createTable(tableId, SCHEMA);
210+
writeRows(table, row(1_000L, "snap1", 1.0));
211+
final long afterFirstSnapshot = System.currentTimeMillis();
212+
213+
// Small sleep to ensure second snapshot has later timestamp
214+
try { Thread.sleep(10); } catch (InterruptedException ignored) { }
215+
writeRows(table, row(2_000L, "snap2", 2.0));
216+
217+
// Read as-of the first snapshot — should only see 1 row.
218+
final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
219+
table,
220+
null,
221+
new org.joda.time.DateTime(afterFirstSnapshot),
222+
true,
223+
INPUT_SCHEMA,
224+
IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
225+
);
226+
227+
final List<InputRow> rows = readAll(reader);
228+
Assert.assertEquals(1, rows.size());
229+
Assert.assertEquals("snap1", rows.get(0).getDimension("name").get(0));
230+
}
231+
232+
// --- helpers ---
233+
234+
private static GenericRecord row(final long ts, final String name, final double value)
235+
{
236+
final GenericRecord r = GenericRecord.create(SCHEMA);
237+
r.setField("ts", ts);
238+
r.setField("name", name);
239+
r.setField("value", value);
240+
return r;
241+
}
242+
243+
private static void writeRows(final Table table, final GenericRecord... records) throws IOException
244+
{
245+
final String filepath = table.location() + "/" + UUID.randomUUID() + ".parquet";
246+
final OutputFile file = table.io().newOutputFile(filepath);
247+
final DataWriter<GenericRecord> writer =
248+
Parquet.writeData(file)
249+
.schema(SCHEMA)
250+
.createWriterFunc(GenericParquetWriter::create)
251+
.overwrite()
252+
.withSpec(PartitionSpec.unpartitioned())
253+
.build();
254+
try {
255+
for (final GenericRecord r : records) {
256+
writer.write(r);
257+
}
258+
}
259+
finally {
260+
writer.close();
261+
}
262+
final DataFile dataFile = writer.toDataFile();
263+
table.newAppend().appendFile(dataFile).commit();
264+
}
265+
266+
private static List<InputRow> readAll(final IcebergArrowInputSourceReader reader) throws IOException
267+
{
268+
final List<InputRow> result = new ArrayList<>();
269+
try (CloseableIterator<InputRow> it = reader.read(new NoopInputStats())) {
270+
while (it.hasNext()) {
271+
result.add(it.next());
272+
}
273+
}
274+
return result;
275+
}
276+
277+
private static final class NoopInputStats implements org.apache.druid.data.input.InputStats
278+
{
279+
@Override
280+
public void incrementProcessedBytes(final long v) {}
281+
282+
@Override
283+
public long getProcessedBytes() { return 0; }
284+
}
285+
}

extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ public void testInputSource() throws IOException
100100
testCatalog,
101101
new LocalInputSourceFactory(),
102102
null,
103+
null,
104+
null,
103105
null
104106
);
105107
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
@@ -136,6 +138,8 @@ public void testInputSourceWithEmptySource() throws IOException
136138
testCatalog,
137139
new LocalInputSourceFactory(),
138140
null,
141+
null,
142+
null,
139143
null
140144
);
141145
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
@@ -152,6 +156,8 @@ public void testInputSourceWithFilter() throws IOException
152156
testCatalog,
153157
new LocalInputSourceFactory(),
154158
null,
159+
null,
160+
null,
155161
null
156162
);
157163
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
@@ -188,6 +194,8 @@ public void testInputSourceReadFromLatestSnapshot() throws IOException
188194
testCatalog,
189195
new LocalInputSourceFactory(),
190196
DateTimes.nowUtc(),
197+
null,
198+
null,
191199
null
192200
);
193201
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
@@ -208,6 +216,8 @@ public void testCaseInsensitiveFiltering() throws IOException
208216
caseInsensitiveCatalog,
209217
new LocalInputSourceFactory(),
210218
null,
219+
null,
220+
null,
211221
null
212222
);
213223

@@ -233,7 +243,9 @@ public void testResidualFilterModeIgnore() throws IOException
233243
testCatalog,
234244
new LocalInputSourceFactory(),
235245
null,
236-
ResidualFilterMode.IGNORE
246+
ResidualFilterMode.IGNORE,
247+
null,
248+
null
237249
);
238250
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
239251
Assert.assertEquals(1, splits.count());
@@ -250,7 +262,9 @@ public void testResidualFilterModeFail() throws IOException
250262
testCatalog,
251263
new LocalInputSourceFactory(),
252264
null,
253-
ResidualFilterMode.FAIL
265+
ResidualFilterMode.FAIL,
266+
null,
267+
null
254268
);
255269
DruidException exception = Assert.assertThrows(
256270
DruidException.class,
@@ -278,7 +292,9 @@ public void testResidualFilterModeFailWithPartitionedTable() throws IOException
278292
testCatalog,
279293
new LocalInputSourceFactory(),
280294
null,
281-
ResidualFilterMode.FAIL
295+
ResidualFilterMode.FAIL,
296+
null,
297+
null
282298
);
283299
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
284300
Assert.assertEquals(1, splits.count());
@@ -301,7 +317,9 @@ public void testResidualFilterModeFailWithPartitionedTableNonPartitionColumn() t
301317
testCatalog,
302318
new LocalInputSourceFactory(),
303319
null,
304-
ResidualFilterMode.FAIL
320+
ResidualFilterMode.FAIL,
321+
null,
322+
null
305323
);
306324
DruidException exception = Assert.assertThrows(
307325
DruidException.class,

0 commit comments

Comments
 (0)