11DESCRIPTION >
2- Compacts activities from same PR into one, keeping necessary information in a single row. Helps to serve PR-wide widgets in the development tab.
3- Uses existing pull_requests_analyzed data as baseline and merges new events from activityRelations on top.
4- Only updates fields that have new data, preserving existing data for unchanged PRs.
2+ Compacts activities from same PR into one, keeping necessary information in a single row.
3+ Uses existing pull_requests_analyzed data as baseline and merges new events on top.
4+
5+ NODE snapshot_resolver
6+ DESCRIPTION >
7+ Resolve latest snapshot ID once for reuse across all nodes.
8+
9+ SQL >
10+ SELECT max(snapshotId) as latestSnapshotId FROM activityRelations_enrich_snapshot_MV_ds
511
612NODE new_pull_request_related_activity
713SQL >
814 SELECT
915 activityId as id,
1016 sourceId,
17+ sourceParentId,
1118 channel,
12- timestamp AS openedAt ,
19+ timestamp as ts ,
1320 segmentId,
1421 gitChangedLinesBucket,
1522 memberId,
1623 organizationId,
1724 platform,
18- updatedAt
25+ updatedAt,
26+ type,
27+ pullRequestReviewState
1928 FROM activityRelations_enrich_snapshot_MV_ds
20- where
21- snapshotId = (select max(snapshotId) from activityRelations_enrich_snapshot_MV_ds )
22- AND type in (
29+ WHERE
30+ snapshotId = (SELECT latestSnapshotId FROM snapshot_resolver )
31+ AND type IN (
2332 'pull_request-opened',
2433 'merge_request-opened',
2534 'changeset-created',
3039 'pull_request-reviewed',
3140 'merge_request-review-changes-requested',
3241 'patchset-created',
33- 'pull_request-reviewed',
3442 'merge_request-review-approved',
3543 'patchset_approval-created',
3644 'pull_request-closed',
4452
4553NODE new_events_aggregated
4654DESCRIPTION >
47- Aggregates ONLY the new events from the latest snapshot in activityRelations.
48- This represents the "delta" - what's changed since the last update.
55+ Aggregates new events from the latest snapshot by PR source ID.
4956
5057SQL >
5158 SELECT
@@ -61,71 +68,49 @@ SQL >
6168 sourceParentId
6269 ) AS prSourceId,
6370 argMinIf(
64- activityId,
65- timestamp,
66- type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
71+ id, ts, type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
6772 ) AS id,
6873 argMinIf(
69- channel,
70- timestamp,
71- type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
74+ channel, ts, type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
7275 ) AS channel,
7376 argMinIf(
74- segmentId,
75- timestamp,
76- type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
77+ segmentId, ts, type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
7778 ) AS segmentId,
7879 argMinIf(
7980 gitChangedLinesBucket,
80- timestamp ,
81+ ts ,
8182 type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
8283 ) AS gitChangedLinesBucket,
8384 argMinIf(
84- memberId,
85- timestamp,
86- type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
85+ memberId, ts, type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
8786 ) AS memberId,
8887 argMinIf(
8988 organizationId,
90- timestamp ,
89+ ts ,
9190 type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
9291 ) AS organizationId,
9392 argMinIf(
94- platform,
95- timestamp,
96- type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
93+ platform, ts, type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
9794 ) AS platform,
9895 minIf(
99- timestamp , type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
96+ ts , type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
10097 ) AS openedAt,
10198 argMinIf(
102- updatedAt,
103- timestamp,
104- type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
99+ updatedAt, ts, type IN ('pull_request-opened', 'merge_request-opened', 'changeset-created')
105100 ) AS openedUpdatedAt,
106101 toInt64(
107102 countIf(type = 'patchset-created' AND sourceParentId = prSourceId)
108103 ) AS numberOfPatchsets,
109104 argMin(
110- updatedAt, if(type IN ('pull_request-assigned', 'merge_request-assigned'), timestamp , NULL)
105+ updatedAt, if(type IN ('pull_request-assigned', 'merge_request-assigned'), ts , NULL)
111106 ) AS assignedUpdatedAt,
112- min(
113- if(type IN ('pull_request-assigned', 'merge_request-assigned'), timestamp, NULL)
114- ) AS assignedAt,
107+ min(if(type IN ('pull_request-assigned', 'merge_request-assigned'), ts, NULL)) AS assignedAt,
115108 argMin(
116109 updatedAt,
117- if(
118- type IN ('pull_request-review-requested', 'merge_request-review-requested'),
119- timestamp,
120- NULL
121- )
110+ if(type IN ('pull_request-review-requested', 'merge_request-review-requested'), ts, NULL)
122111 ) AS reviewRequestedUpdatedAt,
123112 min(
124- if(
125- type IN ('pull_request-review-requested', 'merge_request-review-requested'),
126- timestamp,
127- NULL
128- )
113+ if(type IN ('pull_request-review-requested', 'merge_request-review-requested'), ts, NULL)
129114 ) AS reviewRequestedAt,
130115 argMin(
131116 updatedAt,
@@ -135,7 +120,7 @@ SQL >
135120 type = 'patchset_approval-created'
136121 AND splitByChar('-', sourceParentId)[1] = prSourceId
137122 ),
138- timestamp ,
123+ ts ,
139124 NULL
140125 )
141126 ) AS reviewedUpdatedAt,
@@ -146,7 +131,7 @@ SQL >
146131 type = 'patchset_approval-created'
147132 AND splitByChar('-', sourceParentId)[1] = prSourceId
148133 ),
149- timestamp ,
134+ ts ,
150135 NULL
151136 )
152137 ) AS reviewedAt,
@@ -159,7 +144,7 @@ SQL >
159144 type = 'patchset_approval-created'
160145 AND splitByChar('-', sourceParentId)[1] = prSourceId
161146 ),
162- timestamp ,
147+ ts ,
163148 NULL
164149 )
165150 ) AS approvedUpdatedAt,
@@ -171,7 +156,7 @@ SQL >
171156 type = 'patchset_approval-created'
172157 AND splitByChar('-', sourceParentId)[1] = prSourceId
173158 ),
174- timestamp ,
159+ ts ,
175160 NULL
176161 )
177162 ) AS approvedAt,
@@ -184,7 +169,7 @@ SQL >
184169 'changeset-closed',
185170 'changeset-abandoned'
186171 ),
187- timestamp ,
172+ ts ,
188173 NULL
189174 )
190175 ) AS closedUpdatedAt,
@@ -196,24 +181,16 @@ SQL >
196181 'changeset-closed',
197182 'changeset-abandoned'
198183 ),
199- timestamp ,
184+ ts ,
200185 NULL
201186 )
202187 ) AS closedAt,
203188 argMin(
204189 updatedAt,
205- if(
206- type IN ('pull_request-merged', 'merge_request-merged', 'changeset-merged'),
207- timestamp,
208- NULL
209- )
190+ if(type IN ('pull_request-merged', 'merge_request-merged', 'changeset-merged'), ts, NULL)
210191 ) AS mergedUpdatedAt,
211192 min(
212- if(
213- type IN ('pull_request-merged', 'merge_request-merged', 'changeset-merged'),
214- timestamp,
215- NULL
216- )
193+ if(type IN ('pull_request-merged', 'merge_request-merged', 'changeset-merged'), ts, NULL)
217194 ) AS mergedAt,
218195 argMin(
219196 updatedAt,
@@ -227,7 +204,7 @@ SQL >
227204 'changeset-closed',
228205 'changeset-abandoned'
229206 ),
230- timestamp ,
207+ ts ,
231208 NULL
232209 )
233210 ) AS resolvedUpdatedAt,
@@ -242,42 +219,28 @@ SQL >
242219 'changeset-closed',
243220 'changeset-abandoned'
244221 ),
245- timestamp ,
222+ ts ,
246223 NULL
247224 )
248225 ) AS resolvedAt
249- FROM activityRelations_enrich_snapshot_MV_ds
250- WHERE
251- snapshotId = (select max(snapshotId) from activityRelations_enrich_snapshot_MV_ds)
252- AND type IN (
253- 'pull_request-opened',
254- 'merge_request-opened',
255- 'changeset-created',
256- 'pull_request-assigned',
257- 'merge_request-assigned',
258- 'pull_request-review-requested',
259- 'merge_request-review-requested',
260- 'pull_request-reviewed',
261- 'merge_request-review-changes-requested',
262- 'patchset-created',
263- 'merge_request-review-approved',
264- 'patchset_approval-created',
265- 'pull_request-closed',
266- 'merge_request-closed',
267- 'changeset-closed',
268- 'changeset-abandoned',
269- 'pull_request-merged',
270- 'merge_request-merged',
271- 'changeset-merged'
272- )
226+ FROM new_pull_request_related_activity
273227 GROUP BY prSourceId
274228 HAVING prSourceId != ''
275229
230+ NODE baseline_filtered
231+ DESCRIPTION >
232+ Pre-filter baseline using semi-join pattern.
233+ This significantly reduces the data scanned in the final JOIN.
234+
235+ SQL >
236+ SELECT *
237+ FROM pull_requests_analyzed
238+ WHERE sourceId IN (SELECT prSourceId FROM new_events_aggregated)
239+
276240NODE pull_request_analysis_results_merged
277241DESCRIPTION >
278- Takes existing pull_requests_analyzed data as baseline (LEFT side).
279- Merges new event data on top using COALESCE - new data overwrites existing data if present.
280- For new PRs (not in baseline), uses the new data directly.
242+ Merge new events with filtered baseline using ANY LEFT JOIN.
243+ New data overwrites existing data where present, preserves existing for unchanged fields.
281244
282245SQL >
283246 SELECT
@@ -389,8 +352,8 @@ SQL >
389352 INTERVAL 1 hour
390353 )
391354 + INTERVAL 1 hour as snapshotId
392- FROM new_events_aggregated new
393- LEFT JOIN pull_requests_analyzed existing ON new.prSourceId = existing.sourceId
355+ FROM new_events_aggregated new ANY
356+ LEFT JOIN baseline_filtered existing ON new.prSourceId = existing.sourceId
394357 WHERE if(existing.id != '', existing.id, new.id) != ''
395358
396359TYPE MATERIALIZED
0 commit comments