Skip to content

Commit 4ba32b8

Browse files
committed
Replace Calcite PIVOT with post-processing pivot, add series parameter
- Remove Calcite PIVOT from timewrap: no more MAX_PERIODS limit or crash risk - Add post-processing pivot in execution engine: dynamically builds columns from actual data using HashMap grouping - Add series parameter: series=relative (default), series=short (s0, s1) - Add series=exact grammar support (falls back to short at runtime) - Add time_format grammar support for series=exact - Benchmark: 1000 period columns in ~50ms (Calcite PIVOT crashed at 1000) - 32 IT tests, all using verifySchema + verifyDataRows Signed-off-by: Jialiang Li <jialiang.li@hey.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 6980ad9 commit 4ba32b8

9 files changed

Lines changed: 430 additions & 126 deletions

File tree

core/src/main/java/org/opensearch/sql/ast/tree/Timewrap.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public class Timewrap extends UnresolvedPlan {
2424
private final SpanUnit unit;
2525
private final int value;
2626
private final String align; // "end" or "now"
27+
private final String series; // "relative", "short", or "exact"
28+
private final String timeFormat; // format string for series=exact, nullable
2729
private final Literal spanLiteral; // original span literal for display
2830

2931
private UnresolvedPlan child;

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ public class CalcitePlanContext {
5353
*/
5454
public static final ThreadLocal<String> timewrapUnitName = new ThreadLocal<>();
5555

56+
/** Timewrap series mode: "relative", "short", or "exact". */
57+
public static final ThreadLocal<String> timewrapSeries = new ThreadLocal<>();
58+
59+
/** Timewrap time_format string for series=exact (e.g., "%Y-%m-%d"). */
60+
public static final ThreadLocal<String> timewrapTimeFormat = new ThreadLocal<>();
61+
5662
/** Thread-local switch that tells whether the current query prefers legacy behavior. */
5763
private static final ThreadLocal<Boolean> legacyPreferredFlag =
5864
ThreadLocal.withInitial(() -> true);

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3835,6 +3835,8 @@ public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) {
38353835
CalcitePlanContext.stripNullColumns.set(true);
38363836
CalcitePlanContext.timewrapUnitName.set(
38373837
TimewrapUtils.unitBaseName(node.getUnit(), node.getValue()) + "|_before");
3838+
CalcitePlanContext.timewrapSeries.set(node.getSeries());
3839+
CalcitePlanContext.timewrapTimeFormat.set(node.getTimeFormat());
38383840

38393841
RelBuilder b = context.relBuilder;
38403842
RexBuilder rx = context.rexBuilder;
@@ -3950,44 +3952,10 @@ public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) {
39503952
projections.add(b.alias(periodNum, "__period__"));
39513953
b.project(projections);
39523954

3953-
// Step 4: PIVOT on period, grouped by [offset, __base_offset__]
3954-
// __base_offset__ is constant across all rows so it doesn't affect grouping,
3955-
// but survives the PIVOT so the execution engine can use it for absolute column naming
3956-
b.pivot(
3957-
b.groupKey(b.field(tsFieldName), b.field("__base_offset__")),
3958-
valueFieldNames.stream().map(f -> (RelBuilder.AggCall) b.max(b.field(f)).as("")).toList(),
3959-
ImmutableList.of(b.field("__period__")),
3960-
IntStream.rangeClosed(1, TimewrapUtils.MAX_PERIODS)
3961-
.map(i -> TimewrapUtils.MAX_PERIODS + 1 - i) // reverse: oldest period first
3962-
.mapToObj(
3963-
i ->
3964-
Map.entry(
3965-
// Use placeholder relative names; execution engine renames to absolute
3966-
String.valueOf(i), ImmutableList.of((RexNode) b.literal((long) i))))
3967-
.collect(Collectors.toList()));
3968-
3969-
// Step 5: Rename columns — add agg name prefix, clean up pivot artifacts
3970-
// Use relative period numbers as temporary names; the execution engine will compute
3971-
// absolute offsets using __base_offset__ and rename accordingly
3972-
List<String> pivotColNames = b.peek().getRowType().getFieldNames();
3973-
List<String> cleanNames = new ArrayList<>();
3974-
cleanNames.add(tsFieldName);
3975-
cleanNames.add("__base_offset__");
3976-
for (int i = 2; i < pivotColNames.size(); i++) {
3977-
String name = pivotColNames.get(i);
3978-
if (name.endsWith("_")) {
3979-
name = name.substring(0, name.length() - 1);
3980-
}
3981-
// Prefix with agg name and _before suffix
3982-
if (valueFieldNames.size() == 1) {
3983-
name = valueFieldNames.get(0) + "_" + name + "_before";
3984-
}
3985-
cleanNames.add(name);
3986-
}
3987-
b.rename(cleanNames);
3988-
3989-
// Step 6: Sort by offset
3990-
b.sort(b.field(0));
3955+
// Step 4: Sort by offset, then period (execution engine will pivot)
3956+
// No Calcite PIVOT -- the execution engine pivots dynamically after reading all rows.
3957+
// Output schema: [display_timestamp, value_columns..., __base_offset__, __period__]
3958+
b.sort(b.field(tsFieldName), b.field("__period__"));
39913959

39923960
return b.peek();
39933961
}

core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapUtils.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,18 +276,31 @@ public static long calendarUnitNumberFromEpoch(long epochSec, SpanUnit unit, int
276276
}
277277

278278
/**
279-
* Walk the AST from a Timewrap node to find a WHERE clause with an upper bound on the timestamp
280-
* field. Returns the upper bound as epoch seconds, or null if not found.
279+
* Walk the AST from a Timewrap node to the deepest Filter node and extract the timestamp upper
280+
* bound. The frontend time picker always appends the timestamp filter as the first pipe (closest
281+
* to source), making it the deepest Filter in the AST chain:
282+
*
283+
* <pre>
284+
* Timewrap -> Chart -> [user filters] -> Filter(@timestamp >= X AND @timestamp <= Y) -> Source
285+
* </pre>
286+
*
287+
* We walk all Filter nodes and return the last (deepest) timestamp upper bound found. This
288+
* ensures user filters like `where age > 30` between timechart and the time picker filter don't
289+
* interfere.
281290
*/
282291
public static Long extractTimestampUpperBound(Timewrap node) {
283292
Node current = node;
293+
Long lastBound = null;
284294
while (current != null && !current.getChild().isEmpty()) {
285295
current = current.getChild().get(0);
286296
if (current instanceof Filter filter) {
287-
return findUpperBound(filter.getCondition());
297+
Long bound = findUpperBound(filter.getCondition());
298+
if (bound != null) {
299+
lastBound = bound;
300+
}
288301
}
289302
}
290-
return null;
303+
return lastBound;
291304
}
292305

293306
private static Long findUpperBound(UnresolvedExpression expr) {

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java

Lines changed: 186 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -143,23 +143,32 @@ public void testTimewrapIncompletePeriodNullFill() throws IOException {
143143
schema("sum(requests)_2days_before", "bigint"),
144144
schema("sum(requests)_1day_before", "bigint"),
145145
schema("sum(requests)_latest_day", "bigint"));
146-
verifyDataRowsSome(result, rows("2024-07-04 12:00:00", null, 310, 330, 285));
147-
verifyDataRowsSome(result, rows("2024-07-04 18:00:00", null, 190, 215, 165));
146+
verifyDataRowsInOrder(
147+
result,
148+
rows("2024-07-04 00:00:00", 180, 205, 165, 80),
149+
rows("2024-07-04 06:00:00", 240, 260, 225, 100),
150+
rows("2024-07-04 12:00:00", null, 310, 330, 285),
151+
rows("2024-07-04 18:00:00", null, 190, 215, 165));
148152
}
149153

150154
// --- Different timescales ---
151155

152156
@Test
153157
public void testTimewrapWeekSpanSinglePeriod() throws IOException {
158+
// 3 days of daily data in 1 week -> single period, 3 rows
154159
JSONObject result =
155160
executeQuery(
156161
"source=timewrap_test"
157-
+ WHERE_ALL
158-
+ " | timechart span=6h sum(requests) | timewrap 1week");
162+
+ WHERE_JUL1_TO_JUL3
163+
+ " | timechart span=1day sum(requests) | timewrap 1week");
159164

160165
verifySchema(
161166
result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_week", "bigint"));
162-
verifyDataRowsSome(result, rows("2024-07-04 00:00:00", 80), rows("2024-07-04 06:00:00", 100));
167+
verifyDataRowsInOrder(
168+
result,
169+
rows("2024-07-01 00:00:00", 920),
170+
rows("2024-07-02 00:00:00", 1010),
171+
rows("2024-07-03 00:00:00", 840));
163172
}
164173

165174
@Test
@@ -180,7 +189,10 @@ public void testTimewrapTwelveHourSpan() throws IOException {
180189
schema("sum(requests)_24hours_before", "bigint"),
181190
schema("sum(requests)_12hours_before", "bigint"),
182191
schema("sum(requests)_latest_hour", "bigint"));
183-
verifyDataRowsSome(result, rows("2024-07-04 00:00:00", 180, 310, 205, 330, 165, 285, 80));
192+
verifyDataRowsInOrder(
193+
result,
194+
rows("2024-07-04 00:00:00", 180, 310, 205, 330, 165, 285, 80),
195+
rows("2024-07-04 06:00:00", 240, 190, 260, 215, 225, 165, 100));
184196
}
185197

186198
@Test
@@ -531,15 +543,17 @@ public void testTimewrapDaySpan() throws IOException {
531543

532544
@Test
533545
public void testTimewrapWeekSpan() throws IOException {
546+
// 2 days of daily data in 1 week -> single period, 2 rows
534547
JSONObject result =
535548
executeQuery(
536549
"source=timewrap_test"
537-
+ WHERE_ALL
538-
+ " | timechart span=6h sum(requests) | timewrap 1week");
550+
+ WHERE_JUL1_TO_JUL2
551+
+ " | timechart span=1day sum(requests) | timewrap 1week");
539552

540553
verifySchema(
541554
result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_week", "bigint"));
542-
verifyDataRowsSome(result, rows("2024-07-04 00:00:00", 80), rows("2024-07-04 06:00:00", 100));
555+
verifyDataRowsInOrder(
556+
result, rows("2024-07-01 00:00:00", 920), rows("2024-07-02 00:00:00", 1010));
543557
}
544558

545559
@Test
@@ -552,7 +566,12 @@ public void testTimewrapMonthSpan() throws IOException {
552566

553567
verifySchema(
554568
result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_month", "bigint"));
555-
verifyDataRowsSome(result, rows("2024-07-01 00:00:00", 920), rows("2024-07-04 00:00:00", 180));
569+
verifyDataRowsInOrder(
570+
result,
571+
rows("2024-07-01 00:00:00", 920),
572+
rows("2024-07-02 00:00:00", 1010),
573+
rows("2024-07-03 00:00:00", 840),
574+
rows("2024-07-04 00:00:00", 180));
556575
}
557576

558577
@Test
@@ -570,24 +589,174 @@ public void testTimewrapQuarterSpan() throws IOException {
570589
schema("@timestamp", "timestamp"),
571590
schema("sum(requests)_1quarter_before", "bigint"),
572591
schema("sum(requests)_latest_quarter", "bigint"));
573-
// Day 15 of each quarter aligns both values on the same row
574-
verifyDataRowsSome(result, rows("2024-04-15 00:00:00", 300, 350));
592+
// Day 15 of each quarter aligns -- both values on the same row
593+
verifyDataRows(result, rows("2024-04-15 00:00:00", 300, 350));
575594
}
576595

577596
@Test
578597
public void testTimewrapYearSpan() throws IOException {
579-
// Jan 2024 to Jan 2025 → 2 year periods (2024 and 2025)
580-
// Jan 15 offset exists in both years: 2024=300, 2025=400
598+
// Jan 15 2024 (300) and Jan 15 2025 (400) -- 2 data points in 2 different years
599+
// timechart span=1year: 2 yearly buckets
600+
// timewrap 1year: 2 periods, 1 offset row
581601
JSONObject result =
582602
executeQuery(
583-
"source=timewrap_test | where @timestamp >= '2024-01-15 00:00:00' and @timestamp <="
584-
+ " '2025-01-15 12:00:00' | timechart span=1day sum(requests) | timewrap 1year");
603+
"source=timewrap_test | where @timestamp >= '2024-01-15 12:00:00' and @timestamp <="
604+
+ " '2025-01-15 12:00:00' | timechart span=1year sum(requests) | timewrap 1year");
585605

586606
verifySchema(
587607
result,
588608
schema("@timestamp", "timestamp"),
589609
schema("sum(requests)_1year_before", "bigint"),
590610
schema("sum(requests)_latest_year", "bigint"));
591-
verifyDataRowsSome(result, rows("2025-01-15 00:00:00", 300, 400));
611+
// 2024 yearly sum = all 2024 data in WHERE range; 2025 = Jan 15 only (400)
612+
verifyDataRows(result, rows("2025-01-01 00:00:00", 4050, 400));
613+
}
614+
615+
// --- series parameter ---
616+
617+
@Test
618+
public void testTimewrapSeriesRelativeIsDefault() throws IOException {
619+
// series=relative is the default — same as no series parameter
620+
JSONObject resultDefault =
621+
executeQuery(
622+
"source=timewrap_test"
623+
+ WHERE_JUL1_TO_JUL3
624+
+ " | timechart span=6h sum(requests) | timewrap 1day");
625+
JSONObject resultRelative =
626+
executeQuery(
627+
"source=timewrap_test"
628+
+ WHERE_JUL1_TO_JUL3
629+
+ " | timechart span=6h sum(requests) | timewrap 1day series=relative");
630+
631+
verifySchema(
632+
resultRelative,
633+
schema("@timestamp", "timestamp"),
634+
schema("sum(requests)_2days_before", "bigint"),
635+
schema("sum(requests)_1day_before", "bigint"),
636+
schema("sum(requests)_latest_day", "bigint"));
637+
verifyDataRowsInOrder(
638+
resultRelative,
639+
rows("2024-07-03 00:00:00", 180, 205, 165),
640+
rows("2024-07-03 06:00:00", 240, 260, 225),
641+
rows("2024-07-03 12:00:00", 310, 330, 285),
642+
rows("2024-07-03 18:00:00", 190, 215, 165));
643+
verifySchema(
644+
resultDefault,
645+
schema("@timestamp", "timestamp"),
646+
schema("sum(requests)_2days_before", "bigint"),
647+
schema("sum(requests)_1day_before", "bigint"),
648+
schema("sum(requests)_latest_day", "bigint"));
649+
verifyDataRowsInOrder(
650+
resultDefault,
651+
rows("2024-07-03 00:00:00", 180, 205, 165),
652+
rows("2024-07-03 06:00:00", 240, 260, 225),
653+
rows("2024-07-03 12:00:00", 310, 330, 285),
654+
rows("2024-07-03 18:00:00", 190, 215, 165));
592655
}
656+
657+
@Test
658+
public void testTimewrapSeriesShort() throws IOException {
659+
// series=short: columns named <agg>_s<absolutePeriod>
660+
// With align=end and WHERE upper bound = Jul 3 18:00, baseOffset=0
661+
// Periods: oldest=2, middle=1, newest=0
662+
JSONObject result =
663+
executeQuery(
664+
"source=timewrap_test"
665+
+ WHERE_JUL1_TO_JUL3
666+
+ " | timechart span=6h sum(requests) | timewrap 1day series=short");
667+
668+
verifySchema(
669+
result,
670+
schema("@timestamp", "timestamp"),
671+
schema("sum(requests)_s2", "bigint"),
672+
schema("sum(requests)_s1", "bigint"),
673+
schema("sum(requests)_s0", "bigint"));
674+
verifyDataRowsInOrder(
675+
result,
676+
rows("2024-07-03 00:00:00", 180, 205, 165),
677+
rows("2024-07-03 06:00:00", 240, 260, 225),
678+
rows("2024-07-03 12:00:00", 310, 330, 285),
679+
rows("2024-07-03 18:00:00", 190, 215, 165));
680+
}
681+
682+
@Test
683+
public void testTimewrapSeriesShortWithCount() throws IOException {
684+
// series=short with count aggregation
685+
JSONObject result =
686+
executeQuery(
687+
"source=timewrap_test"
688+
+ WHERE_JUL2_TO_JUL3
689+
+ " | timechart span=6h count() | timewrap 1day series=short");
690+
691+
verifySchema(
692+
result,
693+
schema("@timestamp", "timestamp"),
694+
schema("count()_s1", "bigint"),
695+
schema("count()_s0", "bigint"));
696+
verifyDataRowsInOrder(
697+
result,
698+
rows("2024-07-03 00:00:00", 2, 2),
699+
rows("2024-07-03 06:00:00", 2, 2),
700+
rows("2024-07-03 12:00:00", 2, 2),
701+
rows("2024-07-03 18:00:00", 2, 2));
702+
}
703+
704+
@Test
705+
public void testTimewrapSeriesShortWeekSpan() throws IOException {
706+
// series=short with week span, single period = s0, daily buckets
707+
JSONObject result =
708+
executeQuery(
709+
"source=timewrap_test"
710+
+ WHERE_JUL1_TO_JUL2
711+
+ " | timechart span=1day sum(requests) | timewrap 1week series=short");
712+
713+
verifySchema(result, schema("@timestamp", "timestamp"), schema("sum(requests)_s0", "bigint"));
714+
verifyDataRowsInOrder(
715+
result, rows("2024-07-01 00:00:00", 920), rows("2024-07-02 00:00:00", 1010));
716+
}
717+
718+
@Test
719+
public void testTimewrapSeriesShortWithAvg() throws IOException {
720+
JSONObject result =
721+
executeQuery(
722+
"source=timewrap_test"
723+
+ WHERE_JUL1_TO_JUL2
724+
+ " | timechart span=6h avg(requests) | timewrap 1day series=short");
725+
726+
verifySchema(
727+
result,
728+
schema("@timestamp", "timestamp"),
729+
schema("avg(requests)_s1", "double"),
730+
schema("avg(requests)_s0", "double"));
731+
verifyDataRowsInOrder(
732+
result,
733+
rows("2024-07-02 00:00:00", 90.0, 102.5),
734+
rows("2024-07-02 06:00:00", 120.0, 130.0),
735+
rows("2024-07-02 12:00:00", 155.0, 165.0),
736+
rows("2024-07-02 18:00:00", 95.0, 107.5));
737+
}
738+
739+
@Test
740+
public void testTimewrapSeriesShortWithErrors() throws IOException {
741+
JSONObject result =
742+
executeQuery(
743+
"source=timewrap_test"
744+
+ WHERE_JUL2_TO_JUL3
745+
+ " | timechart span=6h sum(errors) | timewrap 1day series=short");
746+
747+
verifySchema(
748+
result,
749+
schema("@timestamp", "timestamp"),
750+
schema("sum(errors)_s1", "bigint"),
751+
schema("sum(errors)_s0", "bigint"));
752+
verifyDataRowsInOrder(
753+
result,
754+
rows("2024-07-03 00:00:00", 4, 1),
755+
rows("2024-07-03 06:00:00", 6, 3),
756+
rows("2024-07-03 12:00:00", 9, 6),
757+
rows("2024-07-03 18:00:00", 3, 1));
758+
}
759+
760+
// BY clause tests are pending -- blocked by timechart BY output format gap.
761+
// See docs/dev/ppl-timewrap-command.md for design options.
593762
}

0 commit comments

Comments
 (0)