Skip to content

Commit bc58219

Browse files
authored
feat: snowflake export worker [CM-975] (#3844)
1 parent 4901723 commit bc58219

33 files changed

Lines changed: 1541 additions & 0 deletions

backend/src/database/migrations/U1771344764__addSnowflakeExportTable.sql

Whitespace-only changes.

backend/src/database/migrations/U1771497876__addCventActivityTypes.sql

Whitespace-only changes.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
CREATE TABLE integration."snowflakeExportJobs" (
2+
id BIGSERIAL PRIMARY KEY,
3+
platform VARCHAR(100) NOT NULL,
4+
s3_path TEXT NOT NULL UNIQUE,
5+
metrics JSONB,
6+
"createdAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
7+
"updatedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
8+
"processingStartedAt" TIMESTAMP WITH TIME ZONE, -- set when worker claims job (acts as lock)
9+
"exportStartedAt" TIMESTAMP WITH TIME ZONE,
10+
"completedAt" TIMESTAMP WITH TIME ZONE,
11+
"cleanedAt" TIMESTAMP WITH TIME ZONE,
12+
error TEXT
13+
);
14+
15+
CREATE INDEX "idx_snowflakeExportJobs_platform" ON integration."snowflakeExportJobs" (platform);
16+
CREATE INDEX "idx_snowflakeExportJobs_pending" ON integration."snowflakeExportJobs" ("createdAt")
17+
WHERE "processingStartedAt" IS NULL;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
INSERT INTO "activityTypes" ("activityType", platform, "isCodeContribution", "isCollaboration", description, "label") VALUES
2+
('registered-event', 'cvent', false, false, 'User registers to an event', 'Registered for an event'),
3+
('attended-event', 'cvent', false, false, 'User attends an event', 'Attended an event');

pnpm-lock.yaml

Lines changed: 64 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
FROM node:20-bullseye-slim AS builder
2+
3+
RUN apt-get update && apt-get install -y python3 make g++ && rm -rf /var/lib/apt/lists/*
4+
5+
WORKDIR /usr/crowd/app
6+
RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate
7+
8+
COPY ./pnpm-workspace.yaml ./pnpm-lock.yaml ./
9+
RUN pnpm fetch
10+
11+
COPY ./services ./services
12+
RUN pnpm i --frozen-lockfile
13+
14+
FROM node:20-bullseye-slim AS runner
15+
16+
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
17+
18+
WORKDIR /usr/crowd/app
19+
RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate
20+
21+
COPY --from=builder /usr/crowd/app/node_modules ./node_modules
22+
COPY --from=builder /usr/crowd/app/services/base.tsconfig.json ./services/base.tsconfig.json
23+
COPY --from=builder /usr/crowd/app/services/libs ./services/libs
24+
COPY --from=builder /usr/crowd/app/services/archetypes/ ./services/archetypes
25+
COPY --from=builder /usr/crowd/app/services/apps/snowflake_connectors/ ./services/apps/snowflake_connectors
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
**/.git
2+
**/node_modules
3+
**/venv*
4+
**/.webpack
5+
**/.serverless
6+
**/.env
7+
**/.env.*
8+
**/.idea
9+
**/.vscode
10+
**/dist
11+
.vscode/
12+
.github/
13+
frontend/
14+
scripts/
15+
.flake8
16+
*.md
17+
Makefile
18+
backend/
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
version: '3.1'
2+
3+
x-env-args: &env-args
4+
DOCKER_BUILDKIT: 1
5+
NODE_ENV: docker
6+
SERVICE: snowflake-connectors
7+
CROWD_TEMPORAL_TASKQUEUE: snowflakeConnectors
8+
SHELL: /bin/sh
9+
10+
services:
11+
snowflake-connectors:
12+
build:
13+
context: ../../
14+
dockerfile: ./scripts/services/docker/Dockerfile.snowflake_connectors
15+
command: 'pnpm run start'
16+
working_dir: /usr/crowd/app/services/apps/snowflake_connectors
17+
env_file:
18+
- ../../backend/.env.dist.local
19+
- ../../backend/.env.dist.composed
20+
- ../../backend/.env.override.local
21+
- ../../backend/.env.override.composed
22+
environment:
23+
<<: *env-args
24+
restart: always
25+
networks:
26+
- crowd-bridge
27+
28+
snowflake-connectors-dev:
29+
build:
30+
context: ../../
31+
dockerfile: ./scripts/services/docker/Dockerfile.snowflake_connectors
32+
command: 'pnpm run dev'
33+
working_dir: /usr/crowd/app/services/apps/snowflake_connectors
34+
env_file:
35+
- ../../backend/.env.dist.local
36+
- ../../backend/.env.dist.composed
37+
- ../../backend/.env.override.local
38+
- ../../backend/.env.override.composed
39+
environment:
40+
<<: *env-args
41+
hostname: snowflake-connectors
42+
networks:
43+
- crowd-bridge
44+
volumes:
45+
- ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src
46+
- ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src
47+
- ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src
48+
- ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src
49+
- ../../services/libs/types/src:/usr/crowd/app/services/libs/types/src
50+
- ../../services/apps/snowflake_connectors/src:/usr/crowd/app/services/apps/snowflake_connectors/src
51+
52+
networks:
53+
crowd-bridge:
54+
external: true
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{
2+
"name": "@crowd/snowflake-connectors",
3+
"scripts": {
4+
"start": "CROWD_TEMPORAL_TASKQUEUE=snowflakeConnectors SERVICE=snowflake-connectors tsx src/index.ts",
5+
"start:debug": "CROWD_TEMPORAL_TASKQUEUE=snowflakeConnectors SERVICE=snowflake-connectors LOG_LEVEL=debug tsx src/index.ts",
6+
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=snowflakeConnectors SERVICE=snowflake-connectors LOG_LEVEL=debug tsx src/index.ts",
7+
"dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
8+
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
9+
"lint": "npx eslint --ext .ts src --max-warnings=0",
10+
"format": "npx prettier --write \"src/**/*.ts\"",
11+
"format-check": "npx prettier --check .",
12+
"tsc-check": "tsc --noEmit",
13+
"trigger-export": "SERVICE=snowflake-connectors tsx src/scripts/triggerExport.ts"
14+
},
15+
"dependencies": {
16+
"@crowd/archetype-standard": "workspace:*",
17+
"@crowd/common": "workspace:*",
18+
"@crowd/integrations": "workspace:*",
19+
"@crowd/common_services": "workspace:*",
20+
"@crowd/database": "workspace:*",
21+
"@crowd/logging": "workspace:*",
22+
"@crowd/archetype-worker": "workspace:*",
23+
"@crowd/queue": "workspace:*",
24+
"@crowd/redis": "workspace:*",
25+
"@crowd/slack": "workspace:*",
26+
"@crowd/snowflake": "workspace:*",
27+
"@crowd/temporal": "workspace:*",
28+
"@crowd/types": "workspace:*",
29+
"@aws-sdk/client-s3": "^3.700.0",
30+
"@dsnp/parquetjs": "^1.7.0",
31+
"@temporalio/client": "~1.11.8",
32+
"@temporalio/workflow": "~1.11.8",
33+
"tsx": "^4.7.1",
34+
"typescript": "^5.6.3"
35+
},
36+
"devDependencies": {
37+
"nodemon": "^3.0.1"
38+
}
39+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* Export activity: Execute COPY INTO + write metadata.
3+
*
4+
* This activity is invoked by the exportWorkflow and performs
5+
* the actual Snowflake export and metadata bookkeeping.
6+
*/
7+
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database'
8+
import { getServiceChildLogger } from '@crowd/logging'
9+
import { PlatformType } from '@crowd/types'
10+
11+
import { MetadataStore } from '../core/metadataStore'
12+
import { SnowflakeExporter } from '../core/snowflakeExporter'
13+
import { getEnabledPlatforms as _getEnabledPlatforms, getPlatform } from '../integrations'
14+
15+
export async function getEnabledPlatforms(): Promise<PlatformType[]> {
16+
return _getEnabledPlatforms()
17+
}
18+
19+
const log = getServiceChildLogger('exportActivity')
20+
21+
function buildS3FilenamePrefix(platform: string): string {
22+
const now = new Date()
23+
const year = now.getFullYear()
24+
const month = String(now.getMonth() + 1).padStart(2, '0')
25+
const day = String(now.getDate()).padStart(2, '0')
26+
const s3BucketPath = process.env.CROWD_SNOWFLAKE_S3_BUCKET_PATH
27+
if (!s3BucketPath) {
28+
throw new Error('Missing required env var CROWD_SNOWFLAKE_S3_BUCKET_PATH')
29+
}
30+
return `${s3BucketPath}/${platform}/${year}/${month}/${day}`
31+
}
32+
33+
export async function executeExport(platform: PlatformType): Promise<void> {
34+
log.info({ platform }, 'Starting export')
35+
36+
const exporter = new SnowflakeExporter()
37+
const db = await getDbConnection(WRITE_DB_CONFIG())
38+
39+
try {
40+
const metadataStore = new MetadataStore(db)
41+
const platformDef = getPlatform(platform)
42+
43+
const lastSuccessfulExportTimestamp = await metadataStore.getLatestExportStartedAt(platform)
44+
const sinceTimestamp = lastSuccessfulExportTimestamp
45+
? new Date(lastSuccessfulExportTimestamp).toISOString()
46+
: undefined
47+
const sourceQuery = platformDef.buildSourceQuery(sinceTimestamp)
48+
const s3FilenamePrefix = buildS3FilenamePrefix(platform)
49+
50+
const exportStartedAt = new Date()
51+
52+
const onBatchComplete = async (s3Path: string, totalRows: number, totalBytes: number) => {
53+
await metadataStore.insertExportJob(platform, s3Path, totalRows, totalBytes, exportStartedAt)
54+
}
55+
56+
await exporter.executeBatchedCopyInto(sourceQuery, s3FilenamePrefix, onBatchComplete)
57+
58+
log.info({ platform }, 'Export completed')
59+
} catch (err) {
60+
log.error({ platform, err }, 'Export failed')
61+
throw err
62+
} finally {
63+
await exporter
64+
.destroy()
65+
.catch((err) => log.warn({ err }, 'Failed to close Snowflake connection'))
66+
}
67+
}

0 commit comments

Comments
 (0)