Skip to content

Commit 06ef24c

Browse files
authored
feat: ingest support for numeric typed ExpressionLambdaAggregatorFactory (#19508)
1 parent 215f415 commit 06ef24c

5 files changed

Lines changed: 696 additions & 1 deletion

File tree

docs/querying/aggregations.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,11 @@ For these reasons, we have deprecated this aggregator and recommend using the Da
471471

472472
### Expression aggregator
473473

474-
Aggregator applicable only at query time. Aggregates results using [Druid expressions](./math-expr.md) functions to facilitate building custom functions.
474+
Aggregates results using [Druid expressions](./math-expr.md) functions to facilitate building custom functions.
475+
476+
The expression aggregator can be used at query time with any intermediate type. It can also be used at ingest time, but
477+
only when the type of `initialValue` is a primitive numeric type (`LONG` or `DOUBLE`) and matches the type of
478+
`initialCombineValue`. Other intermediate types, such as strings, arrays, and complex types, are query-time only.
475479

476480
| Property | Description | Required |
477481
| --- | --- | --- |

embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTaskTest.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,12 @@
3333
import org.apache.druid.java.util.common.granularity.Granularity;
3434
import org.apache.druid.java.util.common.jackson.JacksonUtils;
3535
import org.apache.druid.query.Druids;
36+
import org.apache.druid.query.aggregation.CountAggregatorFactory;
37+
import org.apache.druid.query.aggregation.ExpressionLambdaAggregatorFactory;
3638
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule;
3739
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
3840
import org.apache.druid.query.aggregation.datasketches.theta.SketchModule;
41+
import org.apache.druid.query.expression.TestExprMacroTable;
3942
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
4043
import org.apache.druid.segment.TestHelper;
4144
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
@@ -55,6 +58,7 @@
5558
import java.util.ArrayList;
5659
import java.util.List;
5760
import java.util.Map;
61+
import java.util.Set;
5862
import java.util.function.Supplier;
5963
import java.util.stream.Collectors;
6064

@@ -107,6 +111,65 @@ public class CompactionTaskTest extends CompactionTestBase
107111
"namespace", "continent", "country", "region", "city", "timestamp"
108112
);
109113

114+
/**
115+
* Index task identical in shape to {@link MoreResources.Task#INDEX_TASK_WITH_AGGREGATORS} but with a pair of
116+
* {@link ExpressionLambdaAggregatorFactory} metrics over the {@code added} long field. Used by
117+
* {@link #testCompactionWithExpressionLambdaAggregator} to verify that an expression aggregator works correctly.
118+
*/
119+
private static final Supplier<TaskBuilder.Index> INDEX_TASK_WITH_EXPR_AGG = () ->
120+
TaskBuilder
121+
.ofTypeIndex()
122+
.jsonInputFormat()
123+
.localInputSourceWithFiles(
124+
Resources.DataFile.tinyWiki1Json(),
125+
Resources.DataFile.tinyWiki2Json(),
126+
Resources.DataFile.tinyWiki3Json()
127+
)
128+
.timestampColumn("timestamp")
129+
.dimensions(
130+
"page",
131+
"language", "tags", "user", "unpatrolled", "newPage", "robot",
132+
"anonymous", "namespace", "continent", "country", "region", "city"
133+
)
134+
.metricAggregates(
135+
new CountAggregatorFactory("ingested_events"),
136+
new ExpressionLambdaAggregatorFactory(
137+
"added_sum_expr",
138+
Set.of("added"),
139+
null,
140+
"0",
141+
null,
142+
null,
143+
false,
144+
false,
145+
"__acc + added",
146+
null,
147+
null,
148+
null,
149+
null,
150+
TestExprMacroTable.INSTANCE
151+
),
152+
new ExpressionLambdaAggregatorFactory(
153+
"added_or_expr",
154+
Set.of("added"),
155+
null,
156+
"0",
157+
null,
158+
null,
159+
false,
160+
false,
161+
"bitwiseOr(\"__acc\", \"added\")",
162+
null,
163+
null,
164+
null,
165+
null,
166+
TestExprMacroTable.INSTANCE
167+
)
168+
)
169+
.dynamicPartitionWithMaxRows(3)
170+
.granularitySpec("DAY", "SECOND", true)
171+
.appendToExisting(false);
172+
110173
private String fullDatasourceName;
111174

112175
@BeforeEach
@@ -259,6 +322,33 @@ public void testCompactionWithTimestampDimension() throws Exception
259322
loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP.get(), COMPACTION_TASK.get(), null);
260323
}
261324

325+
@Test
326+
public void testCompactionWithExpressionLambdaAggregator() throws Exception
327+
{
328+
try (final Closeable ignored = unloader(fullDatasourceName)) {
329+
runTask(INDEX_TASK_WITH_EXPR_AGG.get());
330+
verifySegmentsCount(4);
331+
332+
// Snapshot metric values prior to compaction.
333+
final String preCompact = cluster.runSql(
334+
"SELECT SUM(added_sum_expr), SUM(added_or_expr) FROM %s",
335+
fullDatasourceName
336+
);
337+
338+
// Compact 4 segments -> 2; this performs cross-segment rollup which drives RowCombiningTimeAndDimsIterator
339+
// into ExpressionLambdaAggregatorFactory.makeAggregateCombiner().
340+
compactData(COMPACTION_TASK.get(), null, null);
341+
verifySegmentsCount(2);
342+
343+
// Metric values must round-trip through compaction unchanged.
344+
final String postCompact = cluster.runSql(
345+
"SELECT SUM(added_sum_expr), SUM(added_or_expr) FROM %s",
346+
fullDatasourceName
347+
);
348+
Assertions.assertEquals(preCompact, postCompact);
349+
}
350+
}
351+
262352
private void loadDataAndCompact(
263353
TaskBuilder.Index indexTask,
264354
TaskBuilder.Compact compactionResource,

processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaAggregatorFactory.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@
4040
import org.apache.druid.query.cache.CacheKeyBuilder;
4141
import org.apache.druid.segment.ColumnInspector;
4242
import org.apache.druid.segment.ColumnSelectorFactory;
43+
import org.apache.druid.segment.ColumnValueSelector;
4344
import org.apache.druid.segment.column.ColumnCapabilities;
4445
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
4546
import org.apache.druid.segment.column.ColumnType;
47+
import org.apache.druid.segment.column.ValueType;
4648
import org.apache.druid.segment.virtual.ExpressionPlan;
4749
import org.apache.druid.segment.virtual.ExpressionPlanner;
4850
import org.apache.druid.segment.virtual.ExpressionSelectors;
@@ -347,6 +349,86 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs)
347349
).value();
348350
}
349351

352+
@Override
353+
public AggregateCombiner makeAggregateCombiner()
354+
{
355+
final ColumnType intermediateType = getIntermediateType();
356+
// The combiner delegates to combine(), which feeds inputs into combineExpression typed against initialCombineValue.
357+
// If the fold-side intermediate type (what's stored in the segment column) differs from the combine-side type,
358+
// the primitive selector would silently feed wrong-typed values into the expression. Fall through to UOE.
359+
if (!intermediateType.equals(ExpressionType.toColumnType(initialCombineValue.get().type()))) {
360+
return super.makeAggregateCombiner();
361+
}
362+
if (intermediateType.is(ValueType.LONG)) {
363+
return new LongAggregateCombiner()
364+
{
365+
private long state;
366+
private boolean isNull;
367+
368+
@Override
369+
public void reset(ColumnValueSelector selector)
370+
{
371+
state = selector.getLong();
372+
isNull = selector.isNull();
373+
}
374+
375+
@Override
376+
public void fold(ColumnValueSelector selector)
377+
{
378+
final Object combined = combine(isNull ? null : state, selector.getObject());
379+
isNull = combined == null;
380+
state = combined == null ? 0L : ((Number) combined).longValue();
381+
}
382+
383+
@Override
384+
public long getLong()
385+
{
386+
return state;
387+
}
388+
389+
@Override
390+
public boolean isNull()
391+
{
392+
return isNull;
393+
}
394+
};
395+
} else if (intermediateType.is(ValueType.DOUBLE)) {
396+
return new DoubleAggregateCombiner()
397+
{
398+
private double state;
399+
private boolean isNull;
400+
401+
@Override
402+
public void reset(ColumnValueSelector selector)
403+
{
404+
state = selector.getDouble();
405+
isNull = selector.isNull();
406+
}
407+
408+
@Override
409+
public void fold(ColumnValueSelector selector)
410+
{
411+
final Object combined = combine(isNull ? null : state, selector.getObject());
412+
isNull = combined == null;
413+
state = combined == null ? 0.0 : ((Number) combined).doubleValue();
414+
}
415+
416+
@Override
417+
public double getDouble()
418+
{
419+
return state;
420+
}
421+
422+
@Override
423+
public boolean isNull()
424+
{
425+
return isNull;
426+
}
427+
};
428+
}
429+
return super.makeAggregateCombiner();
430+
}
431+
350432
@Override
351433
public Object deserialize(Object object)
352434
{

0 commit comments

Comments
 (0)