Skip to content

Commit 0f10327

Browse files
committed
Changes for Reader to work
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent e03114e commit 0f10327

6 files changed

Lines changed: 20 additions & 6 deletions

File tree

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
209209
let table_path = shard_view.table_path();
210210
let files_meta = shard_view.files_meta();
211211

212+
println!("Table path: {}", table_path);
213+
println!("Files: {:?}", files_meta);
212214

213215
let list_file_cache = Arc::new(DefaultListFilesCache::default());
214216
list_file_cache.put(table_path.prefix(), files_meta);
@@ -252,7 +254,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
252254
// Create a new TableProvider
253255
let provider = Arc::new(ListingTable::try_new(config).unwrap());
254256
let shard_id = table_path.prefix().filename().expect("error in fetching Path");
255-
ctx.register_table("logs", provider)
257+
ctx.register_table("hits", provider)
256258
.expect("Failed to attach the Table");
257259

258260
});

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class DatafusionEngine extends SearchExecEngine<DatafusionContext, Datafu
5656

5757
public DatafusionEngine(DataFormat dataFormat, Collection<FileMetadata> formatCatalogSnapshot, DataFusionService dataFusionService) throws IOException {
5858
this.dataFormat = dataFormat;
59-
this.datafusionReaderManager = new DatafusionReaderManager("TODO://FigureOutPath", formatCatalogSnapshot);
59+
this.datafusionReaderManager = new DatafusionReaderManager("/Users/abandeji/Public/workplace/OpenSearch/build/testclusters/runTask-0/data/nodes/0/indices/7xU89OS-Tn2_nO7CboVqMg/0/parquet", formatCatalogSnapshot);
6060
this.datafusionService = dataFusionService;
6161
}
6262

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,16 @@ public class DatafusionReader implements Closeable {
3131
public DatafusionReader(String directoryPath, Collection<FileMetadata> files) {
3232
this.directoryPath = directoryPath;
3333
this.files = files;
34-
String[] fileNames = Objects.isNull(files) ? new String[]{} : files.stream().map(FileMetadata::fileName).toArray(String[]::new);
34+
String[] fileNames = Objects.isNull(files) ? new String[]{"hits_data.parquet"} : files.stream().map(FileMetadata::fileName).toArray(String[]::new);
3535
this.cachePtr = DataFusionQueryJNI.createDatafusionReader(directoryPath, fileNames);
3636
incRef();
3737
}
3838

39+
/**
40+
* Gets the cache pointer.
41+
*
42+
* @return the cache pointer
43+
*/
3944
public long getCachePtr() {
4045
return cachePtr;
4146
}
@@ -44,6 +49,11 @@ public void incRef() {
4449
refCount.getAndIncrement();
4550
}
4651

52+
/**
53+
* Decrements the reference count.
54+
*
55+
* @throws IOException if an I/O error occurs
56+
*/
4757
public void decRef() throws IOException {
4858
if(refCount.get() == 0) {
4959
throw new IllegalStateException("Listing table has been already closed");

plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void testQueryPhaseExecutor() throws IOException {
101101
Map<String, Object[]> finalRes = new HashMap<>();
102102
DatafusionSearcher datafusionSearcher = null;
103103
try {
104-
DatafusionEngine engine = new DatafusionEngine(DataFormat.CSV, List.of(new FileMetadata(new TextDF(), "generation-1-optimized.parquet")), service);
104+
DatafusionEngine engine = new DatafusionEngine(DataFormat.CSV, List.of(new FileMetadata(new TextDF(), "hits_data.parquet")), service);
105105
datafusionSearcher = engine.acquireSearcher("Search");
106106

107107

server/src/main/java/org/opensearch/index/engine/exec/composite/CompositeIndexingExecutionEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public CompositeIndexingExecutionEngine(PluginsService pluginsService, Any dataf
5454
public CompositeIndexingExecutionEngine(PluginsService pluginsService) {
5555
try {
5656
DataSourcePlugin plugin = pluginsService.filterPlugins(DataSourcePlugin.class).stream()
57-
.findFirst()
57+
.findAny()
5858
.orElseThrow(() -> new IllegalArgumentException("dataformat [" + DataFormat.TEXT + "] is not registered."));
5959
delegates.add(plugin.indexingEngine());
6060
} catch (NullPointerException e) {

server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ public CompositeEngine(MapperService mapperService, PluginsService pluginsServic
5757
for(org.opensearch.vectorized.execution.search.DataFormat dataFormat : searchEnginePlugin.getSupportedFormats()) {
5858
SearchExecEngine<?,?,?,?> searchExecEngine = searchEnginePlugin.createEngine(dataFormat,
5959
catalogSnapshot.getSearchableFiles(dataFormat.toString()));
60-
readEngines.getOrDefault(dataFormat, new ArrayList<>()).add(searchExecEngine);
60+
List<SearchExecEngine<?, ?, ?, ?>> readEngine = readEngines.getOrDefault(dataFormat, new ArrayList<>());
61+
readEngine.add(searchExecEngine);
62+
readEngines.put(dataFormat, readEngine);
6163
// TODO : figure out how to do internal and external refresh listeners
6264
// Maybe external refresh should be managed in opensearch core and plugins should always give
6365
// internal refresh managers

0 commit comments

Comments
 (0)