Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions __tests__/cron/validateActiveUsers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ describe('users for reactivation', () => {
'cio_subscription_preferences.topics.topic_28': true,
created_at: 1656427727,
first_name: 'Ido',
has_active_recruiter_subscription: false,
is_recruiter: false,
name: 'Ido',
permalink: 'http://localhost:5002/idoshamun',
referral_link: 'http://localhost:5002/join?cid=generic&userid=1',
Expand Down
81 changes: 81 additions & 0 deletions __tests__/workers/userUpdatedCio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import worker from '../../src/workers/userUpdatedCio';
import { ChangeObject } from '../../src/types';
import { expectSuccessfulTypedBackground } from '../helpers';
import {
Feed,
Organization,
User,
UserPersonalizedDigest,
UserPersonalizedDigestType,
Expand All @@ -19,6 +21,14 @@ import createOrGetConnection from '../../src/db';
import { DataSource } from 'typeorm';
import { usersFixture } from '../fixture/user';
import { DEFAULT_NOTIFICATION_SETTINGS } from '../../src/notifications/common';
import { Opportunity } from '../../src/entity/opportunities/Opportunity';
import { OpportunityUser } from '../../src/entity/opportunities/user';
import { OpportunityUserType } from '../../src/entity/opportunities/types';
import { ContentPreferenceOrganization } from '../../src/entity/contentPreference/ContentPreferenceOrganization';
import {
ContentPreferenceStatus,
ContentPreferenceType,
} from '../../src/entity/contentPreference/types';

jest.mock('../../src/common', () => ({
...jest.requireActual('../../src/common'),
Expand Down Expand Up @@ -85,6 +95,8 @@ describe('userUpdatedCio', () => {
updated_at: 1714577744,
username: 'cio',
referral_link: referral,
is_recruiter: false,
has_active_recruiter_subscription: false,
email_confirmed: true,
'cio_subscription_preferences.topics.topic_1': true,
'cio_subscription_preferences.topics.topic_4': true,
Expand Down Expand Up @@ -228,4 +240,73 @@ describe('userUpdatedCio', () => {
'cio_subscription_preferences.topics.topic_8': true,
});
});

it('should identify user as recruiter with active subscription when both are true', async () => {
const referral = 'https://dly.dev/12345678';
mocked(getShortGenericInviteLink).mockResolvedValue(referral);

// Create user first
await con.getRepository(User).save({
...usersFixture[0],
id: 'uucu1',
github: 'uucu1',
hashnode: 'uucu1',
email: 'uucu1@daily.dev',
twitter: 'uucu1',
username: 'uucu1',
notificationFlags: DEFAULT_NOTIFICATION_SETTINGS,
});

// Create feed for the user
await con.getRepository(Feed).save({
id: 'uucu1',
userId: 'uucu1',
});

// Create opportunity and make user a recruiter
const opportunity = await con.getRepository(Opportunity).save({
title: 'Test Opportunity',
tldr: 'Test',
type: 1,
state: 1,
});

await con.getRepository(OpportunityUser).save({
opportunityId: opportunity.id,
userId: 'uucu1',
type: OpportunityUserType.Recruiter,
});

// Create organization with active subscription
const org = await con.getRepository(Organization).save({
name: 'Test Org',
handle: 'testorg2',
recruiterSubscriptionFlags: {
status: 'active',
subscriptionId: 'sub_123',
},
});

await con.getRepository(ContentPreferenceOrganization).save({
referenceId: org.id,
userId: 'uucu1',
organizationId: org.id,
feedId: 'uucu1',
type: ContentPreferenceType.Organization,
status: ContentPreferenceStatus.Follow,
});

await expectSuccessfulTypedBackground(worker, {
newProfile: base,
user: base,
} as unknown as PubSubSchema['user-updated']);

expect(cio.identify).toHaveBeenCalledWith(
'uucu1',
expect.objectContaining({
is_recruiter: true,
has_active_recruiter_subscription: true,
}),
);
});
});
55 changes: 53 additions & 2 deletions src/cio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ import type { Company } from './entity/Company';
import { DataSource, In } from 'typeorm';
import { logger } from './logger';
import { OpportunityMatch } from './entity/OpportunityMatch';
import { OpportunityMatchStatus } from './entity/opportunities/types';
import {
OpportunityMatchStatus,
OpportunityUserType,
} from './entity/opportunities/types';
import { OpportunityUser } from './entity/opportunities/user/OpportunityUser';
import { ContentPreferenceOrganization } from './entity/contentPreference/ContentPreferenceOrganization';
import { SubscriptionStatus } from './common/plus/subscription';

export const cio = new TrackClient(
process.env.CIO_SITE_ID,
Expand Down Expand Up @@ -211,6 +217,42 @@ export const identifyUserOpportunities = async ({
}
};

/**
* Checks if a user is a recruiter by checking if they have any recruiter
* records in the OpportunityUser table.
*/
export const isUserRecruiter = async (
con: ConnectionManager,
userId: string,
): Promise<boolean> => {
const recruiterRecord = await con.getRepository(OpportunityUser).findOne({
where: {
userId,
type: OpportunityUserType.Recruiter,
},
});

return !!recruiterRecord;
};

/**
* Checks if a user has an active recruiter subscription through their organization(s).
* This helps identify users who created opportunities but haven't completed payment.
*/
export const hasActiveRecruiterSubscription = async (
con: ConnectionManager,
userId: string,
): Promise<boolean> =>
await con
.getRepository(ContentPreferenceOrganization)
.createQueryBuilder('cpo')
.innerJoin('cpo.organization', 'org')
.where('cpo.userId = :userId', { userId })
.andWhere(`org."recruiterSubscriptionFlags"->>'status' = :status`, {
status: SubscriptionStatus.Active,
})
.getExists();

export const generateIdentifyObject = async (
con: ConnectionManager,
user: ChangeObject<User>,
Expand Down Expand Up @@ -283,7 +325,12 @@ export const getIdentifyAttributes = async (
delete dup[field];
}

const [genericInviteURL, personalizedDigest] = await Promise.all([
const [
genericInviteURL,
personalizedDigest,
isRecruiter,
hasActiveSubscription,
] = await Promise.all([
getShortGenericInviteLink(logger, id),
con.getRepository(UserPersonalizedDigest).findOne({
select: ['userId'],
Expand All @@ -295,6 +342,8 @@ export const getIdentifyAttributes = async (
]),
},
}),
isUserRecruiter(con, id),
hasActiveRecruiterSubscription(con, id),
]);

return {
Expand All @@ -305,6 +354,8 @@ export const getIdentifyAttributes = async (
? dateToCioTimestamp(getDateBaseFromType(dup.updatedAt))
: undefined,
referral_link: genericInviteURL,
is_recruiter: isRecruiter,
has_active_recruiter_subscription: hasActiveSubscription,
[`cio_subscription_preferences.topics.topic_${CioUnsubscribeTopic.Digest}`]:
!!personalizedDigest,
...(user.notificationFlags
Expand Down
83 changes: 83 additions & 0 deletions src/workers/cdc/primary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ import { UserExperienceType } from '../../entity/user/experiences/types';
import { cio, identifyUserOpportunities } from '../../cio';
import { enrichCompanyForExperience } from '../../common/companyEnrichment';
import { Company } from '../../entity/Company';
import { OpportunityUser } from '../../entity/opportunities/user/OpportunityUser';
import { OpportunityUserType } from '../../entity/opportunities/types';

const convertUserToChangeObject = (user: User): ChangeObject<User> => ({
...user,
createdAt: user.createdAt.getTime() * 1000,
updatedAt: user.updatedAt ? user.updatedAt.getTime() * 1000 : undefined,
flags: JSON.stringify(user.flags || {}),
notificationFlags: JSON.stringify(user.notificationFlags || {}),
subscriptionFlags: JSON.stringify(user.subscriptionFlags || {}),
});

const isFreeformPostLongEnough = (
freeform: ChangeMessage<FreeformPost>,
Expand Down Expand Up @@ -1443,6 +1454,43 @@ const onOrganizationChange = async (
return;
}

// Sync organization members to Customer.io when recruiter subscription status changes
if (
isChanged(
data.payload.before!,
data.payload.after!,
'recruiterSubscriptionFlags',
)
) {
// Find all users who are recruiters for opportunities in this organization
const recruiters = await con
.getRepository(OpportunityUser)
.createQueryBuilder('ou')
.innerJoin('ou.opportunity', 'opp')
.where('opp.organizationId = :organizationId', {
organizationId: data.payload.after!.id,
})
.andWhere('ou.type = :type', { type: OpportunityUserType.Recruiter })
.select('DISTINCT ou.userId', 'userId')
.getRawMany<{ userId: string }>();

// Sync all organization recruiters to update their has_active_recruiter_subscription flag
await Promise.all(
recruiters.map(async (recruiter) => {
const user = await con
.getRepository(User)
.findOneBy({ id: recruiter.userId });
if (user && user.infoConfirmed && user.emailConfirmed) {
const userChangeObject = convertUserToChangeObject(user);
await triggerTypedEvent(logger, 'user-updated', {
user: userChangeObject,
newProfile: userChangeObject,
});
}
}),
);
}

if (
isChanged(data.payload.before!, data.payload.after!, [
'description',
Expand Down Expand Up @@ -1608,6 +1656,38 @@ const onUserExperienceChange = async (
}
};

const onOpportunityUserChange = async (
con: DataSource,
logger: FastifyBaseLogger,
data: ChangeMessage<OpportunityUser>,
) => {
// Get the userId from either the new or deleted record
const userId = data.payload.after?.userId || data.payload.before?.userId;

if (!userId) {
return;
}

// Sync to Customer.io when a recruiter record is created or deleted
// This ensures the is_recruiter flag is updated immediately
const shouldSync =
(data.payload.op === 'c' &&
data.payload.after?.type === OpportunityUserType.Recruiter) ||
(data.payload.op === 'd' &&
data.payload.before?.type === OpportunityUserType.Recruiter);

if (shouldSync) {
const user = await con.getRepository(User).findOneBy({ id: userId });
if (user && user.infoConfirmed && user.emailConfirmed) {
const userChangeObject = convertUserToChangeObject(user);
await triggerTypedEvent(logger, 'user-updated', {
user: userChangeObject,
newProfile: userChangeObject,
});
}
}
};

const worker: Worker = {
subscription: 'api-cdc',
maxMessages: parseInt(process.env.CDC_WORKER_MAX_MESSAGES) || undefined,
Expand Down Expand Up @@ -1739,6 +1819,9 @@ const worker: Worker = {
case getTableName(con, UserExperience):
await onUserExperienceChange(con, logger, data);
break;
case getTableName(con, OpportunityUser):
await onOpportunityUserChange(con, logger, data);
break;
}
} catch (err) {
logger.error(
Expand Down
Loading