Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptTable.ViewExpander;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
Expand All @@ -74,6 +75,7 @@
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.rules.FilterMergeRule;
import org.apache.calcite.rel.type.RelDataType;
Expand Down Expand Up @@ -367,6 +369,36 @@ protected SqlToRelConverter getSqlToRelConverter(
return new OpenSearchSqlToRelConverter(
this, validator, catalogReader, this.cluster, convertletTable, config);
}

@Override
protected RelRoot trimUnusedFields(RelRoot root) {
final SqlToRelConverter.Config config =
SqlToRelConverter.config()
.withTrimUnusedFields(shouldTrim(root.rel))
.withExpand(THREAD_EXPAND.get())
.withInSubQueryThreshold(requireNonNull(THREAD_INSUBQUERY_THRESHOLD.get()));
// PPL analyzes into a pre-built RelNode before prepareStatement(rel). Reuse the incoming
// RelNode's cluster here so prepare-time trimming does not create replacement nodes under a
// different planner than the rest of the tree.
final SqlToRelConverter converter =
new OpenSearchSqlToRelConverter(
this,
getSqlValidator(),
catalogReader,
root.rel.getCluster(),
convertletTable,
config);
final boolean ordered = !root.collation.getFieldCollations().isEmpty();
final boolean dml = SqlKind.DML.contains(root.kind);
return root.withRel(converter.trimUnusedFields(dml || ordered, root.rel));
}

private static boolean shouldTrim(RelNode rootRel) {
// For now, don't trim if there are more than 3 joins. The projects
// near the leaves created by trim migrate past joins and seem to
// prevent join-reordering.
return THREAD_TRIM.get() || RelOptUtil.countJoins(rootRel) < 2;
}
}

public static class OpenSearchSqlToRelConverter extends SqlToRelConverter {
Expand All @@ -379,22 +411,51 @@ public OpenSearchSqlToRelConverter(
RelOptCluster cluster,
SqlRexConvertletTable convertletTable,
Config config) {
super(viewExpander, validator, catalogReader, cluster, convertletTable, config);
this(
viewExpander,
validator,
catalogReader,
cluster,
convertletTable,
preserveHintStrategies(cluster, config),
true);
}

private OpenSearchSqlToRelConverter(
ViewExpander viewExpander,
@Nullable SqlValidator validator,
CatalogReader catalogReader,
RelOptCluster cluster,
SqlRexConvertletTable convertletTable,
Config effectiveConfig,
boolean ignored) {
super(viewExpander, validator, catalogReader, cluster, convertletTable, effectiveConfig);
this.relBuilder =
config
effectiveConfig
.getRelBuilderFactory()
.create(
cluster,
validator != null
? validator.getCatalogReader().unwrap(RelOptSchema.class)
: null)
.transform(config.getRelBuilderConfigTransform());
.transform(effectiveConfig.getRelBuilderConfigTransform());
}

@Override
protected RelFieldTrimmer newFieldTrimmer() {
return new OpenSearchRelFieldTrimmer(validator, this.relBuilder);
}

// SqlToRelConverter always installs the hint strategy table from its config onto the cluster.
// When prepare-time trimming reuses an incoming RelNode cluster, preserve any PPL-specific
// aggregate hint strategies that were already registered during analysis.
private static Config preserveHintStrategies(RelOptCluster cluster, Config config) {
if (config.getHintStrategyTable() == HintStrategyTable.EMPTY
&& cluster.getHintStrategies() != HintStrategyTable.EMPTY) {
return config.withHintStrategyTable(cluster.getHintStrategies());
}
return config;
}
}

public static class OpenSearchRelRunners {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,74 @@ public void testAppendpipeWithConflictTypeColumn() throws IOException {
TEST_INDEX_ACCOUNT)));
assertTrue(exception.getMessage().contains("due to incompatible types"));
}

/** Regression test: double appendpipe with different aggregations (issue #5173). */
@Test
public void testDoubleAppendPipe() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Locale.ROOT,
"source=%s | stats sum(age) as sum_age by gender"
+ " | appendpipe [ stats avg(sum_age) as avg_sum_age ]"
+ " | appendpipe [ stats max(sum_age) as max_sum_age ]",
TEST_INDEX_ACCOUNT));
verifySchemaInOrder(
actual,
schema("sum_age", "bigint"),
schema("gender", "string"),
schema("avg_sum_age", "double"),
schema("max_sum_age", "bigint"));
// 2 original rows + 1 avg row + 1 max row
verifyDataRows(
actual,
rows(14947, "F", null, null),
rows(15224, "M", null, null),
rows(null, null, 15085.5, null),
rows(null, null, null, 15224));
}

/** Regression test: triple appendpipe with different aggregations (issue #5173). */
@Test
public void testTripleAppendPipe() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Locale.ROOT,
"source=%s | stats sum(age) as sum_age by gender"
+ " | appendpipe [ stats avg(sum_age) as avg_sum_age ]"
+ " | appendpipe [ stats max(sum_age) as max_sum_age ]"
+ " | appendpipe [ stats min(sum_age) as min_sum_age ]",
TEST_INDEX_ACCOUNT));
verifySchemaInOrder(
actual,
schema("sum_age", "bigint"),
schema("gender", "string"),
schema("avg_sum_age", "double"),
schema("max_sum_age", "bigint"),
schema("min_sum_age", "bigint"));
// 2 original rows + 1 avg + 1 max + 1 min
verifyDataRows(
actual,
rows(14947, "F", null, null, null),
rows(15224, "M", null, null, null),
rows(null, null, 15085.5, null, null),
rows(null, null, null, 15224, null),
rows(null, null, null, null, 14947));
}

/** Regression test: double appendpipe with non-aggregation (filter) subpipeline. */
@Test
public void testDoubleAppendPipeWithFilter() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Locale.ROOT,
"source=%s | stats sum(age) as sum_age by gender"
+ " | appendpipe [ where gender = 'F' ]"
+ " | appendpipe [ where gender = 'M' ]",
TEST_INDEX_ACCOUNT));
// 2 original + 1 (F filter from original) + 1 (M filter from cumulative 3 rows)
verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(14947, "F"), rows(15224, "M"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
setup:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled: true

- do:
indices.create:
index: issue5173
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
properties:
gender:
type: keyword
age:
type: integer

- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "issue5173", "_id": "1"}}'
- '{"gender": "F", "age": 10}'
- '{"index": {"_index": "issue5173", "_id": "2"}}'
- '{"gender": "F", "age": 20}'
- '{"index": {"_index": "issue5173", "_id": "3"}}'
- '{"gender": "M", "age": 30}'
- '{"index": {"_index": "issue5173", "_id": "4"}}'
- '{"gender": "M", "age": 40}'

---
teardown:
- do:
indices.delete:
index: issue5173
ignore_unavailable: true
- do:
query.settings:
body:
transient:
plugins.calcite.enabled: false

---
"Issue 5173: double appendpipe with different aggregations should succeed":
- skip:
features:
- headers
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: "source=issue5173 | stats sum(age) as sum_age by gender | appendpipe [ stats avg(sum_age) as avg_sum_age ] | appendpipe [ stats max(sum_age) as max_sum_age ]"

- match: { total: 4 }
- match:
schema:
- { name: sum_age, type: bigint }
- { name: gender, type: string }
- { name: avg_sum_age, type: double }
- { name: max_sum_age, type: bigint }
- match:
datarows:
- [ 30, "F", null, null ]
- [ 70, "M", null, null ]
- [ null, null, 50.0, null ]
- [ null, null, null, 70 ]

---
"Issue 5173: triple appendpipe with different aggregations should succeed":
- skip:
features:
- headers
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: "source=issue5173 | stats sum(age) as sum_age by gender | appendpipe [ stats avg(sum_age) as avg_sum_age ] | appendpipe [ stats max(sum_age) as max_sum_age ] | appendpipe [ stats min(sum_age) as min_sum_age ]"

- match: { total: 5 }
- match:
schema:
- { name: sum_age, type: bigint }
- { name: gender, type: string }
- { name: avg_sum_age, type: double }
- { name: max_sum_age, type: bigint }
- { name: min_sum_age, type: bigint }
- match:
datarows:
- [ 30, "F", null, null, null ]
- [ 70, "M", null, null, null ]
- [ null, null, 50.0, null, null ]
- [ null, null, null, 70, null ]
- [ null, null, null, null, 30 ]
Loading
Loading