Skip to content

Commit 44044ce

Browse files
authored
feat: migrate github stats (#3447)
1 parent 501a9dc commit 44044ce

5 files changed

Lines changed: 159 additions & 95 deletions

File tree

backend/src/database/repositories/integrationProgressRepository.ts

Lines changed: 69 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,32 @@
11
import { QueryTypes } from 'sequelize'
22

3+
import { queryActivitiesCounter } from '@crowd/data-access-layer'
4+
import { Counter, TinybirdClient } from '@crowd/data-access-layer/src/database'
5+
36
import { Repos } from '@/serverless/integrations/types/regularTypes'
47
import { GitHubStats } from '@/serverless/integrations/usecases/github/rest/getRemoteStats'
58

69
import { IRepositoryOptions } from './IRepositoryOptions'
710
import SequelizeRepository from './sequelizeRepository'
811

912
class IntegrationProgressRepository {
13+
static createPayloadWithActivityType(
14+
activityTypes: string[],
15+
repos: Repos,
16+
segments: string[] = [],
17+
) {
18+
return {
19+
filter: {
20+
and: [
21+
{ platform: { in: ['github'] } },
22+
{ or: repos.map((repo) => ({ channel: { eq: repo.url } })) },
23+
{ type: { in: activityTypes } },
24+
],
25+
},
26+
segmentIds: segments,
27+
}
28+
}
29+
1030
static async getPendingStreamsCount(integrationId: string, options: IRepositoryOptions) {
1131
const transaction = options.transaction
1232
const seq = SequelizeRepository.getSequelize(options)
@@ -66,89 +86,56 @@ class IntegrationProgressRepository {
6686
return (result[0] as any).id as string
6787
}
6888

69-
static async getDbStatsForGithub(
70-
repos: Repos,
71-
options: IRepositoryOptions,
72-
): Promise<GitHubStats> {
73-
// TODO questdb to tinybird remove - it's here for linter to be happy
74-
options.log.info('getDbStatsForGithub', { repos })
75-
// TODO questdb to tinybird
76-
// const starsQuery = `
77-
// SELECT COUNT_DISTINCT("sourceId") AS count
78-
// FROM activities
79-
// WHERE platform = 'github'
80-
// AND type = 'star'
81-
// AND "deletedAt" IS NULL
82-
// AND channel IN ($(remotes:csv))
83-
// `
84-
85-
// const unstarsQuery = `
86-
// SELECT COUNT_DISTINCT("sourceId") AS count
87-
// FROM activities
88-
// WHERE platform = 'github'
89-
// AND type = 'unstar'
90-
// AND "deletedAt" IS NULL
91-
// AND channel IN ($(remotes:csv))
92-
// `
93-
94-
// const forksQuery = `
95-
// SELECT COUNT_DISTINCT("sourceId") AS count
96-
// FROM activities
97-
// WHERE platform = 'github'
98-
// AND type = 'fork'
99-
// AND "deletedAt" IS NULL
100-
// AND "gitIsIndirectFork" != TRUE
101-
// AND channel IN ($(remotes:csv))
102-
// `
103-
104-
// const issuesOpenedQuery = `
105-
// SELECT COUNT_DISTINCT("sourceId") AS count
106-
// FROM activities
107-
// WHERE platform = 'github'
108-
// AND type = 'issues-opened'
109-
// AND "deletedAt" IS NULL
110-
// AND channel IN ($(remotes:csv))
111-
// `
112-
113-
// const prOpenedQuery = `
114-
// SELECT COUNT_DISTINCT("sourceId") AS count
115-
// FROM activities
116-
// WHERE platform = 'github'
117-
// AND type = 'pull_request-opened'
118-
// AND "deletedAt" IS NULL
119-
// AND channel IN ($(remotes:csv))
120-
// `
121-
122-
// const remotes = repos.map((r) => r.url)
123-
124-
// const promises: Promise<any[]>[] = [
125-
// options.qdb.query(starsQuery, {
126-
// remotes,
127-
// }),
128-
// options.qdb.query(unstarsQuery, {
129-
// remotes,
130-
// }),
131-
// options.qdb.query(forksQuery, {
132-
// remotes,
133-
// }),
134-
// options.qdb.query(issuesOpenedQuery, {
135-
// remotes,
136-
// }),
137-
// options.qdb.query(prOpenedQuery, {
138-
// remotes,
139-
// }),
140-
// ]
141-
142-
// const results = await Promise.all(promises)
89+
static async getDbStatsForGithub({
90+
repos,
91+
segments,
92+
}: {
93+
repos: Repos
94+
segments: string[]
95+
}): Promise<GitHubStats> {
96+
const tb = new TinybirdClient()
97+
98+
const promises: Promise<{ data: Counter }>[] = [
99+
queryActivitiesCounter(
100+
IntegrationProgressRepository.createPayloadWithActivityType(['star'], repos, segments),
101+
tb,
102+
),
103+
queryActivitiesCounter(
104+
IntegrationProgressRepository.createPayloadWithActivityType(['unstar'], repos, segments),
105+
tb,
106+
),
107+
queryActivitiesCounter(
108+
{
109+
...IntegrationProgressRepository.createPayloadWithActivityType(['fork'], repos, segments),
110+
indirectFork: 1,
111+
},
112+
tb,
113+
),
114+
queryActivitiesCounter(
115+
IntegrationProgressRepository.createPayloadWithActivityType(
116+
['issues-opened'],
117+
repos,
118+
segments,
119+
),
120+
tb,
121+
),
122+
queryActivitiesCounter(
123+
IntegrationProgressRepository.createPayloadWithActivityType(
124+
['pull_request-opened'],
125+
repos,
126+
segments,
127+
),
128+
tb,
129+
),
130+
]
131+
132+
const result = await Promise.all(promises)
133+
143134
return {
144-
// stars: parseInt(results[0][0].count, 10) - parseInt(results[1][0].count, 10),
145-
// forks: parseInt(results[2][0].count, 10),
146-
// totalIssues: parseInt(results[3][0].count, 10),
147-
// totalPRs: parseInt(results[4][0].count, 10),
148-
stars: 0,
149-
forks: 0,
150-
totalIssues: 0,
151-
totalPRs: 0,
135+
stars: (result[0]?.data?.[0]?.count ?? 0) - (result[1]?.data?.[0]?.count ?? 0),
136+
forks: result[2]?.data?.[0]?.count ?? 0,
137+
totalIssues: result[3]?.data?.[0]?.count ?? 0,
138+
totalPRs: result[4]?.data?.[0]?.count ?? 0,
152139
}
153140
}
154141

backend/src/services/integrationService.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2110,6 +2110,10 @@ export default class IntegrationService {
21102110
name: string
21112111
updatedAt: string
21122112
}[]
2113+
2114+
const githubRepos = await this.getGithubRepos(integrationId)
2115+
const mappedSegments = githubRepos.map((repo) => repo.segment.id)
2116+
21132117
const cacheRemote = new RedisCache(
21142118
'github-progress-remote',
21152119
this.options.redis,
@@ -2149,10 +2153,23 @@ export default class IntegrationService {
21492153
let cachedStats
21502154
cachedStats = await cacheDb.get(key)
21512155
if (!cachedStats) {
2152-
cachedStats = await IntegrationProgressRepository.getDbStatsForGithub(repos, this.options)
2156+
const segments = Array.from(
2157+
new Set([...(integration.segmentId ? [integration.segmentId] : []), ...mappedSegments]),
2158+
)
2159+
2160+
this.options.log.info(
2161+
`Evaluating cache for repos: ${repos.map((r) => r.name).join(',')} and segments: ${segments}`,
2162+
)
2163+
cachedStats = await IntegrationProgressRepository.getDbStatsForGithub({
2164+
repos,
2165+
segments,
2166+
})
2167+
2168+
this.options.log.info(`Caching data: ${JSON.stringify(cachedStats)}`)
21532169
// cache for 1 minute
21542170
await cacheDb.set(key, JSON.stringify(cachedStats), 60)
21552171
} else {
2172+
this.options.log.info(`Cache data found: ${JSON.stringify(cachedStats)}`)
21562173
cachedStats = JSON.parse(cachedStats)
21572174
}
21582175
return cachedStats as GitHubStats
@@ -2179,6 +2196,9 @@ export default class IntegrationService {
21792196
getDbStatsOrExitEarly(integrationId),
21802197
])
21812198

2199+
this.options.log.info('Remote stats:', remoteStats)
2200+
this.options.log.info('DB stats:', dbStats)
2201+
21822202
// this to prevent too long waiting time
21832203
if (remoteStats === undefined || dbStats === undefined) {
21842204
return {
@@ -2306,6 +2326,7 @@ export default class IntegrationService {
23062326
await IntegrationProgressRepository.getAllIntegrationsInProgressForSegment(this.options)
23072327
return Promise.all(integrationIds.map((id) => this.getIntegrationProgress(id)))
23082328
}
2329+
23092330
const integrationIds =
23102331
await IntegrationProgressRepository.getAllIntegrationsInProgressForMultipleSegments(
23112332
this.options,

services/libs/data-access-layer/src/activities/sql.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import moment from 'moment'
66
import {
77
ActivityRelations,
88
ActivityTimeseriesDatapoint,
9+
Counter,
910
DbConnOrTx,
1011
TinybirdClient,
1112
} from '@crowd/database'
@@ -345,6 +346,19 @@ export async function queryActivities(
345346
}
346347
}
347348

349+
export async function queryActivitiesCounter(
350+
arg: IQueryActivitiesParameters & { indirectFork?: number },
351+
tbClient: TinybirdClient,
352+
): Promise<{ data: Counter }> {
353+
const payload = {
354+
...buildActivitiesParams(arg),
355+
...(arg.indirectFork && { indirectFork: arg.indirectFork }),
356+
countOnly: 1,
357+
}
358+
359+
return tbClient.pipe('activities_relations_filtered', payload)
360+
}
361+
348362
export function mapActivityRowToResult(a: any, columns: string[]): any {
349363
const sentiment: IActivitySentiment | null =
350364
a.sentimentLabel &&

services/libs/database/src/tinybirdClient.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ export type ActivityTimeseriesDatapoint = {
3030
count: number
3131
}
3232

33+
export type Counter = {
34+
count: number
35+
}
36+
3337
export class TinybirdClient {
3438
private host: string
3539
private token: string

services/libs/tinybird/pipes/activities_relations_filtered.pipe

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,22 @@ DESCRIPTION >
1515
- `onlyContributions`: Optional boolean, defaults to 1 (contributions only), set to 0 for all activities.
1616
- `page`: Optional integer page index for pagination (OFFSET-based), defaults to 0.
1717
- `pageSize`: Optional integer page size, defaults to 10.
18-
- Response (final node): all relation fields from Node 1 plus `url`, `body`, `title` from activities.
18+
- `orderBy`: Optional enum ['timestamp_ASC','createdAt_ASC','createdAt_DESC'] else defaults to timestamp DESC.
19+
- `searchTerm`: Optional case-insensitive search in channel/type/title/body.
20+
- Dynamic OR blocks via `G1..G5_*` as in query below (include/exclude groups).
21+
- Response (final node): all relation fields from Node 1 plus `url`, `body`, `title`, `attributes` from activities.
1922
- Performance:
20-
- The enrichment only scans the subset of `activities_deduplicated_ds` whose `id` is found in the filtered page from Node 1, minimizing I/O.
23+
- The enrichment only scans the subset of `activities_deduplicated_ds` whose `id` is found in the filtered (and paginated) set from Node 1, minimizing I/O.
2124
- Keep page sizes reasonable (50–200) for consistent latency.
2225
- Ensure `activityId` and `id` types are aligned (both UUID or both String). If they differ, this pipe casts to String at join time.
2326

2427
NODE filtered_relations
2528
SQL >
2629
%
30+
{% set is_count = 0 %}
31+
{% if defined(countOnly) %}
32+
{% if String(countOnly) == '1' or Int8(countOnly, 0) == 1 %} {% set is_count = 1 %} {% end %}
33+
{% end %}
2734
SELECT
2835
ar.activityId AS id,
2936
ar.channel,
@@ -45,6 +52,35 @@ SQL >
4552
{% if defined(endDate) %}
4653
AND ar.timestamp < parseDateTimeBestEffort({{ String(endDate) }})
4754
{% end %}
55+
{% if defined(platform) %} AND ar.platform = {{ String(platform) }} {% end %}
56+
{% if defined(activity_type) %} AND ar.type = {{ String(activity_type) }} {% end %}
57+
{% if defined(activity_types) %} AND ar.type IN {{ Array(activity_types, 'String') }} {% end %}
58+
-- repos filter: semi-join to activities on URL contains any repo string
59+
{% if defined(repos) %}
60+
AND CAST(ar.activityId AS String) IN (
61+
SELECT CAST(id AS String)
62+
FROM activities_deduplicated_ds
63+
WHERE
64+
{% for r in repos %}
65+
{% if not loop.first %} OR {% end %}
66+
positionCaseInsensitive(url, {{ String(r) }}) > 0
67+
{% end %}
68+
)
69+
{% end %}
70+
-- indirectFork filter: semi-join on activities.attributes->isIndirectFork
71+
{% if defined(indirectFork) %}
72+
AND CAST(ar.activityId AS String) IN (
73+
SELECT CAST(id AS String)
74+
FROM activities_deduplicated_ds
75+
WHERE
76+
{% if Int8(indirectFork) == 1 %}
77+
(
78+
JSONExtractBool(attributes, 'isIndirectFork') = 1
79+
OR lower(JSONExtractString(attributes, 'isIndirectFork')) = 'true'
80+
)
81+
{% end %}
82+
)
83+
{% end %}
4884
-- Dynamic OR block with include/exclude (up to 5 groups)
4985
{% set opened = 0 %} {% set need_or = 0 %}
5086
{% if defined(G1_memberIds) or defined(G1_memberIds_exclude) or defined(
@@ -384,19 +420,21 @@ SQL >
384420
)
385421
)
386422
{% end %}
387-
{% if defined(orderBy) %}
388-
{% if orderBy == 'timestamp_ASC' %}
389-
ORDER BY ar.timestamp ASC, ar.activityId ASC
390-
{% elif orderBy == 'createdAt_ASC' %}
391-
ORDER BY ar.timestamp ASC, ar.activityId ASC
392-
{% elif orderBy == 'createdAt_DESC' %}
393-
ORDER BY ar.timestamp DESC, ar.activityId DESC
423+
{% if not is_count %}
424+
{% if defined(orderBy) %}
425+
{% if orderBy == 'timestamp_ASC' %}
426+
ORDER BY ar.timestamp ASC, ar.activityId ASC
427+
{% elif orderBy == 'createdAt_ASC' %}
428+
ORDER BY ar.timestamp ASC, ar.activityId ASC
429+
{% elif orderBy == 'createdAt_DESC' %}
430+
ORDER BY ar.timestamp DESC, ar.activityId DESC
431+
{% else %} ORDER BY ar.timestamp DESC, ar.activityId DESC
432+
{% end %}
394433
{% else %} ORDER BY ar.timestamp DESC, ar.activityId DESC
395434
{% end %}
396-
{% else %} ORDER BY ar.timestamp DESC, ar.activityId DESC
435+
LIMIT {{ Int32(pageSize, 10) }}
436+
OFFSET {{ Int32(page, 0) * Int32(pageSize, 10) }}
397437
{% end %}
398-
LIMIT {{ Int32(pageSize, 10) }}
399-
OFFSET {{ Int32(page, 0) * Int32(pageSize, 10) }}
400438

401439
NODE activities_enriched_v1
402440
SQL >

0 commit comments

Comments
 (0)