Skip to content

Commit d38b770

Browse files
committed
feat: wire useArrowReader + arrowBatchSize into IcebergInputSource
1 parent 973c702 commit d38b770

2 files changed

Lines changed: 36 additions & 2 deletions

File tree

extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ private InputRow batchRowToInputRow(final ColumnarBatch batch, final int rowIdx)
190190
event.put(vec.getName(), extractValue(vec, rowIdx));
191191
}
192192
}
193-
final long timestamp = schema.getTimestampSpec().extractTimestamp(event);
193+
final long timestamp = schema.getTimestampSpec().extractTimestamp(event).getMillis();
194194
final List<String> dimensions = resolveDimensions(batch);
195195
return new MapBasedInputRow(timestamp, dimensions, event);
196196
}

extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ public class IcebergInputSource implements SplittableInputSource<List<String>>
7676
@JsonProperty
7777
private final ResidualFilterMode residualFilterMode;
7878

79+
@JsonProperty
80+
private final boolean useArrowReader;
81+
82+
@JsonProperty
83+
private final int arrowBatchSize;
84+
7985
private boolean isLoaded = false;
8086

8187
private SplittableInputSource delegateInputSource;
@@ -88,7 +94,9 @@ public IcebergInputSource(
8894
@JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog,
8995
@JsonProperty("warehouseSource") InputSourceFactory warehouseSource,
9096
@JsonProperty("snapshotTime") @Nullable DateTime snapshotTime,
91-
@JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode
97+
@JsonProperty("residualFilterMode") @Nullable ResidualFilterMode residualFilterMode,
98+
@JsonProperty("useArrowReader") @Nullable Boolean useArrowReader,
99+
@JsonProperty("arrowBatchSize") @Nullable Integer arrowBatchSize
92100
)
93101
{
94102
this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null");
@@ -98,6 +106,10 @@ public IcebergInputSource(
98106
this.warehouseSource = Preconditions.checkNotNull(warehouseSource, "warehouseSource cannot be null");
99107
this.snapshotTime = snapshotTime;
100108
this.residualFilterMode = Configs.valueOrDefault(residualFilterMode, ResidualFilterMode.IGNORE);
109+
this.useArrowReader = useArrowReader != null && useArrowReader;
110+
this.arrowBatchSize = arrowBatchSize != null && arrowBatchSize > 0
111+
? arrowBatchSize
112+
: IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE;
101113
}
102114

103115
@Override
@@ -113,6 +125,16 @@ public InputSourceReader reader(
113125
File temporaryDirectory
114126
)
115127
{
128+
if (useArrowReader) {
129+
return new IcebergArrowInputSourceReader(
130+
icebergCatalog.retrieveTable(namespace, tableName),
131+
icebergFilter,
132+
snapshotTime,
133+
icebergCatalog.isCaseSensitive(),
134+
inputRowSchema,
135+
arrowBatchSize
136+
);
137+
}
116138
if (!isLoaded) {
117139
retrieveIcebergDatafiles();
118140
}
@@ -189,6 +211,18 @@ public ResidualFilterMode getResidualFilterMode()
189211
return residualFilterMode;
190212
}
191213

214+
@JsonProperty
215+
public boolean isUseArrowReader()
216+
{
217+
return useArrowReader;
218+
}
219+
220+
@JsonProperty
221+
public int getArrowBatchSize()
222+
{
223+
return arrowBatchSize;
224+
}
225+
192226
public SplittableInputSource getDelegateInputSource()
193227
{
194228
return delegateInputSource;

0 commit comments

Comments
 (0)