From d2cf8263b301f41dfe0e5f10c7827c01331fc0bf Mon Sep 17 00:00:00 2001 From: Ido Shamun Date: Sun, 7 Dec 2025 14:01:01 +0200 Subject: [PATCH 1/3] docs: update AGENTS.md --- AGENTS.md | 30 ++++++++--- src/cron/AGENTS.md | 55 +++++++++++-------- src/graphorm/AGENTS.md | 69 +++++++++++++++++++++--- src/workers/AGENTS.md | 120 +++++++++++++++++++++++++++++++---------- 4 files changed, 208 insertions(+), 66 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 12027be121..569f654ca9 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -2,10 +2,15 @@ This file provides guidance to coding agents when working with code in this repository. +## Prerequisites + +- **Node.js**: 22.16.0 (managed via Volta) +- **Package Manager**: pnpm 9.14.4 + ## Essential Commands **Development:** -- `pnpm run dev` - Start API server with hot reload on port 5000 +- `pnpm run dev` - Start API server with hot reload on port 3000 - `pnpm run dev:background` - Start background processor - `pnpm run dev:temporal-worker` - Start Temporal worker - `pnpm run dev:temporal-server` - Start Temporal server for local development @@ -35,15 +40,17 @@ This file provides guidance to coding agents when working with code in this repo - **Mercurius** - GraphQL server with caching, upload support, and subscriptions - **TypeORM** - Database ORM with entity-based modeling and migrations - **PostgreSQL** - Primary database with master/slave replication setup. Favor read replica when you're ok with eventually consistent data. -- **Redis** - Caching and pub/sub via ioRedisPool +- **Redis** - Caching and pub/sub via `@dailydotdev/ts-ioredis-pool` - **Temporal** - Workflow orchestration for background jobs - **ClickHouse** - Analytics and metrics storage **Application Entry Points:** - `src/index.ts` - Main Fastify server setup with GraphQL, auth, and middleware -- `bin/cli.ts` - CLI dispatcher supporting api, background, temporal, cron modes +- `bin/cli.ts` - CLI dispatcher supporting api, background, temporal, cron, personalized-digest modes - `src/background.ts` - Pub/Sub message handlers and background processing - `src/cron.ts` - Scheduled task execution +- `src/temporal/` - Temporal workflow definitions and workers +- `src/commands/` - Standalone command implementations (e.g., personalized digest) **GraphQL Schema Organization:** - `src/graphql.ts` - Combines all schema modules with transformers and directives @@ -71,11 +78,12 @@ This file provides guidance to coding agents when working with code in this repo **Business Domains:** - **Content**: Posts, comments, bookmarks, feeds, sources -- **Users**: Authentication, preferences, profiles, user experience -- **Organizations**: Squad management, member roles, campaigns +- **Users**: Authentication, preferences, profiles, user experience, streaks +- **Squads**: Squad management, member roles, public requests +- **Organizations**: Organization management, campaigns - **Notifications**: Push notifications, email digests, alerts -- **Monetization**: Paddle subscription management, premium features -- **Squads**: Squad management, member roles, campaigns +- **Monetization**: Paddle subscription management, premium features, cores/transactions +- **Opportunities**: Job matching, recruiter features, candidate profiles **Testing Strategy:** - Jest with supertest for integration testing @@ -88,4 +96,10 @@ This file provides guidance to coding agents when working with code in this repo - GrowthBook for feature flags and A/B testing - OneSignal for push notifications - Temporal workflows for async job processing -- Rate limiting and caching at multiple layers \ No newline at end of file +- Rate limiting and caching at multiple layers + +**Infrastructure as Code:** +- `.infra/` - Pulumi configuration for deployment +- `.infra/crons.ts` - Cron job schedules and resource limits +- `.infra/common.ts` - Worker subscription definitions +- `.infra/index.ts` - Main Pulumi deployment configuration \ No newline at end of file diff --git a/src/cron/AGENTS.md b/src/cron/AGENTS.md index f084b462cd..129672f6dd 100644 --- a/src/cron/AGENTS.md +++ b/src/cron/AGENTS.md @@ -56,7 +56,7 @@ Create a new file in `src/cron/` directory following this pattern: import { Cron } from './cron'; import { YourEntity } from '../entity'; -const cron: Cron = { +export const yourCronName: Cron = { name: 'your-cron-name', // Must match name in .infra/crons.ts handler: async (con, logger, pubsub) => { // Your cron logic here @@ -75,8 +75,6 @@ const cron: Cron = { logger.info({ count: results.length }, 'Cron job completed'); }, }; - -export default cron; ``` ### Step 2: Register in Index @@ -84,11 +82,11 @@ export default cron; Add your cron to `src/cron/index.ts`: ```typescript -import yourCron from './yourCron'; +import { yourCronName } from './yourCronName'; export const crons: Cron[] = [ // ... existing crons - yourCron, + yourCronName, ]; ``` @@ -202,21 +200,32 @@ handler: async (con, logger, pubsub) => { ### 6. Testing -Create tests in `__tests__/cron/` directory: +Create tests in `__tests__/cron/` directory. Tests use a real database connection (reset before each test run): ```typescript -import cron from '../../src/cron/yourCron'; +import { yourCronName } from '../../src/cron/yourCronName'; +import { expectSuccessfulCron, saveFixtures } from '../helpers'; +import { YourEntity } from '../../src/entity'; +import { DataSource } from 'typeorm'; +import createOrGetConnection from '../../src/db'; -describe('yourCron', () => { - it('should execute successfully', async () => { - const mockCon = createMockConnection(); - const mockLogger = createMockLogger(); - const mockPubsub = createMockPubsub(); - - await cron.handler(mockCon, mockLogger, mockPubsub); - - // Assertions - }); +let con: DataSource; + +beforeAll(async () => { + con = await createOrGetConnection(); +}); + +beforeEach(async () => { + // Set up test fixtures + await saveFixtures(con, YourEntity, yourFixtures); +}); + +it('should execute successfully', async () => { + await expectSuccessfulCron(yourCronName); + + // Verify database state after cron execution + const results = await con.getRepository(YourEntity).find(); + expect(results).toHaveLength(expectedLength); }); ``` @@ -225,7 +234,7 @@ describe('yourCron', () => { ### Incremental Processing with Checkpoints ```typescript -const cron: Cron = { +export const incrementalUpdate: Cron = { name: 'incremental-update', handler: async (con) => { const checkpointKey = 'last_incremental_update'; @@ -290,13 +299,13 @@ handler: async (con, logger) => { ### Kubernetes CronJobs -Crons are deployed as Kubernetes CronJobs via Pulumi: +Crons are deployed as Kubernetes CronJobs via Pulumi (see `.infra/index.ts` lines 577-591): - **Command**: `['dumb-init', 'node', 'bin/cli', 'cron', '']` -- **Spot Instances**: Enabled by default (70% weight) for cost optimization -- **Default Limits**: Memory and CPU limits can be overridden per cron -- **Default Deadline**: 300 seconds (5 minutes), configurable per cron -- **Adhoc Environments**: Crons are disabled in adhoc environments (see `.infra/index.ts`) +- **Spot Instances**: Enabled by default for all crons +- **Default Limits**: Uses background worker limits (`512Mi` memory) unless overridden per cron +- **Default Deadline**: 300 seconds (5 minutes), configurable per cron via `activeDeadlineSeconds` +- **Adhoc Environments**: Crons are disabled in adhoc environments ### Resource Configuration diff --git a/src/graphorm/AGENTS.md b/src/graphorm/AGENTS.md index 9fedcd760a..20728df0c8 100644 --- a/src/graphorm/AGENTS.md +++ b/src/graphorm/AGENTS.md @@ -224,7 +224,9 @@ export const resolvers = { }; ``` -### Query with Custom Filtering +### Query with Custom Filtering and Read Replica + +The optional fourth parameter enables read replica routing for eventually consistent reads: ```typescript export const resolvers = { @@ -239,7 +241,7 @@ export const resolvers = { .limit(1); return builder; }, - true // Use read replica + true // readReplica: Use read replica for better performance ); } } @@ -265,6 +267,42 @@ export const resolvers = { }; ``` +### Query by Hierarchy + +Use `queryByHierarchy` when you need to query a nested field from the resolve tree: + +```typescript +export const resolvers = { + Query: { + searchPosts: async (_, args, ctx: Context, info) => { + return graphorm.queryByHierarchy( + ctx, + info, + ['posts', 'edges', 'node'], // Path to the nested field + (builder) => { + builder.queryBuilder.where(`${builder.alias}.visible = true`); + return builder; + }, + true // readReplica + ); + } + } +}; +``` + +### Query Paginated Integration + +Use `queryPaginatedIntegration` for non-database data (e.g., external APIs) while still returning Relay-style pagination: + +```typescript +const results = await graphorm.queryPaginatedIntegration( + (nodeSize) => false, // hasPreviousPage + (nodeSize) => nodeSize >= limit, // hasNextPage + (node, index) => base64(`cursor:${index}`), // nodeToCursor + async () => fetchFromExternalAPI(args), // fetchData callback +); +``` + ## Configuration Patterns ### Required Columns @@ -354,11 +392,12 @@ fields: { ## Best Practices -### 1. Always Use `beforeQuery` for Filtering +### 1. Always Use the Query Builder Callback for Filtering -Don't fetch all data and filter in JavaScript. Filter at the database level: +Don't fetch all data and filter in JavaScript. Filter at the database level using the builder callback parameter: ```typescript +// The callback receives { queryBuilder, alias } and must return the modified builder (builder) => { builder.queryBuilder .andWhere(`${builder.alias}."userId" = :userId`, { userId: ctx.userId }) @@ -395,10 +434,13 @@ Always paginate lists to avoid fetching too much data: graphorm.queryPaginated( ctx, info, - hasPreviousPage, - hasNextPage, - nodeToCursor, - beforeQuery + (nodeSize) => hasPreviousPage(nodeSize), + (nodeSize) => hasNextPage(nodeSize), + (node, index) => nodeToCursor(node, index), + (builder) => { + builder.queryBuilder.limit(limit).offset(offset); + return builder; + } ); ``` @@ -450,6 +492,17 @@ requiredColumns: [ 4. **Required Columns**: Only add truly required columns to avoid unnecessary data fetching 5. **Transform Functions**: Keep transforms lightweight - avoid database calls in transforms +## Available Methods + +| Method | Description | Returns | +|--------|-------------|---------| +| `query()` | Query multiple results | `Promise` | +| `queryOne()` | Query single result or null | `Promise` | +| `queryOneOrFail()` | Query single result or throw | `Promise` | +| `queryPaginated()` | Query with Relay pagination | `Promise>` | +| `queryByHierarchy()` | Query nested field from resolve tree | `Promise` | +| `queryPaginatedIntegration()` | Paginate non-DB data | `Promise>` | + ## Related Files - **Core Implementation**: `src/graphorm/graphorm.ts` diff --git a/src/workers/AGENTS.md b/src/workers/AGENTS.md index 0f1326e1a0..bd2e77f6f5 100644 --- a/src/workers/AGENTS.md +++ b/src/workers/AGENTS.md @@ -51,10 +51,12 @@ import { TypedWorker } from './worker'; const worker: TypedWorker<'post-upvoted'> = { subscription: 'post-upvoted-rep', - handler: async ({ data }, con, logger): Promise => { - // data is automatically typed as PubSubSchema['post-upvoted'] + handler: async (message, con, logger): Promise => { + // message.data is automatically typed as PubSubSchema['post-upvoted'] + const { data, messageId } = message; const { postId, userId } = data; // Process with full type safety + logger.info({ postId, userId, messageId }, 'Processing post upvote'); }, }; ``` @@ -69,8 +71,9 @@ import { MatchedCandidate } from '@dailydotdev/schema'; const worker: TypedWorker<'gondul.v1.candidate-opportunity-match'> = { subscription: 'api.store-candidate-opportunity-match', - handler: async ({ data }, con): Promise => { - // data is typed as MatchedCandidate + handler: async (message, con): Promise => { + // message.data is typed as MatchedCandidate + const { data } = message; }, parseMessage: (message) => { return MatchedCandidate.fromBinary(message.data); @@ -78,6 +81,40 @@ const worker: TypedWorker<'gondul.v1.candidate-opportunity-match'> = { }; ``` +### Typed Notification Workers (`TypedNotificationWorker`) + +For notification-specific workers that return `NotificationHandlerReturn`: + +```typescript +import { TypedNotificationWorker } from './worker'; + +const worker: TypedNotificationWorker<'post-upvoted'> = { + subscription: 'api.article-upvote-milestone-notification', + handler: async (data, con, logger) => { + // Returns NotificationHandlerReturn for notification processing + return { type: NotificationType.ArticleUpvoteMilestone, ... }; + }, +}; +``` + +### Experiment Workers (`ExperimentWorker`) + +For workers that need access to the GrowthBook experiment allocation client: + +```typescript +import { ExperimentWorker, workerToExperimentWorker } from './worker'; + +const worker: ExperimentWorker = { + subscription: 'api.experiment-allocated', + handler: async (message, con, logger, pubsub, experimentAllocationClient) => { + // experimentAllocationClient available for A/B test tracking + }, +}; + +// Wrap with workerToExperimentWorker to inject the client +export default workerToExperimentWorker(worker); +``` + ## Creating a New Worker ### Step 1: Define the Message Schema @@ -103,14 +140,15 @@ import { TypedWorker } from './worker'; const worker: TypedWorker<'my-new-topic'> = { subscription: 'my-new-subscription', - handler: async ({ data }, con, logger): Promise => { + handler: async (message, con, logger): Promise => { + const { data, messageId } = message; const { userId, action } = data; try { // Your worker logic here - logger.info({ userId, action }, 'Processing worker'); + logger.info({ userId, action, messageId }, 'Processing worker'); } catch (err) { - logger.error({ err, data }, 'Worker failed'); + logger.error({ err, data, messageId }, 'Worker failed'); throw err; // Re-throw to trigger nack } }, @@ -121,15 +159,25 @@ export default worker; ### Step 3: Register the Worker -Add your worker to `src/workers/index.ts`: +Add your worker to `src/workers/index.ts`. There are three worker arrays: + +1. **`typedWorkers`** - TypedWorker instances (preferred for new workers) +2. **`workers`** - Legacy Worker instances (still used for CDC and some older workers) +3. **`personalizedDigestWorkers`** - Separate array for digest workers (run in dedicated process) ```typescript import myNewWorker from './myNewWorker'; +// For TypedWorker instances (recommended) export const typedWorkers: BaseTypedWorker[] = [ // ... existing workers myNewWorker, ]; + +// For legacy Worker instances +export const workers: Worker[] = [ + // ... existing workers +]; ``` ### Step 4: Add Infrastructure Configuration @@ -270,7 +318,7 @@ handler: async (message, con, logger): Promise => { ### Pulumi Setup -Workers are configured in `.infra/common.ts` and `.infra/workers.ts`. The Pulumi infrastructure: +Workers are configured in `.infra/common.ts`. The Pulumi infrastructure: 1. Creates subscriptions (topics are managed in a separate infrastructure repository) 2. Configures subscription settings (ack deadline, dead letter, etc.) @@ -313,36 +361,54 @@ For critical workers, configure a dead letter queue: ## Testing Workers -### Unit Testing +### Integration Testing -Test workers in isolation: +Tests use a real database connection (reset before each test run). Create tests in `__tests__/workers/`: ```typescript -import worker from './myWorker'; -import { createMockDataSource } from '../testHelpers'; +import { expectSuccessfulTypedBackground, saveFixtures } from '../helpers'; +import worker from '../../src/workers/myWorker'; +import { typedWorkers } from '../../src/workers'; +import { YourEntity } from '../../src/entity'; +import { DataSource } from 'typeorm'; +import createOrGetConnection from '../../src/db'; + +let con: DataSource; + +beforeAll(async () => { + con = await createOrGetConnection(); +}); describe('myWorker', () => { - it('processes messages correctly', async () => { - const con = createMockDataSource(); - const logger = createMockLogger(); - const pubsub = createMockPubSub(); - - const message = { - messageId: '123', - data: Buffer.from(JSON.stringify({ userId: '456', action: 'test' })), - }; - - await worker.handler(message, con, logger, pubsub); + beforeEach(async () => { + jest.resetAllMocks(); + await saveFixtures(con, YourEntity, yourFixtures); + }); + + it('should be registered', () => { + const registeredWorker = typedWorkers.find( + (item) => item.subscription === worker.subscription, + ); + expect(registeredWorker).toBeDefined(); + }); + + it('should process messages correctly', async () => { + await expectSuccessfulTypedBackground(worker, { + userId: '123', + action: 'test', + }); - // Assertions + // Verify database state + const result = await con.getRepository(YourEntity).findOneBy({ ... }); + expect(result).toBeDefined(); }); }); ``` ## Best Practices -1. **Always Use Typed Workers**: Use `TypedWorker` for all new workers - regular `Worker` interface is deprecated -2. **Use `triggerTypedEvent`**: Always use `triggerTypedEvent` to publish messages (not the deprecated `publishEvent`) +1. **Use Typed Workers for New Workers**: Use `TypedWorker` for all new workers. The `Worker` interface is still used for CDC workers and some legacy workers. +2. **Use `triggerTypedEvent`**: Always use `triggerTypedEvent` to publish typed messages. Use the helper functions in `src/common/pubsub.ts` for specific event types. 3. **Idempotency**: Design workers to handle duplicate messages gracefully 4. **Logging**: Log with context (messageId, relevant data) 5. **Error Handling**: Re-throw errors to trigger retries, but log appropriately From fe655e06745c35533ba66f7ae6f5a939bfa2fb36 Mon Sep 17 00:00:00 2001 From: Ido Shamun Date: Tue, 9 Dec 2025 09:57:29 +0200 Subject: [PATCH 2/3] Update src/graphorm/AGENTS.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Ante Barić --- src/graphorm/AGENTS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/graphorm/AGENTS.md b/src/graphorm/AGENTS.md index 20728df0c8..d2554e1f4e 100644 --- a/src/graphorm/AGENTS.md +++ b/src/graphorm/AGENTS.md @@ -241,7 +241,7 @@ export const resolvers = { .limit(1); return builder; }, - true // readReplica: Use read replica for better performance + true // for better performance for read queries are required, take into account potential replication lag if doing reads right after writes ); } } From 347aede810cb5dddf302aef1f68ca90f3f21c5bd Mon Sep 17 00:00:00 2001 From: Ido Shamun Date: Tue, 9 Dec 2025 11:59:52 +0200 Subject: [PATCH 3/3] docs: adjust agents.md --- src/workers/AGENTS.md | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/src/workers/AGENTS.md b/src/workers/AGENTS.md index bd2e77f6f5..0451f825e2 100644 --- a/src/workers/AGENTS.md +++ b/src/workers/AGENTS.md @@ -34,6 +34,8 @@ Use background workers when: ## Worker Types +> **Note**: Only `TypedWorker` and its variants are supported. The legacy `Worker` interface is deprecated and should not be used for new workers. + ### Typed Workers (`TypedWorker`) **Standard approach** for all new workers. Uses the `PubSubSchema` type system to ensure message structure matches expectations and provides full type safety. @@ -63,7 +65,7 @@ const worker: TypedWorker<'post-upvoted'> = { ### Typed Workers with Protobuf -For Protobuf messages (from external services), you need to provide a `parseMessage` function: +For Protobuf messages (from `@dailydotdev/schema` or external services), you need to provide a `parseMessage` function: ```typescript import { TypedWorker } from './worker'; @@ -108,6 +110,10 @@ const worker: ExperimentWorker = { subscription: 'api.experiment-allocated', handler: async (message, con, logger, pubsub, experimentAllocationClient) => { // experimentAllocationClient available for A/B test tracking + + // IMPORTANT: Always call waitForSend() before exiting to ensure + // allocations are sent to GrowthBook + await experimentAllocationClient.waitForSend(); }, }; @@ -159,25 +165,18 @@ export default worker; ### Step 3: Register the Worker -Add your worker to `src/workers/index.ts`. There are three worker arrays: +Add your worker to `src/workers/index.ts`. There are two worker arrays: -1. **`typedWorkers`** - TypedWorker instances (preferred for new workers) -2. **`workers`** - Legacy Worker instances (still used for CDC and some older workers) -3. **`personalizedDigestWorkers`** - Separate array for digest workers (run in dedicated process) +1. **`typedWorkers`** - TypedWorker instances (standard for all workers) +2. **`personalizedDigestWorkers`** - Separate array for digest workers (run in dedicated process) ```typescript import myNewWorker from './myNewWorker'; -// For TypedWorker instances (recommended) export const typedWorkers: BaseTypedWorker[] = [ // ... existing workers myNewWorker, ]; - -// For legacy Worker instances -export const workers: Worker[] = [ - // ... existing workers -]; ``` ### Step 4: Add Infrastructure Configuration @@ -228,14 +227,13 @@ CDC allows us to react to database changes without distributed transactions. Deb ### CDC Worker Example ```typescript -import { Worker, messageToJson } from './worker'; -import { ChangeMessage } from '../types'; +import { TypedWorker } from './worker'; -const worker: Worker = { +const worker: TypedWorker<'api.changes'> = { subscription: 'api-cdc', maxMessages: 20, // Process multiple messages at once handler: async (message, con, logger): Promise => { - const data: ChangeMessage = messageToJson(message); + const { data } = message; // Skip heartbeat and read operations if ( @@ -363,7 +361,7 @@ For critical workers, configure a dead letter queue: ### Integration Testing -Tests use a real database connection (reset before each test run). Create tests in `__tests__/workers/`: +Tests use a real database connection (data is cleared after each test run). Create tests in `__tests__/workers/`: ```typescript import { expectSuccessfulTypedBackground, saveFixtures } from '../helpers'; @@ -407,7 +405,7 @@ describe('myWorker', () => { ## Best Practices -1. **Use Typed Workers for New Workers**: Use `TypedWorker` for all new workers. The `Worker` interface is still used for CDC workers and some legacy workers. +1. **Use Typed Workers**: Use `TypedWorker` for all workers. This ensures type safety and consistency across the codebase. 2. **Use `triggerTypedEvent`**: Always use `triggerTypedEvent` to publish typed messages. Use the helper functions in `src/common/pubsub.ts` for specific event types. 3. **Idempotency**: Design workers to handle duplicate messages gracefully 4. **Logging**: Log with context (messageId, relevant data) @@ -436,7 +434,7 @@ describe('myWorker', () => { 1. Ensure message type is in `PubSubSchema` 2. Verify `TypedWorker` generic matches topic name -3. Check `parseMessage` for Protobuf messages +3. For Protobuf messages (`@dailydotdev/schema`), ensure `parseMessage` is defined - it's required and the linter will warn if missing ## Related Documentation