Skip to content

Commit 60e3487

Browse files
authored
feat: unnest and join makeCursorHolderAsync (#19600)
1 parent 2c634fc commit 60e3487

13 files changed

Lines changed: 560 additions & 141 deletions

processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.druid.segment.CursorFactory;
4141
import org.apache.druid.segment.CursorHolder;
4242
import org.apache.druid.segment.QueryableIndexColumnSelectorFactory;
43+
import org.apache.druid.segment.ResidentCursorFactory;
4344
import org.apache.druid.segment.SimpleAscendingOffset;
4445
import org.apache.druid.segment.SimpleSettableOffset;
4546
import org.apache.druid.segment.VirtualColumns;
@@ -64,7 +65,7 @@
6465
*
6566
* @see RowFrameCursorFactory the row-based version
6667
*/
67-
public class ColumnarFrameCursorFactory implements CursorFactory
68+
public class ColumnarFrameCursorFactory implements ResidentCursorFactory
6869
{
6970
private final Frame frame;
7071
private final RowSignature signature;

processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.druid.segment.CursorBuildSpec;
3434
import org.apache.druid.segment.CursorFactory;
3535
import org.apache.druid.segment.CursorHolder;
36+
import org.apache.druid.segment.ResidentCursorFactory;
3637
import org.apache.druid.segment.SimpleAscendingOffset;
3738
import org.apache.druid.segment.SimpleSettableOffset;
3839
import org.apache.druid.segment.column.ColumnCapabilities;
@@ -49,7 +50,7 @@
4950
*
5051
* @see ColumnarFrameCursorFactory the columnar version
5152
*/
52-
public class RowFrameCursorFactory implements CursorFactory
53+
public class RowFrameCursorFactory implements ResidentCursorFactory
5354
{
5455
private final Frame frame;
5556
private final FrameReader frameReader;

processing/src/main/java/org/apache/druid/segment/CursorFactory.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.segment;
2121

22+
import org.apache.druid.error.DruidException;
2223
import org.apache.druid.segment.column.ColumnCapabilities;
2324
import org.apache.druid.segment.column.RowSignature;
2425

@@ -35,14 +36,22 @@ public interface CursorFactory extends ColumnInspector
3536
/**
3637
* Asynchronous variant of {@link #makeCursorHolder(CursorBuildSpec)} for cursor factories that may need to do I/O
3738
* (e.g. download column data from deep storage) before they can serve a cursor. Callers running on threads that
38-
* must not block should use this.
39+
* must not block use this rather than {@link #makeCursorHolder}.
3940
* <p>
40-
* The default implementation completes synchronously by delegating to {@link #makeCursorHolder(CursorBuildSpec)},
41-
* which keeps every existing implementation async-correct without changes.
41+
* There is intentionally no working default: this method must be explicitly implemented to participate in
42+
* async-aware engines (MSQ). A factory whose source is always fully-resident and never needs to block while waiting
43+
* on some other thread to perform work can implement {@link ResidentCursorFactory} instead of {@link CursorFactory}
44+
* directly, which provides a default implementation of this method that wraps
45+
* {@link #makeCursorHolder(CursorBuildSpec)}.
4246
*/
4347
default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
4448
{
45-
return AsyncCursorHolder.completed(makeCursorHolder(spec));
49+
throw DruidException.defensive(
50+
"makeCursorHolderAsync is not implemented by [%s]. Override it (or implement ResidentCursorFactory): return "
51+
+ "AsyncCursorHolder.completed(makeCursorHolder(spec)) if the source is always fully resident, or build/await "
52+
+ "the cursor holder asynchronously if it supports load on demand.",
53+
getClass().getName()
54+
);
4655
}
4756

4857
/**

processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
import java.util.LinkedHashSet;
5959
import java.util.List;
6060

61-
public class QueryableIndexCursorFactory implements CursorFactory
61+
public class QueryableIndexCursorFactory implements ResidentCursorFactory
6262
{
6363
private final QueryableIndex index;
6464
private final TimeBoundaryInspector timeBoundaryInspector;
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.segment;
21+
22+
/**
23+
* A {@link CursorFactory} whose {@link #makeCursorHolder} never blocks on I/O, i.e. a fully-resident / in-memory
24+
* source with no on-demand loading. Implementing this interface, rather than {@link CursorFactory} directly, declares
25+
* that intent and supplies the trivial {@link #makeCursorHolderAsync} implementation: the holder is built synchronously
26+
* and returned already completed.
27+
* <p>
28+
* Factories backed by, or wrapping, an on-demand source must implement {@link CursorFactory} directly and provide a
29+
* real {@link CursorFactory#makeCursorHolderAsync} that builds/awaits the holder asynchronously so they never block the
30+
* calling thread.
31+
*/
32+
public interface ResidentCursorFactory extends CursorFactory
33+
{
34+
@Override
35+
default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
36+
{
37+
return AsyncCursorHolder.completed(makeCursorHolder(spec));
38+
}
39+
}

processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import javax.annotation.Nullable;
3333
import java.util.List;
3434

35-
public class RowBasedCursorFactory<RowType> implements CursorFactory
35+
public class RowBasedCursorFactory<RowType> implements ResidentCursorFactory
3636
{
3737
private final Sequence<RowType> rowSequence;
3838
private final RowAdapter<RowType> rowAdapter;

processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -76,33 +76,86 @@ public UnnestCursorFactory(
7676

7777
@Override
7878
public CursorHolder makeCursorHolder(CursorBuildSpec spec)
79+
{
80+
final UnnestFilterSplit filterSplit = computeFilterSplit(spec);
81+
final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(spec, unnestColumn, filterSplit.getBaseTableFilter());
82+
83+
final Closer closer = Closer.create();
84+
// base holder is built lazily on first asCursor()/getOrdering() and registered for close at that point
85+
final Supplier<CursorHolder> baseHolderSupplier = Suppliers.memoize(
86+
() -> closer.register(baseCursorFactory.makeCursorHolder(unnestBuildSpec))
87+
);
88+
return unnestCursorHolder(spec, filterSplit, closer, baseHolderSupplier);
89+
}
90+
91+
@Override
92+
public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
93+
{
94+
final UnnestFilterSplit filterSplit = computeFilterSplit(spec);
95+
final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(spec, unnestColumn, filterSplit.getBaseTableFilter());
96+
97+
// Build the base-table holder asynchronously (a partial base segment downloads its required columns here), then
98+
// wrap the ready holder in the unnest holder. Closing the returned holder before it's ready cancels the base load.
99+
final AsyncCursorHolder baseAsync = baseCursorFactory.makeCursorHolderAsync(unnestBuildSpec);
100+
final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(baseAsync::close);
101+
baseAsync.addReadyCallback(() -> {
102+
final CursorHolder unnestHolder;
103+
try {
104+
// release() transfers ownership of the base holder to us (and surfaces a base-load failure as its cause); from
105+
// here the unnest holder owns closing it. Construction below can't throw, so the catch only fires on a base
106+
// failure or a cancel race (baseAsync already closed), neither of which leaves a base holder to leak.
107+
final CursorHolder baseHolder = baseAsync.release();
108+
final Closer closer = Closer.create();
109+
closer.register(baseHolder);
110+
unnestHolder = unnestCursorHolder(spec, filterSplit, closer, Suppliers.ofInstance(baseHolder));
111+
}
112+
catch (Throwable t) {
113+
asyncHolder.setException(t);
114+
return;
115+
}
116+
if (!asyncHolder.set(unnestHolder)) {
117+
// awaiter closed the wrapper while we were producing the holder; close it so the base holder doesn't leak
118+
unnestHolder.close();
119+
}
120+
});
121+
return asyncHolder;
122+
}
123+
124+
/**
125+
* Split the spec's filters into base-table and post-unnest filters (see
126+
* {@link #computeBaseAndPostUnnestFilters}). Cheap and metadata-only; shared by the sync and async holder paths.
127+
*/
128+
private UnnestFilterSplit computeFilterSplit(CursorBuildSpec spec)
79129
{
80130
final String input = getUnnestInputIfDirectAccess(unnestColumn);
81-
final UnnestFilterSplit filterSplit = computeBaseAndPostUnnestFilters(
131+
return computeBaseAndPostUnnestFilters(
82132
spec.getFilter(),
83133
filter != null ? filter.toFilter() : null,
84134
spec.getVirtualColumns(),
85135
unnestColumn,
86136
input,
87137
input == null ? null : spec.getVirtualColumns().getColumnCapabilitiesWithFallback(baseCursorFactory, input)
88138
);
89-
final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(
90-
spec,
91-
unnestColumn,
92-
filterSplit.getBaseTableFilter()
93-
);
139+
}
94140

141+
/**
142+
* Build the unnest {@link CursorHolder} on top of a base-table holder. {@code baseHolderSupplier} provides the base
143+
* holder: the sync path builds it lazily on first use and registers it with {@code closer}, while the async path
144+
* supplies an already-materialized holder pre-registered with {@code closer}.
145+
*/
146+
private CursorHolder unnestCursorHolder(
147+
CursorBuildSpec spec,
148+
UnnestFilterSplit filterSplit,
149+
Closer closer,
150+
Supplier<CursorHolder> baseHolderSupplier
151+
)
152+
{
95153
return new CursorHolder()
96154
{
97-
final Closer closer = Closer.create();
98-
final Supplier<CursorHolder> cursorHolderSupplier = Suppliers.memoize(
99-
() -> closer.register(baseCursorFactory.makeCursorHolder(unnestBuildSpec))
100-
);
101-
102155
@Override
103156
public Cursor asCursor()
104157
{
105-
final Cursor cursor = cursorHolderSupplier.get().asCursor();
158+
final Cursor cursor = baseHolderSupplier.get().asCursor();
106159
if (cursor == null) {
107160
return null;
108161
}
@@ -135,7 +188,7 @@ public Cursor asCursor()
135188
@Override
136189
public List<OrderBy> getOrdering()
137190
{
138-
return computeOrdering(cursorHolderSupplier.get().getOrdering());
191+
return computeOrdering(baseHolderSupplier.get().getOrdering());
139192
}
140193

141194
@Override

processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.apache.druid.segment.ConcatenatingCursor;
3131
import org.apache.druid.segment.Cursor;
3232
import org.apache.druid.segment.CursorBuildSpec;
33-
import org.apache.druid.segment.CursorFactory;
3433
import org.apache.druid.segment.CursorHolder;
3534
import org.apache.druid.segment.EmptyCursorHolder;
35+
import org.apache.druid.segment.ResidentCursorFactory;
3636
import org.apache.druid.segment.column.ColumnCapabilities;
3737
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
3838
import org.apache.druid.segment.column.ColumnType;
@@ -50,7 +50,7 @@
5050
import java.util.Arrays;
5151
import java.util.List;
5252

53-
public class IncrementalIndexCursorFactory implements CursorFactory
53+
public class IncrementalIndexCursorFactory implements ResidentCursorFactory
5454
{
5555
private static final ColumnCapabilities.CoercionLogic COERCE_LOGIC =
5656
new ColumnCapabilities.CoercionLogic()

0 commit comments

Comments
 (0)