Skip to content

Commit b8312bc

Browse files
kevin-dpclaudeautofix-ci[bot]
committed
feat: support aggregates in subqueries (#1298)
* feat: add toArray() for includes subqueries toArray() wraps an includes subquery so the parent row contains Array<T> instead of Collection<T>. When children change, the parent row is re-emitted with a fresh array snapshot. - Add ToArrayWrapper class and toArray() function - Add materializeAsArray flag to IncludesSubquery IR node - Detect ToArrayWrapper in builder, pass flag through compiler - Re-emit parent rows on child changes for toArray entries - Add SelectValue type support for ToArrayWrapper - Add tests for basic toArray, reactivity, ordering, and limits Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Removed obsolete test * Tests for changes to deeply nested queries * Add type-level tests for toArray() includes subqueries Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Add change propagation tests for includes subqueries Test the reactive model difference between Collection and toArray includes: - Collection includes: child change does NOT re-emit the parent row (the child Collection updates in place) - toArray includes: child change DOES re-emit the parent row (the parent row is re-emitted with the updated array snapshot) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Test aggregates inside subqueries * Take into account correlation key when aggregating in subqueries * changeset * ci: apply automated fixes --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent 2271ca1 commit b8312bc

File tree

4 files changed

+1162
-30
lines changed

4 files changed

+1162
-30
lines changed

.changeset/includes-aggregates.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@tanstack/db': patch
3+
---
4+
5+
fix: support aggregates (e.g. count) in child/includes subqueries with per-parent scoping

packages/db/src/query/compiler/group-by.ts

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ export function processGroupBy(
8080
havingClauses?: Array<Having>,
8181
selectClause?: Select,
8282
fnHavingClauses?: Array<(row: any) => any>,
83+
mainSource?: string,
8384
): NamespacedAndKeyedStream {
8485
// Handle empty GROUP BY (single-group aggregation)
8586
if (groupByClause.length === 0) {
@@ -110,8 +111,15 @@ export function processGroupBy(
110111
}
111112
}
112113

113-
// Use a constant key for single group
114-
const keyExtractor = () => ({ __singleGroup: true })
114+
// Use a constant key for single group.
115+
// When mainSource is set (includes mode), include __correlationKey so that
116+
// rows from different parents aggregate separately.
117+
const keyExtractor = mainSource
118+
? ([, row]: [string, NamespacedRow]) => ({
119+
__singleGroup: true,
120+
__correlationKey: (row as any)?.[mainSource]?.__correlationKey,
121+
})
122+
: () => ({ __singleGroup: true })
115123

116124
// Apply the groupBy operator with single group
117125
pipeline = pipeline.pipe(
@@ -139,14 +147,24 @@ export function processGroupBy(
139147
)
140148
}
141149

142-
// Use a single key for the result and update $selected
143-
return [
144-
`single_group`,
145-
{
146-
...aggregatedRow,
147-
$selected: finalResults,
148-
},
149-
] as [unknown, Record<string, any>]
150+
// Use a single key for the result and update $selected.
151+
// When in includes mode, restore the namespaced source structure with
152+
// __correlationKey so output extraction can route results per-parent.
153+
const correlationKey = mainSource
154+
? (aggregatedRow as any).__correlationKey
155+
: undefined
156+
const resultKey =
157+
correlationKey !== undefined
158+
? `single_group_${serializeValue(correlationKey)}`
159+
: `single_group`
160+
const resultRow: Record<string, any> = {
161+
...aggregatedRow,
162+
$selected: finalResults,
163+
}
164+
if (mainSource && correlationKey !== undefined) {
165+
resultRow[mainSource] = { __correlationKey: correlationKey }
166+
}
167+
return [resultKey, resultRow] as [unknown, Record<string, any>]
150168
}),
151169
)
152170

@@ -196,7 +214,9 @@ export function processGroupBy(
196214
compileExpression(e),
197215
)
198216

199-
// Create a key extractor function using simple __key_X format
217+
// Create a key extractor function using simple __key_X format.
218+
// When mainSource is set (includes mode), include __correlationKey so that
219+
// rows from different parents with the same group key aggregate separately.
200220
const keyExtractor = ([, row]: [
201221
string,
202222
NamespacedRow & { $selected?: any },
@@ -214,6 +234,10 @@ export function processGroupBy(
214234
key[`__key_${i}`] = value
215235
}
216236

237+
if (mainSource) {
238+
key.__correlationKey = (row as any)?.[mainSource]?.__correlationKey
239+
}
240+
217241
return key
218242
}
219243

@@ -278,25 +302,32 @@ export function processGroupBy(
278302
}
279303
}
280304

281-
// Generate a simple key for the live collection using group values
282-
let finalKey: unknown
283-
if (groupByClause.length === 1) {
284-
finalKey = aggregatedRow[`__key_0`]
285-
} else {
286-
const keyParts: Array<unknown> = []
287-
for (let i = 0; i < groupByClause.length; i++) {
288-
keyParts.push(aggregatedRow[`__key_${i}`])
289-
}
290-
finalKey = serializeValue(keyParts)
305+
// Generate a simple key for the live collection using group values.
306+
// When in includes mode, include the correlation key so that groups
307+
// from different parents don't collide.
308+
const correlationKey = mainSource
309+
? (aggregatedRow as any).__correlationKey
310+
: undefined
311+
const keyParts: Array<unknown> = []
312+
for (let i = 0; i < groupByClause.length; i++) {
313+
keyParts.push(aggregatedRow[`__key_${i}`])
291314
}
292-
293-
return [
294-
finalKey,
295-
{
296-
...aggregatedRow,
297-
$selected: finalResults,
298-
},
299-
] as [unknown, Record<string, any>]
315+
if (correlationKey !== undefined) {
316+
keyParts.push(correlationKey)
317+
}
318+
const finalKey =
319+
keyParts.length === 1 ? keyParts[0] : serializeValue(keyParts)
320+
321+
// When in includes mode, restore the namespaced source structure with
322+
// __correlationKey so output extraction can route results per-parent.
323+
const resultRow: Record<string, any> = {
324+
...aggregatedRow,
325+
$selected: finalResults,
326+
}
327+
if (mainSource && correlationKey !== undefined) {
328+
resultRow[mainSource] = { __correlationKey: correlationKey }
329+
}
330+
return [finalKey, resultRow] as [unknown, Record<string, any>]
300331
}),
301332
)
302333

packages/db/src/query/compiler/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,14 +529,18 @@ export function compileQuery(
529529
)
530530
}
531531

532-
// Process the GROUP BY clause if it exists
532+
// Process the GROUP BY clause if it exists.
533+
// When in includes mode (parentKeyStream), pass mainSource so that groupBy
534+
// preserves __correlationKey for per-parent aggregation.
535+
const groupByMainSource = parentKeyStream ? mainSource : undefined
533536
if (query.groupBy && query.groupBy.length > 0) {
534537
pipeline = processGroupBy(
535538
pipeline,
536539
query.groupBy,
537540
query.having,
538541
query.select,
539542
query.fnHaving,
543+
groupByMainSource,
540544
)
541545
} else if (query.select) {
542546
// Check if SELECT contains aggregates but no GROUP BY (implicit single-group aggregation)
@@ -551,6 +555,7 @@ export function compileQuery(
551555
query.having,
552556
query.select,
553557
query.fnHaving,
558+
groupByMainSource,
554559
)
555560
}
556561
}

0 commit comments

Comments
 (0)