diff --git a/.infra/common.ts b/.infra/common.ts index 04ca07453b..6e6e5444ae 100644 --- a/.infra/common.ts +++ b/.infra/common.ts @@ -464,6 +464,10 @@ export const workers: Worker[] = [ topic: 'api.v1.opportunity-feedback-submitted', subscription: 'api.parse-opportunity-feedback', }, + { + topic: 'api.v1.opportunity-parse', + subscription: 'api.opportunity-parse', + }, ]; export const personalizedDigestWorkers: Worker[] = [ diff --git a/AGENTS.md b/AGENTS.md index 4e7b53ada4..067ac36aa6 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -21,6 +21,8 @@ This file provides guidance to coding agents when working with code in this repo - `pnpm run db:seed:import` - Import seed data for local development - `pnpm run db:migrate:make src/migration/MigrationName` - Generate new migration based on entity changes - `pnpm run db:migrate:create src/migration/MigrationName` - Create empty migration file +- **Never use raw SQL queries** (`con.query()`) - always use TypeORM repository methods or query builder +- If raw SQL is absolutely necessary, explain the reason and ask for permission before implementing **Migration Generation:** When adding or modifying entity columns, **always generate a migration** using: @@ -108,6 +110,7 @@ The migration generator compares entities against the local database schema. Ens - Fixtures in `__tests__/fixture/` for test data - Mercurius integration testing for GraphQL endpoints - Avoid creating multiple overlapping tests for the same scenario; a single test per key scenario is preferred +- When evaluating response objects (GraphQL, API), prefer `toEqual` and `toMatchObject` over multiple `expect().toBe()` lines **Infrastructure Concerns:** - OpenTelemetry for distributed tracing and metrics @@ -131,6 +134,12 @@ The migration generator compares entities against the local database schema. Ens - Extract repeated patterns into small inline helpers (e.g., `const respond = (text) => ...`) - Combine related checks (e.g., `if (!match || match.status !== X)` instead of separate blocks) +**Function style:** +- Prefer const arrow functions over function declarations: `const foo = () => {}` instead of `function foo() {}` +- Prefer single props-style argument over multiple arguments: `const foo = ({ a, b }) => {}` instead of `const foo = (a, b) => {}` +- Don't extract single-use code into separate functions - keep logic inline where it's used +- Only extract functions when the same logic is needed in multiple places + **PubSub topics should be general-purpose:** - Topics should contain only essential identifiers (e.g., `{ opportunityId, userId }`) - Subscribers fetch their own data - don't optimize topic payloads for specific consumers @@ -142,6 +151,11 @@ The migration generator compares entities against the local database schema. Ens - Example: Use `2 * ONE_DAY_IN_MINUTES` instead of `2 * 24 * 60` - Add new constants to `src/common/constants.ts` if needed (they are re-exported from `src/common/index.ts`) +**Type declarations:** +- Only create separate exported types if they are used in multiple places +- For single-use types, define them inline within the parent type +- Example: Instead of `export type FileData = {...}; type Flags = { file: FileData }`, use `type Flags = { file: { ... } }` + ## Best Practices & Lessons Learned **Avoiding Code Duplication:** diff --git a/__tests__/schema/opportunity.ts b/__tests__/schema/opportunity.ts index d66d837551..33d7d47799 100644 --- a/__tests__/schema/opportunity.ts +++ b/__tests__/schema/opportunity.ts @@ -57,7 +57,6 @@ import { OpportunityType, SalaryPeriod, SeniorityLevel, - Location, } from '@dailydotdev/schema'; import { UserCandidatePreference } from '../../src/entity/user/UserCandidatePreference'; import { QuestionScreening } from '../../src/entity/questions/QuestionScreening'; @@ -87,6 +86,7 @@ import { updateRecruiterSubscriptionFlags } from '../../src/common'; import { SubscriptionStatus } from '../../src/common/plus'; import { OpportunityPreviewStatus } from '../../src/common/opportunity/types'; import { unsupportedOpportunityDomains } from '../../src/common/schema/opportunities'; +import * as typedPubsub from '../../src/common/typedPubsub'; // Mock Slack WebClient const mockConversationsCreate = jest.fn(); @@ -680,6 +680,129 @@ describe('query opportunities', () => { expect(secondPage.data.opportunities.pageInfo.hasNextPage).toBe(false); expect(secondPage.data.opportunities.pageInfo.hasPreviousPage).toBe(true); }); + + it('should not return opportunities in PARSING state', async () => { + loggedUser = '1'; + isTeamMember = true; + + // Insert opportunity in PARSING state + await saveFixtures(con, OpportunityJob, [ + { + id: '550e8400-e29b-41d4-a716-446655440100', + type: OpportunityType.JOB, + state: OpportunityState.PARSING, + title: 'Parsing Opportunity', + tldr: 'This opportunity is being parsed', + organizationId: organizationsFixture[0].id, + createdAt: new Date('2023-01-10'), + updatedAt: new Date('2023-01-10'), + }, + ]); + await saveFixtures(con, OpportunityUser, [ + { + opportunityId: '550e8400-e29b-41d4-a716-446655440100', + userId: usersFixture[0].id, + type: OpportunityUserType.Recruiter, + }, + ]); + + const res = await client.query(GET_OPPORTUNITIES_QUERY, { + variables: { state: OpportunityState.PARSING, first: 10 }, + }); + + expect(res.errors).toBeFalsy(); + expect(res.data.opportunities.edges).toHaveLength(0); + + isTeamMember = false; + }); + + it('should not return opportunities in ERROR state', async () => { + loggedUser = '1'; + isTeamMember = true; + + // Insert opportunity in ERROR state + await saveFixtures(con, OpportunityJob, [ + { + id: '550e8400-e29b-41d4-a716-446655440101', + type: OpportunityType.JOB, + state: OpportunityState.ERROR, + title: 'Error Opportunity', + tldr: 'This opportunity encountered an error', + organizationId: organizationsFixture[0].id, + createdAt: new Date('2023-01-11'), + updatedAt: new Date('2023-01-11'), + }, + ]); + await saveFixtures(con, OpportunityUser, [ + { + opportunityId: '550e8400-e29b-41d4-a716-446655440101', + userId: usersFixture[0].id, + type: OpportunityUserType.Recruiter, + }, + ]); + + const res = await client.query(GET_OPPORTUNITIES_QUERY, { + variables: { state: OpportunityState.ERROR, first: 10 }, + }); + + expect(res.errors).toBeFalsy(); + expect(res.data.opportunities.edges).toHaveLength(0); + + isTeamMember = false; + }); + + it('should not return PARSING or ERROR opportunities when querying LIVE state', async () => { + loggedUser = '1'; + + // Insert opportunities in PARSING and ERROR states + await saveFixtures(con, OpportunityJob, [ + { + id: '550e8400-e29b-41d4-a716-446655440102', + type: OpportunityType.JOB, + state: OpportunityState.PARSING, + title: 'Parsing Opportunity 2', + tldr: 'This opportunity is being parsed', + organizationId: organizationsFixture[0].id, + createdAt: new Date('2023-01-12'), + updatedAt: new Date('2023-01-12'), + }, + { + id: '550e8400-e29b-41d4-a716-446655440103', + type: OpportunityType.JOB, + state: OpportunityState.ERROR, + title: 'Error Opportunity 2', + tldr: 'This opportunity encountered an error', + organizationId: organizationsFixture[0].id, + createdAt: new Date('2023-01-13'), + updatedAt: new Date('2023-01-13'), + }, + ]); + await saveFixtures(con, OpportunityUser, [ + { + opportunityId: '550e8400-e29b-41d4-a716-446655440102', + userId: usersFixture[0].id, + type: OpportunityUserType.Recruiter, + }, + { + opportunityId: '550e8400-e29b-41d4-a716-446655440103', + userId: usersFixture[0].id, + type: OpportunityUserType.Recruiter, + }, + ]); + + const res = await client.query(GET_OPPORTUNITIES_QUERY, { + variables: { state: OpportunityState.LIVE, first: 10 }, + }); + + expect(res.errors).toBeFalsy(); + // Should only return the 3 LIVE opportunities from fixtures, not the PARSING or ERROR ones + expect(res.data.opportunities.edges).toHaveLength(3); + const states = res.data.opportunities.edges.map( + (e: { node: { state: string } }) => e.node.state, + ); + expect(states).not.toContain(OpportunityState.PARSING); + expect(states).not.toContain(OpportunityState.ERROR); + }); }); describe('query getOpportunityMatch', () => { @@ -5104,6 +5227,7 @@ describe('mutation parseOpportunity', () => { mutation ParseOpportunity($payload: ParseOpportunityInput!) { parseOpportunity(payload: $payload) { id + state title tldr content { @@ -5186,6 +5310,16 @@ describe('mutation parseOpportunity', () => { .mockImplementation((): ServiceClient => { return serviceClient; }); + + // Mock GCS upload + jest + .spyOn(googleCloud, 'uploadResumeFromBuffer') + .mockResolvedValue( + 'https://storage.cloud.google.com/bucket/opportunity-123.pdf', + ); + + // Mock PubSub event trigger + jest.spyOn(typedPubsub, 'triggerTypedEvent').mockResolvedValue(undefined); }); it('should parse opportunity from file', async () => { @@ -5218,57 +5352,28 @@ describe('mutation parseOpportunity', () => { const body = res.body; expect(body.errors).toBeFalsy(); - expect(body.data.parseOpportunity).toMatchObject({ - title: 'Mocked Opportunity Title', - tldr: 'This is a mocked TL;DR of the opportunity.', - keywords: [ - { keyword: 'mock' }, - { keyword: 'opportunity' }, - { keyword: 'test' }, - ], - meta: { - employmentType: EmploymentType.FULL_TIME, - seniorityLevel: SeniorityLevel.SENIOR, - roleType: RoleType.Auto, - salary: { - min: 1000, - max: 2000, - period: SalaryPeriod.MONTHLY, - }, - }, - content: { - overview: { - content: 'This is the overview of the mocked opportunity.', - html: '

This is the overview of the mocked opportunity.

\n', - }, - responsibilities: { - content: 'These are the responsibilities of the mocked opportunity.', - html: '

These are the responsibilities of the mocked opportunity.

\n', - }, - requirements: { - content: 'These are the requirements of the mocked opportunity.', - html: '

These are the requirements of the mocked opportunity.

\n', - }, - }, - locations: [ - { - type: LocationType.REMOTE, - location: { - city: null, - country: 'USA', - subdivision: null, - }, - }, - ], - questions: [], - feedbackQuestions: [ - { - title: 'Why did you reject this opportunity?', - placeholder: `E.g., Not interested in the tech stack, location doesn't work for me, compensation too low...`, - }, - ], + // Verify opportunity is in PARSING state + expect(body.data.parseOpportunity.state).toBe(OpportunityState.PARSING); + expect(body.data.parseOpportunity.title).toBe('Processing...'); + expect(body.data.parseOpportunity.id).toBeDefined(); + + // Verify uploadResumeFromBuffer was called + expect(googleCloud.uploadResumeFromBuffer).toHaveBeenCalledWith( + expect.stringContaining('opportunity-'), + expect.any(Buffer), + { contentType: 'application/pdf' }, + ); + + // Verify triggerTypedEvent was called with opportunityId + expect(typedPubsub.triggerTypedEvent).toHaveBeenCalled(); + const triggerCall = (typedPubsub.triggerTypedEvent as jest.Mock).mock + .calls[0]; + expect(triggerCall[1]).toBe('api.v1.opportunity-parse'); + expect(triggerCall[2]).toEqual({ + opportunityId: body.data.parseOpportunity.id, }); + // Verify opportunity in database has file data in flags const opportunity = await con.getRepository(OpportunityJob).findOne({ where: { id: body.data.parseOpportunity.id, @@ -5276,7 +5381,14 @@ describe('mutation parseOpportunity', () => { }); expect(opportunity).toBeDefined(); - expect(opportunity!.state).toBe(OpportunityState.DRAFT); + expect(opportunity!.state).toBe(OpportunityState.PARSING); + expect(opportunity!.flags?.file).toMatchObject({ + blobName: expect.stringContaining('opportunity-'), + bucketName: expect.any(String), + mimeType: 'application/pdf', + extension: 'pdf', + trackingId: 'anon1', + }); }); it('should parse opportunity from URL', async () => { @@ -5317,49 +5429,42 @@ describe('mutation parseOpportunity', () => { const body = res.body; expect(body.errors).toBeFalsy(); - expect(body.data.parseOpportunity).toMatchObject({ - title: 'Mocked Opportunity Title', - tldr: 'This is a mocked TL;DR of the opportunity.', - keywords: [ - { keyword: 'mock' }, - { keyword: 'opportunity' }, - { keyword: 'test' }, - ], - meta: { - employmentType: EmploymentType.FULL_TIME, - seniorityLevel: SeniorityLevel.SENIOR, - roleType: RoleType.Auto, - salary: { - min: 1000, - max: 2000, - period: SalaryPeriod.MONTHLY, - }, - }, - content: { - overview: { - content: 'This is the overview of the mocked opportunity.', - html: '

This is the overview of the mocked opportunity.

\n', - }, - responsibilities: { - content: 'These are the responsibilities of the mocked opportunity.', - html: '

These are the responsibilities of the mocked opportunity.

\n', - }, - requirements: { - content: 'These are the requirements of the mocked opportunity.', - html: '

These are the requirements of the mocked opportunity.

\n', - }, + // Verify opportunity is in PARSING state + expect(body.data.parseOpportunity.state).toBe(OpportunityState.PARSING); + expect(body.data.parseOpportunity.title).toBe('Processing...'); + expect(body.data.parseOpportunity.id).toBeDefined(); + + // Verify uploadResumeFromBuffer was called + expect(googleCloud.uploadResumeFromBuffer).toHaveBeenCalledWith( + expect.stringContaining('opportunity-'), + expect.any(Buffer), + { contentType: 'application/pdf' }, + ); + + // Verify triggerTypedEvent was called with opportunityId + expect(typedPubsub.triggerTypedEvent).toHaveBeenCalled(); + const triggerCall = (typedPubsub.triggerTypedEvent as jest.Mock).mock + .calls[0]; + expect(triggerCall[1]).toBe('api.v1.opportunity-parse'); + expect(triggerCall[2]).toEqual({ + opportunityId: body.data.parseOpportunity.id, + }); + + // Verify opportunity in database has file data in flags + const opportunity = await con.getRepository(OpportunityJob).findOne({ + where: { + id: body.data.parseOpportunity.id, }, - locations: [ - { - type: LocationType.REMOTE, - location: { - city: null, - country: 'USA', - subdivision: null, - }, - }, - ], - questions: [], + }); + + expect(opportunity).toBeDefined(); + expect(opportunity!.state).toBe(OpportunityState.PARSING); + expect(opportunity!.flags?.file).toMatchObject({ + blobName: expect.stringContaining('opportunity-'), + bucketName: expect.any(String), + mimeType: 'application/pdf', + extension: 'pdf', + trackingId: 'anon1', }); }); @@ -5479,57 +5584,28 @@ describe('mutation parseOpportunity', () => { const body = res.body; expect(body.errors).toBeFalsy(); - expect(body.data.parseOpportunity).toMatchObject({ - title: 'Mocked Opportunity Title', - tldr: 'This is a mocked TL;DR of the opportunity.', - keywords: [ - { keyword: 'mock' }, - { keyword: 'opportunity' }, - { keyword: 'test' }, - ], - meta: { - employmentType: EmploymentType.FULL_TIME, - seniorityLevel: SeniorityLevel.SENIOR, - roleType: RoleType.Auto, - salary: { - min: 1000, - max: 2000, - period: SalaryPeriod.MONTHLY, - }, - }, - content: { - overview: { - content: 'This is the overview of the mocked opportunity.', - html: '

This is the overview of the mocked opportunity.

\n', - }, - responsibilities: { - content: 'These are the responsibilities of the mocked opportunity.', - html: '

These are the responsibilities of the mocked opportunity.

\n', - }, - requirements: { - content: 'These are the requirements of the mocked opportunity.', - html: '

These are the requirements of the mocked opportunity.

\n', - }, - }, - locations: [ - { - type: LocationType.REMOTE, - location: { - city: null, - country: 'USA', - subdivision: null, - }, - }, - ], - questions: [], - feedbackQuestions: [ - { - title: 'Why did you reject this opportunity?', - placeholder: `E.g., Not interested in the tech stack, location doesn't work for me, compensation too low...`, - }, - ], + // Verify opportunity is in PARSING state + expect(body.data.parseOpportunity.state).toBe(OpportunityState.PARSING); + expect(body.data.parseOpportunity.title).toBe('Processing...'); + expect(body.data.parseOpportunity.id).toBeDefined(); + + // Verify uploadResumeFromBuffer was called + expect(googleCloud.uploadResumeFromBuffer).toHaveBeenCalledWith( + expect.stringContaining('opportunity-'), + expect.any(Buffer), + { contentType: 'application/pdf' }, + ); + + // Verify triggerTypedEvent was called with opportunityId + expect(typedPubsub.triggerTypedEvent).toHaveBeenCalled(); + const triggerCall = (typedPubsub.triggerTypedEvent as jest.Mock).mock + .calls[0]; + expect(triggerCall[1]).toBe('api.v1.opportunity-parse'); + expect(triggerCall[2]).toEqual({ + opportunityId: body.data.parseOpportunity.id, }); + // Verify opportunity in database has file data in flags const opportunity = await con.getRepository(OpportunityJob).findOne({ where: { id: body.data.parseOpportunity.id, @@ -5537,7 +5613,7 @@ describe('mutation parseOpportunity', () => { }); expect(opportunity).toBeDefined(); - expect(opportunity!.state).toBe(OpportunityState.DRAFT); + expect(opportunity!.state).toBe(OpportunityState.PARSING); const opportunityRecruiter = await con .getRepository(OpportunityUserRecruiter) @@ -5549,6 +5625,14 @@ describe('mutation parseOpportunity', () => { }); expect(opportunityRecruiter).toBeDefined(); + + expect(opportunity!.flags?.file).toMatchObject({ + blobName: expect.stringContaining('opportunity-'), + bucketName: expect.any(String), + mimeType: 'application/pdf', + extension: 'pdf', + userId: loggedUser, + }); }); it('should assign opportunity to existing organization of authenticated user', async () => { @@ -5583,10 +5667,9 @@ describe('mutation parseOpportunity', () => { const body = res.body; expect(body.errors).toBeFalsy(); - expect(body.data.parseOpportunity).toMatchObject({ - title: 'Mocked Opportunity Title', - organization: { id: '550e8400-e29b-41d4-a716-446655440000' }, - }); + // Verify opportunity is in PARSING state (organization will be linked by worker) + expect(body.data.parseOpportunity.state).toBe(OpportunityState.PARSING); + expect(body.data.parseOpportunity.title).toBe('Processing...'); const opportunity = await con.getRepository(OpportunityJob).findOne({ where: { @@ -5594,91 +5677,10 @@ describe('mutation parseOpportunity', () => { }, }); - expect(opportunity!.organizationId).toBe( - '550e8400-e29b-41d4-a716-446655440000', - ); - }); - - it('should assign opportunity location to Europe when no country specified', async () => { - loggedUser = '1'; - - fileTypeFromBuffer.mockResolvedValue({ - ext: 'pdf', - mime: 'application/pdf', - }); - - const transport = createMockBrokkrTransport({ - opportunity: { - location: [ - new Location({ - continent: 'Europe', - type: 1, - }), - ], - }, - }); - - const serviceClient = { - instance: createClient(BrokkrService, transport), - garmr: createGarmrMock(), - }; - - jest - .spyOn(brokkrCommon, 'getBrokkrClient') - .mockImplementation((): ServiceClient => { - return serviceClient; - }); - - await saveFixtures(con, DatasetLocation, [ - { - continent: 'Europe', - }, - ]); - - // Execute the mutation with a file upload - const res = await authorizeRequest( - request(app.server) - .post('/graphql') - .field( - 'operations', - JSON.stringify({ - query: MUTATION, - variables: { - payload: { - file: null, - }, - }, - }), - ) - .field('map', JSON.stringify({ '0': ['variables.payload.file'] })) - .attach('0', './__tests__/fixture/screen.pdf'), - ).expect(200); - - const body = res.body; - expect(body.errors).toBeFalsy(); - - expect(body.data.parseOpportunity).toMatchObject({ - locations: [ - { - location: { - city: null, - country: 'Europe', - subdivision: null, - }, - type: 1, - }, - ], - }); - - const opportunity = await con.getRepository(OpportunityJob).findOne({ - where: { - id: body.data.parseOpportunity.id, - }, - }); - - expect(opportunity!.organizationId).toBe( - '550e8400-e29b-41d4-a716-446655440000', - ); + expect(opportunity).toBeDefined(); + expect(opportunity!.state).toBe(OpportunityState.PARSING); + // Organization will be linked by the worker, not the mutation + expect(opportunity!.organizationId).toBeNull(); }); it('should throw when trying to parse opportunities from unsupported domain', async () => { @@ -6399,6 +6401,50 @@ describe('query opportunityPreview', () => { location: [{ iso2: 'US', country: 'United States' }], }); }); + + it('should throw conflict error when opportunity is in parsing state', async () => { + await con.getRepository(OpportunityJob).update( + { id: opportunitiesFixture[0].id }, + { + state: OpportunityState.PARSING, + flags: { + anonUserId: 'test-anon-user-123', + }, + }, + ); + + await testQueryErrorCode( + client, + { + query: OPPORTUNITY_PREVIEW_QUERY, + variables: { first: 10 }, + }, + 'CONFLICT', + 'Opportunity is not ready for preview yet', + ); + }); + + it('should throw conflict error when opportunity is in error state', async () => { + await con.getRepository(OpportunityJob).update( + { id: opportunitiesFixture[0].id }, + { + state: OpportunityState.ERROR, + flags: { + anonUserId: 'test-anon-user-123', + }, + }, + ); + + await testQueryErrorCode( + client, + { + query: OPPORTUNITY_PREVIEW_QUERY, + variables: { first: 10 }, + }, + 'CONFLICT', + 'Opportunity is not ready for preview yet', + ); + }); }); describe('query opportunityStats', () => { diff --git a/__tests__/workers/opportunity/parseOpportunity.ts b/__tests__/workers/opportunity/parseOpportunity.ts new file mode 100644 index 0000000000..2c63e975c4 --- /dev/null +++ b/__tests__/workers/opportunity/parseOpportunity.ts @@ -0,0 +1,666 @@ +import { + expectSuccessfulTypedBackground, + saveFixtures, + createMockBrokkrTransport, + createGarmrMock, +} from '../../helpers'; +import { parseOpportunityWorker as worker } from '../../../src/workers/opportunity/parseOpportunity'; +import { DataSource } from 'typeorm'; +import createOrGetConnection from '../../../src/db'; +import { OpportunityJob } from '../../../src/entity/opportunities/OpportunityJob'; +import { OpportunityKeyword } from '../../../src/entity/OpportunityKeyword'; +import { OpportunityLocation } from '../../../src/entity/opportunities/OpportunityLocation'; +import { OpportunityUserRecruiter } from '../../../src/entity/opportunities/user/OpportunityUserRecruiter'; +import { DatasetLocation } from '../../../src/entity/dataset/DatasetLocation'; +import { User } from '../../../src/entity'; +import { usersFixture } from '../../fixture'; +import { + datasetLocationsFixture, + organizationsFixture, +} from '../../fixture/opportunity'; +import { + OpportunityState, + OpportunityType, + OpportunityContent, + EmploymentType, + SeniorityLevel, + LocationType, + SalaryPeriod, + BrokkrService, + Location, +} from '@dailydotdev/schema'; +import { RoleType } from '../../../src/common/schema/userCandidate'; +import { Storage } from '@google-cloud/storage'; +import { Organization } from '../../../src/entity/Organization'; +import { RESUME_BUCKET_NAME } from '../../../src/config'; +import { createClient } from '@connectrpc/connect'; +import type { ServiceClient } from '../../../src/types'; +import * as brokkrCommon from '../../../src/common/brokkr'; + +const mockStorageDownload = jest.fn(); +const mockStorageDelete = jest.fn(); +const mockStorageExists = jest.fn(); + +// Mock GCS Storage +jest.mock('@google-cloud/storage'); + +let con: DataSource; +const testOpportunityId = '550e8400-e29b-41d4-a716-446655440000'; +const testBlobName = 'opportunity-test-123.pdf'; +const testUserId = '1'; + +beforeAll(async () => { + con = await createOrGetConnection(); +}); + +beforeEach(async () => { + jest.resetAllMocks(); + + await saveFixtures(con, User, usersFixture); + await saveFixtures(con, DatasetLocation, datasetLocationsFixture); + await saveFixtures(con, Organization, organizationsFixture); + + // Mock Brokkr client + const transport = createMockBrokkrTransport(); + + const serviceClient = { + instance: createClient(BrokkrService, transport), + garmr: createGarmrMock(), + }; + + jest + .spyOn(brokkrCommon, 'getBrokkrClient') + .mockImplementation((): ServiceClient => { + return serviceClient; + }); + + // Mock GCS Storage + const mockFile = { + download: mockStorageDownload, + delete: mockStorageDelete, + exists: mockStorageExists, + }; + const mockBucket = { + file: jest.fn().mockReturnValue(mockFile), + }; + (Storage as unknown as jest.Mock).mockImplementation(() => ({ + bucket: jest.fn().mockReturnValue(mockBucket), + })); + + // Mock GCS operations + mockStorageDownload.mockResolvedValue([Buffer.from('mock-pdf-content')]); + mockStorageDelete.mockResolvedValue([]); + mockStorageExists.mockResolvedValue([true]); +}); + +afterEach(async () => { + await con.getRepository(OpportunityJob).delete({ id: testOpportunityId }); +}); + +describe('parseOpportunity worker', () => { + it('should process opportunity successfully', async () => { + // Spy on Brokkr parseOpportunity + const parseOpportunitySpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseOpportunity', + ); + + // Create opportunity in PARSING state with file data + await con.getRepository(OpportunityJob).save({ + id: testOpportunityId, + type: OpportunityType.JOB, + state: OpportunityState.PARSING, + title: 'Processing...', + tldr: '', + content: new OpportunityContent({}), + flags: { + batchSize: 100, + file: { + blobName: testBlobName, + bucketName: RESUME_BUCKET_NAME, + mimeType: 'application/pdf', + extension: 'pdf', + userId: testUserId, + trackingId: 'anon1', + }, + }, + }); + + await expectSuccessfulTypedBackground<'api.v1.opportunity-parse'>(worker, { + opportunityId: testOpportunityId, + }); + + // Verify GCS download was called + expect(mockStorageDownload).toHaveBeenCalled(); + + // Verify Brokkr was called + expect(parseOpportunitySpy).toHaveBeenCalledWith( + expect.objectContaining({ + blobName: expect.stringContaining('job-opportunity-parse'), + blob: expect.objectContaining({ + mime: 'application/pdf', + ext: 'pdf', + content: expect.any(Buffer), + }), + }), + ); + + // Verify opportunity was updated with parsed data + const opportunity = await con.getRepository(OpportunityJob).findOne({ + where: { id: testOpportunityId }, + relations: [ + 'keywords', + 'locations', + 'users', + 'questions', + 'feedbackQuestions', + ], + }); + + expect(opportunity).toBeDefined(); + expect(opportunity!.state).toBe(OpportunityState.DRAFT); + + // Verify basic fields + expect(opportunity!.title).toBe('Mocked Opportunity Title'); + expect(opportunity!.tldr).toBe( + 'This is a mocked TL;DR of the opportunity.', + ); + + // Verify keywords + const keywords = await con.getRepository(OpportunityKeyword).find({ + where: { opportunityId: testOpportunityId }, + }); + expect(keywords).toHaveLength(3); + expect(keywords.map((k) => k.keyword).sort()).toEqual([ + 'mock', + 'opportunity', + 'test', + ]); + + // Verify meta + expect(opportunity!.meta).toMatchObject({ + employmentType: EmploymentType.FULL_TIME, + seniorityLevel: SeniorityLevel.SENIOR, + roleType: RoleType.Auto, + salary: { + min: 1000, + max: 2000, + period: SalaryPeriod.MONTHLY, + }, + }); + + // Verify content + expect(opportunity!.content).toMatchObject({ + overview: { + content: 'This is the overview of the mocked opportunity.', + html: '

This is the overview of the mocked opportunity.

\n', + }, + responsibilities: { + content: 'These are the responsibilities of the mocked opportunity.', + html: '

These are the responsibilities of the mocked opportunity.

\n', + }, + requirements: { + content: 'These are the requirements of the mocked opportunity.', + html: '

These are the requirements of the mocked opportunity.

\n', + }, + }); + + // Verify locations + const locations = await con.getRepository(OpportunityLocation).find({ + where: { opportunityId: testOpportunityId }, + relations: ['location'], + }); + expect(locations).toHaveLength(1); + expect(locations[0].type).toBe(LocationType.REMOTE); + const locationData = await locations[0].location; + expect(locationData).toMatchObject({ + country: 'USA', + }); + + // Verify questions (empty by default for new opportunities) + const questions = await opportunity!.questions; + expect(questions).toHaveLength(0); + + // Verify feedback questions + const feedbackQuestions = await opportunity!.feedbackQuestions; + expect(feedbackQuestions).toHaveLength(1); + expect(feedbackQuestions[0]).toMatchObject({ + title: 'Why did you reject this opportunity?', + placeholder: `E.g., Not interested in the tech stack, location doesn't work for me, compensation too low...`, + }); + + // Verify file was deleted from GCS and flags.file was cleared + expect(mockStorageDelete).toHaveBeenCalled(); + expect(opportunity!.flags?.file).toBeNull(); + + // Verify recruiter was assigned + const recruiter = await con + .getRepository(OpportunityUserRecruiter) + .findOne({ + where: { opportunityId: testOpportunityId, userId: testUserId }, + }); + expect(recruiter).toBeDefined(); + }); + + it('should set ERROR state on Brokkr failure', async () => { + // Spy on Brokkr parseOpportunity and make it fail + const parseOpportunitySpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseOpportunity', + ); + parseOpportunitySpy.mockRejectedValue(new Error('Brokkr parsing failed')); + + await con.getRepository(OpportunityJob).save({ + id: testOpportunityId, + type: OpportunityType.JOB, + state: OpportunityState.PARSING, + title: 'Processing...', + tldr: '', + content: new OpportunityContent({}), + flags: { + batchSize: 100, + file: { + blobName: testBlobName, + bucketName: RESUME_BUCKET_NAME, + mimeType: 'application/pdf', + extension: 'pdf', + trackingId: 'anon1', + }, + }, + }); + + await expectSuccessfulTypedBackground<'api.v1.opportunity-parse'>(worker, { + opportunityId: testOpportunityId, + }); + + const opportunity = await con.getRepository(OpportunityJob).findOne({ + where: { id: testOpportunityId }, + }); + + expect(opportunity!.state).toBe(OpportunityState.ERROR); + expect(opportunity!.flags?.parseError).toContain('Brokkr parsing failed'); + }); + + it('should skip if state is not PARSING', async () => { + await con.getRepository(OpportunityJob).save({ + id: testOpportunityId, + type: OpportunityType.JOB, + state: OpportunityState.DRAFT, + title: 'Already processed', + tldr: 'Test', + content: new OpportunityContent({}), + flags: { + batchSize: 100, + }, + }); + + await expectSuccessfulTypedBackground<'api.v1.opportunity-parse'>(worker, { + opportunityId: testOpportunityId, + }); + + // Verify Brokkr was NOT called + const parseOpportunitySpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseOpportunity', + ); + expect(parseOpportunitySpy).not.toHaveBeenCalled(); + expect(mockStorageDownload).not.toHaveBeenCalled(); + + // Verify opportunity unchanged + const opportunity = await con.getRepository(OpportunityJob).findOne({ + where: { id: testOpportunityId }, + }); + expect(opportunity!.state).toBe(OpportunityState.DRAFT); + expect(opportunity!.title).toBe('Already processed'); + }); + + it('should clean up GCS file when state is not PARSING', async () => { + // Create opportunity in DRAFT state but WITH file data + await con.getRepository(OpportunityJob).save({ + id: testOpportunityId, + type: OpportunityType.JOB, + state: OpportunityState.DRAFT, + title: 'Already processed', + tldr: 'Test', + content: new OpportunityContent({}), + flags: { + batchSize: 100, + file: { + blobName: testBlobName, + bucketName: RESUME_BUCKET_NAME, + mimeType: 'application/pdf', + extension: 'pdf', + trackingId: 'anon1', + }, + }, + }); + + await expectSuccessfulTypedBackground<'api.v1.opportunity-parse'>(worker, { + opportunityId: testOpportunityId, + }); + + // Verify Brokkr was NOT called (early return) + const parseOpportunitySpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseOpportunity', + ); + expect(parseOpportunitySpy).not.toHaveBeenCalled(); + + // Verify GCS file was still cleaned up via finally block + expect(mockStorageExists).toHaveBeenCalled(); + expect(mockStorageDelete).toHaveBeenCalled(); + }); + + it('should clean up GCS file on Brokkr error', async () => { + // Spy on Brokkr parseOpportunity and make it fail + const parseOpportunitySpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseOpportunity', + ); + parseOpportunitySpy.mockRejectedValue(new Error('Brokkr parsing failed')); + + await con.getRepository(OpportunityJob).save({ + id: testOpportunityId, + type: OpportunityType.JOB, + state: OpportunityState.PARSING, + title: 'Processing...', + tldr: '', + content: new OpportunityContent({}), + flags: { + batchSize: 100, + file: { + blobName: testBlobName, + bucketName: RESUME_BUCKET_NAME, + mimeType: 'application/pdf', + extension: 'pdf', + trackingId: 'anon1', + }, + }, + }); + + await expectSuccessfulTypedBackground<'api.v1.opportunity-parse'>(worker, { + opportunityId: testOpportunityId, + }); + + // Verify error state was set + const opportunity = await con.getRepository(OpportunityJob).findOne({ + where: { id: testOpportunityId }, + }); + expect(opportunity!.state).toBe(OpportunityState.ERROR); + + // Verify GCS file was cleaned up via finally block despite error + expect(mockStorageExists).toHaveBeenCalled(); + expect(mockStorageDelete).toHaveBeenCalled(); + }); + + it('should handle missing opportunity', async () => { + // Spy on Brokkr parseOpportunity + const parseOpportunitySpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseOpportunity', + ); + + // Don't create opportunity - use valid UUID that doesn't exist + await expectSuccessfulTypedBackground<'api.v1.opportunity-parse'>(worker, { + opportunityId: '550e8400-e29b-41d4-a716-446655440099', + }); + + // Worker should log error and return gracefully + expect(parseOpportunitySpy).not.toHaveBeenCalled(); + }); + + it('should set ERROR state on GCS download failure', async () => { + mockStorageDownload.mockRejectedValue(new Error('GCS download failed')); + + await con.getRepository(OpportunityJob).save({ + id: testOpportunityId, + type: OpportunityType.JOB, + state: OpportunityState.PARSING, + title: 'Processing...', + tldr: '', + content: new OpportunityContent({}), + flags: { + batchSize: 100, + file: { + blobName: testBlobName, + bucketName: RESUME_BUCKET_NAME, + mimeType: 'application/pdf', + extension: 'pdf', + trackingId: 'anon1', + }, + }, + }); + + await expectSuccessfulTypedBackground<'api.v1.opportunity-parse'>(worker, { + opportunityId: testOpportunityId, + }); + + const opportunity = await con.getRepository(OpportunityJob).findOne({ + where: { id: testOpportunityId }, + }); + + expect(opportunity!.state).toBe(OpportunityState.ERROR); + expect(opportunity!.flags?.parseError).toContain('GCS download failed'); + }); + + it('should set ERROR state on missing flags.file data', async () => { + await con.getRepository(OpportunityJob).save({ + id: testOpportunityId, + type: OpportunityType.JOB, + state: OpportunityState.PARSING, + title: 'Processing...', + tldr: '', + content: new OpportunityContent({}), + flags: { + batchSize: 100, + // No file data + }, + }); + + await expectSuccessfulTypedBackground<'api.v1.opportunity-parse'>(worker, { + opportunityId: testOpportunityId, + }); + + const opportunity = await con.getRepository(OpportunityJob).findOne({ + where: { id: testOpportunityId }, + }); + + expect(opportunity!.state).toBe(OpportunityState.ERROR); + expect(opportunity!.flags?.parseError).toContain('Missing file data'); + }); + + it('should assign recruiter for authenticated user', async () => { + await con.getRepository(OpportunityJob).save({ + id: testOpportunityId, + type: OpportunityType.JOB, + state: OpportunityState.PARSING, + title: 'Processing...', + tldr: '', + content: new OpportunityContent({}), + flags: { + batchSize: 100, + file: { + blobName: testBlobName, + bucketName: RESUME_BUCKET_NAME, + mimeType: 'application/pdf', + extension: 'pdf', + userId: testUserId, + }, + }, + }); + + await expectSuccessfulTypedBackground<'api.v1.opportunity-parse'>(worker, { + opportunityId: testOpportunityId, + }); + + const recruiter = await con + .getRepository(OpportunityUserRecruiter) + .findOne({ + where: { opportunityId: testOpportunityId, userId: testUserId }, + }); + + expect(recruiter).toBeDefined(); + }); + + it('should link organization if user has one', async () => { + // Use the organization from the fixture + const existingOrgId = '550e8400-e29b-41d4-a716-446655440000'; + + // First create an opportunity with organization for the user + const existingOppId = '550e8400-e29b-41d4-a716-446655440001'; + await con.getRepository(OpportunityJob).save({ + id: existingOppId, + type: OpportunityType.JOB, + state: OpportunityState.LIVE, + title: 'Existing opportunity', + tldr: 'Test', + content: new OpportunityContent({}), + organizationId: existingOrgId, + flags: { batchSize: 100 }, + }); + + await con.getRepository(OpportunityUserRecruiter).save({ + opportunityId: existingOppId, + userId: testUserId, + }); + + // Now create new opportunity + await con.getRepository(OpportunityJob).save({ + id: testOpportunityId, + type: OpportunityType.JOB, + state: OpportunityState.PARSING, + title: 'Processing...', + tldr: '', + content: new OpportunityContent({}), + flags: { + batchSize: 100, + file: { + blobName: testBlobName, + bucketName: RESUME_BUCKET_NAME, + mimeType: 'application/pdf', + extension: 'pdf', + userId: testUserId, + }, + }, + }); + + await expectSuccessfulTypedBackground<'api.v1.opportunity-parse'>(worker, { + opportunityId: testOpportunityId, + }); + + const opportunity = await con.getRepository(OpportunityJob).findOne({ + where: { id: testOpportunityId }, + }); + + expect(opportunity!.organizationId).toBe(existingOrgId); + + // Cleanup + await con.getRepository(OpportunityJob).delete({ id: existingOppId }); + }); + + it('should handle anonymous user (trackingId only)', async () => { + await con.getRepository(OpportunityJob).save({ + id: testOpportunityId, + type: OpportunityType.JOB, + state: OpportunityState.PARSING, + title: 'Processing...', + tldr: '', + content: new OpportunityContent({}), + flags: { + batchSize: 100, + anonUserId: 'anon1', + file: { + blobName: testBlobName, + bucketName: RESUME_BUCKET_NAME, + mimeType: 'application/pdf', + extension: 'pdf', + trackingId: 'anon1', + }, + }, + }); + + await expectSuccessfulTypedBackground<'api.v1.opportunity-parse'>(worker, { + opportunityId: testOpportunityId, + }); + + const opportunity = await con.getRepository(OpportunityJob).findOne({ + where: { id: testOpportunityId }, + }); + + expect(opportunity!.state).toBe(OpportunityState.DRAFT); + expect(opportunity!.flags?.anonUserId).toBe('anon1'); + + // Verify no recruiter was assigned + const recruiter = await con + .getRepository(OpportunityUserRecruiter) + .findOne({ + where: { opportunityId: testOpportunityId }, + }); + expect(recruiter).toBeNull(); + }); + + it('should assign Europe as continent when no country specified', async () => { + // Create a custom mock with just continent: 'Europe' (no country) + const transport = createMockBrokkrTransport({ + opportunity: { + location: [ + new Location({ + continent: 'Europe', + type: LocationType.REMOTE, + }), + ], + }, + }); + + const serviceClient = { + instance: createClient(BrokkrService, transport), + garmr: createGarmrMock(), + }; + + jest + .spyOn(brokkrCommon, 'getBrokkrClient') + .mockImplementation((): ServiceClient => { + return serviceClient; + }); + + // Add Europe dataset location + await con.getRepository(DatasetLocation).save({ + continent: 'Europe', + }); + + await con.getRepository(OpportunityJob).save({ + id: testOpportunityId, + type: OpportunityType.JOB, + state: OpportunityState.PARSING, + title: 'Processing...', + tldr: '', + content: new OpportunityContent({}), + flags: { + batchSize: 100, + file: { + blobName: testBlobName, + bucketName: RESUME_BUCKET_NAME, + mimeType: 'application/pdf', + extension: 'pdf', + userId: testUserId, + }, + }, + }); + + await expectSuccessfulTypedBackground<'api.v1.opportunity-parse'>(worker, { + opportunityId: testOpportunityId, + }); + + // Verify location was assigned with Europe as continent + const locations = await con.getRepository(OpportunityLocation).find({ + where: { opportunityId: testOpportunityId }, + relations: ['location'], + }); + + expect(locations).toHaveLength(1); + expect(locations[0].type).toBe(LocationType.REMOTE); + + const locationData = await locations[0].location; + expect(locationData.continent).toBe('Europe'); + }); +}); diff --git a/package.json b/package.json index 1edf7203e7..63c7737e47 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,7 @@ "@connectrpc/connect-fastify": "^1.6.1", "@connectrpc/connect-node": "^1.6.1", "@dailydotdev/graphql-redis-subscriptions": "^2.4.3", - "@dailydotdev/schema": "0.2.65", + "@dailydotdev/schema": "0.2.66", "@dailydotdev/ts-ioredis-pool": "^1.0.2", "@fastify/cookie": "^11.0.2", "@fastify/cors": "^11.2.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0c48f81a02..550c2dab57 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -35,8 +35,8 @@ importers: specifier: ^2.4.3 version: 2.4.3(graphql-subscriptions@3.0.0(graphql@16.12.0)) '@dailydotdev/schema': - specifier: 0.2.65 - version: 0.2.65(@bufbuild/protobuf@1.10.0) + specifier: 0.2.66 + version: 0.2.66(@bufbuild/protobuf@1.10.0) '@dailydotdev/ts-ioredis-pool': specifier: ^1.0.2 version: 1.0.2 @@ -771,8 +771,8 @@ packages: peerDependencies: graphql-subscriptions: ^1.0.0 || ^2.0.0 - '@dailydotdev/schema@0.2.65': - resolution: {integrity: sha512-9Mil946IhS7TQGnXvKl+TJdEBnUvOjyjfYYKpGXTWhAhkNMp6lwcPYlaeG62qSiwAT9vr4/vond/IrOnU2gjTQ==} + '@dailydotdev/schema@0.2.66': + resolution: {integrity: sha512-txFsQYRd3uxpDRPGrwBMa5l0kOaTXXoD0dm51FgPjaCGE4UN0uqg8GxzU+BLHuaBPIYmJ4HEu7LEwlMHVUhySg==} peerDependencies: '@bufbuild/protobuf': 1.x @@ -5686,7 +5686,7 @@ snapshots: transitivePeerDependencies: - supports-color - '@dailydotdev/schema@0.2.65(@bufbuild/protobuf@1.10.0)': + '@dailydotdev/schema@0.2.66(@bufbuild/protobuf@1.10.0)': dependencies: '@bufbuild/protobuf': 1.10.0 diff --git a/src/common/opportunity/parse.ts b/src/common/opportunity/parse.ts index 1c94021648..0a992d3eae 100644 --- a/src/common/opportunity/parse.ts +++ b/src/common/opportunity/parse.ts @@ -23,7 +23,6 @@ import { markdown } from '../markdown'; import { OpportunityJob } from '../../entity/opportunities/OpportunityJob'; import { OpportunityLocation } from '../../entity/opportunities/OpportunityLocation'; import { OpportunityKeyword } from '../../entity/OpportunityKeyword'; -import { OpportunityUserRecruiter } from '../../entity/opportunities/user/OpportunityUserRecruiter'; import { findDatasetLocation } from '../../entity/dataset/utils'; import { addOpportunityDefaultQuestionFeedback } from './question'; import type { Opportunity } from '../../entity/opportunities/Opportunity'; @@ -172,10 +171,12 @@ function renderOpportunityMarkdownContent( * buffer: Buffer; * mime: string; * extension: string; + * opportunityId?: string; * }} { * buffer, * mime, * extension, + * opportunityId * } * @return {*} {Promise} */ @@ -183,12 +184,14 @@ export async function parseOpportunityWithBrokkr({ buffer, mime, extension, + opportunityId, }: { buffer: Buffer; mime: string; extension: string; + opportunityId?: string; }): Promise { - const filename = `job-opportunity-parse-${randomUUID()}.pdf`; + const filename = `job-opportunity-parse-${opportunityId || randomUUID()}.pdf`; const brokkrClient = getBrokkrClient(); @@ -265,10 +268,10 @@ export async function parseOpportunityWithBrokkr({ } /** - * Creates an opportunity and all related entities from parsed data + * Creates or updates an opportunity and all related entities from parsed data * * Handles: - * - Creating the opportunity record + * - Creating or updating the opportunity record * - Creating location relationships * - Creating keywords * - Adding default feedback questions @@ -277,11 +280,13 @@ export async function parseOpportunityWithBrokkr({ * * @param ctx - Context with database connection and user info * @param parsedData - The parsed opportunity data from Brokkr - * @returns The created opportunity + * @param opportunityId - Optional ID of existing opportunity to update (for async worker flow) + * @returns The created or updated opportunity */ export async function createOpportunityFromParsedData( ctx: ParseOpportunityContext, parsedData: ParsedOpportunityResult, + opportunityId?: string, ): Promise { const { opportunity: parsedOpportunity, content } = parsedData; const locationData = parsedOpportunity.location || []; @@ -321,6 +326,7 @@ export async function createOpportunityFromParsedData( const opportunity = await entityManager.getRepository(OpportunityJob).save( entityManager.getRepository(OpportunityJob).create({ + id: opportunityId, ...opportunityData, state: OpportunityState.DRAFT, content, @@ -355,15 +361,6 @@ export async function createOpportunityFromParsedData( ); } - if (ctx.userId) { - await entityManager.getRepository(OpportunityUserRecruiter).insert( - entityManager.getRepository(OpportunityUserRecruiter).create({ - opportunityId: opportunity.id, - userId: ctx.userId, - }), - ); - } - return opportunity; }); } diff --git a/src/common/typedPubsub.ts b/src/common/typedPubsub.ts index 90acbebe37..8e103baff1 100644 --- a/src/common/typedPubsub.ts +++ b/src/common/typedPubsub.ts @@ -250,6 +250,9 @@ export type PubSubSchema = { opportunityId: string; userId: string; }; + 'api.v1.opportunity-parse': { + opportunityId: string; + }; }; export async function triggerTypedEvent( diff --git a/src/entity/opportunities/Opportunity.ts b/src/entity/opportunities/Opportunity.ts index bae039ebfb..8080cf043f 100644 --- a/src/entity/opportunities/Opportunity.ts +++ b/src/entity/opportunities/Opportunity.ts @@ -34,6 +34,15 @@ export type OpportunityFlags = Partial<{ reminders: boolean | null; showSlack: boolean | null; showFeedback: boolean | null; + file: { + blobName: string; + bucketName: string; + mimeType: string; + extension: string; + userId?: string; + trackingId?: string; + } | null; + parseError: string | null; }>; export type OpportunityFlagsPublic = Pick< diff --git a/src/schema/opportunity.ts b/src/schema/opportunity.ts index 85dd05303f..c2b4beba8f 100644 --- a/src/schema/opportunity.ts +++ b/src/schema/opportunity.ts @@ -47,11 +47,15 @@ import { import { ConflictError, NotFoundError, PaymentRequiredError } from '../errors'; import { UserCandidateKeyword } from '../entity/user/UserCandidateKeyword'; import { User } from '../entity/user/User'; -import { EMPLOYMENT_AGREEMENT_BUCKET_NAME } from '../config'; +import { + EMPLOYMENT_AGREEMENT_BUCKET_NAME, + RESUME_BUCKET_NAME, +} from '../config'; import { deleteEmploymentAgreementByUserId, generateResumeSignedUrl, uploadEmploymentAgreementFromBuffer, + uploadResumeFromBuffer, } from '../common/googleCloud'; import { opportunityEditSchema, @@ -69,7 +73,7 @@ import { } from '../common/opportunity/accessControl'; import { sanitizeHtml } from '../common/markdown'; import { QuestionScreening } from '../entity/questions/QuestionScreening'; -import { In, Not, JsonContains, EntityManager } from 'typeorm'; +import { In, Not, JsonContains, EntityManager, DeepPartial } from 'typeorm'; import { Organization } from '../entity/Organization'; import { Source, SourceType } from '../entity/Source'; import { @@ -102,7 +106,6 @@ import { getOpportunityFileBuffer, validateOpportunityFileType, parseOpportunityWithBrokkr, - createOpportunityFromParsedData, updateOpportunityFromParsedData, handleOpportunityKeywordsUpdate, } from '../common/opportunity/parse'; @@ -112,6 +115,9 @@ import { mockPreviewSquadIds, } from '../mocks/opportunity/services'; import { notifyOpportunityFeedbackSubmitted } from '../common/opportunity/pubsub'; +import { triggerTypedEvent } from '../common/typedPubsub'; +import { randomUUID } from 'crypto'; +import { opportunityMatchBatchSize } from '../types'; export interface GQLOpportunity extends Pick< Opportunity, @@ -1374,6 +1380,13 @@ export const resolvers: IResolvers = traceResolvers< .parse(args); builder.queryBuilder.where({ state: validatedInput.state }); } + + builder.queryBuilder.andWhere({ + state: Not( + In([OpportunityState.ERROR, OpportunityState.PARSING]), + ), + }); + if (!ctx.isTeamMember) { builder.queryBuilder .innerJoin( @@ -1535,6 +1548,14 @@ export const resolvers: IResolvers = traceResolvers< }); } + if ( + [OpportunityState.PARSING, OpportunityState.ERROR].includes( + opportunity.state, + ) + ) { + throw new ConflictError('Opportunity is not ready for preview yet'); + } + const keywords = await opportunity.keywords; let opportunityPreview: OpportunityJob['flags']['preview'] = { @@ -2765,77 +2786,61 @@ export const resolvers: IResolvers = traceResolvers< throw new ValidationError('User identifier is required'); } - try { - const startTime = Date.now(); - let stepStart = startTime; + const parsedPayload = await parseOpportunitySchema.parseAsync(payload); + const { buffer, extension } = + await getOpportunityFileBuffer(parsedPayload); + const { mime } = await validateOpportunityFileType(buffer, extension); - const parsedPayload = await parseOpportunitySchema.parseAsync(payload); - ctx.log.info( - { durationMs: Date.now() - stepStart }, - 'parseOpportunity: payload schema validated', - ); + const fileName = `opportunity-${randomUUID()}.${extension}`; + await uploadResumeFromBuffer(fileName, buffer, { contentType: mime }); - stepStart = Date.now(); - const { buffer, extension } = - await getOpportunityFileBuffer(parsedPayload); - ctx.log.info( - { durationMs: Date.now() - stepStart, bufferSize: buffer.length }, - 'parseOpportunity: file buffer acquired', - ); + const flags: OpportunityJob['flags'] = { + batchSize: opportunityMatchBatchSize, + file: { + blobName: fileName, + bucketName: RESUME_BUCKET_NAME, + mimeType: mime, + extension, + userId: ctx.userId, + trackingId: ctx.trackingId, + }, + }; - stepStart = Date.now(); - const { mime } = await validateOpportunityFileType(buffer, extension); - ctx.log.info( - { durationMs: Date.now() - stepStart, mime }, - 'parseOpportunity: file type validated', - ); + if (!ctx.userId) { + flags.anonUserId = ctx.trackingId; + } - stepStart = Date.now(); - const parsedData = await parseOpportunityWithBrokkr({ - buffer, - mime, - extension, - }); - ctx.log.info( - { - durationMs: Date.now() - stepStart, - title: parsedData.opportunity.title, - }, - 'parseOpportunity: Brokkr parsing completed', + const opportunity = await ctx.con.transaction(async (entityManager) => { + const newOpportunity = await ctx.con.getRepository(OpportunityJob).save( + ctx.con.getRepository(OpportunityJob).create({ + state: OpportunityState.PARSING, + title: 'Processing...', + tldr: '', + content: new OpportunityContent({}).toJson(), + flags, + } as DeepPartial), ); - stepStart = Date.now(); - const opportunity = await createOpportunityFromParsedData( - { - con: ctx.con, - userId: ctx.userId, - trackingId: ctx.trackingId, - log: ctx.log, - }, - parsedData, - ); - ctx.log.info( - { durationMs: Date.now() - stepStart, opportunityId: opportunity.id }, - 'parseOpportunity: database records created', - ); + if (ctx.userId) { + await entityManager.getRepository(OpportunityUserRecruiter).insert( + entityManager.getRepository(OpportunityUserRecruiter).create({ + opportunityId: newOpportunity.id, + userId: ctx.userId, + }), + ); + } - const totalDurationMs = Date.now() - startTime; - ctx.log.info( - { totalDurationMs, opportunityId: opportunity.id }, - 'parseOpportunity: completed successfully', - ); + return newOpportunity; + }); - return graphorm.queryOneOrFail(ctx, info, (builder) => { - builder.queryBuilder.where({ id: opportunity.id }); - return builder; - }); - } catch (error) { - ctx.log.error( - { error }, - 'parseOpportunity: failed to parse opportunity', - ); - throw error; - } + await triggerTypedEvent(ctx.log, 'api.v1.opportunity-parse', { + opportunityId: opportunity.id, + }); + + return graphorm.queryOneOrFail(ctx, info, (builder) => { + builder.queryBuilder.where({ id: opportunity.id }); + return builder; + }); }, reimportOpportunity: async ( _, diff --git a/src/workers/index.ts b/src/workers/index.ts index 017c1bf5a5..f88fda545a 100644 --- a/src/workers/index.ts +++ b/src/workers/index.ts @@ -77,6 +77,7 @@ import recruiterRejectedCandidateMatchEmail from './recruiterRejectedCandidateMa import { opportunityPreviewResultWorker } from './opportunity/opportunityPreviewResult'; import opportunityInReviewSlack from './opportunityInReviewSlack'; import { parseOpportunityFeedbackWorker } from './opportunity/parseOpportunityFeedback'; +import { parseOpportunityWorker } from './opportunity/parseOpportunity'; export { Worker } from './worker'; @@ -157,6 +158,7 @@ export const typedWorkers: BaseTypedWorker[] = [ opportunityPreviewResultWorker, opportunityInReviewSlack, parseOpportunityFeedbackWorker, + parseOpportunityWorker, ]; export const personalizedDigestWorkers: Worker[] = [ diff --git a/src/workers/opportunity/parseOpportunity.ts b/src/workers/opportunity/parseOpportunity.ts new file mode 100644 index 0000000000..53d0faa5d8 --- /dev/null +++ b/src/workers/opportunity/parseOpportunity.ts @@ -0,0 +1,138 @@ +import { Storage } from '@google-cloud/storage'; +import { OpportunityState } from '@dailydotdev/schema'; +import type { TypedWorker } from '../worker'; +import { OpportunityJob } from '../../entity/opportunities/OpportunityJob'; +import { + parseOpportunityWithBrokkr, + createOpportunityFromParsedData, +} from '../../common/opportunity/parse'; +import { updateFlagsStatement } from '../../common'; +import { deleteBlobFromGCS } from '../../common/googleCloud'; +import z from 'zod'; +import { performance } from 'perf_hooks'; + +export const parseOpportunityWorker: TypedWorker<'api.v1.opportunity-parse'> = { + subscription: 'api.opportunity-parse', + handler: async ({ data }, con, logger) => { + const startMs = performance.now(); + const { opportunityId } = data; + + // Fetch opportunity early to extract file data for cleanup + const opportunity = await con.getRepository(OpportunityJob).findOne({ + where: { id: opportunityId }, + }); + + // Extract file data for cleanup in finally block + const fileData = opportunity?.flags?.file; + + try { + if (!opportunity) { + return; + } + + if (opportunity.state !== OpportunityState.PARSING) { + return; + } + + // Clear any previous parseError before processing + await con + .getRepository(OpportunityJob) + .update( + { id: opportunityId }, + { flags: updateFlagsStatement({ parseError: null }) }, + ); + + if (!fileData) { + await con.getRepository(OpportunityJob).update( + { id: opportunityId }, + { + state: OpportunityState.ERROR, + flags: updateFlagsStatement({ + parseError: 'Missing file data', + }), + }, + ); + + return; + } + + const { blobName, bucketName, mimeType, extension, userId, trackingId } = + fileData; + const storage = new Storage(); + const [buffer] = await storage + .bucket(bucketName) + .file(blobName) + .download(); + + logger.info( + { opportunityId, durationMs: performance.now() - startMs }, + 'parseOpportunity worker: GCS download completed', + ); + + const parsedData = await parseOpportunityWithBrokkr({ + buffer, + mime: mimeType, + extension, + opportunityId, + }); + + logger.info( + { opportunityId, durationMs: performance.now() - startMs }, + 'parseOpportunity worker: Brokkr parsing completed', + ); + + await createOpportunityFromParsedData( + { con, userId, trackingId, log: logger }, + parsedData, + opportunityId, + ); + + logger.info( + { opportunityId, durationMs: performance.now() - startMs }, + 'parseOpportunity worker: opportunity saved to DB', + ); + + await con + .getRepository(OpportunityJob) + .update( + { id: opportunityId }, + { flags: updateFlagsStatement({ file: null }) }, + ); + + logger.info( + { opportunityId, durationMs: performance.now() - startMs }, + 'parseOpportunity worker: completed', + ); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : 'Unknown error'; + + await con.getRepository(OpportunityJob).update( + { id: opportunityId }, + { + state: OpportunityState.ERROR, + flags: updateFlagsStatement({ + parseError: + error instanceof z.ZodError + ? z.prettifyError(error) + : errorMessage, + }), + }, + ); + + logger.error( + { opportunityId, error, durationMs: performance.now() - startMs }, + 'parseOpportunity worker: failed', + ); + } finally { + // Clean up GCS file if it exists (regardless of success/failure/early return) + if (fileData?.blobName && fileData?.bucketName) { + await deleteBlobFromGCS({ + blobName: fileData.blobName, + bucketName: fileData.bucketName, + logger, + }); + } + } + }, +};