Skip to content

Commit 215f415

Browse files
authored
feat: partial segment cache infrastructure (#19496)
changes: * add `PartialSegmentMetadataCacheEntry` a `CacheEntry` that range-reads the V10 header on mount, constructs `PartialSegmentFileMapperV10`, and shrinks its reservation to actual on-disk size * add `PartialSegmentBundleCacheEntry` and `PartialSegmentBundleCacheEntryIdentifier` are `CacheEntry` associated with each file bundle of a v10 segment that sparse-allocates and evicts its containers as a unit; places holds metadata and transitive parent bundle entries holds via the `StorageLocation` methods (weak reference holds on the parent cache entries) and reference-counted usage references * add `PartialSegmentCacheBootstrap` a helper that restores partial-format entries from on-disk layout on historical startup (not wired up yet); cleans orphaned bundles * add `ResizableCacheEntry` interface and `StorageLocation.adjustReservation` (shrink-only) so the metadata entry can tighten its reservation post-mount * rename `SegmentFileBuilder.startFileGroup` → `startFileBundle`; introduce `ROOT_BUNDLE_NAME` as the default bundle for containers written without an explicit declaration * rename json field `SegmentFileContainerMetadata.fileGroup` → `bundle`; now non-null via getter, normalizes to `ROOT_BUNDLE_NAME` in the constructor, default value omitted from JSON using a custom `JsonInclude` filter * Extract shared `DirectoryBackedRangeReader` and `CountingRangeReader` test helpers; consolidate duplicates across processing + server tests
1 parent f0d06c4 commit 215f415

25 files changed

Lines changed: 4352 additions & 303 deletions

processing/src/main/java/org/apache/druid/segment/IndexMergerBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ protected Metadata makeProjections(
630630

631631
final String section2 = "build projection[" + projectionSchema.getName() + "] inverted index and columns";
632632
progress.startSection(section2);
633-
segmentFileBuilder.startFileGroup(projectionSchema.getName());
633+
segmentFileBuilder.startFileBundle(projectionSchema.getName());
634634
if (projectionSchema.getTimeColumnName() != null) {
635635
makeTimeColumn(
636636
segmentFileBuilder,

processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ protected File makeIndexFiles(
218218
/************ Create Inverted Indexes and Finalize Build Columns *************/
219219
final String section = "build inverted index and columns";
220220
progress.startSection(section);
221-
v10Smoosher.startFileGroup(Projections.BASE_TABLE_PROJECTION_NAME);
221+
v10Smoosher.startFileBundle(Projections.BASE_TABLE_PROJECTION_NAME);
222222
makeTimeColumn(v10Smoosher, progress, timeWriter, indexSpec, basePrefix + ColumnHolder.TIME_COLUMN_NAME);
223223
makeMetricsColumns(
224224
v10Smoosher,

processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java

Lines changed: 215 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.druid.java.util.common.FileUtils;
2929
import org.apache.druid.java.util.common.StringUtils;
3030
import org.apache.druid.java.util.common.io.Closer;
31+
import org.apache.druid.java.util.common.logger.Logger;
3132
import org.apache.druid.segment.data.CompressionStrategy;
3233
import org.apache.druid.segment.loading.SegmentRangeReader;
3334
import org.apache.druid.utils.CloseableUtils;
@@ -83,7 +84,13 @@
8384
*/
8485
public class PartialSegmentFileMapperV10 implements SegmentFileMapper
8586
{
86-
static final String METADATA_HEADER_SUFFIX = ".header";
87+
private static final Logger LOG = new Logger(PartialSegmentFileMapperV10.class);
88+
89+
/**
90+
* Suffix appended to the target filename to form the local header file. Public so cache-manager components can
91+
* recognize the partial-download on-disk layout during bootstrap restore and reservation cleanup.
92+
*/
93+
public static final String METADATA_HEADER_SUFFIX = ".header";
8794

8895
/**
8996
* Create (or restore) a lazy mapper for the main segment file with attached external file mappers. If persisted state
@@ -146,9 +153,16 @@ static PartialSegmentFileMapperV10 createForFile(
146153
bitmapBuffer = mmapBitmap(headerFile, result);
147154
}
148155
catch (Exception e) {
149-
// corrupted file (partial write, truncated bitmap, bad JSON, etc.) delete and re-fetch
156+
// corrupted file (partial write, truncated bitmap, bad JSON, etc.), delete and re-fetch
150157
result = null;
151-
headerFile.delete();
158+
if (!headerFile.delete()) {
159+
LOG.warn(
160+
e,
161+
"Failed to delete corrupted header file[%s] for [%s]; will be overwritten by re-fetch",
162+
headerFile,
163+
targetFilename
164+
);
165+
}
152166
}
153167
}
154168

@@ -167,7 +181,32 @@ static PartialSegmentFileMapperV10 createForFile(
167181
bitmapBuffer
168182
);
169183

170-
// restore downloaded files from the bitmap
184+
// bitmap-vs-container repair pre-pass: if the bitmap claims a file is downloaded but its container file is
185+
// missing on disk, the bitmap is lying (e.g. partial-cache eviction that cleared containers but couldn't atomically
186+
// clear bits, or external file-system damage). Clear those bits before the restore loop so we don't spuriously
187+
// sparse-allocate empty containers in the restore loop's ensureContainerInitialized call and treat their files as
188+
// downloaded.
189+
for (int i = 0; i < mapper.sortedFileNames.size(); i++) {
190+
final int byteIndex = i / 8;
191+
final int bitMask = 1 << (i % 8);
192+
if ((bitmapBuffer.get(byteIndex) & bitMask) == 0) {
193+
continue;
194+
}
195+
final String name = mapper.sortedFileNames.get(i);
196+
final SegmentInternalFileMetadata fileMetadata = result.getMetadata().getFiles().get(name);
197+
if (fileMetadata == null) {
198+
continue;
199+
}
200+
final File containerFile = new File(
201+
localCacheDir,
202+
StringUtils.format("%s.container.%05d", targetFilename, fileMetadata.getContainer())
203+
);
204+
if (!containerFile.exists()) {
205+
bitmapBuffer.put(byteIndex, (byte) (bitmapBuffer.get(byteIndex) & ~bitMask));
206+
}
207+
}
208+
209+
// restore downloaded files from the (now-repaired) bitmap
171210
for (int i = 0; i < mapper.sortedFileNames.size(); i++) {
172211
final int byteIndex = i / 8;
173212
final int bitIndex = i % 8;
@@ -249,6 +288,57 @@ public SegmentFileMetadata getSegmentFileMetadata()
249288
return metadata;
250289
}
251290

291+
/**
292+
* Names of the external segment files attached to this mapper (each one is its own {@link PartialSegmentFileMapperV10}
293+
* accessible via {@link #getExternalMapper}). Empty for mappers with no externals.
294+
*/
295+
public Set<String> getExternalFilenames()
296+
{
297+
return externalMappers.keySet();
298+
}
299+
300+
/**
301+
* Look up the child mapper for an external segment file. Returns {@code null} if no external with that name is
302+
* attached. Cache-layer callers use this to walk external files' {@link SegmentFileMetadata} and route
303+
* {@link #initializeContainer} / {@link #evictContainer} calls to the right physical file.
304+
*/
305+
@Nullable
306+
public PartialSegmentFileMapperV10 getExternalMapper(String externalFilename)
307+
{
308+
return externalMappers.get(externalFilename);
309+
}
310+
311+
/**
312+
* Resolve {@code this} when {@code externalFilename} is null (main file), otherwise the named external child
313+
* mapper. Throws if the external is not attached. Useful for routing container operations from cache-layer code
314+
* that holds {@code (externalFilename, containerIndex)} refs.
315+
*/
316+
public PartialSegmentFileMapperV10 mapperForContainer(@Nullable String externalFilename)
317+
{
318+
if (externalFilename == null) {
319+
return this;
320+
}
321+
final PartialSegmentFileMapperV10 external = externalMappers.get(externalFilename);
322+
if (external == null) {
323+
throw DruidException.defensive(
324+
"External mapper[%s] is not attached to this mapper for [%s]",
325+
externalFilename,
326+
targetFilename
327+
);
328+
}
329+
return external;
330+
}
331+
332+
/**
333+
* The {@code targetFilename} this mapper writes/reads to/from inside the cache directory. For the entry-point
334+
* mapper this is e.g. {@link org.apache.druid.segment.IndexIO#V10_FILE_NAME}; for an external child mapper it's
335+
* the external file's name.
336+
*/
337+
public String getTargetFilename()
338+
{
339+
return targetFilename;
340+
}
341+
252342
@Override
253343
public Set<String> getInternalFilenames()
254344
{
@@ -290,8 +380,8 @@ public ByteBuffer mapExternalFile(String filename, String name) throws IOExcepti
290380

291381
/**
292382
* Pre-download a set of internal files so that subsequent {@link #mapFile(String)} calls for these files will not
293-
* trigger individual downloads. Files that are already downloaded are skipped. This is useful for batch-downloading
294-
* all files for a projection at once.
383+
* trigger individual downloads. Files that are already downloaded are skipped. Useful for batch-downloading all
384+
* files in a bundle at once (see {@link SegmentFileBuilder#startFileBundle}).
295385
*/
296386
public void ensureFilesAvailable(Set<String> fileNames) throws IOException
297387
{
@@ -303,6 +393,27 @@ public void ensureFilesAvailable(Set<String> fileNames) throws IOException
303393
}
304394
}
305395

396+
/**
397+
* Total on-disk size of the header file(s) backing this mapper, summed across the main file and any external file
398+
* mappers. This is the actual reservation size that should be charged against the local cache once the metadata has
399+
* been fetched and persisted; callers can compare it against an up-front pessimistic estimate to decide whether to
400+
* shrink the reservation.
401+
*/
402+
public long getOnDiskHeaderSize()
403+
{
404+
long total = headerFileSize(localCacheDir, targetFilename);
405+
for (PartialSegmentFileMapperV10 ext : externalMappers.values()) {
406+
total += headerFileSize(ext.localCacheDir, ext.targetFilename);
407+
}
408+
return total;
409+
}
410+
411+
private static long headerFileSize(File dir, String filename)
412+
{
413+
final File header = new File(dir, filename + METADATA_HEADER_SUFFIX);
414+
return header.exists() ? header.length() : 0;
415+
}
416+
306417
/**
307418
* Total bytes downloaded so far across all internal files, including external mappers.
308419
*/
@@ -384,6 +495,104 @@ private void ensureFileDownloaded(String name, SegmentInternalFileMetadata fileM
384495
}
385496
}
386497

498+
/**
499+
* Public entry point for cache-layer code that wants to ensure a container is materialized before any data is
500+
* downloaded into it (e.g. when a per-bundle cache entry is mounted, the entry pre-allocates its container files
501+
* so that subsequent {@link #mapFile} calls have somewhere to write into and the cache layer can charge the
502+
* reservation up front).
503+
*/
504+
public void initializeContainer(int containerIndex) throws IOException
505+
{
506+
checkClosed();
507+
ensureContainerInitialized(containerIndex);
508+
}
509+
510+
/**
511+
* Reverse of {@link #initializeContainer(int)}: unmap the in-memory view of the container, delete the local
512+
* container file, and clear the bitmap bits + {@link #downloadedFiles} entries for every internal file that lived
513+
* in this container.
514+
* <p>
515+
* Used by per-bundle cache entries on unmount/eviction to release the disk and memory footprint of one bundle
516+
* without affecting other bundles sharing the same {@link PartialSegmentFileMapperV10}. After eviction, subsequent
517+
* {@link #mapFile} calls for files in this container will re-trigger downloads via {@link #initializeContainer}
518+
* and the bitmap will be repopulated incrementally.
519+
* <p>
520+
* <b>Concurrency contract.</b> The caller is responsible for ensuring no concurrent {@link #mapFile} (or
521+
* {@link #ensureFilesAvailable}) call is in flight for any file in this container. This is enforced one layer up
522+
* by the cache-entry refcount: {@code PartialSegmentBundleCacheEntry} only invokes {@code evictContainer} from its
523+
* {@code doActualUnmount} callback, which fires only after every reference acquired via {@code acquireReference()}
524+
* has been closed. Bypassing that gate is dangerous, {@link ByteBufferUtils#unmap} frees the off-heap mapping, so a
525+
* {@link ByteBuffer#slice} from a concurrent reader is a JVM SIGSEGV, not a recoverable error.
526+
* <p>
527+
* No-op if the container has not been initialized.
528+
*/
529+
public void evictContainer(int containerIndex)
530+
{
531+
checkClosed();
532+
containerLocks[containerIndex].lock();
533+
try {
534+
final MappedByteBuffer existing = containers[containerIndex];
535+
if (existing != null) {
536+
ByteBufferUtils.unmap(existing);
537+
containers[containerIndex] = null;
538+
}
539+
// Try the cached containerFiles[i] first. If it's null, the container was never initialized in this mapper
540+
// instance (typical right after create() with an empty bitmap), but the on-disk file may still exist from a
541+
// previous run. Fall back to the deterministic path so eviction is always effective.
542+
File containerFile = containerFiles[containerIndex];
543+
if (containerFile == null) {
544+
containerFile = new File(
545+
localCacheDir,
546+
StringUtils.format("%s.container.%05d", targetFilename, containerIndex)
547+
);
548+
}
549+
if (containerFile.exists() && !containerFile.delete()) {
550+
LOG.warn(
551+
"Failed to delete container file[%s] during eviction of container[%d] for [%s]; leaking on disk",
552+
containerFile,
553+
containerIndex,
554+
targetFilename
555+
);
556+
}
557+
containerFiles[containerIndex] = null;
558+
}
559+
finally {
560+
containerLocks[containerIndex].unlock();
561+
}
562+
563+
// clear bitmap bits + downloadedFiles entries for files that lived in this container. Iterates
564+
// metadata.getFiles() without external synchronization: SegmentFileMetadata is constructed once at mapper
565+
// creation and its file map is effectively immutable for the mapper's lifetime, so concurrent iteration is safe.
566+
for (Map.Entry<String, SegmentInternalFileMetadata> entry : metadata.getFiles().entrySet()) {
567+
if (entry.getValue().getContainer() != containerIndex) {
568+
continue;
569+
}
570+
final String fileName = entry.getKey();
571+
if (downloadedFiles.remove(fileName)) {
572+
downloadedBytes.addAndGet(-entry.getValue().getSize());
573+
}
574+
clearBitmapBit(fileName);
575+
}
576+
}
577+
578+
private void clearBitmapBit(String name)
579+
{
580+
final Integer index = fileNameToIndex.get(name);
581+
if (index == null) {
582+
return;
583+
}
584+
final int byteIndex = index / 8;
585+
final int bitMask = 1 << (index % 8);
586+
bitmapLock.lock();
587+
try {
588+
final byte existing = bitmapBuffer.get(byteIndex);
589+
bitmapBuffer.put(byteIndex, (byte) (existing & ~bitMask));
590+
}
591+
finally {
592+
bitmapLock.unlock();
593+
}
594+
}
595+
387596
/**
388597
* Initialize a local container file if not already done. Creates a sparse file at the original container size
389598
* and memory-maps it. The channel is closed immediately after mapping, the mmap persists independently, backed by

processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilder.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,25 +41,33 @@
4141
*/
4242
public interface SegmentFileBuilder extends Closeable
4343
{
44+
/**
45+
* Default bundle name for containers written without an explicit {@link #startFileBundle} call. Thinking of file
46+
* bundles as directories, this is the root directory that sits above any named subdirectories the writer declares.
47+
* Containers always carry a non-null bundle name; if the writer never calls {@code startFileBundle}, they are
48+
* tagged with this default. Cache-layer readers treat all containers sharing this name as one mount/evict unit.
49+
*/
50+
String ROOT_BUNDLE_NAME = "__root__";
51+
4452
/**
4553
* Add a column to the metadata of this segment file
4654
*/
4755
void addColumn(String name, ColumnDescriptor columnDescriptor);
4856

4957
/**
50-
* Declare that subsequent writes belong to a named group of files that should be stored together. This is a hint
58+
* Declare that subsequent writes belong to a named bundle of files that should be stored together. This is a hint
5159
* about physical layout, it does not constrain the names of files subsequently added, and implementations are free
5260
* to ignore it entirely (the default is a no-op for formats that don't organize data into coarse-grained
5361
* groupings). Projections are the primary caller today, but the mechanism is generic, it's equally applicable to
5462
* grouping internal metadata, data shared across columns, etc.
5563
* <p>
56-
* Callers should invoke this before writing each group's files; passing {@code null} clears the current group.
57-
* Callers should not invoke this while a writer returned by {@link #addWithChannel} is still open (implementations
58-
* may reject such calls).
64+
* Callers should invoke this before writing each bundle's files; passing {@code null} resets the current bundle to
65+
* the {@link #ROOT_BUNDLE_NAME} default. Callers should not invoke this while a writer returned by
66+
* {@link #addWithChannel} is still open (implementations may reject such calls).
5967
*
60-
* @see SegmentFileBuilderV10#startFileGroup(String) for the V10 semantics
68+
* @see SegmentFileBuilderV10#startFileBundle(String) for the V10 semantics
6169
*/
62-
default void startFileGroup(@Nullable String groupName)
70+
default void startFileBundle(@Nullable String bundleName)
6371
{
6472
}
6573

0 commit comments

Comments
 (0)