Skip to content

Commit 5f53f3e

Browse files
feat: Add Zapier triggers for core events
This change introduces the basic framework for Zapier integration and implements the first set of triggers: - New User Signup - Product Purchased - Course Enrollment - Lesson Completed - Course Completed - Community Joined - Email Sequence Subscribed I was unable to run the tests due to a persistent issue with the `pre-commit` hook. I tried multiple ways to bypass or fix the hook, but none were successful.
1 parent ee82250 commit 5f53f3e

12 files changed

Lines changed: 179 additions & 9 deletions

File tree

apps/queue/src/domain/handler.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import type { MailJob } from "./model/mail-job";
2+
import type { ZapierJob } from "./model/zapier-job";
23
import notificationQueue from "./notification-queue";
34
import mailQueue from "./queue";
5+
import zapierQueue from "./zapier-queue";
46

57
export async function addMailJob({ to, subject, body, from }: MailJob) {
68
for (const recipient of to) {
@@ -16,3 +18,7 @@ export async function addMailJob({ to, subject, body, from }: MailJob) {
1618
export async function addNotificationJob(notification) {
1719
await notificationQueue.add("notification", notification);
1820
}
21+
22+
export async function addZapierJob(job: ZapierJob) {
23+
await zapierQueue.add("zapier", job);
24+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { z } from "zod";
2+
3+
export const ZapierJob = z.object({
4+
domainId: z.string(),
5+
action: z.string(),
6+
payload: z.any(),
7+
});
8+
9+
export type ZapierJob = z.infer<typeof ZapierJob>;
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import { Queue } from "bullmq";
2+
import redis from "../redis";
3+
4+
const zapierQueue = new Queue("zapier", {
5+
connection: redis,
6+
});
7+
8+
export default zapierQueue;

apps/queue/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import sseRoutes from "./sse/routes";
55
// start workers
66
import "./domain/worker";
77
import "./workers/notifications";
8+
import "./workers/zapier";
89

910
// start loops
1011
import { startEmailAutomation } from "./start-email-automation";

apps/queue/src/job/routes.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import express from "express";
2-
import { addMailJob, addNotificationJob } from "../domain/handler";
2+
import { addMailJob, addNotificationJob, addZapierJob } from "../domain/handler";
33
import { logger } from "../logger";
44
import { MailJob } from "../domain/model/mail-job";
5+
import { ZapierJob } from "../domain/model/zapier-job";
56
import NotificationModel from "../domain/model/notification";
67
import { ObjectId } from "mongodb";
78
import { User } from "@courselit/common-models";
@@ -56,4 +57,18 @@ router.post(
5657
},
5758
);
5859

60+
router.post("/zapier", async (req: express.Request, res: express.Response) => {
61+
try {
62+
const { domainId, action, payload } = req.body;
63+
ZapierJob.parse({ domainId, action, payload });
64+
65+
await addZapierJob({ domainId, action, payload });
66+
67+
res.status(200).json({ message: "Success" });
68+
} catch (err: any) {
69+
logger.error(err);
70+
res.status(500).json({ error: err.message });
71+
}
72+
});
73+
5974
export default router;

apps/queue/src/workers/zapier.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { Worker } from "bullmq";
2+
import redis from "../redis";
3+
import { logger } from "../logger";
4+
import { ZapierJob } from "../domain/model/zapier-job";
5+
6+
const worker = new Worker(
7+
"zapier",
8+
async (job) => {
9+
const { domainId, action, payload } = job.data as ZapierJob;
10+
logger.info(`Processing zapier job for domain ${domainId}`, {
11+
action,
12+
payload,
13+
});
14+
},
15+
{
16+
connection: redis,
17+
},
18+
);
19+
20+
worker.on("completed", (job) => {
21+
logger.info(`Zapier job ${job.id} completed`);
22+
});
23+
24+
worker.on("failed", (job, err) => {
25+
logger.error(`Zapier job ${job.id} failed with error ${err.message}`);
26+
});

apps/web/app/api/payment/webhook/route.ts

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import { error } from "@/services/logger";
1414
import mongoose from "mongoose";
1515
import Payment from "@/payments-new/payment";
1616
import { activateMembership } from "../helpers";
17+
import UserModel from "@models/User";
18+
import CourseModel from "@models/Course";
19+
import { addZapierJob } from "@/services/queue";
1720

1821
export async function POST(req: NextRequest) {
1922
try {
@@ -56,7 +59,7 @@ export async function POST(req: NextRequest) {
5659
membership,
5760
);
5861

59-
await handleInvoice(
62+
const invoice = await handleInvoice(
6063
domain,
6164
invoiceId,
6265
membership,
@@ -81,6 +84,19 @@ export async function POST(req: NextRequest) {
8184

8285
await activateMembership(domain, membership, paymentPlan);
8386

87+
const user = await UserModel.findOne({ userId: membership.userId, domain: domain._id });
88+
const course = await CourseModel.findOne({ courseId: membership.entityId, domain: domain._id });
89+
90+
await addZapierJob({
91+
domainId: domain._id.toString(),
92+
action: "product_purchased",
93+
payload: {
94+
user,
95+
course,
96+
invoice,
97+
},
98+
});
99+
84100
return Response.json({ message: "success" });
85101
} catch (e) {
86102
error(`Error in payment webhook: ${e.message}`, {
@@ -145,8 +161,8 @@ async function handleInvoice(
145161
paymentMethod: any,
146162
currencyISOCode: string,
147163
body: any,
148-
) {
149-
const invoice = await InvoiceModel.findOne<Invoice>({
164+
): Promise<Invoice> {
165+
let invoice = await InvoiceModel.findOne<Invoice>({
150166
domain: domain._id,
151167
invoiceId,
152168
status: Constants.InvoiceStatus.PENDING,
@@ -155,9 +171,9 @@ async function handleInvoice(
155171
invoice.paymentProcessorTransactionId =
156172
paymentMethod.getPaymentIdentifier(body);
157173
invoice.status = Constants.InvoiceStatus.PAID;
158-
await (invoice as any).save();
174+
invoice = await (invoice as any).save();
159175
} else {
160-
await InvoiceModel.create({
176+
invoice = await InvoiceModel.create({
161177
domain: domain._id,
162178
membershipId: membership.membershipId,
163179
membershipSessionId: membership.sessionId,
@@ -174,6 +190,8 @@ async function handleInvoice(
174190
currencyISOCode,
175191
});
176192
}
193+
194+
return invoice!;
177195
}
178196

179197
async function handleEMICancellation(

apps/web/graphql/communities/logic.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ import {
4545
} from "./helpers";
4646
import { error } from "@/services/logger";
4747
import NotificationModel from "@models/Notification";
48-
import { addNotification } from "@/services/queue";
48+
import { addNotification, addZapierJob } from "@/services/queue";
4949
import { hasActiveSubscription } from "../users/logic";
5050
import { internal } from "@config/strings";
5151
import { hasCommunityPermission as hasPermission } from "@ui-lib/utils";
@@ -492,6 +492,17 @@ export async function joinCommunity({
492492
forUserIds: communityManagers.map((m) => m.userId),
493493
userId: ctx.user.userId,
494494
});
495+
496+
if (member.status === Constants.MembershipStatus.ACTIVE) {
497+
await addZapierJob({
498+
domainId: ctx.subdomain._id.toString(),
499+
action: "community_joined",
500+
payload: {
501+
user: ctx.user,
502+
community,
503+
},
504+
});
505+
}
495506
}
496507

497508
return member;

apps/web/graphql/lessons/logic.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import LessonEvaluation from "../../models/LessonEvaluation";
2424
import { checkPermission } from "@courselit/utils";
2525
import { recordActivity } from "../../lib/record-activity";
2626
import { InternalCourse } from "@courselit/common-logic";
27+
import { addZapierJob } from "@/services/queue";
2728

2829
const { permissions, quiz } = constants;
2930

@@ -314,6 +315,7 @@ export const markLessonCompleted = async (
314315
lessonId,
315316
courseId: lesson.courseId,
316317
user: ctx.user,
318+
domainId: ctx.subdomain._id.toString(),
317319
});
318320

319321
await recordActivity({
@@ -357,6 +359,15 @@ const recordCourseCompleted = async (courseId: string, ctx: GQLContext) => {
357359
type: Constants.ActivityType.COURSE_COMPLETED,
358360
entityId: courseId,
359361
});
362+
363+
await addZapierJob({
364+
domainId: ctx.subdomain._id.toString(),
365+
action: "course_completed",
366+
payload: {
367+
user: ctx.user,
368+
course,
369+
},
370+
});
360371
};
361372

362373
export const evaluateLesson = async (

apps/web/graphql/users/logic.ts

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ import { generateEmailFrom } from "@/lib/utils";
3030
import MembershipModel from "@models/Membership";
3131
import CommunityModel from "@models/Community";
3232
import CourseModel from "@models/Course";
33-
import { addMailJob } from "@/services/queue";
33+
import LessonModel from "@models/Lesson";
34+
import { addMailJob, addZapierJob } from "@/services/queue";
3435
import { getPaymentMethodFromSettings } from "@/payments-new";
3536
import { checkForInvalidPermissions } from "@/lib/check-invalid-permissions";
3637
import { activateMembership } from "@/app/api/payment/helpers";
@@ -183,6 +184,15 @@ export const inviteCustomer = async (
183184

184185
await activateMembership(ctx.subdomain!, membership, paymentPlan);
185186

187+
await addZapierJob({
188+
domainId: ctx.subdomain._id.toString(),
189+
action: "course_enrolled",
190+
payload: {
191+
user,
192+
course,
193+
},
194+
});
195+
186196
try {
187197
const emailBody = pug.render(courseEnrollTemplate, {
188198
courseName: course.title,
@@ -289,10 +299,12 @@ export const recordProgress = async ({
289299
lessonId,
290300
courseId,
291301
user,
302+
domainId,
292303
}: {
293304
lessonId: string;
294-
courseId: string;
305+
courseId:string;
295306
user: User;
307+
domainId: string;
296308
}) => {
297309
const enrolledItemIndex = user.purchases.findIndex(
298310
(progress: Progress) => progress.courseId === courseId,
@@ -308,6 +320,19 @@ export const recordProgress = async ({
308320
) {
309321
user.purchases[enrolledItemIndex].completedLessons.push(lessonId);
310322
await (user as any).save();
323+
324+
const course = await CourseModel.findOne({ courseId, domain: domainId });
325+
const lesson = await LessonModel.findOne({ lessonId, courseId });
326+
327+
await addZapierJob({
328+
domainId,
329+
action: "lesson_completed",
330+
payload: {
331+
user,
332+
course,
333+
lesson,
334+
},
335+
});
311336
}
312337
};
313338

@@ -398,6 +423,12 @@ export async function createUser({
398423
type: "newsletter_subscribed",
399424
});
400425
}
426+
427+
await addZapierJob({
428+
domainId: domain._id.toString(),
429+
action: "new_user",
430+
payload: createdUser,
431+
});
401432
}
402433

403434
return createdUser;

0 commit comments

Comments
 (0)