diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000000..12027be121 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,91 @@ +# Agents.md + +This file provides guidance to coding agents when working with code in this repository. + +## Essential Commands + +**Development:** +- `pnpm run dev` - Start API server with hot reload on port 5000 +- `pnpm run dev:background` - Start background processor +- `pnpm run dev:temporal-worker` - Start Temporal worker +- `pnpm run dev:temporal-server` - Start Temporal server for local development + +**Database:** +- `pnpm run db:migrate:latest` - Apply latest migrations +- `pnpm run db:migrate:reset` - Drop schema and rerun migrations +- `pnpm run db:seed:import` - Import seed data for local development +- `pnpm run db:migrate:make` - Generate new migration based on entity changes +- `pnpm run db:migrate:create` - Create empty migration file + +**Building & Testing:** +- `pnpm run build` - Compile TypeScript to build directory +- `pnpm run lint` - Run ESLint (max 0 warnings) +- `pnpm run test` - Run full test suite with database reset +- `pnpm run cli` - Run CLI commands (e.g., `pnpm run cli api`) + +**Individual Test Execution:** +- Remember to run only individual tests when possible for faster feedback +- `NODE_ENV=test npx jest __tests__/specific-test.ts --testEnvironment=node --runInBand` +- Use `--testEnvironment=node --runInBand` flags for database-dependent tests + +## High-Level Architecture + +**Core Framework Stack:** +- **Fastify** - Web framework with plugins for CORS, helmet, cookies, rate limiting +- **Mercurius** - GraphQL server with caching, upload support, and subscriptions +- **TypeORM** - Database ORM with entity-based modeling and migrations +- **PostgreSQL** - Primary database with master/slave replication setup. Favor read replica when you're ok with eventually consistent data. +- **Redis** - Caching and pub/sub via ioRedisPool +- **Temporal** - Workflow orchestration for background jobs +- **ClickHouse** - Analytics and metrics storage + +**Application Entry Points:** +- `src/index.ts` - Main Fastify server setup with GraphQL, auth, and middleware +- `bin/cli.ts` - CLI dispatcher supporting api, background, temporal, cron modes +- `src/background.ts` - Pub/Sub message handlers and background processing +- `src/cron.ts` - Scheduled task execution + +**GraphQL Schema Organization:** +- `src/graphql.ts` - Combines all schema modules with transformers and directives +- `src/schema/` - GraphQL resolvers organized by domain (posts, users, feeds, etc.) +- `src/directive/` - Custom GraphQL directives for auth, rate limiting, URL processing +- **Docs**: See `src/graphorm/AGENTS.md` for comprehensive guide on using GraphORM to solve N+1 queries. GraphORM is the default and preferred method for all GraphQL query responses. Use GraphORM instead of TypeORM repositories for GraphQL resolvers to prevent N+1 queries and enforce best practices. + +**Data Layer:** +- `src/entity/` - TypeORM entities defining database schema +- `src/migration/` - Database migrations for schema evolution +- `src/data-source.ts` - Database connection with replication configuration + +**Core Services:** +- `src/Context.ts` - Request context with user, permissions, and data loaders +- `src/auth.ts` - Authentication middleware and user context resolution +- `src/dataLoaderService.ts` - Efficient batch loading for related entities +- `src/workers/` - Use workers for async, non-critical operations (notifications, reputation, external syncs). Prefer `TypedWorker` for type safety. Architecture uses Google Pub/Sub + CDC (Debezium) for reactive processing. See `src/workers/AGENTS.md` for more. +- `src/integrations/` - External service integrations (Slack, SendGrid, etc.) +- `src/cron/` - Scheduled cron jobs for maintenance and periodic tasks. One file per cron, registered in `index.ts`, deployed via `.infra/crons.ts` Pulumi config. Each cron exports a `Cron` object with `name` and `handler(DataSource, logger, pubsub)`. Run locally with `pnpm run cli cron `. See `src/cron/AGENTS.md` for more. + +**Type Safety & Validation:** +- We favor type safety throughout the codebase. Use TypeScript interfaces and types for compile-time type checking. +- **Zod schemas** are preferred for runtime validation, especially for input validation, API boundaries, and data parsing. Zod provides both type inference and runtime validation, making it ideal for verifying user input, API payloads, and external data sources. +- When possible, prefer Zod schemas over manual validation as they provide type safety, better error messages, and can be inferred to TypeScript types. + +**Business Domains:** +- **Content**: Posts, comments, bookmarks, feeds, sources +- **Users**: Authentication, preferences, profiles, user experience +- **Organizations**: Squad management, member roles, campaigns +- **Notifications**: Push notifications, email digests, alerts +- **Monetization**: Paddle subscription management, premium features +- **Squads**: Squad management, member roles, campaigns + +**Testing Strategy:** +- Jest with supertest for integration testing +- Database reset before each test run via pretest hook +- Fixtures in `__tests__/fixture/` for test data +- Mercurius integration testing for GraphQL endpoints + +**Infrastructure Concerns:** +- OpenTelemetry for distributed tracing and metrics +- GrowthBook for feature flags and A/B testing +- OneSignal for push notifications +- Temporal workflows for async job processing +- Rate limiting and caching at multiple layers \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md deleted file mode 100644 index 7fe01db783..0000000000 --- a/CLAUDE.md +++ /dev/null @@ -1,85 +0,0 @@ -# CLAUDE.md - -This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. - -## Essential Commands - -**Development:** -- `pnpm run dev` - Start API server with hot reload on port 5000 -- `pnpm run dev:background` - Start background processor -- `pnpm run dev:temporal-worker` - Start Temporal worker -- `pnpm run dev:temporal-server` - Start Temporal server for local development - -**Database:** -- `pnpm run db:migrate:latest` - Apply latest migrations -- `pnpm run db:migrate:reset` - Drop schema and rerun migrations -- `pnpm run db:seed:import` - Import seed data for local development -- `pnpm run db:migrate:make` - Generate new migration based on entity changes -- `pnpm run db:migrate:create` - Create empty migration file - -**Building & Testing:** -- `pnpm run build` - Compile TypeScript to build directory -- `pnpm run lint` - Run ESLint (max 0 warnings) -- `pnpm run test` - Run full test suite with database reset -- `pnpm run cli` - Run CLI commands (e.g., `pnpm run cli api`) - -**Individual Test Execution:** -- Remember to run only individual tests when possible for faster feedback -- `NODE_ENV=test npx jest __tests__/specific-test.ts --testEnvironment=node --runInBand` -- Use `--testEnvironment=node --runInBand` flags for database-dependent tests - -## High-Level Architecture - -**Core Framework Stack:** -- **Fastify** - Web framework with plugins for CORS, helmet, cookies, rate limiting -- **Mercurius** - GraphQL server with caching, upload support, and subscriptions -- **TypeORM** - Database ORM with entity-based modeling and migrations -- **PostgreSQL** - Primary database with master/slave replication setup -- **Redis** - Caching and pub/sub via ioRedisPool -- **Temporal** - Workflow orchestration for background jobs -- **ClickHouse** - Analytics and metrics storage - -**Application Entry Points:** -- `src/index.ts` - Main Fastify server setup with GraphQL, auth, and middleware -- `bin/cli.ts` - CLI dispatcher supporting api, background, temporal, cron modes -- `src/background.ts` - Pub/Sub message handlers and background processing -- `src/cron.ts` - Scheduled task execution - -**GraphQL Schema Organization:** -- `src/graphql.ts` - Combines all schema modules with transformers and directives -- `src/schema/` - GraphQL resolvers organized by domain (posts, users, feeds, etc.) -- `src/directive/` - Custom GraphQL directives for auth, rate limiting, URL processing -- Schema uses custom GraphORM patterns for efficient database loading - -**Data Layer:** -- `src/entity/` - TypeORM entities defining database schema -- `src/migration/` - Database migrations for schema evolution -- `src/data-source.ts` - Database connection with replication configuration -- Uses repository pattern with DataLoader for N+1 query optimization - -**Core Services:** -- `src/Context.ts` - Request context with user, permissions, and data loaders -- `src/auth.ts` - Authentication middleware and user context resolution -- `src/dataLoaderService.ts` - Efficient batch loading for related entities -- `src/workers/` - Pub/Sub message handlers organized by domain -- `src/integrations/` - External service integrations (Slack, SendGrid, etc.) - -**Business Domains:** -- **Content**: Posts, comments, bookmarks, feeds, sources -- **Users**: Authentication, preferences, profiles, user experience -- **Organizations**: Squad management, member roles, campaigns -- **Notifications**: Push notifications, email digests, alerts -- **Monetization**: Paddle subscription management, premium features - -**Testing Strategy:** -- Jest with supertest for integration testing -- Database reset before each test run via pretest hook -- Fixtures in `__tests__/fixture/` for test data -- Mercurius integration testing for GraphQL endpoints - -**Infrastructure Concerns:** -- OpenTelemetry for distributed tracing and metrics -- GrowthBook for feature flags and A/B testing -- OneSignal for push notifications -- Temporal workflows for async job processing -- Rate limiting and caching at multiple layers \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 120000 index 0000000000..47dc3e3d86 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +AGENTS.md \ No newline at end of file diff --git a/src/cron/AGENTS.md b/src/cron/AGENTS.md new file mode 100644 index 0000000000..f084b462cd --- /dev/null +++ b/src/cron/AGENTS.md @@ -0,0 +1,368 @@ +# Cron Jobs Guide + +This document describes the cron job architecture and how to work with scheduled tasks in this codebase. + +## Overview + +Cron jobs are scheduled tasks that run at specific intervals to perform maintenance, data processing, and automated workflows. The architecture follows a simple pattern: **one file per cron job**, with all crons registered in a central index file and deployed via Pulumi infrastructure configuration. + +## Architecture + +### File Structure + +``` +src/cron/ +├── index.ts # Central registry of all cron jobs +├── cron.ts # Cron interface definition +├── updateViews.ts # Example: individual cron file +├── personalizedDigest.ts # Example: individual cron file +└── ... # Other cron files +``` + +### Key Components + +1. **Individual Cron Files** (`src/cron/*.ts`) + - Each cron job is a separate TypeScript file + - Exports a `Cron` object with `name` and `handler` function + - Handler receives: `DataSource` (TypeORM), `FastifyLoggerInstance`, and `PubSub` + +2. **Cron Registry** (`src/cron/index.ts`) + - Imports all individual cron files + - Exports a `crons` array containing all `Cron` objects + - Used by the execution entry point to find and run crons + +3. **Execution Entry Point** (`src/cron.ts`) + - Accepts a cron name as parameter + - Finds the matching cron from the registry + - Executes the handler with database connection, logger, and PubSub + +4. **Infrastructure Configuration** (`.infra/crons.ts`) + - Defines deployment configuration for each cron + - Specifies schedule (cron expression), resource limits, and requests + - Used by Pulumi to create Kubernetes CronJobs + +5. **Pulumi Deployment** (`.infra/index.ts`) + - Reads the crons array from `.infra/crons.ts` + - Creates Kubernetes CronJob resources for each cron + - Maps cron names to execution commands: `['dumb-init', 'node', 'bin/cli', 'cron', cron.name]` + +## Creating a New Cron Job + +### Step 1: Create the Cron File + +Create a new file in `src/cron/` directory following this pattern: + +```typescript +import { Cron } from './cron'; +import { YourEntity } from '../entity'; + +const cron: Cron = { + name: 'your-cron-name', // Must match name in .infra/crons.ts + handler: async (con, logger, pubsub) => { + // Your cron logic here + logger.info('Starting cron job'); + + // Example: Query database + const results = await con + .getRepository(YourEntity) + .find(); + + // Example: Publish Pub/Sub messages + await pubsub.topic('your-topic').publishMessage({ + json: { data: 'value' }, + }); + + logger.info({ count: results.length }, 'Cron job completed'); + }, +}; + +export default cron; +``` + +### Step 2: Register in Index + +Add your cron to `src/cron/index.ts`: + +```typescript +import yourCron from './yourCron'; + +export const crons: Cron[] = [ + // ... existing crons + yourCron, +]; +``` + +### Step 3: Configure Infrastructure + +Add your cron to `.infra/crons.ts`: + +```typescript +export const crons: Cron[] = [ + // ... existing crons + { + name: 'your-cron-name', // Must match name in cron file + schedule: '0 */1 * * *', // Cron expression (every hour) + // Optional: Resource limits for memory-intensive jobs + limits: { + memory: '1Gi', + }, + requests: { + cpu: '250m', + memory: '1Gi', + }, + // Optional: Maximum execution time in seconds + activeDeadlineSeconds: 300, + }, +]; +``` + +### Step 4: Test Locally + +Run the cron locally using the CLI: + +```bash +pnpm run cli cron your-cron-name +``` + +Or directly: + +```bash +node bin/cli cron your-cron-name +``` + +## Cron Interface + +The `Cron` interface is defined in `src/cron/cron.ts`: + +```typescript +export interface Cron { + name: string; + handler: ( + con: DataSource, + logger: FastifyLoggerInstance, + pubsub: PubSub, + ) => Promise; +} +``` + +### Handler Parameters + +- **`con: DataSource`** - TypeORM database connection. Use for queries, transactions, and entity operations. +- **`logger: FastifyLoggerInstance`** - Structured logger for observability. Always log start, progress, and completion. +- **`pubsub: PubSub`** - Google Cloud Pub/Sub client. Use to publish messages for async processing. + +## Best Practices + +### 1. Naming Convention + +- Use kebab-case for cron names (e.g., `update-views`, `clean-zombie-users`) +- Names must match exactly between: + - The cron file (`name` property) + - The index registration (import name) + - The infrastructure config (`.infra/crons.ts`) + +### 2. Logging + +Always log important events: + +```typescript +handler: async (con, logger, pubsub) => { + logger.info({ cron: 'your-cron-name' }, 'Starting cron job'); + + try { + // Your logic + logger.info({ processed: count }, 'Cron job completed successfully'); + } catch (error) { + logger.error({ error }, 'Cron job failed'); + throw error; // Re-throw to mark job as failed + } +} +``` + +### 3. Error Handling + +- Let errors propagate to mark the cron job as failed in Kubernetes +- Use transactions for data consistency +- Consider idempotency for retry safety + +### 4. Resource Management + +- Use checkpoints for incremental processing (see `updateViews.ts` example) +- Process data in batches to avoid memory issues +- Set appropriate resource limits in `.infra/crons.ts` for memory-intensive jobs + +### 5. Scheduling + +- Use standard cron expressions: `minute hour day month day-of-week` +- Examples: + - `*/10 * * * *` - Every 10 minutes + - `0 */1 * * *` - Every hour at minute 0 + - `15 4 * * *` - Daily at 4:15 AM + - `0 2 1 * *` - First day of month at 2:00 AM + +### 6. Testing + +Create tests in `__tests__/cron/` directory: + +```typescript +import cron from '../../src/cron/yourCron'; + +describe('yourCron', () => { + it('should execute successfully', async () => { + const mockCon = createMockConnection(); + const mockLogger = createMockLogger(); + const mockPubsub = createMockPubsub(); + + await cron.handler(mockCon, mockLogger, mockPubsub); + + // Assertions + }); +}); +``` + +## Common Patterns + +### Incremental Processing with Checkpoints + +```typescript +const cron: Cron = { + name: 'incremental-update', + handler: async (con) => { + const checkpointKey = 'last_incremental_update'; + const before = new Date(); + let checkpoint = await con + .getRepository(Checkpoint) + .findOneBy({ key: checkpointKey }); + const after = checkpoint?.timestamp || new Date(0); + + await con.transaction(async (entityManager) => { + // Process records between 'after' and 'before' + // Update checkpoint + checkpoint.timestamp = before; + await entityManager.getRepository(Checkpoint).save(checkpoint); + }); + }, +}; +``` + +### Publishing Pub/Sub Messages + +```typescript +handler: async (con, logger, pubsub) => { + const items = await con.getRepository(Entity).find(); + + for (const item of items) { + await pubsub.topic('your-topic').publishMessage({ + json: { id: item.id, data: item.data }, + }); + } +} +``` + +### Batch Processing + +```typescript +handler: async (con, logger) => { + const batchSize = 100; + let offset = 0; + let hasMore = true; + + while (hasMore) { + const batch = await con + .getRepository(Entity) + .find({ take: batchSize, skip: offset }); + + if (batch.length === 0) { + hasMore = false; + break; + } + + // Process batch + await processBatch(batch); + + offset += batchSize; + logger.info({ processed: offset }, 'Progress update'); + } +} +``` + +## Infrastructure Details + +### Kubernetes CronJobs + +Crons are deployed as Kubernetes CronJobs via Pulumi: + +- **Command**: `['dumb-init', 'node', 'bin/cli', 'cron', '']` +- **Spot Instances**: Enabled by default (70% weight) for cost optimization +- **Default Limits**: Memory and CPU limits can be overridden per cron +- **Default Deadline**: 300 seconds (5 minutes), configurable per cron +- **Adhoc Environments**: Crons are disabled in adhoc environments (see `.infra/index.ts`) + +### Resource Configuration + +Configure resources in `.infra/crons.ts`: + +```typescript +{ + name: 'memory-intensive-cron', + schedule: '0 2 * * *', + limits: { + memory: '2Gi', // Maximum memory + }, + requests: { + cpu: '500m', // CPU request + memory: '1Gi', // Memory request + }, + activeDeadlineSeconds: 600, // 10 minutes max execution +} +``` + +## Troubleshooting + +### Cron Not Running + +1. Check that the cron name matches in all three places: + - `src/cron/yourCron.ts` (name property) + - `src/cron/index.ts` (imported and added to array) + - `.infra/crons.ts` (name in config) + +2. Verify the cron is deployed: + ```bash + kubectl get cronjobs -n + ``` + +3. Check cron job logs: + ```bash + kubectl logs -n job/- + ``` + +### Cron Failing + +1. Check Kubernetes events: + ```bash + kubectl describe cronjob -n + ``` + +2. Review logs for errors +3. Verify database connectivity and permissions +4. Check resource limits (may be OOMKilled) + +### Testing Locally + +Always test crons locally before deploying: + +```bash +# Set up environment variables +export DATABASE_URL=... +export REDIS_URL=... + +# Run the cron +pnpm run cli cron your-cron-name +``` + +## Related Documentation + +- **Workers**: See `src/workers/AGENTS.md` for background workers (Pub/Sub message handlers) +- **CLI**: See `bin/cli.ts` for CLI command structure +- **Infrastructure**: See `.infra/index.ts` for deployment configuration + diff --git a/src/graphorm/AGENTS.md b/src/graphorm/AGENTS.md new file mode 100644 index 0000000000..9fedcd760a --- /dev/null +++ b/src/graphorm/AGENTS.md @@ -0,0 +1,468 @@ +# GraphORM Documentation + +## Overview + +GraphORM is a custom-built tool that solves the classic N+1 query problem in GraphQL by transforming GraphQL queries into optimized single PostgreSQL queries. Instead of executing multiple database queries (one per field resolution), GraphORM analyzes the GraphQL query structure, TypeORM entity metadata, and a configuration object to generate a single SQL query using PostgreSQL's JSON aggregation functions. + +**GraphORM is the default and preferred method for GraphQL query responses in this codebase.** It should be used instead of direct TypeORM repository queries whenever possible, as it enforces best practices and prevents common performance issues. + +## The Problem: N+1 Queries + +In traditional GraphQL resolvers, each field might trigger a separate database query: + +```graphql +query { + posts(first: 10) { + id + title + author { + id + name + profile { + avatar + } + } + comments { + id + content + user { + name + } + } + } +} +``` + +Without GraphORM, this could result in: +- 1 query for posts +- 10 queries for authors (one per post) +- 10 queries for profiles (one per author) +- 10 queries for comments (one per post) +- N queries for comment users (one per comment) + +**Total: 1 + 10 + 10 + 10 + N = potentially 30+ queries** + +With GraphORM, this becomes **1 optimized SQL query** that uses JSON aggregation to return all nested data in a single result set. + +## How GraphORM Works + +### Architecture Overview + +1. **Query Parsing**: GraphORM receives the GraphQL `resolveInfo` object and parses it into a resolve tree using `graphql-parse-resolve-info`. + +2. **Metadata Scanning**: It scans TypeORM entity metadata in real-time to understand: + - Table names and column mappings + - Entity relationships (one-to-many, many-to-one) + - Foreign key relationships + +3. **Configuration Lookup**: It checks the GraphORM configuration object (`src/graphorm/index.ts`) for: + - Custom field mappings + - Transform functions + - Custom relations + - Required columns + - Field aliases + +4. **SQL Generation**: It builds a TypeORM `QueryBuilder` that: + - Uses subqueries with `jsonb_agg()` for one-to-many relations + - Uses `to_jsonb()` for one-to-one relations + - Aggregates nested data into JSON structures + - Applies filters, sorting, and pagination + +5. **Post-Processing**: After fetching raw results, it applies JavaScript transform functions to: + - Reshape nested JSON data + - Apply business logic + - Handle permissions and visibility + - Format dates and other types + +### Key Concepts + +#### 1. GraphORM Mappings + +The configuration object (`src/graphorm/index.ts`) defines how GraphQL types map to database entities and how fields should be resolved: + +```typescript +const obj = new GraphORM({ + Post: { + requiredColumns: ['id', 'title', 'createdAt'], + fields: { + // Field configurations here + } + } +}); +``` + +#### 2. Field Configuration + +Each field can have: +- **`select`**: Custom SQL selection (string or function) +- **`transform`**: Post-processing function +- **`relation`**: Custom relation definition +- **`alias`**: Map to another field +- **`jsonType`**: Mark as JSON column +- **`pagination`**: Relay-style pagination config + +#### 3. Relations + +GraphORM automatically detects TypeORM relations, but you can override them: + +```typescript +fields: { + comments: { + relation: { + isMany: true, + customRelation: (ctx, parentAlias, childAlias, qb) => { + return qb + .where(`${childAlias}."postId" = ${parentAlias}.id`) + .andWhere(`${childAlias}."deleted" = false`); + } + } + } +} +``` + +#### 4. Transforms + +Transform functions run after data is fetched, allowing you to: +- Apply business logic +- Check permissions +- Format data +- Compute derived values + +```typescript +fields: { + views: { + transform: (value: number, ctx, parent): number | null => { + const post = parent as Post; + const isAuthor = post?.authorId && ctx.userId === post.authorId; + return isAuthor ? value : null; // Only show views to author + } + } +} +``` + +## When to Use GraphORM + +**GraphORM should be the default choice for all GraphQL query responses over direct TypeORM queries when possible.** This architectural decision enforces best practices including N+1 query prevention, consistent data fetching patterns, and optimized database access. + +### ✅ Use GraphORM (Default Choice): + +1. **All GraphQL queries fetching entities** - Posts, users, sources, comments, etc. +2. **Fetching entities with nested relations** - Posts with authors, comments, etc. +3. **Need to avoid N+1 queries** - Multiple related entities in one query +4. **Complex filtering/sorting** - When you need to filter at the database level +5. **Pagination** - Relay-style pagination with cursor-based navigation +6. **Read-heavy operations** - Queries that don't modify data + +**Prefer GraphORM over TypeORM repositories for GraphQL resolvers** because it: +- Automatically prevents N+1 queries +- Enforces consistent query patterns +- Leverages PostgreSQL JSON aggregation for efficiency +- Provides built-in support for transforms and business logic +- Supports read replica routing + +### ❌ Don't Use GraphORM When: + +1. **Mutations** - GraphORM is read-only (use TypeORM repositories for writes) +2. **External API calls** - Data not in PostgreSQL +3. **Complex aggregations requiring raw SQL** - Better handled with raw SQL queries +4. **Real-time subscriptions** - Use standard resolvers +5. **Non-GraphQL endpoints** - REST endpoints or internal services can use TypeORM directly + +**Note**: Even for simple queries without relations, prefer GraphORM for consistency and to benefit from future optimizations. The overhead is minimal and the consistency benefits are significant. + +## Usage Examples + +### Basic Query + +```typescript +import { graphorm } from '../graphorm'; + +export const resolvers = { + Query: { + post: async (_, { id }: { id: string }, ctx: Context, info) => { + return graphorm.queryOne( + ctx, + info, + (builder) => { + builder.queryBuilder.where(`${builder.alias}.id = :id`, { id }); + return builder; + } + ); + } + } +}; +``` + +### Paginated Query + +```typescript +import { graphorm } from '../graphorm'; +import { offsetPageGenerator } from './common'; + +export const resolvers = { + Query: { + posts: async (_, args: ConnectionArguments, ctx: Context, info) => { + const pageGenerator = offsetPageGenerator(10, 100); + const page = pageGenerator.connArgsToPage(args); + + return graphorm.queryPaginated( + ctx, + info, + (nodeSize) => pageGenerator.hasPreviousPage(page, nodeSize), + (nodeSize) => pageGenerator.hasNextPage(page, nodeSize), + (node, index) => pageGenerator.nodeToCursor(page, args, node, index), + (builder) => { + builder.queryBuilder + .limit(page.limit) + .offset(page.offset) + .orderBy(`${builder.alias}."createdAt"`, 'DESC'); + return builder; + } + ); + } + } +}; +``` + +### Query with Custom Filtering + +```typescript +export const resolvers = { + Query: { + keyword: async (_, { value }: { value: string }, ctx: Context, info) => { + return graphorm.queryOne( + ctx, + info, + (builder) => { + builder.queryBuilder + .andWhere(`${builder.alias}.value = :value`, { value }) + .limit(1); + return builder; + }, + true // Use read replica + ); + } + } +}; +``` + +### Query One or Fail + +```typescript +export const resolvers = { + Query: { + campaignById: async (_, { id }: { id: string }, ctx: Context, info) => { + return graphorm.queryOneOrFail( + ctx, + info, + (builder) => { + builder.queryBuilder.where({ id }); + return builder; + } + ); + } + } +}; +``` + +## Configuration Patterns + +### Required Columns + +Always select certain columns, even if not requested: + +```typescript +Post: { + requiredColumns: [ + 'id', + 'createdAt', + { + column: `"contentMeta"->'alt_title'->'translations'`, + columnAs: 'smartTitle', + isJson: true + } + ] +} +``` + +### Anonymous Restrictions + +Hide sensitive fields from unauthenticated users: + +```typescript +UserExperience: { + anonymousRestrictedColumns: [ + 'user', + 'subtitle', + 'description', + 'startedAt', + 'endedAt' + ] +} +``` + +### Custom Field Selection + +Select from related tables or computed values: + +```typescript +fields: { + numUpvotes: { + select: 'upvotes' // Maps to 'upvotes' column + }, + isPlus: { + alias: { field: 'subscriptionFlags', type: 'jsonb' }, + transform: (subscriptionFlags: UserSubscriptionFlags) => + isPlusMember(subscriptionFlags?.cycle) + } +} +``` + +### Complex Relations + +Define custom joins and filters: + +```typescript +fields: { + bookmark: { + relation: { + isMany: false, + customRelation: ({ userId }, parentAlias, childAlias, qb) => + qb + .where(`${parentAlias}.id = ${childAlias}."postId"`) + .andWhere(`${childAlias}."userId" = :userId`, { userId }) + } + } +} +``` + +### JSON Field Handling + +Handle JSONB columns: + +```typescript +fields: { + flags: { + jsonType: true, + transform: (value: PostFlagsPublic): PostFlagsPublic => ({ + ...value, + generatedAt: transformDate(value.generatedAt) + }) + } +} +``` + +## Best Practices + +### 1. Always Use `beforeQuery` for Filtering + +Don't fetch all data and filter in JavaScript. Filter at the database level: + +```typescript +(builder) => { + builder.queryBuilder + .andWhere(`${builder.alias}."userId" = :userId`, { userId: ctx.userId }) + .andWhere(`${builder.alias}."deleted" = false`); + return builder; +} +``` + +### 2. Use Transforms for Business Logic + +Keep SQL queries focused on data fetching. Use transforms for: +- Permission checks +- Data formatting +- Computed values +- Conditional field visibility + +### 3. Leverage Required Columns + +If a transform function needs data that might not be requested, add it to `requiredColumns`: + +```typescript +requiredColumns: [ + 'id', + 'authorId', // Needed for permission checks in transforms + 'scoutId' +] +``` + +### 4. Use Pagination for Large Datasets + +Always paginate lists to avoid fetching too much data: + +```typescript +graphorm.queryPaginated( + ctx, + info, + hasPreviousPage, + hasNextPage, + nodeToCursor, + beforeQuery +); +``` + +## Common Patterns + +### Conditional Field Visibility + +Show fields only to specific users: + +```typescript +fields: { + email: { + transform: nullIfNotSameUser // Only show to the user themselves + } +} +``` + +### Date Transformations + +Consistently transform dates: + +```typescript +fields: { + createdAt: { + transform: transformDate + } +} +``` + +### Nested JSON Queries + +Query nested JSON structures: + +```typescript +requiredColumns: [ + { + column: `"contentMeta"->'alt_title'->'translations'`, + columnAs: 'smartTitle', + isJson: true + } +] +``` + +## Performance Considerations + +1. **Indexes**: Ensure foreign keys and frequently filtered columns are indexed +2. **JSON Aggregation**: Large nested arrays can be expensive - consider pagination +3. **Read Replicas**: Use read replicas for all read queries when available +4. **Required Columns**: Only add truly required columns to avoid unnecessary data fetching +5. **Transform Functions**: Keep transforms lightweight - avoid database calls in transforms + +## Related Files + +- **Core Implementation**: `src/graphorm/graphorm.ts` +- **Configuration**: `src/graphorm/index.ts` +- **Usage Examples**: + - `src/schema/posts.ts` + - `src/schema/users.ts` + - `src/schema/sources.ts` + - `src/common/feedGenerator.ts` + +## Further Reading + +- [TypeORM Query Builder Documentation](https://typeorm.io/select-query-builder) +- [PostgreSQL JSON Functions](https://www.postgresql.org/docs/current/functions-json.html) +- [GraphQL Resolve Info](https://graphql.org/graphql-js/type/#graphqlobjecttype) + diff --git a/src/workers/AGENTS.md b/src/workers/AGENTS.md new file mode 100644 index 0000000000..0f1326e1a0 --- /dev/null +++ b/src/workers/AGENTS.md @@ -0,0 +1,380 @@ +# Background Workers Documentation + +## Overview + +Our architecture follows a **reactive pattern** where we offload non-critical, asynchronous work to background workers. This approach improves API response times, increases system resilience, and enables better scalability. + +### Architecture Components + +- **Google Pub/Sub**: Message queue for asynchronous event processing +- **Pulumi**: Infrastructure as Code (IaC) for managing subscriptions (located in `.infra/`) +- **Topics**: Managed in a separate infrastructure repository +- **Debezium + CDC**: Change Data Capture for reacting to database changes +- **TypeScript Workers**: Type-safe worker implementations + +## When to Use Workers + +Use background workers when: + +1. **Non-mission-critical processing**: Work that doesn't need to complete before returning a response to the user +2. **Async operations**: Tasks that can happen independently of the main request flow +3. **Third-party integrations**: External API calls (Slack, SendGrid, etc.) that shouldn't block user requests +4. **Heavy computations**: Image processing, data transformations, analytics calculations +5. **Distributed transactions**: When you need to coordinate changes across multiple systems without blocking the primary transaction + +### Examples of Worker Use Cases + +- Sending notifications (email, push, real-time) +- Updating reputation scores +- Processing images and media +- Syncing data to external services (CIO, Slack) +- Analytics and metrics collection +- Cleanup operations +- Content processing (markdown, translations) + +## Worker Types + +### Typed Workers (`TypedWorker`) + +**Standard approach** for all new workers. Uses the `PubSubSchema` type system to ensure message structure matches expectations and provides full type safety. + +```typescript +type TypedWorker = ConditionalTypedWorker< + T, + PubSubSchema[T] +>; +``` + +**Example:** +```typescript +import { TypedWorker } from './worker'; + +const worker: TypedWorker<'post-upvoted'> = { + subscription: 'post-upvoted-rep', + handler: async ({ data }, con, logger): Promise => { + // data is automatically typed as PubSubSchema['post-upvoted'] + const { postId, userId } = data; + // Process with full type safety + }, +}; +``` + +### Typed Workers with Protobuf + +For Protobuf messages (from external services), you need to provide a `parseMessage` function: + +```typescript +import { TypedWorker } from './worker'; +import { MatchedCandidate } from '@dailydotdev/schema'; + +const worker: TypedWorker<'gondul.v1.candidate-opportunity-match'> = { + subscription: 'api.store-candidate-opportunity-match', + handler: async ({ data }, con): Promise => { + // data is typed as MatchedCandidate + }, + parseMessage: (message) => { + return MatchedCandidate.fromBinary(message.data); + }, +}; +``` + +## Creating a New Worker + +### Step 1: Define the Message Schema + +If creating a typed worker, add your message type to `src/common/typedPubsub.ts`: + +```typescript +export type PubSubSchema = { + // ... existing schemas + 'my-new-topic': { + userId: string; + action: string; + }; +}; +``` + +### Step 2: Create the Worker File + +Create a new file in `src/workers/` (or appropriate subdirectory): + +```typescript +import { TypedWorker } from './worker'; + +const worker: TypedWorker<'my-new-topic'> = { + subscription: 'my-new-subscription', + handler: async ({ data }, con, logger): Promise => { + const { userId, action } = data; + + try { + // Your worker logic here + logger.info({ userId, action }, 'Processing worker'); + } catch (err) { + logger.error({ err, data }, 'Worker failed'); + throw err; // Re-throw to trigger nack + } + }, +}; + +export default worker; +``` + +### Step 3: Register the Worker + +Add your worker to `src/workers/index.ts`: + +```typescript +import myNewWorker from './myNewWorker'; + +export const typedWorkers: BaseTypedWorker[] = [ + // ... existing workers + myNewWorker, +]; +``` + +### Step 4: Add Infrastructure Configuration + +**Note**: Topics are managed in the "streams" repository. You only need to add the subscription configuration here. + +Add the subscription to `.infra/common.ts`: + +```typescript +export const workers: Worker[] = [ + // ... existing workers + { + topic: 'my-new-topic', // Topic must exist in the "streams" repository + subscription: 'my-new-subscription', + args: { + // Optional: configure ack deadline, dead letter policy, etc. + ackDeadlineSeconds: 60, + }, + }, +]; +``` + +### Step 5: Publish Messages + +Use `triggerTypedEvent` to publish messages (this is the standard approach): + +```typescript +import { triggerTypedEvent } from '../common/typedPubsub'; + +// In your resolver or service: +await triggerTypedEvent(logger, 'my-new-topic', { + userId: '123', + action: 'created', +}); +``` + +## Change Data Capture (CDC) + +CDC allows us to react to database changes without distributed transactions. Debezium captures PostgreSQL changes and publishes them to Pub/Sub. + +### How CDC Works + +1. **Debezium** monitors PostgreSQL WAL (Write-Ahead Log) +2. Changes are published to Pub/Sub topics +3. Workers subscribe to CDC topics and react to changes +4. This enables eventual consistency across systems + +### CDC Worker Example + +```typescript +import { Worker, messageToJson } from './worker'; +import { ChangeMessage } from '../types'; + +const worker: Worker = { + subscription: 'api-cdc', + maxMessages: 20, // Process multiple messages at once + handler: async (message, con, logger): Promise => { + const data: ChangeMessage = messageToJson(message); + + // Skip heartbeat and read operations + if ( + data.schema.name === 'io.debezium.connector.common.Heartbeat' || + data.payload.op === 'r' + ) { + return; + } + + // React to specific table changes + switch (data.payload.source.table) { + case 'user': + await handleUserChange(con, logger, data); + break; + case 'post': + await handlePostChange(con, logger, data); + break; + } + }, +}; +``` + +### CDC Tables + +CDC is configured for specific tables in `.infra/application.properties`. Common patterns: + +- React to user changes for external sync (CIO, etc.) +- React to post changes for indexing, notifications +- React to vote changes for reputation calculations +- React to comment changes for notifications + +## Worker Execution + +### Background Processor + +Workers run in the background processor: + +```bash +pnpm run dev:background +``` + +This starts `src/background.ts`, which: +1. Connects to the database +2. Initializes Pub/Sub client +3. Subscribes all registered workers to their subscriptions +4. Processes messages as they arrive + +### Message Flow + +1. **Publish**: Application code publishes a message to a Pub/Sub topic +2. **Subscribe**: Worker subscribes to the subscription +3. **Receive**: Pub/Sub delivers the message to the worker +4. **Process**: Worker handler executes +5. **Ack/Nack**: Worker acknowledges (ack) on success or negative-acknowledges (nack) on failure + - **Ack**: Message is removed from subscription + - **Nack**: Message is redelivered (with exponential backoff) + +### Error Handling + +Workers should handle errors appropriately: + +```typescript +handler: async (message, con, logger): Promise => { + try { + // Your logic + } catch (err) { + logger.error({ err, messageId: message.messageId }, 'Worker failed'); + throw err; // Re-throw to trigger nack and retry + } +} +``` + +**Best Practices:** +- Log errors with context (messageId, data) +- Re-throw errors to trigger retries for transient failures +- Use transactions for database operations +- Consider idempotency (handle duplicate messages gracefully) + +## Infrastructure Configuration + +### Pulumi Setup + +Workers are configured in `.infra/common.ts` and `.infra/workers.ts`. The Pulumi infrastructure: + +1. Creates subscriptions (topics are managed in a separate infrastructure repository) +2. Configures subscription settings (ack deadline, dead letter, etc.) +3. Sets up IAM permissions + +### Subscription Options + +```typescript +interface WorkerArgs { + enableMessageOrdering?: boolean; // Maintain message order + ackDeadlineSeconds?: number; // Time before redelivery + expirationPolicy?: { + ttl: string; // Subscription TTL + }; + deadLetterPolicy?: { + deadLetterTopic: string; // Topic for failed messages + maxDeliveryAttempts: number; // Max retries before dead letter + }; +} +``` + +### Example: Dead Letter Queue + +For critical workers, configure a dead letter queue: + +```typescript +// In .infra/common.ts +{ + topic: 'api.v1.generate-personalized-digest', + subscription: 'api.personalized-digest-email', + args: { + ackDeadlineSeconds: 120, + deadLetterPolicy: { + deadLetterTopic: `projects/${project}/topics/api.v1.personalized-digest-email-dead-letter`, + maxDeliveryAttempts: 5, + }, + }, +} +``` + +## Testing Workers + +### Unit Testing + +Test workers in isolation: + +```typescript +import worker from './myWorker'; +import { createMockDataSource } from '../testHelpers'; + +describe('myWorker', () => { + it('processes messages correctly', async () => { + const con = createMockDataSource(); + const logger = createMockLogger(); + const pubsub = createMockPubSub(); + + const message = { + messageId: '123', + data: Buffer.from(JSON.stringify({ userId: '456', action: 'test' })), + }; + + await worker.handler(message, con, logger, pubsub); + + // Assertions + }); +}); +``` + +## Best Practices + +1. **Always Use Typed Workers**: Use `TypedWorker` for all new workers - regular `Worker` interface is deprecated +2. **Use `triggerTypedEvent`**: Always use `triggerTypedEvent` to publish messages (not the deprecated `publishEvent`) +3. **Idempotency**: Design workers to handle duplicate messages gracefully +4. **Logging**: Log with context (messageId, relevant data) +5. **Error Handling**: Re-throw errors to trigger retries, but log appropriately +6. **Monitoring**: Use OpenTelemetry spans (automatically added) for observability +7. **Resource Limits**: Set `maxMessages` appropriately to control concurrency +8. **Dead Letter Queues**: Configure for critical workers to catch persistent failures + +## Troubleshooting + +### Worker Not Processing Messages + +1. Check worker is registered in `src/workers/index.ts` +2. Verify subscription exists in `.infra/common.ts` +3. Ensure background processor is running +4. Check Pub/Sub permissions in GCP + +### Messages Stuck in Subscription + +1. Check worker logs for errors +2. Verify ack deadline is sufficient +3. Check for dead letter queue messages +4. Review worker error handling + +### Type Errors + +1. Ensure message type is in `PubSubSchema` +2. Verify `TypedWorker` generic matches topic name +3. Check `parseMessage` for Protobuf messages + +## Related Documentation + +- **GraphORM**: See `src/graphorm/AGENTS.md` for GraphQL query optimization +- **Main Architecture**: See `AGENTS.md` for high-level architecture overview +- **Pulumi**: See `.infra/` directory for infrastructure configuration +