Skip to content

Commit 95bd360

Browse files
authored
feat: activityRelations and PRs lambda architecture (#3624)
1 parent e4914f8 commit 95bd360

49 files changed

Lines changed: 1849 additions & 115 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

services/libs/tinybird/LAMBDA_ARCHITECTURE.md

Lines changed: 558 additions & 0 deletions
Large diffs are not rendered by default.

services/libs/tinybird/README.md

Lines changed: 6 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,7 @@
33
[This image](https://uploads.linear.app/aebec7ad-5649-4758-9bed-061f7228a879/b72d9f55-8f27-4c57-81fe-729807c12ffb/36c116c2-0f88-4735-a932-0c3e6bf8ea45) shows how data flows from CM to Insights.
44

55
## Activity Preprocessing Pipeline
6-
7-
1. **New activities land** on `activities` and `activityRelations` datasources
8-
2. **Deduplication** of activities via copy pipe:
9-
- `activities_deduplicated_copy_pipe (every hour at minute 0)`
10-
2.1. `activities``activities_deduplicated_ds`
11-
3. **Preprocessing pipeline for activityRelations - Deduplicates, filters and sorts data for performant queries**:
12-
- `activityRelations (every hour at minute 0)``activityRelations_deduplicated_cleaned_ds`
13-
14-
## Other Copy Pipes
15-
16-
1. **pull_request_analysis_copy_pipe (every hour at minute 15)**: Compacts activities from same PR into one, keeping state change times in the same row. Helps with serving PR related metrics
17-
2. **issue_analysis_copy_pipe (every hour at minute 15)**: Similar to pr analysis, this time we compact issue related information into one row.
6+
See LAMBDA_ARCHITECTURE.md for details
187

198
---
209

@@ -63,18 +52,6 @@ Since `activities` **don’t exist in Postgres**, schema iteration must be done
6352

6453
### Iterating on Datasources Replicated by Sequin
6554

66-
These sources exist in Postgres (i.e., all Tinybird datasources **except `activities`**):
67-
68-
- `activityRelations`
69-
- `collections`
70-
- `insightsProjects`
71-
- `collectionsInsightsProjects`
72-
- `members`
73-
- `organizations`
74-
- `segments`
75-
- `securityInsightsEvaluationSuiteControlEvaluations`
76-
- `securityInsightsEvaluationSuiteControlEvaluationAssessments`
77-
7855
**Steps:**
7956
1. **Pause** the related Sequin sink
8057
2. **Run Postgres migration** to add/update/remove fields
@@ -83,7 +60,6 @@ These sources exist in Postgres (i.e., all Tinybird datasources **except `activi
8360
4. **Backfill** the resource from Sequin
8461
5. **Restart** the paused sink
8562

86-
8763
---
8864

8965
### Add new tables to sequin and tinybird
@@ -95,7 +71,7 @@ These sources exist in Postgres (i.e., all Tinybird datasources **except `activi
9571
ALTER PUBLICATION sequin_pub ADD TABLE "tableName";
9672
ALTER TABLE public."tableName" REPLICA IDENTITY FULL;
9773
```
98-
3. (only for PROD) u need to create the topic in oracle kafka, it doesn't get created automaticly
74+
3. (only for PROD) You need to create the topic in oracle kafka, it doesn't get created automaticly
9975
4. Update tinybird kafka connect plugin env ( it's under crowd-kube/lf-prod-oracle(lf-staging-oracle)/kafka-connect/tinybird-sink.properties.enc ), there are list of tracked files in the decrypted file.
10076
5. Restart kafka-connect
10177
6. Create tinybird datasource schema and push it to tinybird
@@ -111,11 +87,11 @@ GRANT SELECT ON "tableName" to sequin;
11187

11288
### Downtime Consideration
11389

114-
Switching between old and new datasources can lead to **temporary downtime**, but only for **endpoint pipes that consume raw datasources directly**.
90+
Switching between old and new datasources can lead to **temporary downtime**, but only for **endpoint pipes that consume raw datasources directly**.
11591

116-
**No Downtime** if the endpoint pipe uses a **deduplication copy pipe**:
117-
- You can safely remove the raw datasource
118-
- The deduplicated datasource will continue to serve data
92+
**No Downtime** if the endpoint pipe uses a **copy pipe result**:
93+
- You can safely remove the raw datasource after stopping the copy job
94+
- The copy pipe result datasource will continue to serve data
11995
- New fields will be included in the **next copy run**
12096

12197
**Only consider the following tips if your pipe is consuming raw datasources directly**:
@@ -127,34 +103,6 @@ Switching between old and new datasources can lead to **temporary downtime**, bu
127103

128104
---
129105

130-
### Alternative Way to Handle Datasource Iterations
131-
132-
You can avoid downtime entirely by **not deleting the old datasource**.
133-
134-
Instead of renaming the new datasource to the old one,
135-
**Update each endpoint pipe to use the new datasource directly**
136-
137-
This allows your pipelines to stay active without interruption.
138-
139-
#### Pros:
140-
- No downtime at all
141-
- Safer testing of the new datasource before retiring the old one
142-
143-
#### Cons:
144-
- Every pipe using the old datasource must be updated manually
145-
- Easy to miss a reference if not done carefully
146-
147-
---
148-
149-
### Choosing the Right Approach
150-
151-
Until we move fully to **Tinybird Forward** (which will support migration scripts), the best practice is to **find a balance** between these two approaches:
152-
153-
1. **Quick rename strategy** is best when the raw datasource is only consumed by deduplication copy pipes, but no endpoints
154-
2. **Pipe-by-pipe updates** for zero downtime where #1 is not enough
155-
156-
Pick the method that best fits your workflow and datasource complexity.
157-
158106
# Testing Tinybird Pipes Locally
159107

160108
This guide explains how to test a Tinybird data pipeline ("pipe") on your local Tinybird environment. We will fetch sample data (fixtures) from a staging Tinybird workspace and use it to run and verify a pipe locally. The steps below are written for a developer who may not be familiar with Tinybird, and they are organized in a clear, numbered format for easy follow-up.

services/libs/tinybird/datasources/activityRelations_deduplicated_cleaned_ds.datasource

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,10 @@ SCHEMA >
5454
`gitChangedLines` UInt64,
5555
`gitChangedLinesBucket` String,
5656
`organizationCountryCode` LowCardinality(String),
57-
`organizationName` String
57+
`organizationName` String,
58+
`snapshotId` DateTime
5859

5960
ENGINE MergeTree
60-
ENGINE_PARTITION_KEY toYear(timestamp)
61+
ENGINE_PARTITION_KEY snapshotId
6162
ENGINE_SORTING_KEY segmentId, timestamp, type, platform, memberId, organizationId
63+
ENGINE_TTL toDateTime(snapshotId) + toIntervalHour(6)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
SCHEMA >
2+
`activityId` String,
3+
`conversationId` String,
4+
`createdAt` DateTime64(3),
5+
`updatedAt` DateTime64(3),
6+
`memberId` String,
7+
`objectMemberId` String,
8+
`objectMemberUsername` String,
9+
`organizationId` String,
10+
`parentId` String,
11+
`platform` LowCardinality(String),
12+
`segmentId` String,
13+
`username` String,
14+
`sourceId` String,
15+
`type` LowCardinality(String),
16+
`timestamp` DateTime64(3),
17+
`sourceParentId` String,
18+
`channel` String,
19+
`sentimentScore` Int8,
20+
`gitInsertions` UInt32,
21+
`gitDeletions` UInt32,
22+
`score` Int8,
23+
`isContribution` UInt8,
24+
`pullRequestReviewState` LowCardinality(String),
25+
`gitChangedLines` UInt64,
26+
`gitChangedLinesBucket` String,
27+
`organizationCountryCode` LowCardinality(String),
28+
`organizationName` String,
29+
`snapshotId` DateTime
30+
31+
ENGINE ReplacingMergeTree
32+
ENGINE_PARTITION_KEY toYYYYMM(snapshotId)
33+
ENGINE_SORTING_KEY snapshotId, segmentId, timestamp, type, platform, channel, sourceId
34+
ENGINE_TTL toDateTime(snapshotId) + toIntervalDay(1)

services/libs/tinybird/datasources/activityRelations_sorted.datasource

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ DESCRIPTION >
1313
- `segmentId` links to the subproject-level segment for filtering and organization.
1414
- `username` contains the username of the member who performed the activity.
1515

16-
TAGS "Activity preprocessing pipeline", "Query optimization"
17-
1816
SCHEMA >
1917
`activityId` String,
2018
`conversationId` String,

services/libs/tinybird/datasources/contributions_with_local_time_ds.datasource

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ DESCRIPTION >
1010
- `weekday` represents the day of the week (0-6, UInt8) in local time.
1111
- `two_hours_block` represents 2-hour time blocks (0-11, UInt16) for temporal pattern analysis.
1212

13-
TAGS "Contribution patterns", "Temporal analysis"
14-
1513
SCHEMA >
1614
`id` String,
1715
`timestamp` DateTime64(3),

services/libs/tinybird/datasources/maintainers_roles_copy_ds.datasource

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ DESCRIPTION >
88
- `memberId` links to the member who has this maintainer role.
99
- `insightsProjectId` links to the insights project this repository belongs to.
1010

11-
TAGS "Maintainer roles", "Project governance"
11+
TAGS "Project governance"
1212

1313
SCHEMA >
1414
`id` String,

services/libs/tinybird/datasources/pull_requests_analyzed.datasource

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ SCHEMA >
3434
`reviewedInSeconds` Nullable(Int64),
3535
`closedInSeconds` Nullable(Int64),
3636
`mergedInSeconds` Nullable(Int64),
37-
`resolvedInSeconds` Nullable(Int64)
37+
`resolvedInSeconds` Nullable(Int64),
38+
`snapshotId` DateTime
3839

3940
ENGINE MergeTree
4041
ENGINE_PARTITION_KEY toYear(openedAt)

services/libs/tinybird/datasources/segmentsAggregatedMV.datasource

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ DESCRIPTION >
66
- `contributorCount` is an aggregate function storing distinct contributor counts for the segment.
77
- `organizationCount` is an aggregate function storing distinct organization counts for the segment.
88

9-
TAGS "Segment aggregates", "Materialized view"
10-
119
SCHEMA >
1210
`segmentId` String,
1311
`contributorCount` AggregateFunction(countDistinct, String),

services/libs/tinybird/pipes/activities_filtered.pipe

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@ DESCRIPTION >
1717
- This pipe is consumed by many of downstream pipes and widgets across the platform for consistent activity filtering.
1818
- Performance is optimized through proper sorting keys on `segmentId`, `timestamp`, `type`, `platform`, and `memberId` in the source datasource.
1919

20-
NODE activities_filtered_v2_1
20+
NODE activities_filtered_LAMBDA
2121
SQL >
2222
%
2323
SELECT activityId as id, timestamp, type, platform, memberId, organizationId, segmentId
2424
FROM activityRelations_deduplicated_cleaned_ds a
2525
where
26-
segmentId = (SELECT segmentId FROM segments_filtered)
26+
snapshotId = (select max(snapshotId) from activityRelations_deduplicated_cleaned_ds)
27+
AND segmentId = (SELECT segmentId FROM segments_filtered)
2728
{% if defined(startDate) %}
2829
AND a.timestamp
2930
> {{ DateTime(startDate, description="Filter activity timestamp after", required=False) }}

0 commit comments

Comments
 (0)