Skip to content

Commit d41992f

Browse files
authored
Analytics engine - add support for index patterns, aliases, multi-index search (opensearch-project#21822)
* analytics: add alias, index pattern, and multi-index query support Enables queries against aliases, wildcard patterns, and comma-separated index expressions. The planner resolves these to concrete indices, validates schema compatibility, and builds a union row type. At the data node, Rust widens the registered ListingTable schema from the plan's base_schema so DataFusion null-fills columns this shard doesn't have. Key components: - IndexResolution: expands aliases/patterns to concrete indices, validates field type compatibility, rejects filter aliases and data streams - FieldStorageResolver.merged(): unions per-field storage across backing indices - ShardTargetResolver: fans out shard routing across all concrete indices - widen_schema_from_plan (Rust): appends missing nullable columns to the ListingTable using from_substrait_named_struct for type conversion - UnifiedQueryService: preserves lazy table resolution for wildcard support - Indexed execution (filter delegation): now passes plan bytes to Rust, enabling multi-index support on the delegation path Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * spotless Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Add missing tests from PR review feedback - Case-insensitive table resolution test (OpenSearchSchemaBuilderTests) - findTableName on join/union shapes (RelNodeUtilsTests) + generalize findTableName to match any TableScan, not just OpenSearchTableScan - Field-conflict test for comma-separated sources (first-wins semantics) - IndexResolution: concrete name with resolver, exclusion pattern test - Rust: empty/garbage input tests for first_named_table_name - Rust: widen_schema_from_plan noop tests (empty plan, all cols present) Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Fix schema builder to be lazy. Add more tests Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * Add support for datastreams Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * add more datastream edge cases Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * fix sql tests Signed-off-by: Marc Handalian <marc.handalian@gmail.com> * remove unintentional run.gradle update Signed-off-by: Marc Handalian <marc.handalian@gmail.com> --------- Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
1 parent ba9befb commit d41992f

50 files changed

Lines changed: 3179 additions & 142 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

sandbox/libs/analytics-api/src/main/java/org/opensearch/analytics/schema/OpenSearchSchemaBuilder.java

Lines changed: 114 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,56 +11,151 @@
1111
import org.apache.calcite.jdbc.CalciteSchema;
1212
import org.apache.calcite.rel.type.RelDataType;
1313
import org.apache.calcite.rel.type.RelDataTypeFactory;
14+
import org.apache.calcite.schema.Schema;
1415
import org.apache.calcite.schema.SchemaPlus;
16+
import org.apache.calcite.schema.Table;
17+
import org.apache.calcite.schema.impl.AbstractSchema;
1518
import org.apache.calcite.schema.impl.AbstractTable;
1619
import org.apache.calcite.sql.type.SqlTypeName;
20+
import org.opensearch.action.support.IndicesOptions;
1721
import org.opensearch.cluster.ClusterState;
1822
import org.opensearch.cluster.metadata.IndexMetadata;
23+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
1924
import org.opensearch.cluster.metadata.MappingMetadata;
25+
import org.opensearch.common.settings.Settings;
26+
import org.opensearch.common.util.concurrent.ThreadContext;
27+
import org.opensearch.core.common.Strings;
28+
import org.opensearch.index.IndexNotFoundException;
2029

30+
import java.util.HashMap;
31+
import java.util.LinkedHashMap;
32+
import java.util.List;
2133
import java.util.Map;
2234

2335
/**
2436
* Builds a Calcite {@link SchemaPlus} from OpenSearch {@link ClusterState} index mappings.
2537
*
2638
* <p>One Calcite table per index. Reads field types from index mapping properties.
2739
* Navigates: IndexMetadata -> MappingMetadata -> sourceAsMap() -> "properties" -> per-field "type".
28-
* // TODO: This is for illustation - use version sql plugin has built and re-purpose to not call node-client
2940
*/
3041
public class OpenSearchSchemaBuilder {
3142

3243
private OpenSearchSchemaBuilder() {}
3344

45+
public static SchemaPlus buildSchema(ClusterState clusterState) {
46+
return buildSchema(clusterState, new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)));
47+
}
48+
3449
/**
3550
* Builds a Calcite SchemaPlus from the given ClusterState.
36-
* Each index becomes a table; each mapped field becomes a column.
3751
*
38-
* @param clusterState the current cluster state to derive schema from
52+
* <p>Tables are resolved lazily on first lookup, mirroring the sql-plugin
53+
* {@code OpenSearchSchema}. A requested name may be a concrete index, an alias, a comma list,
54+
* a wildcard, an exclusion, or date-math; it is resolved through {@code resolver} — the same
55+
* canonical resolution the execution-side {@code IndexResolution} uses — and the matching
56+
* indices' supported fields are unioned into one row type. No upfront enumeration of cluster
57+
* indices: construction is O(1) regardless of cluster size, and each referenced name costs
58+
* one {@code IndexNameExpressionResolver} call plus a single mapping union.
59+
*
60+
* <p>The lazy schema is wrapped in a NON-caching root: a caching root enumerates
61+
* {@code getTableNames()} and would never perform the implicit {@code getTable(name)} lookup
62+
* that drives lazy resolution of expressions.
3963
*/
40-
public static SchemaPlus buildSchema(ClusterState clusterState) {
41-
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
42-
SchemaPlus schemaPlus = rootSchema.plus();
64+
public static SchemaPlus buildSchema(ClusterState clusterState, IndexNameExpressionResolver resolver) {
65+
Schema lazySchema = new AbstractSchema() {
66+
// Truly lazy table map, mirroring sql-plugin's OpenSearchSchema pattern: no upfront
67+
// enumeration of cluster indices. get() registers on first lookup and caches under the
68+
// lower-cased name. PPL's RelBuilder.scan and Calcite's case-sensitive validator both
69+
// reach the schema via getTable(name), which routes here directly — no entrySet /
70+
// keySet iteration needed for production resolution. Callers that need name
71+
// enumeration (Calcite's withCaseSensitive(false) parser path used in some unit tests)
72+
// get only resolved names back, which is fine when the lookup name is exact-case.
73+
private final Map<String, Table> tableMap = new HashMap<>() {
74+
@Override
75+
public Table get(Object key) {
76+
String name = ((String) key).toLowerCase(java.util.Locale.ROOT);
77+
if (!super.containsKey(name)) {
78+
Table resolved = resolveTable(clusterState, resolver, name);
79+
if (resolved != null) {
80+
super.put(name, resolved);
81+
}
82+
}
83+
return super.get(name);
84+
}
85+
};
4386

44-
for (Map.Entry<String, IndexMetadata> entry : clusterState.metadata().indices().entrySet()) {
45-
String indexName = entry.getKey();
46-
IndexMetadata indexMetadata = entry.getValue();
47-
MappingMetadata mapping = indexMetadata.mapping();
87+
@Override
88+
protected Map<String, Table> getTableMap() {
89+
return tableMap;
90+
}
91+
};
92+
93+
return CalciteSchema.createRootSchema(true, false, "", lazySchema).plus();
94+
}
95+
96+
/**
97+
* Resolves a source expression (concrete name, alias, comma list, wildcard, exclusion, or
98+
* date-math) to a single table whose row type unions the supported fields of all matching
99+
* concrete indices, or {@code null} when nothing matches (so Calcite reports a clean "table not
100+
* found"). Resolution goes through {@link IndexNameExpressionResolver} so schema membership
101+
* matches the execution-side {@code IndexResolution}. First-wins on field-name conflict across
102+
* the union; the planner's scan rule validates cross-index mapping compatibility when the table
103+
* is referenced.
104+
*/
105+
@SuppressWarnings("unchecked")
106+
private static Table resolveTable(ClusterState clusterState, IndexNameExpressionResolver resolver, String expression) {
107+
// Short-circuit literal alias / data stream names so the resolver's lenientExpandOpen
108+
// (which does not include hidden backings) doesn't filter out data stream backings. The
109+
// alias / data-stream abstraction already carries the full backing list — use it directly.
110+
java.util.SortedMap<String, org.opensearch.cluster.metadata.IndexAbstraction> lookup = clusterState.metadata().getIndicesLookup();
111+
org.opensearch.cluster.metadata.IndexAbstraction abstraction = lookup == null ? null : lookup.get(expression);
112+
List<IndexMetadata> backing;
113+
if (abstraction != null
114+
&& (abstraction.getType() == org.opensearch.cluster.metadata.IndexAbstraction.Type.ALIAS
115+
|| abstraction.getType() == org.opensearch.cluster.metadata.IndexAbstraction.Type.DATA_STREAM)) {
116+
backing = abstraction.getIndices();
117+
} else {
118+
String[] concrete;
119+
try {
120+
// Comma-split first: concreteIndexNames treats each vararg as one expression, and
121+
// splitting lets the resolver honor exclusions across tokens (e.g. "test*,-test1").
122+
// includeDataStreams=true so wildcards / comma-lists that match a data stream NAME
123+
// expand to its backings (the resolver normally excludes data streams from
124+
// wildcard expansion otherwise). Literal data stream / alias names take the
125+
// abstraction short-circuit above and skip the resolver entirely.
126+
concrete = resolver.concreteIndexNames(
127+
clusterState,
128+
IndicesOptions.lenientExpandOpen(),
129+
true,
130+
Strings.splitStringByCommaToArray(expression)
131+
);
132+
} catch (IndexNotFoundException e) {
133+
return null;
134+
}
135+
backing = new java.util.ArrayList<>(concrete.length);
136+
for (String name : concrete) {
137+
IndexMetadata index = clusterState.metadata().index(name);
138+
if (index != null) {
139+
backing.add(index);
140+
}
141+
}
142+
}
143+
LinkedHashMap<String, Object> merged = new LinkedHashMap<>();
144+
for (IndexMetadata index : backing) {
145+
MappingMetadata mapping = index.mapping();
48146
if (mapping == null) {
49147
continue;
50148
}
51-
52-
@SuppressWarnings("unchecked")
53-
Map<String, Object> sourceMap = mapping.sourceAsMap();
54-
@SuppressWarnings("unchecked")
55-
Map<String, Object> properties = (Map<String, Object>) sourceMap.get("properties");
149+
Map<String, Object> properties = (Map<String, Object>) mapping.sourceAsMap().get("properties");
56150
if (properties == null) {
57151
continue;
58152
}
59-
60-
schemaPlus.add(indexName, buildTable(properties));
153+
properties.forEach(merged::putIfAbsent);
61154
}
62-
63-
return schemaPlus;
155+
if (merged.isEmpty()) {
156+
return null;
157+
}
158+
return buildTable(merged);
64159
}
65160

66161
/**

sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -987,6 +987,40 @@ fn collect_reads(rel: &substrait::proto::Rel, out: &mut Vec<substrait::proto::Re
987987
}
988988
}
989989

990+
/// All `ReadRel`s reachable from the plan's roots.
991+
fn collect_plan_reads(plan: &substrait::proto::Plan) -> Vec<substrait::proto::ReadRel> {
992+
let mut reads = Vec::new();
993+
for plan_rel in &plan.relations {
994+
if let Some(rel) = root_rel(plan_rel) {
995+
collect_reads(&rel, &mut reads);
996+
}
997+
}
998+
reads
999+
}
1000+
1001+
/// Extracts the table name from the first NamedTable read in the plan bytes.
1002+
pub(crate) fn first_named_table_name(plan_bytes: &[u8]) -> Option<String> {
1003+
use substrait::proto::read_rel::ReadType;
1004+
let plan: substrait::proto::Plan = prost::Message::decode(plan_bytes).ok()?;
1005+
for read in collect_plan_reads(&plan) {
1006+
if let Some(ReadType::NamedTable(nt)) = read.read_type {
1007+
return nt.names.last().cloned();
1008+
}
1009+
}
1010+
None
1011+
}
1012+
1013+
/// Extracts the `base_schema` NamedStruct from the plan's first ReadRel matching `table_name`.
1014+
pub(crate) fn base_schema_for_table(plan: &substrait::proto::Plan, table_name: &str) -> Option<substrait::proto::NamedStruct> {
1015+
use substrait::proto::read_rel::ReadType;
1016+
for read in collect_plan_reads(plan) {
1017+
let Some(ReadType::NamedTable(nt)) = read.read_type.as_ref() else { continue };
1018+
if nt.names.last().map(String::as_str) != Some(table_name) { continue }
1019+
return read.base_schema.clone();
1020+
}
1021+
None
1022+
}
1023+
9901024
// ---------------------------------------------------------------------------
9911025
// Coordinator-reduce local execution API
9921026
//
@@ -1373,6 +1407,16 @@ mod tests {
13731407
);
13741408
}
13751409

1410+
#[test]
1411+
fn test_first_named_table_name_returns_none_on_empty() {
1412+
assert_eq!(super::first_named_table_name(&[]), None);
1413+
}
1414+
1415+
#[test]
1416+
fn test_first_named_table_name_returns_none_on_garbage() {
1417+
assert_eq!(super::first_named_table_name(&[0xFF, 0x00, 0x01]), None);
1418+
}
1419+
13761420
#[test]
13771421
fn view_needs_gc_detects_bloat() {
13781422
let strings: Vec<String> = (0..10_000)

sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,11 +649,18 @@ pub unsafe extern "C" fn df_create_session_context(
649649
table_name_len: i64,
650650
context_id: i64,
651651
query_config_ptr: i64,
652+
plan_ptr: *const u8,
653+
plan_len: i64,
652654
) -> i64 {
653655
let table_name = str_from_raw(table_name_ptr, table_name_len)
654656
.map_err(|e| format!("df_create_session_context: {}", e))?;
655657
let query_config =
656658
crate::datafusion_query_config::DatafusionQueryConfig::from_ffm_ptr(query_config_ptr);
659+
let plan_bytes: &[u8] = if plan_len > 0 {
660+
slice::from_raw_parts(plan_ptr, plan_len as usize)
661+
} else {
662+
&[]
663+
};
657664
let mgr = get_rt_manager()?;
658665
mgr.io_runtime
659666
.block_on(crate::task_monitors::plan_setup_monitor().instrument(
@@ -663,6 +670,7 @@ pub unsafe extern "C" fn df_create_session_context(
663670
table_name,
664671
context_id,
665672
query_config,
673+
plan_bytes,
666674
)
667675
))
668676
.map_err(|e| e.to_string())
@@ -679,16 +687,23 @@ pub unsafe extern "C" fn df_create_session_context_indexed(
679687
tree_shape: i32,
680688
delegated_predicate_count: i32,
681689
query_config_ptr: i64,
690+
plan_ptr: *const u8,
691+
plan_len: i64,
682692
) -> i64 {
683693
let table_name = str_from_raw(table_name_ptr, table_name_len)
684694
.map_err(|e| format!("df_create_session_context_indexed: {}", e))?;
685695
let query_config =
686696
crate::datafusion_query_config::DatafusionQueryConfig::from_ffm_ptr(query_config_ptr);
697+
let plan_bytes: &[u8] = if plan_len > 0 {
698+
slice::from_raw_parts(plan_ptr, plan_len as usize)
699+
} else {
700+
&[]
701+
};
687702
let mgr = get_rt_manager()?;
688703
mgr.io_runtime
689704
.block_on(crate::task_monitors::plan_setup_monitor().instrument(
690705
crate::session_context::create_session_context_indexed(
691-
runtime_ptr, shard_view_ptr, table_name, context_id, tree_shape, delegated_predicate_count, query_config,
706+
runtime_ptr, shard_view_ptr, table_name, context_id, tree_shape, delegated_predicate_count, query_config, plan_bytes,
692707
)
693708
))
694709
.map_err(|e| e.to_string())

sandbox/plugins/analytics-backend-datafusion/rust/src/query_executor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ pub async fn execute_with_context(
217217
DataFusionError::Execution(format!("Failed to decode Substrait: {}", e))
218218
})?;
219219

220+
// Union schema widening was applied at table registration (session_context::widen_to_union_schema).
220221
let logical_plan = from_substrait_plan(&handle.ctx.state(), &substrait_plan).await?;
221222
log_debug!("DataFusion logical plan:\n{}", logical_plan.display_indent());
222223
let dataframe = handle.ctx.execute_logical_plan(logical_plan).await?;

sandbox/plugins/analytics-backend-datafusion/rust/src/schema_coerce.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,68 @@ fn rewrite_data_type(dt: &DataType) -> DataType {
157157
}
158158
}
159159

160+
/// Appends to `registered` any `expected` field whose name is absent, as a nullable column.
161+
/// `Some(augmented)` if anything was added, `None` if `registered` already covers `expected`.
162+
///
163+
/// The Substrait consumer binds `base_schema` to the provider BY NAME, so the registered schema
164+
/// only needs to *contain* every expected column — order is irrelevant and present columns keep
165+
/// their inferred (coerced) types. Appended columns are forced nullable; DataFusion's parquet
166+
/// `SchemaAdapter` null-fills them at read time.
167+
pub fn append_missing_nullable(registered: &Schema, expected: &Schema) -> Option<SchemaRef> {
168+
let mut added: Vec<Field> = Vec::new();
169+
for ef in expected.fields() {
170+
if registered.field_with_name(ef.name()).is_err() {
171+
added.push(
172+
Field::new(ef.name(), ef.data_type().clone(), true).with_metadata(ef.metadata().clone()),
173+
);
174+
}
175+
}
176+
if added.is_empty() {
177+
return None;
178+
}
179+
let mut fields: Vec<Field> = registered.fields().iter().map(|f| f.as_ref().clone()).collect();
180+
fields.extend(added);
181+
Some(Arc::new(Schema::new_with_metadata(fields, registered.metadata().clone())))
182+
}
183+
160184
#[cfg(test)]
161185
mod tests {
162186
use super::*;
163187

188+
#[test]
189+
fn append_missing_adds_absent_columns_as_nullable() {
190+
let registered = Schema::new(vec![
191+
Field::new("name", DataType::Utf8, true),
192+
Field::new("age", DataType::Int64, true),
193+
]);
194+
// `alias` is declared non-nullable in the plan; it must still be appended as nullable
195+
// since the shard has no data for it.
196+
let expected = Schema::new(vec![
197+
Field::new("name", DataType::Utf8, true),
198+
Field::new("age", DataType::Int64, true),
199+
Field::new("alias", DataType::Utf8, false),
200+
]);
201+
202+
let merged = append_missing_nullable(&registered, &expected).expect("alias missing → augmented");
203+
assert_eq!(merged.fields().len(), 3);
204+
let alias = merged.field_with_name("alias").unwrap();
205+
assert_eq!(alias.data_type(), &DataType::Utf8);
206+
assert!(alias.is_nullable(), "appended column must be nullable");
207+
assert!(merged.field_with_name("name").is_ok());
208+
assert!(merged.field_with_name("age").is_ok());
209+
}
210+
211+
#[test]
212+
fn append_missing_returns_none_when_registered_covers_expected() {
213+
let registered = Schema::new(vec![
214+
Field::new("name", DataType::Utf8, true),
215+
Field::new("age", DataType::Int64, true),
216+
]);
217+
// Registered may carry extra columns the plan doesn't reference — still nothing to add.
218+
let expected = Schema::new(vec![Field::new("name", DataType::Utf8, true)]);
219+
assert!(append_missing_nullable(&registered, &expected).is_none());
220+
}
221+
164222
#[test]
165223
fn top_level_binary_view_gets_rewritten() {
166224
let schema = Arc::new(Schema::new(vec![

0 commit comments

Comments
 (0)