Skip to content

Commit 150d3fe

Browse files
Working changes (#44)
* Changes in dataformat for CSVEngine Signed-off-by: Arpit Bandejiya <abandeji@amazon.com> * Changes for Reader to work Signed-off-by: Arpit Bandejiya <abandeji@amazon.com> --------- Signed-off-by: Arpit Bandejiya <abandeji@amazon.com> Co-authored-by: Bharathwaj G <bharath78910@gmail.com> Signed-off-by: bharath-techie <bharath78910@gmail.com>
1 parent 53cfcaf commit 150d3fe

11 files changed

Lines changed: 68 additions & 25 deletions

File tree

gradle/missing-javadoc.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,9 @@ configure([
160160
project(":test:fixtures:hdfs-fixture"),
161161
project(":test:fixtures:s3-fixture"),
162162
project(":test:framework"),
163-
project(":test:logger-usage")
163+
project(":test:logger-usage"),
164+
project(":libs:opensearch-vectorized-exec-spi"), // TODO
165+
project(":plugins:engine-datafusion"), //TODO
164166
]) {
165167
project.tasks.withType(MissingJavadocTask) {
166168
isExcluded = true

plugins/engine-datafusion/jni/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
209209
let table_path = shard_view.table_path();
210210
let files_meta = shard_view.files_meta();
211211

212+
println!("Table path: {}", table_path);
213+
println!("Files: {:?}", files_meta);
212214

213215
let list_file_cache = Arc::new(DefaultListFilesCache::default());
214216
list_file_cache.put(table_path.prefix(), files_meta);
@@ -252,7 +254,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
252254
// Create a new TableProvider
253255
let provider = Arc::new(ListingTable::try_new(config).unwrap());
254256
let shard_id = table_path.prefix().filename().expect("error in fetching Path");
255-
ctx.register_table("logs", provider)
257+
ctx.register_table("hits", provider)
256258
.expect("Failed to attach the Table");
257259

258260
});

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.opensearch.index.engine.EngineSearcherSupplier;
3131
import org.opensearch.index.engine.SearchExecEngine;
3232
import org.opensearch.index.engine.exec.FileMetadata;
33+
import org.opensearch.search.SearchShardTarget;
3334
import org.opensearch.search.aggregations.SearchResultsCollector;
3435
import org.opensearch.search.internal.ReaderContext;
3536
import org.opensearch.search.internal.ShardSearchRequest;
@@ -65,7 +66,7 @@ public class DatafusionEngine extends SearchExecEngine<DatafusionContext, Datafu
6566
*/
6667
public DatafusionEngine(DataFormat dataFormat, Collection<FileMetadata> formatCatalogSnapshot, DataFusionService dataFusionService) throws IOException {
6768
this.dataFormat = dataFormat;
68-
this.datafusionReaderManager = new DatafusionReaderManager("TODO://FigureOutPath", formatCatalogSnapshot);
69+
this.datafusionReaderManager = new DatafusionReaderManager("/Users/gbh/Downloads/res", formatCatalogSnapshot);
6970
this.datafusionService = dataFusionService;
7071
}
7172

@@ -80,8 +81,8 @@ public QueryPhaseExecutor<DatafusionContext> getQueryPhaseExecutor() {
8081
}
8182

8283
@Override
83-
public DatafusionContext createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTask task) throws IOException {
84-
DatafusionContext datafusionContext = new DatafusionContext(readerContext, request, task, this);
84+
public DatafusionContext createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task) throws IOException {
85+
DatafusionContext datafusionContext = new DatafusionContext(readerContext, request, searchShardTarget, task, this);
8586
// Parse source
8687
datafusionContext.datafusionQuery(new DatafusionQuery(request.source().queryPlanIR(), new ArrayList<>()));
8788
return datafusionContext;

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.action.search.SearchType;
1717
import org.opensearch.common.unit.TimeValue;
1818
import org.opensearch.common.util.BigArrays;
19+
import org.opensearch.index.IndexService;
1920
import org.opensearch.index.cache.bitset.BitsetFilterCache;
2021
import org.opensearch.index.engine.EngineSearcher;
2122
import org.opensearch.index.mapper.MappedFieldType;
@@ -68,8 +69,13 @@ public class DatafusionContext extends SearchContext {
6869
private final SearchShardTask task;
6970
private final DatafusionEngine readEngine;
7071
private final DatafusionSearcher engineSearcher;
72+
private final IndexShard indexShard;
73+
private final QuerySearchResult queryResult;
74+
private final FetchSearchResult fetchResult;
75+
private final IndexService indexService;
76+
private final QueryShardContext queryShardContext;
7177
private DatafusionQuery datafusionQuery;
72-
78+
private Map<String, Object[]> dfResults;
7379
/**
7480
* Constructor
7581
* @param readerContext The reader context
@@ -80,13 +86,26 @@ public class DatafusionContext extends SearchContext {
8086
public DatafusionContext(
8187
ReaderContext readerContext,
8288
ShardSearchRequest request,
89+
SearchShardTarget searchShardTarget,
8390
SearchShardTask task,
8491
DatafusionEngine engine) {
8592
this.readerContext = readerContext;
93+
this.indexShard = readerContext.indexShard();
8694
this.request = request;
8795
this.task = task;
8896
this.readEngine = engine;
8997
this.engineSearcher = engine.acquireSearcher("search");//null;//TODO readerContext.contextEngineSearcher();
98+
this.queryResult = new QuerySearchResult(readerContext.id(), searchShardTarget, request);
99+
this.fetchResult = new FetchSearchResult(readerContext.id(), searchShardTarget);
100+
this.indexService = readerContext.indexService();
101+
this.queryShardContext = indexService.newQueryShardContext(
102+
request.shardId().id(),
103+
null, // TOOD : index searcher is null
104+
request::nowInMillis,
105+
searchShardTarget.getClusterAlias(),
106+
false, // reevaluate the usage
107+
false // specific to lucene
108+
);
90109
}
91110

92111
/**
@@ -175,7 +194,7 @@ public String source() {
175194

176195
@Override
177196
public ShardSearchRequest request() {
178-
return null;
197+
return request;
179198
}
180199

181200
@Override
@@ -346,7 +365,7 @@ public ContextIndexSearcher searcher() {
346365

347366
@Override
348367
public IndexShard indexShard() {
349-
return null;
368+
return this.indexShard;
350369
}
351370

352371
@Override
@@ -671,7 +690,7 @@ public DfsSearchResult dfsResult() {
671690

672691
@Override
673692
public QuerySearchResult queryResult() {
674-
return null;
693+
return this.queryResult;
675694
}
676695

677696
@Override
@@ -681,7 +700,7 @@ public FetchPhase fetchPhase() {
681700

682701
@Override
683702
public FetchSearchResult fetchResult() {
684-
return null;
703+
return this.fetchResult;
685704
}
686705

687706
@Override
@@ -719,7 +738,7 @@ public Map<Class<?>, CollectorManager<? extends Collector, ReduceableSearchResul
719738

720739
@Override
721740
public QueryShardContext getQueryShardContext() {
722-
return null;
741+
return queryShardContext;
723742
}
724743

725744
@Override
@@ -763,4 +782,12 @@ public boolean shouldUseTimeSeriesDescSortOptimization() {
763782
public ContextEngineSearcher<DatafusionQuery, RecordBatchStream> contextEngineSearcher() {
764783
return new ContextEngineSearcher<>(this.engineSearcher, this);
765784
}
785+
786+
public void setDFResults(Map<String, Object[]> dfResults) {
787+
this.dfResults = dfResults;
788+
}
789+
790+
public Map<String, Object[]> getDFResults() {
791+
return dfResults;
792+
}
766793
}

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,13 @@ public class DatafusionReader implements Closeable {
4545
public DatafusionReader(String directoryPath, Collection<FileMetadata> files) {
4646
this.directoryPath = directoryPath;
4747
this.files = files;
48-
String[] fileNames = Objects.isNull(files) ? new String[]{} : files.stream().map(FileMetadata::fileName).toArray(String[]::new);
48+
String[] fileNames = Objects.isNull(files) ? new String[]{"hits_data.parquet"} : files.stream().map(FileMetadata::fileName).toArray(String[]::new);
4949
this.cachePtr = DataFusionQueryJNI.createDatafusionReader(directoryPath, fileNames);
5050
incRef();
5151
}
5252

5353
/**
5454
* Gets the cache pointer.
55-
*
5655
* @return the cache pointer
5756
*/
5857
public long getCachePtr() {
@@ -68,7 +67,6 @@ public void incRef() {
6867

6968
/**
7069
* Decrements the reference count.
71-
*
7270
* @throws IOException if an I/O error occurs
7371
*/
7472
public void decRef() throws IOException {

plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void testQueryPhaseExecutor() throws IOException {
101101
Map<String, Object[]> finalRes = new HashMap<>();
102102
DatafusionSearcher datafusionSearcher = null;
103103
try {
104-
DatafusionEngine engine = new DatafusionEngine(DataFormat.CSV, List.of(new FileMetadata(new TextDF(), "generation-1-optimized.parquet")), service);
104+
DatafusionEngine engine = new DatafusionEngine(DataFormat.CSV, List.of(new FileMetadata(new TextDF(), "hits_data.parquet")), service);
105105
datafusionSearcher = engine.acquireSearcher("Search");
106106

107107

server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.opensearch.action.search.SearchShardTask;
1212
import org.opensearch.common.annotation.ExperimentalApi;
13+
import org.opensearch.search.SearchShardTarget;
1314
import org.opensearch.search.internal.ReaderContext;
1415
import org.opensearch.search.internal.SearchContext;
1516
import org.opensearch.search.internal.ShardSearchRequest;
@@ -43,7 +44,7 @@ public abstract class SearchExecEngine<C extends SearchContext, S extends Engine
4344
/**
4445
* Create a search context for this engine
4546
*/
46-
public abstract C createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTask task) throws IOException;
47+
public abstract C createContext(ReaderContext readerContext, ShardSearchRequest request, SearchShardTarget searchShardTarget, SearchShardTask task) throws IOException;
4748

4849
/**
4950
* execute

server/src/main/java/org/opensearch/index/engine/exec/composite/CompositeIndexingExecutionEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public CompositeIndexingExecutionEngine(PluginsService pluginsService, Any dataf
5555
public CompositeIndexingExecutionEngine(PluginsService pluginsService) {
5656
try {
5757
DataSourcePlugin plugin = pluginsService.filterPlugins(DataSourcePlugin.class).stream()
58-
.findFirst()
58+
.findAny()
5959
.orElseThrow(() -> new IllegalArgumentException("dataformat [" + DataFormat.TEXT + "] is not registered."));
6060
delegates.add(plugin.indexingEngine());
6161
} catch (NullPointerException e) {

server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,21 @@ public CompositeEngine(MapperService mapperService, PluginsService pluginsServic
5858
// Create read specific engines for each format which is associated with shard
5959
for(SearchEnginePlugin searchEnginePlugin : searchEnginePlugins) {
6060
for(org.opensearch.vectorized.execution.search.DataFormat dataFormat : searchEnginePlugin.getSupportedFormats()) {
61-
SearchExecEngine<?,?,?,?> searchExecEngine = searchEnginePlugin.createEngine(dataFormat,
61+
List<SearchExecEngine<?, ?, ?, ?>> currentSearchEngines = readEngines.getOrDefault(dataFormat, new ArrayList<>());
62+
SearchExecEngine<?,?,?,?> newSearchEngine = searchEnginePlugin.createEngine(dataFormat,
6263
catalogSnapshot.getSearchableFiles(dataFormat.toString()));
63-
readEngines.getOrDefault(dataFormat, new ArrayList<>()).add(searchExecEngine);
64+
65+
currentSearchEngines.add(newSearchEngine);
66+
readEngines.put(dataFormat, currentSearchEngines);
67+
6468
// TODO : figure out how to do internal and external refresh listeners
6569
// Maybe external refresh should be managed in opensearch core and plugins should always give
6670
// internal refresh managers
6771
// 60s as refresh interval -> ExternalReaderManager acquires a view every 60 seconds
6872
// InternalReaderManager -> IndexingMemoryController , it keeps on refreshing internal maanger
6973
//
70-
if(searchExecEngine.getRefreshListener(Engine.SearcherScope.INTERNAL) != null) {
71-
catalogSnapshotAwareRefreshListeners.add(searchExecEngine.getRefreshListener(Engine.SearcherScope.INTERNAL));
74+
if(newSearchEngine.getRefreshListener(Engine.SearcherScope.INTERNAL) != null) {
75+
catalogSnapshotAwareRefreshListeners.add(newSearchEngine.getRefreshListener(Engine.SearcherScope.INTERNAL));
7276
}
7377
}
7478
}

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -817,14 +817,20 @@ private SearchPhaseResult executeQueryPhase(
817817
SearchExecEngine searchExecEngine = readerContext.indexShard()
818818
.getIndexingExecutionCoordinator()
819819
.getPrimaryReadEngine();
820-
820+
SearchShardTarget shardTarget = new SearchShardTarget(
821+
clusterService.localNode().getId(),
822+
readerContext.indexShard().shardId(),
823+
request.getClusterAlias(),
824+
OriginalIndices.NONE
825+
);
821826
try (
822827
Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
823828
//SearchContext context = createContext(readerContext, request, task, true, isStreamSearch)
824829

825830
// Get engine-specific executor and context
826831
// TODO : move this logic to work with Lucene
827-
SearchContext context = searchExecEngine.createContext(readerContext, request, task);
832+
833+
SearchContext context = searchExecEngine.createContext(readerContext, request, shardTarget, task);
828834
//SearchContext context = createContext(readerContext, request, task, true)
829835
) {
830836

@@ -850,7 +856,7 @@ private SearchPhaseResult executeQueryPhase(
850856
//QueryPhaseExecutor<?> queryPhaseExecutor = readEngine.getQueryPhaseExecutor();
851857
// boolean success = queryPhaseExecutor.execute(context);
852858
loadOrExecuteQueryPhase(request, context);
853-
queryPhase.execute(context);
859+
//queryPhase.execute(context);
854860
// loadOrExecuteQueryPhase(request, context);
855861
if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
856862
freeReaderContext(readerContext.id());

0 commit comments

Comments
 (0)