Skip to content

Commit 513464f

Browse files
author
Kavya Aggarwal
committed
Add skeleton structure for WritableWarm Directories and IndexInputs
Signed-off-by: Kavya Aggarwal <kavyaagg@amazon.com>
1 parent 4b0dc3d commit 513464f

19 files changed

Lines changed: 996 additions & 0 deletions

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3131

3232
- Add warmup phase to wait for lag to catch up in pull-based ingestion before serving ([#20526](https://github.com/opensearch-project/OpenSearch/pull/20526))
3333
- Add a new static method to IndicesOptions API to expose `STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED` index option ([#20980](https://github.com/opensearch-project/OpenSearch/pull/20980))
34+
- Add skeleton structure for tiered-storage module ([#21017](https://github.com/opensearch-project/OpenSearch/pull/21017))
35+
- Add skeleton structure for WritableWarm Directories and IndexInputs ([#21082](https://github.com/opensearch-project/OpenSearch/pull/21082))
3436

3537
### Changed
3638
- Make telemetry `Tags` immutable ([#20788](https://github.com/opensearch-project/OpenSearch/pull/20788))
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.storage.directory;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.lucene.store.Directory;
13+
import org.apache.lucene.store.FilterDirectory;
14+
import org.opensearch.index.IndexSettings;
15+
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
16+
import org.opensearch.threadpool.ThreadPool;
17+
18+
import java.io.IOException;
19+
import java.nio.file.Path;
20+
import java.util.Map;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
import java.util.function.Supplier;
23+
24+
/**
25+
* A directory implementation that stores files as blocks.
26+
* This directory manages operations on block files and exposes an interface to read
27+
* these block files in non block manner.
28+
* If a file is present as is, in a non block manner, this directory can manage such a file as well.
29+
* Non block files can get created if the files are created without the provided interfaces.
30+
*
31+
* FilterDirectory overrides (listAll, fileLength, deleteFile, openInput, copyFrom, etc.),
32+
* block file management, and metadata initialization will be added in the implementation PR.
33+
*/
34+
public class OSBlockHotDirectory extends FilterDirectory {
35+
36+
private Logger logger;
37+
private final RemoteSegmentStoreDirectory remoteSegmentStoreDirectory;
38+
private final IndexSettings indexSettings;
39+
private final Path directoryPath;
40+
// BlockTransferManager field will be added in the implementation PR
41+
private final int blockSizeShift;
42+
43+
/**
44+
* This map stores file length for logical files which are backed by block files.
45+
* Writes can happen to this directory outside the block context using createOutput or directly
46+
* on the FSDirectory path. Those file names will not be present in this map and will not be
47+
* treated as block files.
48+
*/
49+
protected final Map<String, Long> logicalFileLengthMap = new ConcurrentHashMap<>();
50+
51+
/**
52+
* Creates a new OSBlockHotDirectory instance with default block transfer manager.
53+
*
54+
* @param delegate The underlying directory to delegate operations to
55+
* @param remoteDirectory The remote directory for segment files
56+
* @param indexSettings Index-specific settings
57+
* @param threadPoolSupplier Supplier for the thread pool used for async block downloads
58+
* @throws IllegalArgumentException if no FSDirectory is found in delegate chain
59+
*/
60+
public OSBlockHotDirectory(
61+
Directory delegate,
62+
Directory remoteDirectory,
63+
IndexSettings indexSettings,
64+
Supplier<ThreadPool> threadPoolSupplier
65+
) throws IOException {
66+
super(delegate);
67+
throw new UnsupportedOperationException("Not yet implemented");
68+
}
69+
70+
/**
71+
* Creates a new OSBlockHotDirectory instance with a custom block transfer manager.
72+
*
73+
* @param delegate The underlying directory to delegate operations to
74+
* @param remoteDirectory The remote directory for storing segments
75+
* @param indexSettings Index-specific settings
76+
* @throws IllegalArgumentException if validation fails
77+
*/
78+
public OSBlockHotDirectory(
79+
Directory delegate,
80+
Directory remoteDirectory,
81+
IndexSettings indexSettings
82+
) throws IOException {
83+
super(delegate);
84+
throw new UnsupportedOperationException("Not yet implemented");
85+
}
86+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.storage.directory;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.store.Directory;
14+
import org.opensearch.index.IndexSettings;
15+
import org.opensearch.index.shard.ShardPath;
16+
import org.opensearch.index.store.remote.filecache.FileCache;
17+
import org.opensearch.plugins.IndexStorePlugin;
18+
import org.opensearch.threadpool.ThreadPool;
19+
20+
import java.io.IOException;
21+
import java.util.function.Supplier;
22+
23+
/**
24+
* Factory to create OSBlockHotDirectory.
25+
* The newDirectory implementation will be added in the implementation PR.
26+
*/
27+
public class OSBlockHotDirectoryFactory implements IndexStorePlugin.CompositeDirectoryFactory {
28+
29+
private static final Logger logger = LogManager.getLogger(OSBlockHotDirectoryFactory.class);
30+
private final Supplier<ThreadPool> threadPoolSupplier;
31+
32+
/**
33+
* Constructs a new OSBlockHotDirectoryFactory.
34+
* @param threadPoolSupplier supplier for the thread pool
35+
*/
36+
public OSBlockHotDirectoryFactory(Supplier<ThreadPool> threadPoolSupplier) {
37+
this.threadPoolSupplier = threadPoolSupplier;
38+
}
39+
40+
@Override
41+
public Directory newDirectory(
42+
IndexSettings indexSettings,
43+
ShardPath shardPath,
44+
IndexStorePlugin.DirectoryFactory directoryFactory,
45+
Directory directory,
46+
FileCache fileCache,
47+
ThreadPool threadPool
48+
) throws IOException {
49+
throw new UnsupportedOperationException("Not yet implemented");
50+
}
51+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.storage.directory;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.store.Directory;
14+
import org.opensearch.index.store.CompositeDirectory;
15+
import org.opensearch.index.store.remote.filecache.FileCache;
16+
import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
17+
import org.opensearch.threadpool.ThreadPool;
18+
19+
import java.util.function.Supplier;
20+
21+
/**
22+
* Extension of CompositeDirectory to support writable warm and other related features.
23+
* Directory overrides (listAll, deleteFile, rename, openInput, close, sync, afterSyncToRemote),
24+
* file caching, and full-file-to-block switching logic will be added in the implementation PR.
25+
*/
26+
public class TieredDirectory extends CompositeDirectory {
27+
28+
private static final Logger logger = LogManager.getLogger(TieredDirectory.class);
29+
private final Supplier<TieredStoragePrefetchSettings> tieredStoragePrefetchSettingsSupplier;
30+
31+
/**
32+
* Constructs a new TieredDirectory.
33+
* @param localDirectory the local directory
34+
* @param remoteDirectory the remote directory
35+
* @param fileCache the file cache
36+
* @param threadPool the thread pool
37+
* @param tieredStoragePrefetchSettingsSupplier supplier for prefetch settings
38+
*/
39+
public TieredDirectory(
40+
Directory localDirectory,
41+
Directory remoteDirectory,
42+
FileCache fileCache,
43+
ThreadPool threadPool,
44+
Supplier<TieredStoragePrefetchSettings> tieredStoragePrefetchSettingsSupplier
45+
) {
46+
super(localDirectory, remoteDirectory, fileCache, threadPool);
47+
this.tieredStoragePrefetchSettingsSupplier = tieredStoragePrefetchSettingsSupplier;
48+
}
49+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.storage.directory;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.store.Directory;
14+
import org.opensearch.index.IndexSettings;
15+
import org.opensearch.index.shard.ShardPath;
16+
import org.opensearch.index.store.remote.filecache.FileCache;
17+
import org.opensearch.plugins.IndexStorePlugin;
18+
import org.opensearch.storage.prefetch.TieredStoragePrefetchSettings;
19+
import org.opensearch.threadpool.ThreadPool;
20+
21+
import java.io.IOException;
22+
import java.util.function.Supplier;
23+
24+
/**
25+
* Factory to create TieredDirectory.
26+
* The newDirectory implementation will be added in the implementation PR.
27+
*/
28+
public class TieredDirectoryFactory implements IndexStorePlugin.CompositeDirectoryFactory {
29+
30+
private static final Logger logger = LogManager.getLogger(TieredDirectoryFactory.class);
31+
private final Supplier<TieredStoragePrefetchSettings> tieredStoragePrefetchSettingsSupplier;
32+
33+
/**
34+
* Constructs a new TieredDirectoryFactory.
35+
* @param tieredStoragePrefetchSettingsSupplier supplier for prefetch settings
36+
*/
37+
public TieredDirectoryFactory(Supplier<TieredStoragePrefetchSettings> tieredStoragePrefetchSettingsSupplier) {
38+
this.tieredStoragePrefetchSettingsSupplier = tieredStoragePrefetchSettingsSupplier;
39+
}
40+
41+
@Override
42+
public Directory newDirectory(
43+
IndexSettings indexSettings,
44+
ShardPath shardPath,
45+
IndexStorePlugin.DirectoryFactory directoryFactory,
46+
Directory directory,
47+
FileCache fileCache,
48+
ThreadPool threadPool
49+
) throws IOException {
50+
throw new UnsupportedOperationException("Not yet implemented");
51+
}
52+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/** Directory implementations for tiered storage. */
10+
package org.opensearch.storage.directory;

0 commit comments

Comments
 (0)