diff --git a/__tests__/cron/validateActiveUsers.ts b/__tests__/cron/validateActiveUsers.ts index 8a1ec2bd1a..4f0be73985 100644 --- a/__tests__/cron/validateActiveUsers.ts +++ b/__tests__/cron/validateActiveUsers.ts @@ -296,6 +296,8 @@ describe('users for reactivation', () => { 'cio_subscription_preferences.topics.topic_28': true, created_at: 1656427727, first_name: 'Ido', + has_active_recruiter_subscription: false, + is_recruiter: false, name: 'Ido', permalink: 'http://localhost:5002/idoshamun', referral_link: 'http://localhost:5002/join?cid=generic&userid=1', diff --git a/__tests__/workers/userUpdatedCio.ts b/__tests__/workers/userUpdatedCio.ts index 2262c2fad7..fb7b2886ec 100644 --- a/__tests__/workers/userUpdatedCio.ts +++ b/__tests__/workers/userUpdatedCio.ts @@ -3,6 +3,8 @@ import worker from '../../src/workers/userUpdatedCio'; import { ChangeObject } from '../../src/types'; import { expectSuccessfulTypedBackground } from '../helpers'; import { + Feed, + Organization, User, UserPersonalizedDigest, UserPersonalizedDigestType, @@ -19,6 +21,14 @@ import createOrGetConnection from '../../src/db'; import { DataSource } from 'typeorm'; import { usersFixture } from '../fixture/user'; import { DEFAULT_NOTIFICATION_SETTINGS } from '../../src/notifications/common'; +import { Opportunity } from '../../src/entity/opportunities/Opportunity'; +import { OpportunityUser } from '../../src/entity/opportunities/user'; +import { OpportunityUserType } from '../../src/entity/opportunities/types'; +import { ContentPreferenceOrganization } from '../../src/entity/contentPreference/ContentPreferenceOrganization'; +import { + ContentPreferenceStatus, + ContentPreferenceType, +} from '../../src/entity/contentPreference/types'; jest.mock('../../src/common', () => ({ ...jest.requireActual('../../src/common'), @@ -85,6 +95,8 @@ describe('userUpdatedCio', () => { updated_at: 1714577744, username: 'cio', referral_link: referral, + is_recruiter: false, + has_active_recruiter_subscription: false, email_confirmed: true, 'cio_subscription_preferences.topics.topic_1': true, 'cio_subscription_preferences.topics.topic_4': true, @@ -228,4 +240,73 @@ describe('userUpdatedCio', () => { 'cio_subscription_preferences.topics.topic_8': true, }); }); + + it('should identify user as recruiter with active subscription when both are true', async () => { + const referral = 'https://dly.dev/12345678'; + mocked(getShortGenericInviteLink).mockResolvedValue(referral); + + // Create user first + await con.getRepository(User).save({ + ...usersFixture[0], + id: 'uucu1', + github: 'uucu1', + hashnode: 'uucu1', + email: 'uucu1@daily.dev', + twitter: 'uucu1', + username: 'uucu1', + notificationFlags: DEFAULT_NOTIFICATION_SETTINGS, + }); + + // Create feed for the user + await con.getRepository(Feed).save({ + id: 'uucu1', + userId: 'uucu1', + }); + + // Create opportunity and make user a recruiter + const opportunity = await con.getRepository(Opportunity).save({ + title: 'Test Opportunity', + tldr: 'Test', + type: 1, + state: 1, + }); + + await con.getRepository(OpportunityUser).save({ + opportunityId: opportunity.id, + userId: 'uucu1', + type: OpportunityUserType.Recruiter, + }); + + // Create organization with active subscription + const org = await con.getRepository(Organization).save({ + name: 'Test Org', + handle: 'testorg2', + recruiterSubscriptionFlags: { + status: 'active', + subscriptionId: 'sub_123', + }, + }); + + await con.getRepository(ContentPreferenceOrganization).save({ + referenceId: org.id, + userId: 'uucu1', + organizationId: org.id, + feedId: 'uucu1', + type: ContentPreferenceType.Organization, + status: ContentPreferenceStatus.Follow, + }); + + await expectSuccessfulTypedBackground(worker, { + newProfile: base, + user: base, + } as unknown as PubSubSchema['user-updated']); + + expect(cio.identify).toHaveBeenCalledWith( + 'uucu1', + expect.objectContaining({ + is_recruiter: true, + has_active_recruiter_subscription: true, + }), + ); + }); }); diff --git a/src/cio.ts b/src/cio.ts index 1022f2b841..d2d85f6112 100644 --- a/src/cio.ts +++ b/src/cio.ts @@ -51,7 +51,13 @@ import type { Company } from './entity/Company'; import { DataSource, In } from 'typeorm'; import { logger } from './logger'; import { OpportunityMatch } from './entity/OpportunityMatch'; -import { OpportunityMatchStatus } from './entity/opportunities/types'; +import { + OpportunityMatchStatus, + OpportunityUserType, +} from './entity/opportunities/types'; +import { OpportunityUser } from './entity/opportunities/user/OpportunityUser'; +import { ContentPreferenceOrganization } from './entity/contentPreference/ContentPreferenceOrganization'; +import { SubscriptionStatus } from './common/plus/subscription'; export const cio = new TrackClient( process.env.CIO_SITE_ID, @@ -211,6 +217,42 @@ export const identifyUserOpportunities = async ({ } }; +/** + * Checks if a user is a recruiter by checking if they have any recruiter + * records in the OpportunityUser table. + */ +export const isUserRecruiter = async ( + con: ConnectionManager, + userId: string, +): Promise => { + const recruiterRecord = await con.getRepository(OpportunityUser).findOne({ + where: { + userId, + type: OpportunityUserType.Recruiter, + }, + }); + + return !!recruiterRecord; +}; + +/** + * Checks if a user has an active recruiter subscription through their organization(s). + * This helps identify users who created opportunities but haven't completed payment. + */ +export const hasActiveRecruiterSubscription = async ( + con: ConnectionManager, + userId: string, +): Promise => + await con + .getRepository(ContentPreferenceOrganization) + .createQueryBuilder('cpo') + .innerJoin('cpo.organization', 'org') + .where('cpo.userId = :userId', { userId }) + .andWhere(`org."recruiterSubscriptionFlags"->>'status' = :status`, { + status: SubscriptionStatus.Active, + }) + .getExists(); + export const generateIdentifyObject = async ( con: ConnectionManager, user: ChangeObject, @@ -283,7 +325,12 @@ export const getIdentifyAttributes = async ( delete dup[field]; } - const [genericInviteURL, personalizedDigest] = await Promise.all([ + const [ + genericInviteURL, + personalizedDigest, + isRecruiter, + hasActiveSubscription, + ] = await Promise.all([ getShortGenericInviteLink(logger, id), con.getRepository(UserPersonalizedDigest).findOne({ select: ['userId'], @@ -295,6 +342,8 @@ export const getIdentifyAttributes = async ( ]), }, }), + isUserRecruiter(con, id), + hasActiveRecruiterSubscription(con, id), ]); return { @@ -305,6 +354,8 @@ export const getIdentifyAttributes = async ( ? dateToCioTimestamp(getDateBaseFromType(dup.updatedAt)) : undefined, referral_link: genericInviteURL, + is_recruiter: isRecruiter, + has_active_recruiter_subscription: hasActiveSubscription, [`cio_subscription_preferences.topics.topic_${CioUnsubscribeTopic.Digest}`]: !!personalizedDigest, ...(user.notificationFlags diff --git a/src/workers/cdc/primary.ts b/src/workers/cdc/primary.ts index 3339c0cfce..58bb77420b 100644 --- a/src/workers/cdc/primary.ts +++ b/src/workers/cdc/primary.ts @@ -158,6 +158,17 @@ import { UserExperienceType } from '../../entity/user/experiences/types'; import { cio, identifyUserOpportunities } from '../../cio'; import { enrichCompanyForExperience } from '../../common/companyEnrichment'; import { Company } from '../../entity/Company'; +import { OpportunityUser } from '../../entity/opportunities/user/OpportunityUser'; +import { OpportunityUserType } from '../../entity/opportunities/types'; + +const convertUserToChangeObject = (user: User): ChangeObject => ({ + ...user, + createdAt: user.createdAt.getTime() * 1000, + updatedAt: user.updatedAt ? user.updatedAt.getTime() * 1000 : undefined, + flags: JSON.stringify(user.flags || {}), + notificationFlags: JSON.stringify(user.notificationFlags || {}), + subscriptionFlags: JSON.stringify(user.subscriptionFlags || {}), +}); const isFreeformPostLongEnough = ( freeform: ChangeMessage, @@ -1443,6 +1454,43 @@ const onOrganizationChange = async ( return; } + // Sync organization members to Customer.io when recruiter subscription status changes + if ( + isChanged( + data.payload.before!, + data.payload.after!, + 'recruiterSubscriptionFlags', + ) + ) { + // Find all users who are recruiters for opportunities in this organization + const recruiters = await con + .getRepository(OpportunityUser) + .createQueryBuilder('ou') + .innerJoin('ou.opportunity', 'opp') + .where('opp.organizationId = :organizationId', { + organizationId: data.payload.after!.id, + }) + .andWhere('ou.type = :type', { type: OpportunityUserType.Recruiter }) + .select('DISTINCT ou.userId', 'userId') + .getRawMany<{ userId: string }>(); + + // Sync all organization recruiters to update their has_active_recruiter_subscription flag + await Promise.all( + recruiters.map(async (recruiter) => { + const user = await con + .getRepository(User) + .findOneBy({ id: recruiter.userId }); + if (user && user.infoConfirmed && user.emailConfirmed) { + const userChangeObject = convertUserToChangeObject(user); + await triggerTypedEvent(logger, 'user-updated', { + user: userChangeObject, + newProfile: userChangeObject, + }); + } + }), + ); + } + if ( isChanged(data.payload.before!, data.payload.after!, [ 'description', @@ -1608,6 +1656,38 @@ const onUserExperienceChange = async ( } }; +const onOpportunityUserChange = async ( + con: DataSource, + logger: FastifyBaseLogger, + data: ChangeMessage, +) => { + // Get the userId from either the new or deleted record + const userId = data.payload.after?.userId || data.payload.before?.userId; + + if (!userId) { + return; + } + + // Sync to Customer.io when a recruiter record is created or deleted + // This ensures the is_recruiter flag is updated immediately + const shouldSync = + (data.payload.op === 'c' && + data.payload.after?.type === OpportunityUserType.Recruiter) || + (data.payload.op === 'd' && + data.payload.before?.type === OpportunityUserType.Recruiter); + + if (shouldSync) { + const user = await con.getRepository(User).findOneBy({ id: userId }); + if (user && user.infoConfirmed && user.emailConfirmed) { + const userChangeObject = convertUserToChangeObject(user); + await triggerTypedEvent(logger, 'user-updated', { + user: userChangeObject, + newProfile: userChangeObject, + }); + } + } +}; + const worker: Worker = { subscription: 'api-cdc', maxMessages: parseInt(process.env.CDC_WORKER_MAX_MESSAGES) || undefined, @@ -1739,6 +1819,9 @@ const worker: Worker = { case getTableName(con, UserExperience): await onUserExperienceChange(con, logger, data); break; + case getTableName(con, OpportunityUser): + await onOpportunityUserChange(con, logger, data); + break; } } catch (err) { logger.error(