Skip to content

Commit 7eeabf8

Browse files
authored
Queue HubSpot webhook events to QStash for async processing (dubinc#3785)
1 parent ce9e26a commit 7eeabf8

4 files changed

Lines changed: 237 additions & 238 deletions

File tree

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import { captureWebhookLog } from "@/lib/api-logs/capture-webhook-log";
2+
import { handleAndReturnErrorResponse } from "@/lib/api/errors";
3+
import { withAxiom } from "@/lib/axiom/server";
4+
import { verifyQstashSignature } from "@/lib/cron/verify-qstash";
5+
import { hubSpotOAuthProvider } from "@/lib/integrations/hubspot/oauth";
6+
import {
7+
hubSpotSettingsSchema,
8+
hubSpotWebhookSchema,
9+
} from "@/lib/integrations/hubspot/schema";
10+
import { trackHubSpotLeadEvent } from "@/lib/integrations/hubspot/track-lead";
11+
import { trackHubSpotSaleEvent } from "@/lib/integrations/hubspot/track-sale";
12+
import { WorkspaceProps } from "@/lib/types";
13+
import { prisma } from "@dub/prisma";
14+
import { logAndRespond } from "../../../cron/utils";
15+
16+
// POST /api/hubspot/webhook/process – process individual webhook event
17+
export const POST = withAxiom(async (req) => {
18+
const startTime = Date.now();
19+
20+
let body: any;
21+
let workspace:
22+
| Pick<WorkspaceProps, "id" | "stripeConnectId" | "webhookEnabled">
23+
| undefined;
24+
25+
try {
26+
const rawBody = await req.text();
27+
28+
await verifyQstashSignature({
29+
req,
30+
rawBody,
31+
});
32+
33+
body = JSON.parse(rawBody);
34+
35+
const { objectTypeId, portalId, subscriptionType } =
36+
hubSpotWebhookSchema.parse(body);
37+
38+
// Find the installation
39+
const installation = await prisma.installedIntegration.findFirst({
40+
where: {
41+
integration: {
42+
slug: "hubspot",
43+
},
44+
credentials: {
45+
path: "$.hub_id",
46+
equals: portalId,
47+
},
48+
},
49+
include: {
50+
project: {
51+
select: {
52+
id: true,
53+
stripeConnectId: true,
54+
webhookEnabled: true,
55+
},
56+
},
57+
},
58+
});
59+
60+
if (!installation) {
61+
return logAndRespond(
62+
`[HubSpot] Installation is not found for portalId ${portalId}.`,
63+
);
64+
}
65+
66+
workspace = installation.project;
67+
68+
// Refresh the access token if needed
69+
const authToken =
70+
await hubSpotOAuthProvider.refreshTokenForInstallation(installation);
71+
72+
if (!authToken) {
73+
return logAndRespond(
74+
`[HubSpot] Authentication token is not found or valid for portalId ${portalId}.`,
75+
);
76+
}
77+
78+
const settings = hubSpotSettingsSchema.parse(installation.settings ?? {});
79+
80+
console.log("[HubSpot] Integration settings", settings);
81+
82+
let response = "";
83+
84+
// Contact events
85+
if (objectTypeId === "0-1") {
86+
const isContactCreated = subscriptionType === "object.creation";
87+
88+
const isLifecycleStageChanged =
89+
subscriptionType === "object.propertyChange" &&
90+
settings.leadTriggerEvent === "lifecycleStageReached";
91+
92+
if (isContactCreated || isLifecycleStageChanged) {
93+
response = await trackHubSpotLeadEvent({
94+
payload: body,
95+
workspace,
96+
authToken,
97+
settings,
98+
});
99+
} else {
100+
response = `Skipping contact event: subscriptionType "${subscriptionType}" does not match the configured leadTriggerEvent "${settings.leadTriggerEvent}".`;
101+
}
102+
}
103+
104+
// Deal event
105+
else if (objectTypeId === "0-3") {
106+
const isDealCreated =
107+
subscriptionType === "object.creation" &&
108+
settings.leadTriggerEvent === "dealCreated";
109+
110+
const isDealUpdated = subscriptionType === "object.propertyChange";
111+
112+
// Track the final lead event
113+
if (isDealCreated) {
114+
response = await trackHubSpotLeadEvent({
115+
payload: body,
116+
workspace,
117+
authToken,
118+
settings,
119+
});
120+
}
121+
122+
// Track the sale event when deal is closed won
123+
else if (isDealUpdated) {
124+
response = await trackHubSpotSaleEvent({
125+
payload: body,
126+
workspace,
127+
authToken,
128+
settings,
129+
});
130+
}
131+
}
132+
133+
// Unknown object type
134+
else {
135+
response = `Unknown objectTypeId ${objectTypeId}.`;
136+
}
137+
138+
await captureWebhookLog({
139+
workspaceId: workspace.id,
140+
method: "POST",
141+
path: "/hubspot/webhook",
142+
statusCode: 200,
143+
duration: Date.now() - startTime,
144+
requestBody: body,
145+
responseBody: response,
146+
userAgent: req.headers.get("user-agent"),
147+
});
148+
149+
return logAndRespond(response);
150+
} catch (error) {
151+
const response = handleAndReturnErrorResponse(error);
152+
153+
if (workspace) {
154+
await captureWebhookLog({
155+
workspaceId: workspace.id,
156+
method: "POST",
157+
path: "/hubspot/webhook",
158+
statusCode: response.status,
159+
duration: Date.now() - startTime,
160+
requestBody: body,
161+
responseBody: response,
162+
userAgent: req.headers.get("user-agent"),
163+
});
164+
}
165+
166+
return response;
167+
}
168+
});
Lines changed: 22 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,16 @@
1-
import { captureWebhookLog } from "@/lib/api-logs/capture-webhook-log";
21
import { DubApiError, handleAndReturnErrorResponse } from "@/lib/api/errors";
32
import { withAxiom } from "@/lib/axiom/server";
4-
import { hubSpotOAuthProvider } from "@/lib/integrations/hubspot/oauth";
5-
import {
6-
hubSpotSettingsSchema,
7-
hubSpotWebhookSchema,
8-
} from "@/lib/integrations/hubspot/schema";
9-
import { trackHubSpotLeadEvent } from "@/lib/integrations/hubspot/track-lead";
10-
import { trackHubSpotSaleEvent } from "@/lib/integrations/hubspot/track-sale";
11-
import { prisma } from "@dub/prisma";
12-
import { waitUntil } from "@vercel/functions";
3+
import { qstash } from "@/lib/cron";
4+
import { APP_DOMAIN_WITH_NGROK } from "@dub/utils";
135
import crypto from "crypto";
14-
import { NextResponse } from "next/server";
6+
import { logAndRespond } from "../../cron/utils";
157

168
const HUBSPOT_CLIENT_SECRET = process.env.HUBSPOT_CLIENT_SECRET || "";
179

1810
// POST /api/hubspot/webhook – listen to webhook events from Hubspot
1911
export const POST = withAxiom(async (req) => {
20-
const startTime = Date.now();
21-
2212
try {
23-
const rawPayload = await req.text();
13+
const rawBody = await req.text();
2414
const signature = req.headers.get("X-HubSpot-Signature");
2515

2616
// Verify webhook signature
@@ -39,7 +29,7 @@ export const POST = withAxiom(async (req) => {
3929
}
4030

4131
// Create expected hash: client_secret + request_body
42-
const sourceString = HUBSPOT_CLIENT_SECRET + rawPayload;
32+
const sourceString = HUBSPOT_CLIENT_SECRET + rawBody;
4333
const expectedHash = crypto
4434
.createHash("sha256")
4535
.update(sourceString)
@@ -53,163 +43,27 @@ export const POST = withAxiom(async (req) => {
5343
});
5444
}
5545

56-
const payload = JSON.parse(rawPayload) as any[];
57-
58-
// HS send multiple events in the same request
59-
// so we need to process each event individually
60-
const results = await Promise.allSettled(payload.map(processWebhookEvent));
61-
62-
const responseBody = { message: "Webhook received." };
63-
const duration = Date.now() - startTime;
64-
65-
// Collect log entries from fulfilled results, including failures
66-
const logEntries: Array<{
67-
workspaceId: string;
68-
statusCode: number;
69-
responseBody: unknown;
70-
requestBody: unknown;
71-
}> = [];
72-
73-
for (let i = 0; i < results.length; i++) {
74-
const r = results[i];
75-
if (r.status !== "fulfilled" || !r.value) {
76-
continue;
77-
}
78-
79-
const { workspaceId, errorResponse } = r.value;
80-
81-
logEntries.push({
82-
workspaceId,
83-
statusCode: errorResponse ? errorResponse.status : 200,
84-
responseBody: errorResponse ?? responseBody,
85-
requestBody: payload[i],
86-
});
87-
}
46+
const events = JSON.parse(rawBody) as any[];
47+
const finalEvents = Array.isArray(events) ? events : [events];
48+
49+
// HubSpot can send multiple events in a single request, so we fan them out
50+
// to QStash and process each event independently in /api/hubspot/webhook/process.
51+
// This keeps the webhook handler fast and ensures a slow/failing event doesn't
52+
// block or fail the rest of the batch.
53+
const qstashResponse = await qstash.batchJSON(
54+
finalEvents.map((event) => ({
55+
url: `${APP_DOMAIN_WITH_NGROK}/api/hubspot/webhook/process`,
56+
body: event,
57+
})),
58+
);
8859

89-
waitUntil(
90-
Promise.allSettled(
91-
logEntries.map((entry) =>
92-
captureWebhookLog({
93-
workspaceId: entry.workspaceId,
94-
method: req.method,
95-
path: "/hubspot/webhook",
96-
statusCode: entry.statusCode,
97-
duration,
98-
requestBody: entry.requestBody,
99-
responseBody: entry.responseBody,
100-
userAgent: req.headers.get("user-agent"),
101-
}),
102-
),
103-
),
60+
console.log(
61+
`[hubspot/webhook] Enqueued ${finalEvents.length} webhook events to be processed.`,
62+
qstashResponse,
10463
);
10564

106-
return NextResponse.json(responseBody);
65+
return logAndRespond("Webhook received.");
10766
} catch (error) {
10867
return handleAndReturnErrorResponse(error);
10968
}
11069
});
111-
112-
// Process individual event, returns workspaceId and error response if failed
113-
async function processWebhookEvent(event: any) {
114-
const { objectTypeId, portalId, subscriptionType } =
115-
hubSpotWebhookSchema.parse(event);
116-
117-
// Find the installation
118-
const installation = await prisma.installedIntegration.findFirst({
119-
where: {
120-
integration: {
121-
slug: "hubspot",
122-
},
123-
credentials: {
124-
path: "$.hub_id",
125-
equals: portalId,
126-
},
127-
},
128-
include: {
129-
project: true,
130-
},
131-
});
132-
133-
if (!installation) {
134-
console.error(
135-
`[HubSpot] Installation is not found for portalId ${portalId}.`,
136-
);
137-
return;
138-
}
139-
140-
const { project: workspace } = installation;
141-
142-
// Refresh the access token if needed
143-
const authToken =
144-
await hubSpotOAuthProvider.refreshTokenForInstallation(installation);
145-
146-
if (!authToken) {
147-
console.error(
148-
`[HubSpot] Authentication token is not found or valid for portalId ${portalId}.`,
149-
);
150-
return;
151-
}
152-
153-
const settings = hubSpotSettingsSchema.parse(installation.settings ?? {});
154-
155-
console.log("[HubSpot] Event", event);
156-
console.log("[HubSpot] Integration settings", settings);
157-
158-
try {
159-
// Contact events
160-
if (objectTypeId === "0-1") {
161-
const isContactCreated = subscriptionType === "object.creation";
162-
163-
const isLifecycleStageChanged =
164-
subscriptionType === "object.propertyChange" &&
165-
settings.leadTriggerEvent === "lifecycleStageReached";
166-
167-
if (isContactCreated || isLifecycleStageChanged) {
168-
await trackHubSpotLeadEvent({
169-
payload: event,
170-
workspace,
171-
authToken,
172-
settings,
173-
});
174-
}
175-
}
176-
177-
// Deal event
178-
if (objectTypeId === "0-3") {
179-
const isDealCreated =
180-
subscriptionType === "object.creation" &&
181-
settings.leadTriggerEvent === "dealCreated";
182-
183-
const isDealUpdated = subscriptionType === "object.propertyChange";
184-
185-
// Track the final lead event
186-
if (isDealCreated) {
187-
await trackHubSpotLeadEvent({
188-
payload: event,
189-
workspace,
190-
authToken,
191-
settings,
192-
});
193-
}
194-
195-
// Track the sale event when deal is closed won
196-
else if (isDealUpdated) {
197-
await trackHubSpotSaleEvent({
198-
payload: event,
199-
workspace,
200-
authToken,
201-
settings,
202-
});
203-
}
204-
}
205-
} catch (error) {
206-
return {
207-
workspaceId: workspace.id,
208-
errorResponse: handleAndReturnErrorResponse(error),
209-
};
210-
}
211-
212-
return {
213-
workspaceId: workspace.id,
214-
};
215-
}

0 commit comments

Comments
 (0)