Skip to content

Commit 360ef34

Browse files
expanibharath-techie
authored andcommitted
Added Filter delegation E2E from Datafusion into Lucene
Signed-off-by: expani <anijainc@amazon.com>
1 parent 0cb2190 commit 360ef34

39 files changed

Lines changed: 695 additions & 629 deletions

File tree

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/ShardScanExecutionContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.arrow.memory.BufferAllocator;
1212
import org.opensearch.analytics.spi.CommonExecutionContext;
13+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
1314
import org.opensearch.index.IndexSettings;
1415
import org.opensearch.index.engine.exec.IndexReaderProvider.Reader;
1516
import org.opensearch.index.mapper.MapperService;
@@ -30,6 +31,7 @@ public class ShardScanExecutionContext implements CommonExecutionContext {
3031
private BufferAllocator allocator;
3132
private MapperService mapperService;
3233
private IndexSettings indexSettings;
34+
private NamedWriteableRegistry namedWriteableRegistry;
3335

3436
/**
3537
* Constructs an execution context.
@@ -97,4 +99,14 @@ public IndexSettings getIndexSettings() {
9799
public void setIndexSettings(IndexSettings indexSettings) {
98100
this.indexSettings = indexSettings;
99101
}
102+
103+
/** Returns the NamedWriteableRegistry for deserializing delegated expressions. */
104+
public NamedWriteableRegistry getNamedWriteableRegistry() {
105+
return namedWriteableRegistry;
106+
}
107+
108+
/** Sets the NamedWriteableRegistry. */
109+
public void setNamedWriteableRegistry(NamedWriteableRegistry namedWriteableRegistry) {
110+
this.namedWriteableRegistry = namedWriteableRegistry;
111+
}
100112
}

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/DelegationDescriptor.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,12 @@
2323
*
2424
* @opensearch.internal
2525
*/
26-
public record DelegationDescriptor(
27-
FilterTreeShape treeShape,
28-
int delegatedPredicateCount,
29-
List<DelegatedExpression> delegatedExpressions
30-
) implements Writeable {
26+
public record DelegationDescriptor(FilterTreeShape treeShape, int delegatedPredicateCount, List<DelegatedExpression> delegatedExpressions)
27+
implements
28+
Writeable {
3129

3230
public DelegationDescriptor(StreamInput in) throws IOException {
33-
this(
34-
in.readEnum(FilterTreeShape.class),
35-
in.readVInt(),
36-
readExpressions(in)
37-
);
31+
this(in.readEnum(FilterTreeShape.class), in.readVInt(), readExpressions(in));
3832
}
3933

4034
@Override

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FragmentInstructionHandlerFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ Optional<InstructionNode> createFilterDelegationNode(
3535
List<DelegatedExpression> delegatedQueries
3636
);
3737

38+
/** Creates a shard scan with delegation instruction node — combines scan setup with delegation config. */
39+
Optional<InstructionNode> createShardScanWithDelegationNode(FilterTreeShape treeShape, int delegatedPredicateCount);
40+
3841
/** Creates a partial aggregate instruction node. */
3942
Optional<InstructionNode> createPartialAggregateNode();
4043

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs

Lines changed: 59 additions & 290 deletions
Large diffs are not rendered by default.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bool_tree.rs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,8 @@ mod tests {
382382
#[test]
383383
fn leaf_count_counts_only_collectors() {
384384
let tree = BoolNode::And(vec![
385-
collector(b"a"),
386-
BoolNode::Or(vec![collector(b"b"), predicate("x", Operator::Eq, 1)]),
385+
collector(0),
386+
BoolNode::Or(vec![collector(1), predicate("x", Operator::Eq, 1)]),
387387
predicate("y", Operator::Eq, 2),
388388
]);
389389
assert_eq!(tree.collector_leaf_count(), 2);
@@ -392,30 +392,30 @@ mod tests {
392392
#[test]
393393
fn leaves_dfs_order() {
394394
let tree = BoolNode::And(vec![
395-
collector(b"x"),
396-
BoolNode::Or(vec![collector(b"y"), collector(b"z")]),
395+
collector(10),
396+
BoolNode::Or(vec![collector(11), collector(12)]),
397397
]);
398398
let leaves = tree.collector_leaves();
399399
assert_eq!(leaves.len(), 3);
400-
assert_eq!(&*leaves[0], b"x");
401-
assert_eq!(&*leaves[1], b"y");
402-
assert_eq!(&*leaves[2], b"z");
400+
assert_eq!(leaves[0], 10);
401+
assert_eq!(leaves[1], 11);
402+
assert_eq!(leaves[2], 12);
403403
}
404404

405405
// ── push_not_down (De Morgan) ─────────────────────────────────────
406406

407407
#[test]
408408
fn not_collector_stays_wrapped() {
409-
let tree = BoolNode::Not(Box::new(collector(b"x")));
409+
let tree = BoolNode::Not(Box::new(collector(10)));
410410
let n = tree.push_not_down();
411411
assert!(matches!(n, BoolNode::Not(b) if matches!(*b, BoolNode::Collector { .. })));
412412
}
413413

414414
#[test]
415415
fn de_morgan_not_and_to_or() {
416416
let tree = BoolNode::Not(Box::new(BoolNode::And(vec![
417-
collector(b"a"),
418-
collector(b"b"),
417+
collector(0),
418+
collector(1),
419419
])));
420420
match tree.push_not_down() {
421421
BoolNode::Or(children) => {
@@ -447,16 +447,16 @@ mod tests {
447447

448448
#[test]
449449
fn double_negation_cancels() {
450-
let tree = BoolNode::Not(Box::new(BoolNode::Not(Box::new(collector(b"x")))));
450+
let tree = BoolNode::Not(Box::new(BoolNode::Not(Box::new(collector(10)))));
451451
let n = tree.push_not_down();
452452
assert!(matches!(n, BoolNode::Collector { .. }));
453453
}
454454

455455
#[test]
456456
fn nested_not_recurses_through_and_or() {
457457
let tree = BoolNode::Not(Box::new(BoolNode::And(vec![
458-
BoolNode::Or(vec![collector(b"a"), collector(b"b")]),
459-
collector(b"c"),
458+
BoolNode::Or(vec![collector(0), collector(1)]),
459+
collector(2),
460460
])));
461461
match tree.push_not_down() {
462462
BoolNode::Or(outer) => {
@@ -473,8 +473,8 @@ mod tests {
473473
#[test]
474474
fn flatten_collapses_nested_and() {
475475
let tree = BoolNode::And(vec![
476-
BoolNode::And(vec![collector(b"a"), collector(b"b")]),
477-
collector(b"c"),
476+
BoolNode::And(vec![collector(0), collector(1)]),
477+
collector(2),
478478
]);
479479
match tree.flatten() {
480480
BoolNode::And(children) => {
@@ -490,10 +490,10 @@ mod tests {
490490
#[test]
491491
fn flatten_collapses_nested_or() {
492492
let tree = BoolNode::Or(vec![
493-
collector(b"a"),
493+
collector(0),
494494
BoolNode::Or(vec![
495-
collector(b"b"),
496-
BoolNode::Or(vec![collector(b"c"), collector(b"d")]),
495+
collector(1),
496+
BoolNode::Or(vec![collector(2), collector(3)]),
497497
]),
498498
]);
499499
match tree.flatten() {
@@ -505,9 +505,9 @@ mod tests {
505505
#[test]
506506
fn flatten_preserves_mixed_connectives() {
507507
let tree = BoolNode::And(vec![
508-
collector(b"a"),
509-
BoolNode::Or(vec![collector(b"b"), collector(b"c")]),
510-
BoolNode::And(vec![collector(b"d"), collector(b"e")]),
508+
collector(0),
509+
BoolNode::Or(vec![collector(1), collector(2)]),
510+
BoolNode::And(vec![collector(3), collector(4)]),
511511
]);
512512
match tree.flatten() {
513513
BoolNode::And(children) => {
@@ -521,8 +521,8 @@ mod tests {
521521
#[test]
522522
fn flatten_descends_into_not() {
523523
let tree = BoolNode::Not(Box::new(BoolNode::And(vec![
524-
BoolNode::And(vec![collector(b"a"), collector(b"b")]),
525-
collector(b"c"),
524+
BoolNode::And(vec![collector(0), collector(1)]),
525+
collector(2),
526526
])));
527527
match tree.flatten() {
528528
BoolNode::Not(inner) => match *inner {
@@ -537,7 +537,7 @@ mod tests {
537537

538538
#[test]
539539
fn resolve_replaces_collector_bytes_with_refs() {
540-
let tree = BoolNode::And(vec![collector(b"a"), collector(b"b")]);
540+
let tree = BoolNode::And(vec![collector(0), collector(1)]);
541541
let a: Arc<dyn RowGroupDocsCollector> = Arc::new(StubCollector(1));
542542
let b: Arc<dyn RowGroupDocsCollector> = Arc::new(StubCollector(2));
543543
let resolved = tree.resolve(&[(10, a), (20, b)]).unwrap();
@@ -572,14 +572,14 @@ mod tests {
572572

573573
#[test]
574574
fn resolve_out_of_range_errors() {
575-
let tree = collector(b"x");
575+
let tree = collector(10);
576576
let err = tree.resolve(&[]).unwrap_err();
577577
assert!(err.contains("out of range"), "got: {}", err);
578578
}
579579

580580
#[test]
581581
fn resolve_not_collector_still_wraps() {
582-
let tree = BoolNode::Not(Box::new(collector(b"x")));
582+
let tree = BoolNode::Not(Box::new(collector(10)));
583583
let c: Arc<dyn RowGroupDocsCollector> = Arc::new(StubCollector(0));
584584
let resolved = tree.resolve(&[(1, c)]).unwrap();
585585
match resolved {

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/substrait_to_tree.rs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -411,25 +411,25 @@ mod tests {
411411
let udf = Arc::new(create_index_filter_udf());
412412
let expr = Expr::ScalarFunction(datafusion::logical_expr::expr::ScalarFunction::new_udf(
413413
udf,
414-
vec![lit(ScalarValue::Binary(Some(b"hello-query".to_vec())))],
414+
vec![lit(ScalarValue::Int32(Some(42)))],
415415
));
416416
let r = expr_to_bool_tree(&expr, &test_schema()).unwrap();
417417
match r.tree {
418-
BoolNode::Collector { query_bytes } => {
419-
assert_eq!(&*query_bytes, b"hello-query");
418+
BoolNode::Collector { annotation_id } => {
419+
assert_eq!(annotation_id, 42);
420420
}
421421
_ => panic!("expected Collector"),
422422
}
423423
}
424424

425425
#[test]
426426
fn mixed_tree() {
427-
// AND(collector(bytes), OR(price > 100, qty < 50))
427+
// AND(collector(annotationId), OR(price > 100, qty < 50))
428428
let udf = Arc::new(create_index_filter_udf());
429429
let collector_expr =
430430
Expr::ScalarFunction(datafusion::logical_expr::expr::ScalarFunction::new_udf(
431431
udf,
432-
vec![lit(ScalarValue::Binary(Some(b"mixed".to_vec())))],
432+
vec![lit(ScalarValue::Int32(Some(0)))],
433433
));
434434
let or_branch = col("price").gt(lit(100i32)).or(col("qty").lt(lit(50i32)));
435435
let expr = Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr::new(
@@ -443,9 +443,9 @@ mod tests {
443443

444444
// ── classify_filter ──────────────────────────────────────────────
445445

446-
fn collector(tag: &[u8]) -> BoolNode {
446+
fn collector(id: i32) -> BoolNode {
447447
BoolNode::Collector {
448-
query_bytes: Arc::from(tag),
448+
annotation_id: id,
449449
}
450450
}
451451
fn dummy_predicate() -> BoolNode {
@@ -472,40 +472,40 @@ mod tests {
472472
#[test]
473473
fn classify_bare_collector_is_single() {
474474
assert_eq!(
475-
classify_filter(&collector(b"x")),
475+
classify_filter(&collector(10)),
476476
FilterClass::SingleCollector
477477
);
478478
}
479479

480480
#[test]
481481
fn classify_and_of_collector_and_predicates_is_single() {
482-
let tree = BoolNode::And(vec![collector(b"x"), dummy_predicate(), dummy_predicate()]);
482+
let tree = BoolNode::And(vec![collector(10), dummy_predicate(), dummy_predicate()]);
483483
assert_eq!(classify_filter(&tree), FilterClass::SingleCollector);
484484
}
485485

486486
#[test]
487487
fn classify_and_with_two_collectors_is_single() {
488488
// AND(C, C, P) — all collectors under AND-only path → SingleCollector.
489-
let tree = BoolNode::And(vec![collector(b"x"), collector(b"y"), dummy_predicate()]);
489+
let tree = BoolNode::And(vec![collector(10), collector(11), dummy_predicate()]);
490490
assert_eq!(classify_filter(&tree), FilterClass::SingleCollector);
491491
}
492492

493493
#[test]
494494
fn classify_or_containing_collector_is_tree() {
495-
let tree = BoolNode::Or(vec![collector(b"x"), dummy_predicate()]);
495+
let tree = BoolNode::Or(vec![collector(10), dummy_predicate()]);
496496
assert_eq!(classify_filter(&tree), FilterClass::Tree);
497497
}
498498

499499
#[test]
500500
fn classify_not_of_collector_is_tree() {
501-
let tree = BoolNode::Not(Box::new(collector(b"x")));
501+
let tree = BoolNode::Not(Box::new(collector(10)));
502502
assert_eq!(classify_filter(&tree), FilterClass::Tree);
503503
}
504504

505505
#[test]
506506
fn classify_and_with_nested_collector_is_tree() {
507507
let tree = BoolNode::And(vec![
508-
BoolNode::Or(vec![collector(b"x"), dummy_predicate()]),
508+
BoolNode::Or(vec![collector(10), dummy_predicate()]),
509509
dummy_predicate(),
510510
]);
511511
assert_eq!(classify_filter(&tree), FilterClass::Tree);
@@ -517,8 +517,8 @@ mod tests {
517517
fn classify_nested_and_collector_plus_predicate_is_single() {
518518
// AND(C₁, AND(C₂, P)) — nested AND, all collectors under AND-only path.
519519
let tree = BoolNode::And(vec![
520-
collector(b"x"),
521-
BoolNode::And(vec![collector(b"y"), dummy_predicate()]),
520+
collector(10),
521+
BoolNode::And(vec![collector(11), dummy_predicate()]),
522522
]);
523523
assert_eq!(classify_filter(&tree), FilterClass::SingleCollector);
524524
}
@@ -529,10 +529,10 @@ mod tests {
529529
let tree = BoolNode::And(vec![
530530
dummy_predicate(),
531531
BoolNode::And(vec![
532-
collector(b"a"),
532+
collector(0),
533533
BoolNode::And(vec![
534-
collector(b"b"),
535-
BoolNode::And(vec![collector(b"c"), dummy_predicate()]),
534+
collector(1),
535+
BoolNode::And(vec![collector(2), dummy_predicate()]),
536536
]),
537537
]),
538538
]);
@@ -543,8 +543,8 @@ mod tests {
543543
fn classify_nested_and_only_collectors_is_single() {
544544
// AND(AND(C₁, C₂), AND(C₃, C₄)) — nested AND of only collectors.
545545
let tree = BoolNode::And(vec![
546-
BoolNode::And(vec![collector(b"a"), collector(b"b")]),
547-
BoolNode::And(vec![collector(b"c"), collector(b"d")]),
546+
BoolNode::And(vec![collector(0), collector(1)]),
547+
BoolNode::And(vec![collector(2), collector(3)]),
548548
]);
549549
assert_eq!(classify_filter(&tree), FilterClass::SingleCollector);
550550
}
@@ -553,7 +553,7 @@ mod tests {
553553
fn classify_nested_and_with_or_predicate_is_single() {
554554
// AND(C, AND(P, OR(P, P))) — OR contains only predicates, no collectors.
555555
let tree = BoolNode::And(vec![
556-
collector(b"x"),
556+
collector(10),
557557
BoolNode::And(vec![
558558
dummy_predicate(),
559559
BoolNode::Or(vec![dummy_predicate(), dummy_predicate()]),
@@ -566,7 +566,7 @@ mod tests {
566566
fn classify_nested_and_with_not_predicate_is_single() {
567567
// AND(C, NOT(P)) — NOT wraps a predicate, not a collector.
568568
let tree = BoolNode::And(vec![
569-
collector(b"x"),
569+
collector(10),
570570
BoolNode::Not(Box::new(dummy_predicate())),
571571
]);
572572
assert_eq!(classify_filter(&tree), FilterClass::SingleCollector);
@@ -576,9 +576,9 @@ mod tests {
576576
fn classify_nested_and_or_containing_collector_is_tree() {
577577
// AND(C₁, AND(OR(C₂, P), P)) — OR above C₂ → Tree.
578578
let tree = BoolNode::And(vec![
579-
collector(b"x"),
579+
collector(10),
580580
BoolNode::And(vec![
581-
BoolNode::Or(vec![collector(b"y"), dummy_predicate()]),
581+
BoolNode::Or(vec![collector(11), dummy_predicate()]),
582582
dummy_predicate(),
583583
]),
584584
]);
@@ -589,9 +589,9 @@ mod tests {
589589
fn classify_nested_and_not_containing_collector_is_tree() {
590590
// AND(C₁, AND(NOT(C₂), P)) — NOT above C₂ → Tree.
591591
let tree = BoolNode::And(vec![
592-
collector(b"x"),
592+
collector(10),
593593
BoolNode::And(vec![
594-
BoolNode::Not(Box::new(collector(b"y"))),
594+
BoolNode::Not(Box::new(collector(11))),
595595
dummy_predicate(),
596596
]),
597597
]);

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/tests_e2e/boolean_algebra.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ impl LeafId {
289289
// Collector leaves first.
290290
if let Some(provider_id) = self.as_collector_provider_id() {
291291
return BoolNode::Collector {
292-
query_bytes: Arc::from(&[provider_id][..]),
292+
annotation_id: provider_id as i32,
293293
};
294294
}
295295
// Simple comparison leaves via ReferencePred.

0 commit comments

Comments
 (0)