Skip to content

Commit 8de9b62

Browse files
committed
feat(webapp): add Attio CRM sync client + worker jobs
1 parent 6afc9bf commit 8de9b62

3 files changed

Lines changed: 148 additions & 19 deletions

File tree

apps/webapp/app/env.server.ts

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,13 @@ import { isValidDuration } from "./services/realtime/duration.server";
88
// `z.string()` constrained to a `parseDuration`-parseable string (e.g.
99
// `7d`, `1h`). Validated at boot so a typo'd duration fails fast.
1010
function durationString() {
11-
return z
12-
.string()
13-
.refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y");
11+
return z.string().refine(isValidDuration, "must be a duration like 7d, 30d, 365d, 1h, 1y");
1412
}
1513

1614
// Parses a CSV of machine preset names (e.g. "small-1x,small-2x") into a
1715
// non-empty array of MachinePresetName. Used by COMPUTE_TEMPLATE_MACHINE_PRESETS
1816
// and its _REQUIRED variant. Adds zod issues for empty input or unknown names.
19-
const parseMachinePresetCsv = (
20-
raw: string,
21-
ctx: z.RefinementCtx
22-
): MachinePresetName[] => {
17+
const parseMachinePresetCsv = (raw: string, ctx: z.RefinementCtx): MachinePresetName[] => {
2318
const names = raw
2419
.split(",")
2520
.map((s) => s.trim())
@@ -427,10 +422,7 @@ const EnvironmentSchema = z
427422
.string()
428423
.optional()
429424
.transform((v, ctx) =>
430-
parseMachinePresetCsv(
431-
v ?? process.env.COMPUTE_TEMPLATE_MACHINE_PRESETS ?? "small-1x",
432-
ctx
433-
)
425+
parseMachinePresetCsv(v ?? process.env.COMPUTE_TEMPLATE_MACHINE_PRESETS ?? "small-1x", ctx)
434426
),
435427

436428
DEPLOY_IMAGE_PLATFORM: z.string().default("linux/amd64"),
@@ -602,6 +594,7 @@ const EnvironmentSchema = z
602594
ALERT_RATE_LIMITER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
603595

604596
LOOPS_API_KEY: z.string().optional(),
597+
ATTIO_API_KEY: z.string().optional(),
605598
MARQS_DISABLE_REBALANCING: BoolEnv.default(false),
606599
MARQS_VISIBILITY_TIMEOUT_MS: z.coerce
607600
.number()
@@ -1085,7 +1078,9 @@ const EnvironmentSchema = z
10851078
// setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
10861079
// no-op because the gate-side singleton refuses to construct a buffer
10871080
// when the system is off.
1088-
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
1081+
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z
1082+
.string()
1083+
.default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
10891084
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
10901085
TRIGGER_MOLLIFIER_REDIS_HOST: z
10911086
.string()
@@ -1095,7 +1090,7 @@ const EnvironmentSchema = z
10951090
.number()
10961091
.optional()
10971092
.transform(
1098-
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined),
1093+
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)
10991094
),
11001095
TRIGGER_MOLLIFIER_REDIS_USERNAME: z
11011096
.string()
@@ -1105,7 +1100,9 @@ const EnvironmentSchema = z
11051100
.string()
11061101
.optional()
11071102
.transform((v) => v ?? process.env.REDIS_PASSWORD),
1108-
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
1103+
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z
1104+
.string()
1105+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
11091106
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
11101107
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
11111108
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
@@ -1169,11 +1166,7 @@ const EnvironmentSchema = z
11691166
// (retrieve, trace) have a safety net while PG replica lag settles.
11701167
TRIGGER_MOLLIFIER_ACK_GRACE_TTL_SECONDS: z.coerce.number().int().positive().default(30),
11711168
// ioredis per-request retry limit on the buffer's Redis client.
1172-
TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST: z.coerce
1173-
.number()
1174-
.int()
1175-
.positive()
1176-
.default(20),
1169+
TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST: z.coerce.number().int().positive().default(20),
11771170
// ioredis reconnect backoff envelope for the buffer client: the base
11781171
// grows by `STEP_MS` per attempt, capped at `MAX_MS`, then equal-jittered.
11791172
TRIGGER_MOLLIFIER_REDIS_RECONNECT_STEP_MS: z.coerce.number().int().positive().default(50),
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import { z } from "zod";
2+
import { env } from "~/env.server";
3+
import { logger } from "./logger.server";
4+
5+
// Syncs new orgs/users into Attio (workspaces/users objects) at signup, via the
6+
// common worker so a slow Attio never blocks signup. Ongoing field updates are
7+
// handled by the scheduled sync, not here. No-op without ATTIO_API_KEY.
8+
9+
const ATTIO_API = "https://api.attio.com/v2";
10+
const IS_TEST = env.APP_ENV !== "production";
11+
12+
export const AttioWorkspaceSyncSchema = z.object({
13+
orgId: z.string(),
14+
title: z.string(),
15+
slug: z.string(),
16+
companySize: z.string().nullish(),
17+
createdAt: z.coerce.date(),
18+
});
19+
export type AttioWorkspaceSync = z.infer<typeof AttioWorkspaceSyncSchema>;
20+
21+
export const AttioUserSyncSchema = z.object({
22+
userId: z.string(),
23+
email: z.string(),
24+
referralSource: z.string().nullish(),
25+
marketingEmails: z.boolean(),
26+
createdAt: z.coerce.date(),
27+
});
28+
export type AttioUserSync = z.infer<typeof AttioUserSyncSchema>;
29+
30+
class AttioClient {
31+
constructor(private readonly apiKey: string) {}
32+
33+
// Create-or-update by unique attribute; throws on failure so the worker retries.
34+
async #assert(object: string, matchingAttribute: string, values: Record<string, unknown>) {
35+
const url = `${ATTIO_API}/objects/${object}/records?matching_attribute=${matchingAttribute}`;
36+
const response = await fetch(url, {
37+
method: "PUT",
38+
headers: { Authorization: `Bearer ${this.apiKey}`, "Content-Type": "application/json" },
39+
body: JSON.stringify({ data: { values } }),
40+
});
41+
42+
if (!response.ok) {
43+
const body = await response.text();
44+
logger.error("Attio assert failed", { object, matchingAttribute, status: response.status, body });
45+
throw new Error(`Attio assert ${object} failed with status ${response.status}`);
46+
}
47+
}
48+
49+
async upsertWorkspace(payload: AttioWorkspaceSync) {
50+
await this.#assert("workspaces", "workspace_id", {
51+
workspace_id: payload.orgId,
52+
name: payload.title,
53+
org_slug: payload.slug,
54+
company_size: payload.companySize ?? undefined,
55+
signup_date: toDate(payload.createdAt),
56+
plan: "Free",
57+
account_status: "Active",
58+
is_test: IS_TEST,
59+
});
60+
}
61+
62+
async upsertUser(payload: AttioUserSync) {
63+
await this.#assert("users", "user_id", {
64+
user_id: payload.userId,
65+
primary_email_address: payload.email,
66+
marketing_opt_in: payload.marketingEmails,
67+
referral_source: payload.referralSource ?? undefined,
68+
signup_date: toDate(payload.createdAt),
69+
is_test: IS_TEST,
70+
});
71+
}
72+
}
73+
74+
// Attio `date` attributes want a bare YYYY-MM-DD value.
75+
function toDate(date: Date): string {
76+
return date.toISOString().slice(0, 10);
77+
}
78+
79+
export const attioClient = env.ATTIO_API_KEY ? new AttioClient(env.ATTIO_API_KEY) : null;
80+
81+
export async function enqueueAttioWorkspaceSync(payload: AttioWorkspaceSync) {
82+
if (!attioClient) return;
83+
try {
84+
// Lazy import to avoid a circular dependency with commonWorker (which imports this module's schemas).
85+
const { commonWorker } = await import("~/v3/commonWorker.server");
86+
await commonWorker.enqueue({ id: `attio:workspace:${payload.orgId}`, job: "attio.syncWorkspace", payload });
87+
} catch (error) {
88+
logger.error("Failed to enqueue Attio workspace sync", { orgId: payload.orgId, error });
89+
}
90+
}
91+
92+
export async function enqueueAttioUserSync(payload: AttioUserSync) {
93+
if (!attioClient) return;
94+
try {
95+
const { commonWorker } = await import("~/v3/commonWorker.server");
96+
await commonWorker.enqueue({ id: `attio:user:${payload.userId}`, job: "attio.syncUser", payload });
97+
} catch (error) {
98+
logger.error("Failed to enqueue Attio user sync", { userId: payload.userId, error });
99+
}
100+
}
101+
102+
export async function runAttioWorkspaceSync(payload: AttioWorkspaceSync) {
103+
if (!attioClient) return;
104+
await attioClient.upsertWorkspace(payload);
105+
}
106+
107+
export async function runAttioUserSync(payload: AttioUserSync) {
108+
if (!attioClient) return;
109+
await attioClient.upsertUser(payload);
110+
}

apps/webapp/app/v3/commonWorker.server.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ import { z } from "zod";
55
import { env } from "~/env.server";
66
import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server";
77
import { sendEmail } from "~/services/email.server";
8+
import {
9+
AttioUserSyncSchema,
10+
AttioWorkspaceSyncSchema,
11+
runAttioUserSync,
12+
runAttioWorkspaceSync,
13+
} from "~/services/attio.server";
814
import { logger } from "~/services/logger.server";
915
import { singleton } from "~/utils/singleton";
1016
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
@@ -46,6 +52,20 @@ function initializeWorker() {
4652
maxAttempts: 3,
4753
},
4854
},
55+
"attio.syncWorkspace": {
56+
schema: AttioWorkspaceSyncSchema,
57+
visibilityTimeoutMs: 30_000,
58+
retry: {
59+
maxAttempts: 3,
60+
},
61+
},
62+
"attio.syncUser": {
63+
schema: AttioUserSyncSchema,
64+
visibilityTimeoutMs: 30_000,
65+
retry: {
66+
maxAttempts: 3,
67+
},
68+
},
4969
"v3.resumeBatchRun": {
5070
schema: z.object({
5171
batchRunId: z.string(),
@@ -213,6 +233,12 @@ function initializeWorker() {
213233
scheduleEmail: async ({ payload }) => {
214234
await sendEmail(payload);
215235
},
236+
"attio.syncWorkspace": async ({ payload }) => {
237+
await runAttioWorkspaceSync(payload);
238+
},
239+
"attio.syncUser": async ({ payload }) => {
240+
await runAttioUserSync(payload);
241+
},
216242
"v3.resumeBatchRun": async ({ payload }) => {
217243
const service = new ResumeBatchRunService();
218244
await service.call(payload.batchRunId);

0 commit comments

Comments
 (0)