Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions apps/web/app/(ee)/api/hubspot/webhook/process/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import { captureWebhookLog } from "@/lib/api-logs/capture-webhook-log";
import { handleAndReturnErrorResponse } from "@/lib/api/errors";
import { withAxiom } from "@/lib/axiom/server";
import { verifyQstashSignature } from "@/lib/cron/verify-qstash";
import { hubSpotOAuthProvider } from "@/lib/integrations/hubspot/oauth";
import {
hubSpotSettingsSchema,
hubSpotWebhookSchema,
} from "@/lib/integrations/hubspot/schema";
import { trackHubSpotLeadEvent } from "@/lib/integrations/hubspot/track-lead";
import { trackHubSpotSaleEvent } from "@/lib/integrations/hubspot/track-sale";
import { WorkspaceProps } from "@/lib/types";
import { prisma } from "@dub/prisma";
import { logAndRespond } from "../../../cron/utils";

// POST /api/hubspot/webhook/process – process individual webhook event
export const POST = withAxiom(async (req) => {
const startTime = Date.now();

let body: any;
let workspace:
| Pick<WorkspaceProps, "id" | "stripeConnectId" | "webhookEnabled">
| undefined;

try {
const rawBody = await req.text();

await verifyQstashSignature({
req,
rawBody,
});

body = JSON.parse(rawBody);

const { objectTypeId, portalId, subscriptionType } =
hubSpotWebhookSchema.parse(body);

// Find the installation
const installation = await prisma.installedIntegration.findFirst({
where: {
integration: {
slug: "hubspot",
},
credentials: {
path: "$.hub_id",
equals: portalId,
},
},
include: {
project: {
select: {
id: true,
stripeConnectId: true,
webhookEnabled: true,
},
},
},
});

if (!installation) {
return logAndRespond(
`[HubSpot] Installation is not found for portalId ${portalId}.`,
);
}

workspace = installation.project;

// Refresh the access token if needed
const authToken =
await hubSpotOAuthProvider.refreshTokenForInstallation(installation);

if (!authToken) {
return logAndRespond(
`[HubSpot] Authentication token is not found or valid for portalId ${portalId}.`,
);
}

const settings = hubSpotSettingsSchema.parse(installation.settings ?? {});

console.log("[HubSpot] Integration settings", settings);

let response = "";

// Contact events
if (objectTypeId === "0-1") {
const isContactCreated = subscriptionType === "object.creation";

const isLifecycleStageChanged =
subscriptionType === "object.propertyChange" &&
settings.leadTriggerEvent === "lifecycleStageReached";

if (isContactCreated || isLifecycleStageChanged) {
response = await trackHubSpotLeadEvent({
payload: body,
workspace,
authToken,
settings,
});
} else {
response = `Skipping contact event: subscriptionType "${subscriptionType}" does not match the configured leadTriggerEvent "${settings.leadTriggerEvent}".`;
}
}

// Deal event
else if (objectTypeId === "0-3") {
const isDealCreated =
subscriptionType === "object.creation" &&
settings.leadTriggerEvent === "dealCreated";

const isDealUpdated = subscriptionType === "object.propertyChange";

// Track the final lead event
if (isDealCreated) {
response = await trackHubSpotLeadEvent({
payload: body,
workspace,
authToken,
settings,
});
}

// Track the sale event when deal is closed won
else if (isDealUpdated) {
response = await trackHubSpotSaleEvent({
payload: body,
workspace,
authToken,
settings,
});
}
}

// Unknown object type
else {
response = `Unknown objectTypeId ${objectTypeId}.`;
}

await captureWebhookLog({
workspaceId: workspace.id,
method: "POST",
path: "/hubspot/webhook",
statusCode: 200,
duration: Date.now() - startTime,
requestBody: body,
responseBody: response,
userAgent: req.headers.get("user-agent"),
});

return logAndRespond(response);
} catch (error) {
const response = handleAndReturnErrorResponse(error);

if (workspace) {
await captureWebhookLog({
workspaceId: workspace.id,
method: "POST",
path: "/hubspot/webhook",
statusCode: response.status,
duration: Date.now() - startTime,
requestBody: body,
responseBody: response,
userAgent: req.headers.get("user-agent"),
});
}

return response;
}
});
190 changes: 22 additions & 168 deletions apps/web/app/(ee)/api/hubspot/webhook/route.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
import { captureWebhookLog } from "@/lib/api-logs/capture-webhook-log";
import { DubApiError, handleAndReturnErrorResponse } from "@/lib/api/errors";
import { withAxiom } from "@/lib/axiom/server";
import { hubSpotOAuthProvider } from "@/lib/integrations/hubspot/oauth";
import {
hubSpotSettingsSchema,
hubSpotWebhookSchema,
} from "@/lib/integrations/hubspot/schema";
import { trackHubSpotLeadEvent } from "@/lib/integrations/hubspot/track-lead";
import { trackHubSpotSaleEvent } from "@/lib/integrations/hubspot/track-sale";
import { prisma } from "@dub/prisma";
import { waitUntil } from "@vercel/functions";
import { qstash } from "@/lib/cron";
import { APP_DOMAIN_WITH_NGROK } from "@dub/utils";
import crypto from "crypto";
import { NextResponse } from "next/server";
import { logAndRespond } from "../../cron/utils";

const HUBSPOT_CLIENT_SECRET = process.env.HUBSPOT_CLIENT_SECRET || "";

// POST /api/hubspot/webhook – listen to webhook events from Hubspot
export const POST = withAxiom(async (req) => {
const startTime = Date.now();

try {
const rawPayload = await req.text();
const rawBody = await req.text();
const signature = req.headers.get("X-HubSpot-Signature");

// Verify webhook signature
Expand All @@ -39,7 +29,7 @@ export const POST = withAxiom(async (req) => {
}

// Create expected hash: client_secret + request_body
const sourceString = HUBSPOT_CLIENT_SECRET + rawPayload;
const sourceString = HUBSPOT_CLIENT_SECRET + rawBody;
const expectedHash = crypto
.createHash("sha256")
.update(sourceString)
Expand All @@ -53,163 +43,27 @@ export const POST = withAxiom(async (req) => {
});
}

const payload = JSON.parse(rawPayload) as any[];

// HS send multiple events in the same request
// so we need to process each event individually
const results = await Promise.allSettled(payload.map(processWebhookEvent));

const responseBody = { message: "Webhook received." };
const duration = Date.now() - startTime;

// Collect log entries from fulfilled results, including failures
const logEntries: Array<{
workspaceId: string;
statusCode: number;
responseBody: unknown;
requestBody: unknown;
}> = [];

for (let i = 0; i < results.length; i++) {
const r = results[i];
if (r.status !== "fulfilled" || !r.value) {
continue;
}

const { workspaceId, errorResponse } = r.value;

logEntries.push({
workspaceId,
statusCode: errorResponse ? errorResponse.status : 200,
responseBody: errorResponse ?? responseBody,
requestBody: payload[i],
});
}
const events = JSON.parse(rawBody) as any[];
const finalEvents = Array.isArray(events) ? events : [events];

// HubSpot can send multiple events in a single request, so we fan them out
// to QStash and process each event independently in /api/hubspot/webhook/process.
// This keeps the webhook handler fast and ensures a slow/failing event doesn't
// block or fail the rest of the batch.
const qstashResponse = await qstash.batchJSON(
finalEvents.map((event) => ({
url: `${APP_DOMAIN_WITH_NGROK}/api/hubspot/webhook/process`,
body: event,
})),
);

waitUntil(
Promise.allSettled(
logEntries.map((entry) =>
captureWebhookLog({
workspaceId: entry.workspaceId,
method: req.method,
path: "/hubspot/webhook",
statusCode: entry.statusCode,
duration,
requestBody: entry.requestBody,
responseBody: entry.responseBody,
userAgent: req.headers.get("user-agent"),
}),
),
),
console.log(
`[hubspot/webhook] Enqueued ${finalEvents.length} webhook events to be processed.`,
qstashResponse,
);

return NextResponse.json(responseBody);
return logAndRespond("Webhook received.");
} catch (error) {
return handleAndReturnErrorResponse(error);
}
});

// Process individual event, returns workspaceId and error response if failed
async function processWebhookEvent(event: any) {
const { objectTypeId, portalId, subscriptionType } =
hubSpotWebhookSchema.parse(event);

// Find the installation
const installation = await prisma.installedIntegration.findFirst({
where: {
integration: {
slug: "hubspot",
},
credentials: {
path: "$.hub_id",
equals: portalId,
},
},
include: {
project: true,
},
});

if (!installation) {
console.error(
`[HubSpot] Installation is not found for portalId ${portalId}.`,
);
return;
}

const { project: workspace } = installation;

// Refresh the access token if needed
const authToken =
await hubSpotOAuthProvider.refreshTokenForInstallation(installation);

if (!authToken) {
console.error(
`[HubSpot] Authentication token is not found or valid for portalId ${portalId}.`,
);
return;
}

const settings = hubSpotSettingsSchema.parse(installation.settings ?? {});

console.log("[HubSpot] Event", event);
console.log("[HubSpot] Integration settings", settings);

try {
// Contact events
if (objectTypeId === "0-1") {
const isContactCreated = subscriptionType === "object.creation";

const isLifecycleStageChanged =
subscriptionType === "object.propertyChange" &&
settings.leadTriggerEvent === "lifecycleStageReached";

if (isContactCreated || isLifecycleStageChanged) {
await trackHubSpotLeadEvent({
payload: event,
workspace,
authToken,
settings,
});
}
}

// Deal event
if (objectTypeId === "0-3") {
const isDealCreated =
subscriptionType === "object.creation" &&
settings.leadTriggerEvent === "dealCreated";

const isDealUpdated = subscriptionType === "object.propertyChange";

// Track the final lead event
if (isDealCreated) {
await trackHubSpotLeadEvent({
payload: event,
workspace,
authToken,
settings,
});
}

// Track the sale event when deal is closed won
else if (isDealUpdated) {
await trackHubSpotSaleEvent({
payload: event,
workspace,
authToken,
settings,
});
}
}
} catch (error) {
return {
workspaceId: workspace.id,
errorResponse: handleAndReturnErrorResponse(error),
};
}

return {
workspaceId: workspace.id,
};
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { getPostbackOrThrow } from "@/lib/api/postbacks/get-postback-or-throw";
import { withPartnerProfile } from "@/lib/auth/partner";
import { getPostbackEvents } from "@/lib/postback/api/get-postback-events";
import { getPostbackEvents } from "@/lib/postback/get-postback-events";
import { getPostbackOrThrow } from "@/lib/postback/get-postback-or-throw";
import { NextResponse } from "next/server";

// GET /api/partner-profile/postbacks/[postbackId]/events
Expand Down
Loading
Loading