Skip to content

Commit 633ccd4

Browse files
authored
feat: on-demand partial segment loading in SegmentLocalCacheManager (#19535)
changes: * adds `acquirePartialSegment` / `acquireCachedPartialSegment` to `SegmentCacheManager` and `SegmentManager` to allow callers to opt-in to async partial segments; MSQ `RegularLoadableSegment` uses the new partial path * `PartialSegmentMetadataCacheEntry`, `PartialSegmentBundleCacheEntry` are now wired into `SegmentLocalCacheManager`, along with added `PartialBundleAcquirer` helper to pass things like download thread pool and the ability to acquire reference holds on cache entrie * adds `PartialQueryableIndexSegment`, `PartialQueryableIndexCursorFactory`, `V10TimeBoundaryInspector` for references acquired from the now wired up metadata cache entry, implementing the async cursor holder contract * moved `DirectoryBackedRangeReader` out of tests to use as the implementation for `LocalLoadSpec` range reader. * add `FilteredCursorFactory`/`RestrictedCursorFactory` async cursor implementations * adds config `druid.segmentCache.virtualStoragePartialDownloadsEnabled to enable feature, of by default
1 parent 2e61c71 commit 633ccd4

64 files changed

Lines changed: 5748 additions & 618 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class QueryVirtualStorageTest extends EmbeddedClusterTestBase
7373
private static final long SIZE_BYTES = 3778338L;
7474
private static final long CACHE_SIZE = HumanReadableBytes.parse("1MiB");
7575
private static final long MAX_SIZE = HumanReadableBytes.parse("100MiB");
76+
private static final long ESTIMATE_SIZE = HumanReadableBytes.parse("2KiB");
7677

7778
private final EmbeddedBroker broker = new EmbeddedBroker();
7879
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
@@ -89,6 +90,11 @@ public EmbeddedDruidCluster createCluster()
8990
{
9091
historical.setServerMemory(500_000_000)
9192
.addProperty("druid.segmentCache.virtualStorage", "true")
93+
.addProperty("druid.segmentCache.virtualStoragePartialDownloadsEnabled", "true")
94+
.addProperty(
95+
"druid.segmentCache.virtualStorageMetadataReservationEstimate",
96+
String.valueOf(ESTIMATE_SIZE)
97+
)
9298
.addProperty("druid.segmentCache.virtualStorageLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors()))
9399
.addBeforeStartHook(
94100
(cluster, self) -> self.addProperty(
@@ -155,8 +161,10 @@ void testQueryTooMuchData()
155161
RuntimeException.class,
156162
() -> cluster.runSql("select count(*) from \"%s\"", dataSource)
157163
);
158-
Assertions.assertTrue(t.getMessage().contains("Unable to load segment"));
159-
Assertions.assertTrue(t.getMessage().contains("] on demand, ensure enough disk space has been allocated to load all segments involved in the query"));
164+
Assertions.assertTrue(t.getMessage().contains("Unable to reserve bundle"));
165+
Assertions.assertTrue(t.getMessage()
166+
.contains(
167+
"ensure enough disk space has been allocated to load all segments involved in the query"));
160168
}
161169

162170
@Test
@@ -296,11 +304,13 @@ void testQueryTooMuchDataButWithDart()
296304
MatcherAssert.assertThat(inputChannelSums.bytes(), Matchers.lessThanOrEqualTo(SIZE_BYTES));
297305

298306
// Verify stage 0 (segment read) VSF load counters
299-
MatcherAssert.assertThat(inputChannelSums.loadFiles(), Matchers.greaterThan(0L));
307+
// partial loading is only partially metered at the moment, so depending on how stuff landed in and was evicted
308+
// from the cache,there can be 0 loads (because loads are currently only counted when the metadata entry is mounted)
309+
MatcherAssert.assertThat(inputChannelSums.loadFiles(), Matchers.greaterThanOrEqualTo(0L));
300310
MatcherAssert.assertThat(inputChannelSums.loadFiles(), Matchers.lessThanOrEqualTo(24L));
301-
MatcherAssert.assertThat(inputChannelSums.loadTime(), Matchers.greaterThan(0L));
302-
MatcherAssert.assertThat(inputChannelSums.loadWait(), Matchers.greaterThan(0L));
303-
MatcherAssert.assertThat(inputChannelSums.loadBytes(), Matchers.greaterThan(0L));
311+
MatcherAssert.assertThat(inputChannelSums.loadTime(), Matchers.greaterThanOrEqualTo(0L));
312+
MatcherAssert.assertThat(inputChannelSums.loadWait(), Matchers.greaterThanOrEqualTo(0L));
313+
MatcherAssert.assertThat(inputChannelSums.loadBytes(), Matchers.greaterThanOrEqualTo(0L));
304314
MatcherAssert.assertThat(inputChannelSums.loadBytes(), Matchers.lessThanOrEqualTo(SIZE_BYTES));
305315
}
306316

@@ -322,7 +332,7 @@ private void assertQueryMetrics(int expectedEventCount, @Nullable Long expectedL
322332

323333
long loadCount = getMetricLatestValue(emitter, DefaultQueryMetrics.QUERY_ON_DEMAND_LOAD_COUNT, expectedEventCount);
324334
if (expectedLoadCount != null) {
325-
Assertions.assertEquals(expectedLoadCount, loadCount);
335+
MatcherAssert.assertThat(loadCount, Matchers.lessThanOrEqualTo(expectedLoadCount));
326336
}
327337
boolean hasLoads = loadCount > 0;
328338

embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.druid.segment.Segment;
5656
import org.apache.druid.segment.SegmentMapFunction;
5757
import org.apache.druid.segment.SegmentReference;
58+
import org.apache.druid.segment.loading.AcquireMode;
5859
import org.apache.druid.server.SegmentManager;
5960
import org.apache.druid.server.ServerManager;
6061
import org.apache.druid.server.initialization.ServerConfig;
@@ -225,7 +226,7 @@ protected LeafSegmentsBundle getLeafSegmentsBundle(Query<T> query, SegmentMapFun
225226
missingSegments.add(segment.getDescriptor());
226227
continue;
227228
}
228-
Optional<Segment> ref = segmentManager.acquireCachedSegment(dataSegment);
229+
Optional<Segment> ref = segmentManager.acquireCachedSegment(dataSegment, AcquireMode.FULL);
229230
if (ref.isPresent()) {
230231
segmentReferences.add(
231232
new SegmentReference(

indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import org.apache.druid.segment.indexing.CombinedDataSchema;
9696
import org.apache.druid.segment.indexing.DataSchema;
9797
import org.apache.druid.segment.indexing.TuningConfig;
98+
import org.apache.druid.segment.loading.AcquireMode;
9899
import org.apache.druid.segment.loading.AcquireSegmentAction;
99100
import org.apache.druid.segment.loading.SegmentCacheManager;
100101
import org.apache.druid.segment.projections.AggregateProjectionSchema;
@@ -890,7 +891,8 @@ private static ResourceHolder<QueryableIndex> fetchSegmentInternal(
890891
{
891892
final Closer closer = Closer.create();
892893
try {
893-
final AcquireSegmentAction acquireAction = closer.register(segmentCacheManager.acquireSegment(dataSegment));
894+
final AcquireSegmentAction acquireAction =
895+
closer.register(segmentCacheManager.acquireSegment(dataSegment, AcquireMode.FULL));
894896
final ReferenceCountedObjectProvider<Segment> segmentProvider =
895897
FutureUtils.getUnchecked(acquireAction.getSegmentFuture(), true).getReferenceProvider();
896898
final Segment segment = segmentProvider.acquireReference().map(closer::register).get();

indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132
import org.apache.druid.segment.indexing.DataSchema;
133133
import org.apache.druid.segment.indexing.TuningConfig;
134134
import org.apache.druid.segment.join.NoopJoinableFactory;
135+
import org.apache.druid.segment.loading.AcquireMode;
135136
import org.apache.druid.segment.loading.AcquireSegmentAction;
136137
import org.apache.druid.segment.loading.AcquireSegmentResult;
137138
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
@@ -1978,7 +1979,7 @@ public void load(DataSegment segment)
19781979
}
19791980

19801981
@Override
1981-
public Optional<Segment> acquireCachedSegment(SegmentId segmentId)
1982+
public Optional<Segment> acquireCachedSegment(SegmentId segmentId, AcquireMode acquireMode)
19821983
{
19831984
for (Map.Entry<DataSegment, File> entry : segments.entrySet()) {
19841985
if (entry.getKey().getId().equals(segmentId)) {
@@ -1991,7 +1992,7 @@ public Optional<Segment> acquireCachedSegment(SegmentId segmentId)
19911992
}
19921993

19931994
@Override
1994-
public AcquireSegmentAction acquireSegment(DataSegment dataSegment)
1995+
public AcquireSegmentAction acquireSegment(DataSegment dataSegment, AcquireMode acquireMode)
19951996
{
19961997
final Segment segment =
19971998
new QueryableIndexSegment(indexIO.loadIndex(segments.get(dataSegment)), dataSegment.getId());

indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.druid.segment.Segment;
5454
import org.apache.druid.segment.TestIndex;
5555
import org.apache.druid.segment.indexing.DataSchema;
56+
import org.apache.druid.segment.loading.AcquireMode;
5657
import org.apache.druid.segment.loading.SegmentCacheManager;
5758
import org.apache.druid.segment.loading.SegmentLoadingException;
5859
import org.apache.druid.segment.loading.TombstoneLoadSpec;
@@ -260,7 +261,7 @@ private Segment loadSegment(DataSegment dataSegment, File tempSegmentDir)
260261
.manufacturate(tempSegmentDir, null, false);
261262
try {
262263
cacheManager.load(dataSegment);
263-
return cacheManager.acquireCachedSegment(dataSegment.getId()).orElseThrow();
264+
return cacheManager.acquireCachedSegment(dataSegment.getId(), AcquireMode.FULL).orElseThrow();
264265
}
265266
catch (SegmentLoadingException e) {
266267
throw new RuntimeException(e);

multi-stage-query/src/main/java/org/apache/druid/msq/input/AdaptedLoadableSegment.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.druid.query.SegmentDescriptor;
3232
import org.apache.druid.segment.ReferenceCountedSegmentProvider;
3333
import org.apache.druid.segment.Segment;
34+
import org.apache.druid.segment.loading.AcquireMode;
3435
import org.apache.druid.segment.loading.AcquireSegmentAction;
3536
import org.apache.druid.segment.loading.AcquireSegmentResult;
3637
import org.apache.druid.timeline.DataSegment;
@@ -129,16 +130,21 @@ public String description()
129130
}
130131

131132
/**
132-
* Adapted segments are not managed by SegmentManager, so they are never cached.
133+
* Adapted segments are not managed by SegmentManager, so they are never cached. The {@code acquireMode} is ignored:
134+
* an adapted segment is produced by its own async supplier, not the cache manager's full-vs-partial machinery.
133135
*/
134136
@Override
135-
public Optional<Segment> acquireIfCached()
137+
public Optional<Segment> acquireIfCached(AcquireMode acquireMode)
136138
{
137139
return Optional.empty();
138140
}
139141

142+
/**
143+
* The {@code acquireMode} is ignored: an adapted segment is produced by its own async supplier, not the cache
144+
* manager's full-vs-partial machinery.
145+
*/
140146
@Override
141-
public AcquireSegmentAction acquire()
147+
public AcquireSegmentAction acquire(AcquireMode acquireMode)
142148
{
143149
if (!acquired.compareAndSet(false, true)) {
144150
throw DruidException.defensive("Segment with descriptor[%s] is already acquired", descriptor);

multi-stage-query/src/main/java/org/apache/druid/msq/input/LoadableSegment.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.druid.error.DruidException;
2525
import org.apache.druid.query.SegmentDescriptor;
2626
import org.apache.druid.segment.Segment;
27+
import org.apache.druid.segment.loading.AcquireMode;
2728
import org.apache.druid.segment.loading.AcquireSegmentAction;
2829
import org.apache.druid.server.SegmentManager;
2930
import org.apache.druid.timeline.DataSegment;
@@ -37,7 +38,8 @@
3738
* {@link PhysicalInputSlice#getLoadableSegments()}, which are in turn generated by {@link InputSliceReader#attach}.
3839
*
3940
* These objects do not hold resources and do not have a lifecycle. They are just pointers to data that hasn't
40-
* been acquired yet. Once {@link #acquire()} or {@link #acquireIfCached()} is called, the lifecycle begins.
41+
* been acquired yet. Once {@link #acquire(AcquireMode)} or {@link #acquireIfCached(AcquireMode)} is called, the
42+
* lifecycle begins.
4143
*
4244
* @see RegularLoadableSegment for segments loaded via {@link SegmentManager}
4345
* @see AdaptedLoadableSegment for segments adapted from other data sources
@@ -61,23 +63,35 @@ public interface LoadableSegment
6163
*
6264
* If the returned {@link Optional} is present, the caller is responsible for closing the {@link Segment}
6365
* when finished with it. If the returned {@link Optional} is empty, the segment is not cached and the caller
64-
* should use {@link #acquire()} to load it.
66+
* should use {@link #acquire(AcquireMode)} to load it.
67+
*
68+
* The {@code acquireMode} selects what counts as a cached hit and how the segment is mounted; see {@link AcquireMode}.
69+
* With {@link AcquireMode#PARTIAL} the returned {@link Segment} is mounted but, for a partial-download (virtual
70+
* storage) segment, may not be fully downloaded yet: its data is fetched on demand at cursor-build time. Callers must
71+
* therefore access it through the async API ({@link org.apache.druid.segment.CursorFactory#makeCursorHolderAsync})
72+
* rather than the synchronous {@code makeCursorHolder}, which refuses to download on a processing thread.
6573
*
6674
* @return an Optional containing the cached Segment if available, or empty if not cached
6775
*/
68-
Optional<Segment> acquireIfCached();
76+
Optional<Segment> acquireIfCached(AcquireMode acquireMode);
6977

7078
/**
7179
* Acquire the actual segment. Non-blocking operation. Once this is called, callers are responsible for closing the
7280
* {@link AcquireSegmentAction}.
7381
*
82+
* The {@code acquireMode} selects how the segment is loaded; see {@link AcquireMode}. With {@link AcquireMode#PARTIAL}
83+
* the returned {@link Segment} is mounted but, for a partial-download (virtual storage) segment, may not be fully
84+
* downloaded yet: its data is fetched on demand at cursor-build time. Callers must therefore access it through the
85+
* async API ({@link org.apache.druid.segment.CursorFactory#makeCursorHolderAsync}) rather than the
86+
* synchronous {@code makeCursorHolder}, which refuses to download on a processing thread.
87+
*
7488
* @throws DruidException if the segment has already been acquired
7589
*/
76-
AcquireSegmentAction acquire();
90+
AcquireSegmentAction acquire(AcquireMode acquireMode);
7791

7892
/**
7993
* Returns a future for the {@link DataSegment} object. For {@link RegularLoadableSegment}, the future is created
80-
* lazily when this method is first called, or when a segment is acquired through {@link #acquire()}.
94+
* lazily when this method is first called, or when a segment is acquired through {@link #acquire(AcquireMode)}.
8195
* For {@link AdaptedLoadableSegment}, this returns a failed future.
8296
*/
8397
ListenableFuture<DataSegment> dataSegmentFuture();

multi-stage-query/src/main/java/org/apache/druid/msq/input/RegularLoadableSegment.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.druid.query.SegmentDescriptor;
3333
import org.apache.druid.query.TableDataSource;
3434
import org.apache.druid.segment.Segment;
35+
import org.apache.druid.segment.loading.AcquireMode;
3536
import org.apache.druid.segment.loading.AcquireSegmentAction;
3637
import org.apache.druid.server.SegmentManager;
3738
import org.apache.druid.timeline.DataSegment;
@@ -127,13 +128,13 @@ public String description()
127128
}
128129

129130
@Override
130-
public synchronized Optional<Segment> acquireIfCached()
131+
public synchronized Optional<Segment> acquireIfCached(AcquireMode acquireMode)
131132
{
132133
if (acquired) {
133134
throw DruidException.defensive("Segment with descriptor[%s] is already acquired", descriptor);
134135
}
135136

136-
final Optional<Segment> cachedSegment = segmentManager.acquireCachedSegment(segmentId);
137+
final Optional<Segment> cachedSegment = segmentManager.acquireCachedSegment(segmentId, acquireMode);
137138
if (cachedSegment.isPresent()) {
138139
acquired = true;
139140

@@ -148,7 +149,7 @@ public synchronized Optional<Segment> acquireIfCached()
148149
}
149150

150151
@Override
151-
public synchronized AcquireSegmentAction acquire()
152+
public synchronized AcquireSegmentAction acquire(AcquireMode acquireMode)
152153
{
153154
if (acquired) {
154155
throw DruidException.defensive("Segment with descriptor[%s] is already acquired", descriptor);
@@ -157,7 +158,7 @@ public synchronized AcquireSegmentAction acquire()
157158
acquired = true;
158159

159160
if (cachedDataSegment != null) {
160-
final AcquireSegmentAction action = segmentManager.acquireSegment(cachedDataSegment);
161+
final AcquireSegmentAction action = segmentManager.acquireSegment(cachedDataSegment, acquireMode);
161162
return new AcquireSegmentAction(
162163
() -> LoadableSegmentUtils.countedLoad(
163164
action.getSegmentFuture(),
@@ -177,7 +178,7 @@ public synchronized AcquireSegmentAction acquire()
177178
Suppliers.memoize(() -> FutureUtils.transformAsync(
178179
dataSegmentFutureSupplier.get(),
179180
dataSegment -> LoadableSegmentUtils.countedLoad(
180-
closer.register(segmentManager.acquireSegment(dataSegment)).getSegmentFuture(),
181+
closer.register(segmentManager.acquireSegment(dataSegment, acquireMode)).getSegmentFuture(),
181182
dataSegment.getSize(),
182183
inputCounters
183184
)

multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafStageProcessor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.druid.query.filter.SegmentPruner;
5656
import org.apache.druid.query.planning.ExecutionVertex;
5757
import org.apache.druid.segment.SegmentMapFunction;
58+
import org.apache.druid.segment.loading.AcquireMode;
5859
import org.apache.druid.utils.CollectionUtils;
5960

6061
import javax.annotation.Nullable;
@@ -263,6 +264,23 @@ protected List<PhysicalInputSlice> filterBaseInput(final List<PhysicalInputSlice
263264
return slices;
264265
}
265266

267+
268+
/**
269+
* Create the {@link ReadableInputQueue} for this stage's base inputs. Override to change how the queue acquires
270+
* segments: the default acquires them fully up front ({@link AcquireMode#FULL}), which is safe for any processor.
271+
* Leaf processors that read segments through the async cursor API
272+
* ({@link org.apache.druid.segment.CursorFactory#makeCursorHolderAsync}) can override to {@link AcquireMode#PARTIAL}
273+
* so only the columns a query actually touches are downloaded.
274+
*/
275+
protected ReadableInputQueue makeReadableInputQueue(
276+
final StandardPartitionReader partitionReader,
277+
final List<PhysicalInputSlice> slices,
278+
final int loadahead
279+
)
280+
{
281+
return new ReadableInputQueue(partitionReader, slices, loadahead, AcquireMode.FULL);
282+
}
283+
266284
/**
267285
* Read base inputs, where "base" is meant in the same sense as in {@link ExecutionVertex}: the primary datasource
268286
* that drives query processing.
@@ -293,7 +311,7 @@ private ReadableInputQueue makeBaseInputQueue(
293311
}
294312

295313
final List<PhysicalInputSlice> filteredSlices = filterBaseInput(physicalInputSlices);
296-
return new ReadableInputQueue(
314+
return makeReadableInputQueue(
297315
new StandardPartitionReader(context),
298316
filteredSlices,
299317
context.segmentLoadAheadCount()

multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ReadableInputQueue.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.druid.msq.input.stage.ReadablePartition;
3535
import org.apache.druid.segment.Segment;
3636
import org.apache.druid.segment.SegmentReference;
37+
import org.apache.druid.segment.loading.AcquireMode;
3738
import org.apache.druid.segment.loading.AcquireSegmentAction;
3839
import org.apache.druid.utils.CloseableUtils;
3940

@@ -98,16 +99,25 @@ public class ReadableInputQueue implements Closeable
9899

99100
private final StandardPartitionReader partitionReader;
100101
private final int loadahead;
102+
103+
/**
104+
* How segments in this queue are acquired (fully up front, or partially with on-demand column loading at query
105+
* time). Threaded through to {@link LoadableSegment#acquire(AcquireMode)} and
106+
* {@link LoadableSegment#acquireIfCached(AcquireMode)}.
107+
*/
108+
private final AcquireMode acquireMode;
101109
private final AtomicBoolean started = new AtomicBoolean(false);
102110

103111
public ReadableInputQueue(
104112
final StandardPartitionReader partitionReader,
105113
final List<PhysicalInputSlice> slices,
106-
final int loadahead
114+
final int loadahead,
115+
final AcquireMode acquireMode
107116
)
108117
{
109118
this.partitionReader = partitionReader;
110119
this.loadahead = loadahead;
120+
this.acquireMode = acquireMode;
111121

112122
for (final PhysicalInputSlice slice : slices) {
113123
loadableSegments.addAll(slice.getLoadableSegments());
@@ -131,7 +141,7 @@ public void start()
131141
final List<LoadableSegment> toLoad = new ArrayList<>(); // Temporarily store all non-cached segments
132142
LoadableSegment loadableSegment;
133143
while ((loadableSegment = loadableSegments.poll()) != null) {
134-
final Optional<Segment> cachedSegment = loadableSegment.acquireIfCached();
144+
final Optional<Segment> cachedSegment = loadableSegment.acquireIfCached(acquireMode);
135145
if (cachedSegment.isPresent()) {
136146
final SegmentReferenceHolder holder = new SegmentReferenceHolder(
137147
new SegmentReference(loadableSegment.descriptor(), cachedSegment, null),
@@ -288,7 +298,7 @@ private ListenableFuture<ReadableInput> loadNextSegment()
288298
return null;
289299
}
290300

291-
final AcquireSegmentAction acquireSegmentAction = nextLoadableSegment.acquire();
301+
final AcquireSegmentAction acquireSegmentAction = nextLoadableSegment.acquire(acquireMode);
292302
loadingSegments.add(acquireSegmentAction);
293303
return FutureUtils.transform(
294304
acquireSegmentAction.getSegmentFuture(),

0 commit comments

Comments
 (0)