Skip to content

Commit 4eaf653

Browse files
committed
chore(integrations): added segmentId to filters for populate activity relations workflow
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent 2571d4a commit 4eaf653

4 files changed

Lines changed: 18 additions & 5 deletions

File tree

services/apps/script_executor_worker/src/activities/populate-activity-relations/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,16 @@ export async function markActivitiesAsIndexed(activitiesRedisKey: string): Promi
2828
return lastSyncedTimestamp
2929
}
3030

31-
export async function getActivitiesToCopy(latestSyncedActivityTimestamp: string, limit: number) {
31+
export async function getActivitiesToCopy(
32+
latestSyncedActivityTimestamp: string,
33+
limit: number,
34+
segmentIds?: string[],
35+
) {
3236
const activities = await getActivityRelationsSortedByTimestamp(
3337
svc.questdbSQL,
3438
latestSyncedActivityTimestamp,
3539
limit,
40+
segmentIds,
3641
)
3742

3843
if (activities.length === 0) {

services/apps/script_executor_worker/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export interface IPopulateActivityRelationsArgs {
2727
batchSizePerRun: number
2828
deleteIndexedEntities?: boolean
2929
latestSyncedActivityTimestamp?: string
30+
segmentIds?: string[]
3031
}
3132

3233
export interface IScriptBatchTestArgs {

services/apps/script_executor_worker/src/workflows/populateActivityRelations.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export async function populateActivityRelations(
2525
let { activitiesLength, activitiesRedisKey, lastTimestamp } = await activity.getActivitiesToCopy(
2626
latestSyncedActivityTimestamp ?? undefined,
2727
BATCH_SIZE_PER_RUN,
28+
args.segmentIds,
2829
)
2930

3031
if (activitiesLength === 0) {
@@ -44,6 +45,7 @@ export async function populateActivityRelations(
4445
const result = await activity.getActivitiesToCopy(
4546
lastTimestamp,
4647
BATCH_SIZE_PER_RUN * batchSizeMultiplier,
48+
args.segmentIds,
4749
)
4850
activitiesLength = result.activitiesLength
4951
activitiesRedisKey = result.activitiesRedisKey

services/libs/data-access-layer/src/activities/sql.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1715,11 +1715,16 @@ export async function getActivityRelationsSortedByTimestamp(
17151715
qdbConn: DbConnOrTx,
17161716
cursorActivityTimestamp?: string,
17171717
limit = 100,
1718+
segmentIds?: string[],
17181719
): Promise<IActivityRelationsCreateData[]> {
1719-
let cursorQuery = ''
1720+
const conditions: string[] = [`"deletedAt" is null`]
17201721

17211722
if (cursorActivityTimestamp) {
1722-
cursorQuery = `AND "timestamp" >= $(cursorActivityTimestamp)`
1723+
conditions.push('timestamp >= $(cursorActivityTimestamp)')
1724+
}
1725+
1726+
if (segmentIds && segmentIds.length > 0) {
1727+
conditions.push('"segmentId" in ($(segmentIds:csv))')
17231728
}
17241729

17251730
const query = `
@@ -1737,14 +1742,14 @@ export async function getActivityRelationsSortedByTimestamp(
17371742
username,
17381743
"objectMemberUsername"
17391744
FROM activities
1740-
WHERE "deletedAt" IS NULL
1741-
${cursorQuery}
1745+
WHERE ${conditions.join(' AND ')}
17421746
ORDER BY "timestamp" asc
17431747
LIMIT ${limit}
17441748
`
17451749

17461750
const rows = await qdbConn.any(query, {
17471751
cursorActivityTimestamp,
1752+
segmentIds,
17481753
limit,
17491754
})
17501755

0 commit comments

Comments
 (0)