Skip to content

Commit f873759

Browse files
committed
Merge branch 'deprecate-questdb-CM-420' of github.com:CrowdDotDev/crowd.dev into deprecate-questdb-CM-420
2 parents cc396e5 + 44044ce commit f873759

7 files changed

Lines changed: 235 additions & 143 deletions

File tree

backend/src/database/repositories/integrationProgressRepository.ts

Lines changed: 69 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,32 @@
11
import { QueryTypes } from 'sequelize'
22

3+
import { queryActivitiesCounter } from '@crowd/data-access-layer'
4+
import { Counter, TinybirdClient } from '@crowd/data-access-layer/src/database'
5+
36
import { Repos } from '@/serverless/integrations/types/regularTypes'
47
import { GitHubStats } from '@/serverless/integrations/usecases/github/rest/getRemoteStats'
58

69
import { IRepositoryOptions } from './IRepositoryOptions'
710
import SequelizeRepository from './sequelizeRepository'
811

912
class IntegrationProgressRepository {
13+
static createPayloadWithActivityType(
14+
activityTypes: string[],
15+
repos: Repos,
16+
segments: string[] = [],
17+
) {
18+
return {
19+
filter: {
20+
and: [
21+
{ platform: { in: ['github'] } },
22+
{ or: repos.map((repo) => ({ channel: { eq: repo.url } })) },
23+
{ type: { in: activityTypes } },
24+
],
25+
},
26+
segmentIds: segments,
27+
}
28+
}
29+
1030
static async getPendingStreamsCount(integrationId: string, options: IRepositoryOptions) {
1131
const transaction = options.transaction
1232
const seq = SequelizeRepository.getSequelize(options)
@@ -66,89 +86,56 @@ class IntegrationProgressRepository {
6686
return (result[0] as any).id as string
6787
}
6888

69-
static async getDbStatsForGithub(
70-
repos: Repos,
71-
options: IRepositoryOptions,
72-
): Promise<GitHubStats> {
73-
// TODO questdb to tinybird remove - it's here for linter to be happy
74-
options.log.info('getDbStatsForGithub', { repos })
75-
// TODO questdb to tinybird
76-
// const starsQuery = `
77-
// SELECT COUNT_DISTINCT("sourceId") AS count
78-
// FROM activities
79-
// WHERE platform = 'github'
80-
// AND type = 'star'
81-
// AND "deletedAt" IS NULL
82-
// AND channel IN ($(remotes:csv))
83-
// `
84-
85-
// const unstarsQuery = `
86-
// SELECT COUNT_DISTINCT("sourceId") AS count
87-
// FROM activities
88-
// WHERE platform = 'github'
89-
// AND type = 'unstar'
90-
// AND "deletedAt" IS NULL
91-
// AND channel IN ($(remotes:csv))
92-
// `
93-
94-
// const forksQuery = `
95-
// SELECT COUNT_DISTINCT("sourceId") AS count
96-
// FROM activities
97-
// WHERE platform = 'github'
98-
// AND type = 'fork'
99-
// AND "deletedAt" IS NULL
100-
// AND "gitIsIndirectFork" != TRUE
101-
// AND channel IN ($(remotes:csv))
102-
// `
103-
104-
// const issuesOpenedQuery = `
105-
// SELECT COUNT_DISTINCT("sourceId") AS count
106-
// FROM activities
107-
// WHERE platform = 'github'
108-
// AND type = 'issues-opened'
109-
// AND "deletedAt" IS NULL
110-
// AND channel IN ($(remotes:csv))
111-
// `
112-
113-
// const prOpenedQuery = `
114-
// SELECT COUNT_DISTINCT("sourceId") AS count
115-
// FROM activities
116-
// WHERE platform = 'github'
117-
// AND type = 'pull_request-opened'
118-
// AND "deletedAt" IS NULL
119-
// AND channel IN ($(remotes:csv))
120-
// `
121-
122-
// const remotes = repos.map((r) => r.url)
123-
124-
// const promises: Promise<any[]>[] = [
125-
// options.qdb.query(starsQuery, {
126-
// remotes,
127-
// }),
128-
// options.qdb.query(unstarsQuery, {
129-
// remotes,
130-
// }),
131-
// options.qdb.query(forksQuery, {
132-
// remotes,
133-
// }),
134-
// options.qdb.query(issuesOpenedQuery, {
135-
// remotes,
136-
// }),
137-
// options.qdb.query(prOpenedQuery, {
138-
// remotes,
139-
// }),
140-
// ]
141-
142-
// const results = await Promise.all(promises)
89+
static async getDbStatsForGithub({
90+
repos,
91+
segments,
92+
}: {
93+
repos: Repos
94+
segments: string[]
95+
}): Promise<GitHubStats> {
96+
const tb = new TinybirdClient()
97+
98+
const promises: Promise<{ data: Counter }>[] = [
99+
queryActivitiesCounter(
100+
IntegrationProgressRepository.createPayloadWithActivityType(['star'], repos, segments),
101+
tb,
102+
),
103+
queryActivitiesCounter(
104+
IntegrationProgressRepository.createPayloadWithActivityType(['unstar'], repos, segments),
105+
tb,
106+
),
107+
queryActivitiesCounter(
108+
{
109+
...IntegrationProgressRepository.createPayloadWithActivityType(['fork'], repos, segments),
110+
indirectFork: 1,
111+
},
112+
tb,
113+
),
114+
queryActivitiesCounter(
115+
IntegrationProgressRepository.createPayloadWithActivityType(
116+
['issues-opened'],
117+
repos,
118+
segments,
119+
),
120+
tb,
121+
),
122+
queryActivitiesCounter(
123+
IntegrationProgressRepository.createPayloadWithActivityType(
124+
['pull_request-opened'],
125+
repos,
126+
segments,
127+
),
128+
tb,
129+
),
130+
]
131+
132+
const result = await Promise.all(promises)
133+
143134
return {
144-
// stars: parseInt(results[0][0].count, 10) - parseInt(results[1][0].count, 10),
145-
// forks: parseInt(results[2][0].count, 10),
146-
// totalIssues: parseInt(results[3][0].count, 10),
147-
// totalPRs: parseInt(results[4][0].count, 10),
148-
stars: 0,
149-
forks: 0,
150-
totalIssues: 0,
151-
totalPRs: 0,
135+
stars: (result[0]?.data?.[0]?.count ?? 0) - (result[1]?.data?.[0]?.count ?? 0),
136+
forks: result[2]?.data?.[0]?.count ?? 0,
137+
totalIssues: result[3]?.data?.[0]?.count ?? 0,
138+
totalPRs: result[4]?.data?.[0]?.count ?? 0,
152139
}
153140
}
154141

backend/src/services/integrationService.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2110,6 +2110,10 @@ export default class IntegrationService {
21102110
name: string
21112111
updatedAt: string
21122112
}[]
2113+
2114+
const githubRepos = await this.getGithubRepos(integrationId)
2115+
const mappedSegments = githubRepos.map((repo) => repo.segment.id)
2116+
21132117
const cacheRemote = new RedisCache(
21142118
'github-progress-remote',
21152119
this.options.redis,
@@ -2149,10 +2153,23 @@ export default class IntegrationService {
21492153
let cachedStats
21502154
cachedStats = await cacheDb.get(key)
21512155
if (!cachedStats) {
2152-
cachedStats = await IntegrationProgressRepository.getDbStatsForGithub(repos, this.options)
2156+
const segments = Array.from(
2157+
new Set([...(integration.segmentId ? [integration.segmentId] : []), ...mappedSegments]),
2158+
)
2159+
2160+
this.options.log.info(
2161+
`Evaluating cache for repos: ${repos.map((r) => r.name).join(',')} and segments: ${segments}`,
2162+
)
2163+
cachedStats = await IntegrationProgressRepository.getDbStatsForGithub({
2164+
repos,
2165+
segments,
2166+
})
2167+
2168+
this.options.log.info(`Caching data: ${JSON.stringify(cachedStats)}`)
21532169
// cache for 1 minute
21542170
await cacheDb.set(key, JSON.stringify(cachedStats), 60)
21552171
} else {
2172+
this.options.log.info(`Cache data found: ${JSON.stringify(cachedStats)}`)
21562173
cachedStats = JSON.parse(cachedStats)
21572174
}
21582175
return cachedStats as GitHubStats
@@ -2179,6 +2196,9 @@ export default class IntegrationService {
21792196
getDbStatsOrExitEarly(integrationId),
21802197
])
21812198

2199+
this.options.log.info('Remote stats:', remoteStats)
2200+
this.options.log.info('DB stats:', dbStats)
2201+
21822202
// this to prevent too long waiting time
21832203
if (remoteStats === undefined || dbStats === undefined) {
21842204
return {
@@ -2306,6 +2326,7 @@ export default class IntegrationService {
23062326
await IntegrationProgressRepository.getAllIntegrationsInProgressForSegment(this.options)
23072327
return Promise.all(integrationIds.map((id) => this.getIntegrationProgress(id)))
23082328
}
2329+
23092330
const integrationIds =
23102331
await IntegrationProgressRepository.getAllIntegrationsInProgressForMultipleSegments(
23112332
this.options,

services/apps/cache_worker/src/activities/dashboard-cache/refreshDashboardCache.ts

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {
2+
activitiesTimeseries,
23
getTimeseriesOfActiveMembers,
34
getTimeseriesOfNewMembers,
45
queryActivityRelations,
@@ -130,17 +131,13 @@ export async function getActivitiesNumber(params: IQueryTimeseriesParams): Promi
130131
export async function getActivitiesTimeseries(
131132
params: IQueryTimeseriesParams,
132133
): Promise<ITimeseriesDatapoint[]> {
133-
// let result: ITimeseriesDatapoint[]
134-
135134
try {
136-
// TODO questdb to tinybird
137-
// result = await activitiesTimeseries(svc.questdbSQL, {
138-
// segmentIds: params.segmentIds,
139-
// after: params.startDate,
140-
// before: params.endDate,
141-
// platform: params.platform,
142-
// })
143-
return []
135+
return activitiesTimeseries({
136+
endDate: params.endDate,
137+
platform: params.platform,
138+
segmentIds: params.segmentIds,
139+
startDate: params.startDate,
140+
})
144141
} catch (err) {
145142
svc.log.error({ err, params }, 'Error getting activities timeseries')
146143
throw new Error(err)

services/libs/data-access-layer/src/activities/sql.ts

Lines changed: 73 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
/* eslint-disable @typescript-eslint/no-explicit-any */
22
import max from 'lodash.max'
33
import min from 'lodash.min'
4+
import moment from 'moment'
45

5-
import { ActivityRelations, DbConnOrTx, TinybirdClient } from '@crowd/database'
6+
import {
7+
ActivityRelations,
8+
ActivityTimeseriesDatapoint,
9+
Counter,
10+
DbConnOrTx,
11+
TinybirdClient,
12+
} from '@crowd/database'
613
import { ActivityDisplayService } from '@crowd/integrations'
714
import { ActivityTypeSettings, ITimeseriesDatapoint, PageData } from '@crowd/types'
815

@@ -339,6 +346,19 @@ export async function queryActivities(
339346
}
340347
}
341348

349+
export async function queryActivitiesCounter(
350+
arg: IQueryActivitiesParameters & { indirectFork?: number },
351+
tbClient: TinybirdClient,
352+
): Promise<{ data: Counter }> {
353+
const payload = {
354+
...buildActivitiesParams(arg),
355+
...(arg.indirectFork && { indirectFork: arg.indirectFork }),
356+
countOnly: 1,
357+
}
358+
359+
return tbClient.pipe('activities_relations_filtered', payload)
360+
}
361+
342362
export function mapActivityRowToResult(a: any, columns: string[]): any {
343363
const sentiment: IActivitySentiment | null =
344364
a.sentimentLabel &&
@@ -377,46 +397,66 @@ export function mapActivityRowToResult(a: any, columns: string[]): any {
377397
return data
378398
}
379399

380-
// TODO questdb to tinybird
381-
export async function activitiesTimeseries(
382-
qdbConn: DbConnOrTx,
383-
arg: IQueryGroupedActivitiesParameters,
384-
): Promise<ITimeseriesDatapoint[]> {
385-
let query = `
386-
SELECT COUNT_DISTINCT(id) AS count, "timestamp" AS date
387-
FROM activities
388-
WHERE "deletedAt" IS NULL
389-
`
400+
function fillMissingDays(
401+
data: { date: string; count: number }[],
402+
startDate: Date,
403+
endDate: Date,
404+
): ITimeseriesDatapoint[] {
405+
const result: ITimeseriesDatapoint[] = []
406+
407+
//handles both "2025-09-18 00:00:00" and "2025-09-18T00:00:00" formats
408+
const dataMap = new Map(
409+
data.map((d) => {
410+
// Extract only day (YYYY-MM-DD)
411+
const dateOnly = d.date.includes('T')
412+
? d.date.split('T')[0] // ISO format: "2025-09-18T00:00:00"
413+
: d.date.split(' ')[0] // Space format: "2025-09-18 00:00:00"
414+
415+
return [dateOnly, d.count]
416+
}),
417+
)
390418

391-
if (arg.segmentIds) {
392-
query += ' AND "segmentId" IN ($(segmentIds:csv))'
393-
}
419+
const current = moment(startDate)
420+
const end = moment(endDate)
394421

395-
if (arg.platform) {
396-
query += ' AND "platform" = $(platform)'
397-
}
422+
while (current.isBefore(end, 'day')) {
423+
const dateKey = current.format('YYYY-MM-DD')
424+
const isoDate = current.format('YYYY-MM-DDTHH:mm:ss.SSS') + 'Z'
425+
426+
result.push({
427+
date: isoDate,
428+
count: dataMap.get(dateKey) || 0, // Use existing data or 0 if missing
429+
})
398430

399-
if (arg.after && arg.before) {
400-
query += ' AND "timestamp" BETWEEN $(after) AND $(before)'
431+
current.add(1, 'day')
401432
}
402433

403-
query += `
404-
SAMPLE BY 1d FILL(0) ALIGN TO CALENDAR
405-
ORDER BY "date" ASC;
406-
`
434+
return result
435+
}
407436

408-
const rows: ITimeseriesDatapoint[] = await qdbConn.query(query, {
409-
segmentIds: arg.segmentIds,
410-
platform: arg.platform,
411-
after: arg.after,
412-
before: arg.before,
413-
})
437+
export async function activitiesTimeseries(
438+
arg: IQueryGroupedActivitiesParameters,
439+
): Promise<ITimeseriesDatapoint[]> {
440+
const tb = new TinybirdClient()
414441

415-
rows.forEach((row) => {
416-
row.count = Number(row.count)
417-
})
442+
const timeseries = await tb.pipe<{ data: ActivityTimeseriesDatapoint[] }>(
443+
'activities_daily_counts',
444+
{
445+
after: arg.startDate,
446+
before: arg.endDate,
447+
platform: arg.platform,
448+
segmentIds: arg.segmentIds.join(','),
449+
},
450+
)
451+
452+
if (arg.startDate && arg.endDate) {
453+
timeseries.data = fillMissingDays(timeseries.data, arg.startDate, arg.endDate)
454+
}
418455

419-
return rows
456+
return timeseries.data.map((row) => ({
457+
count: Number(row.count),
458+
date: row.date,
459+
}))
420460
}
421461
export async function getNewActivityPlatforms(
422462
qdbConn: DbConnOrTx,

0 commit comments

Comments
 (0)