Skip to content

Commit 20b85f0

Browse files
authored
SDK update (#104)
* feat: add decorator-based worker registration (@worker, TaskHandler) Introduce SDK-style worker API with auto-discovery, event listeners, and environment configuration. Reorganize docs to prioritize new API. Fully backward compatible with legacy TaskManager. * Fix types and lint * Fix mocks in tests * Remove .test files from code coverage * More exclusions for code coverage * Type fixes and rename type file * Remove exceptions file from code coverage * Add path aliasing and more tests * Update TaskHandler test * Add test * Ensure concurrency is a valid number * Fix tests * Fix tests * Update test to execute worker with retry
1 parent 5f27563 commit 20b85f0

31 files changed

Lines changed: 5423 additions & 66 deletions

README.md

Lines changed: 239 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
[![Build Status](https://github.com/conductor-oss/javascript-sdk/actions/workflows/pull_request.yml/badge.svg)](https://github.com/conductor-oss/javascript-sdk/actions/workflows/pull_request.yml)
44

5-
A comprehensive TypeScript/JavaScript client for [Conductor OSS](https://github.com/conductor-oss/conductor), enabling developers to build, orchestrate, and monitor distributed workflows with ease.
5+
A comprehensive TypeScript/JavaScript SDK for [Conductor OSS](https://github.com/conductor-oss/conductor), enabling developers to build, orchestrate, and monitor distributed workflows with ease.
66

77
[Conductor](https://www.conductor-oss.org/) is the leading open-source orchestration platform allowing developers to build highly scalable distributed applications.
88

@@ -40,11 +40,16 @@ Show support for the Conductor OSS. Please help spread the awareness by starrin
4040
- [Step 4: Manage and Monitor Execution](#step-4-manage-and-monitor-execution)
4141
- [Use TaskClient to Monitor and Debug Tasks](#use-taskclient-to-monitor-and-debug-tasks)
4242
- [Workers](#workers)
43-
- [The TaskManager](#the-taskmanager)
44-
- [Quick Start: Building a Worker](#quick-start-building-a-worker)
45-
- [Step 1: Define the Worker's Logic](#step-1-define-the-workers-logic)
46-
- [Step 2: Handle Task Outcomes and Errors](#step-2-handle-task-outcomes-and-errors)
47-
- [Step 3: Run the Worker with TaskManager](#step-3-run-the-worker-with-taskmanager)
43+
- [SDK-Style Worker Registration (Recommended)](#sdk-style-worker-registration-recommended)
44+
- [Using the @worker Decorator](#using-the-worker-decorator)
45+
- [Worker Configuration Options](#worker-configuration-options)
46+
- [Environment Variable Configuration](#environment-variable-configuration)
47+
- [Event Listeners for Observability](#event-listeners-for-observability)
48+
- [NonRetryableException for Terminal Failures](#nonretryableexception-for-terminal-failures)
49+
- [Module Imports for Side-Effect Registration](#module-imports-for-side-effect-registration)
50+
- [Legacy TaskManager API](#legacy-taskmanager-api)
51+
- [The TaskManager](#the-taskmanager)
52+
- [Quick Start: Building a Worker](#quick-start-building-a-worker)
4853
- [Worker Design Principles](#worker-design-principles)
4954
- [Scheduling](#scheduling)
5055
- [The SchedulerClient](#the-schedulerclient)
@@ -110,7 +115,6 @@ Here's a simple example to get you started:
110115
import {
111116
orkesConductorClient,
112117
WorkflowExecutor,
113-
TaskManager,
114118
simpleTask,
115119
workflow
116120
} from "@io-orkes/conductor-javascript";
@@ -447,17 +451,236 @@ Workers are background processes that execute tasks in your workflows. Think of
447451
Workflow → Creates Tasks → Workers Poll for Tasks → Execute Logic → Return Results → Workflow Continues
448452
```
449453

450-
The `TaskManager` class in this SDK simplifies the process of creating and managing workers.
454+
### SDK-Style Worker Registration (Recommended)
451455

452-
### The TaskManager
456+
The SDK supports decorator-based worker registration. This provides a modern, declarative approach to defining workers with auto-discovery, type safety, and less boilerplate.
453457

454-
The `TaskManager` is the primary tool for managing workers. It handles polling, task execution, and result reporting, allowing you to run multiple workers concurrently. For a complete method reference, see the [TaskManager API Reference](docs/api-reference/task-manager.md).
458+
**Key Benefits:**
459+
- **Auto-discovery**: No need to manually register workers
460+
- **Declarative**: Configuration is co-located with worker logic
461+
- **Type-safe**: Full TypeScript support
462+
- **Cleaner code**: Less boilerplate than the legacy API
455463

456-
### Quick Start: Building a Worker
464+
> 💡 **New to Conductor?** Start here! The decorator-based approach is simpler and more maintainable than the legacy `TaskManager` API.
457465
458-
Building a robust worker involves defining its logic, handling outcomes, and managing its execution.
466+
#### Using the @worker Decorator
459467

460-
#### Step 1: Define the Worker's Logic
468+
Define workers using the `@worker` decorator for cleaner, more maintainable code:
469+
470+
```typescript
471+
import { worker, TaskHandler, Task } from "@io-orkes/conductor-javascript";
472+
473+
// Define a worker with the @worker decorator
474+
@worker({ taskDefName: "send_email", concurrency: 10, pollInterval: 100 })
475+
async function sendEmail(task: Task) {
476+
const { to, subject, body } = task.inputData;
477+
await emailService.send(to, subject, body);
478+
479+
return {
480+
status: "COMPLETED",
481+
outputData: { sent: true, timestamp: new Date().toISOString() }
482+
};
483+
}
484+
485+
@worker({ taskDefName: "process_payment", domain: "payments", concurrency: 5 })
486+
async function processPayment(task: Task) {
487+
const { amount, customerId } = task.inputData;
488+
const result = await paymentGateway.charge(customerId, amount);
489+
490+
return {
491+
status: "COMPLETED",
492+
outputData: { transactionId: result.id }
493+
};
494+
}
495+
496+
// Auto-discover and start all decorated workers
497+
const handler = new TaskHandler({
498+
client,
499+
scanForDecorated: true, // Automatically finds @worker decorated functions
500+
});
501+
502+
handler.startWorkers();
503+
console.log(`Started ${handler.runningWorkerCount} workers`);
504+
505+
// Graceful shutdown
506+
process.on("SIGTERM", async () => {
507+
await handler.stopWorkers();
508+
process.exit(0);
509+
});
510+
```
511+
512+
#### Worker Configuration Options
513+
514+
The `@worker` decorator supports comprehensive configuration:
515+
516+
```typescript
517+
@worker({
518+
taskDefName: "my_task", // Required: task name
519+
concurrency: 5, // Max concurrent tasks (default: 1)
520+
pollInterval: 100, // Polling interval in ms (default: 100)
521+
domain: "production", // Task domain for multi-tenancy
522+
workerId: "worker-123", // Unique worker identifier
523+
pollTimeout: 100, // Server-side long poll timeout
524+
})
525+
async function myTask(task: Task) {
526+
// Your logic here
527+
}
528+
```
529+
530+
#### Environment Variable Configuration
531+
532+
Override worker configuration using environment variables without code changes:
533+
534+
```bash
535+
# Global configuration (applies to all workers)
536+
export CONDUCTOR_WORKER_ALL_POLL_INTERVAL=500
537+
export CONDUCTOR_WORKER_ALL_CONCURRENCY=10
538+
539+
# Worker-specific configuration (overrides global)
540+
export CONDUCTOR_WORKER_SEND_EMAIL_CONCURRENCY=20
541+
export CONDUCTOR_WORKER_PROCESS_PAYMENT_DOMAIN=payments
542+
```
543+
544+
**Configuration Hierarchy** (highest to lowest priority):
545+
1. Worker-specific environment variables
546+
2. Global environment variables
547+
3. Code-level decorator parameters
548+
4. System defaults
549+
550+
**Supported Environment Variable Formats:**
551+
- `CONDUCTOR_WORKER_<TASK_NAME>_<PROPERTY>` - Worker-specific (uppercase)
552+
- `conductor.worker.<task_name>.<property>` - Worker-specific (dotted)
553+
- `CONDUCTOR_WORKER_ALL_<PROPERTY>` - Global (uppercase)
554+
- `conductor.worker.all.<property>` - Global (dotted)
555+
556+
#### Event Listeners for Observability
557+
558+
Monitor worker lifecycle events for metrics, logging, and debugging:
559+
560+
```typescript
561+
import { TaskHandler, TaskRunnerEventsListener } from "@io-orkes/conductor-javascript";
562+
563+
const metricsListener: TaskRunnerEventsListener = {
564+
onTaskExecutionCompleted(event) {
565+
metrics.histogram("task_duration_ms", event.durationMs, {
566+
task_type: event.taskType,
567+
});
568+
},
569+
570+
onTaskExecutionFailure(event) {
571+
logger.error(`Task ${event.taskId} failed:`, event.cause);
572+
metrics.counter("task_failures", 1, {
573+
task_type: event.taskType,
574+
});
575+
},
576+
577+
onTaskUpdateFailure(event) {
578+
// CRITICAL: Task result was lost after all retries
579+
alertOps({
580+
severity: "CRITICAL",
581+
message: `Task update failed after ${event.retryCount} retries`,
582+
taskId: event.taskId,
583+
});
584+
},
585+
};
586+
587+
const handler = new TaskHandler({
588+
client,
589+
eventListeners: [metricsListener],
590+
});
591+
```
592+
593+
**Available Events:**
594+
- `onPollStarted` - Polling begins
595+
- `onPollCompleted` - Polling succeeds
596+
- `onPollFailure` - Polling fails
597+
- `onTaskExecutionStarted` - Task execution begins
598+
- `onTaskExecutionCompleted` - Task execution succeeds
599+
- `onTaskExecutionFailure` - Task execution fails
600+
- `onTaskUpdateFailure` - Task update fails after all retries (CRITICAL)
601+
602+
#### NonRetryableException for Terminal Failures
603+
604+
Mark failures as terminal to prevent unnecessary retries:
605+
606+
```typescript
607+
import { worker, NonRetryableException } from "@io-orkes/conductor-javascript";
608+
609+
@worker({ taskDefName: "validate_order" })
610+
async function validateOrder(task: Task) {
611+
const order = await getOrder(task.inputData.orderId);
612+
613+
if (!order) {
614+
// Order doesn't exist - retry won't help
615+
throw new NonRetryableException(`Order ${task.inputData.orderId} not found`);
616+
}
617+
618+
if (order.status === "CANCELLED") {
619+
// Business rule violation - retry won't help
620+
throw new NonRetryableException("Cannot process cancelled order");
621+
}
622+
623+
// Regular errors will be retried
624+
if (order.amount > MAX_AMOUNT) {
625+
throw new Error("Amount exceeds limit"); // Will retry
626+
}
627+
628+
return { status: "COMPLETED", outputData: { validated: true } };
629+
}
630+
```
631+
632+
**Error Handling:**
633+
- `throw new Error()` → Task status: `FAILED` (will retry with exponential backoff)
634+
- `throw new NonRetryableException()` → Task status: `FAILED_WITH_TERMINAL_ERROR` (no retry)
635+
636+
#### Module Imports for Side-Effect Registration
637+
638+
Organize workers across multiple files:
639+
640+
```typescript
641+
// workers/orderWorkers.ts
642+
import { worker } from "@io-orkes/conductor-javascript";
643+
644+
@worker({ taskDefName: "validate_order" })
645+
export async function validateOrder(task) { /* ... */ }
646+
647+
@worker({ taskDefName: "fulfill_order" })
648+
export async function fulfillOrder(task) { /* ... */ }
649+
650+
// workers/paymentWorkers.ts
651+
@worker({ taskDefName: "process_payment" })
652+
export async function processPayment(task) { /* ... */ }
653+
654+
// main.ts
655+
import { TaskHandler } from "@io-orkes/conductor-javascript";
656+
657+
// Use TaskHandler.create() for async module imports
658+
const handler = await TaskHandler.create({
659+
client,
660+
importModules: [
661+
"./workers/orderWorkers",
662+
"./workers/paymentWorkers",
663+
],
664+
});
665+
666+
handler.startWorkers(); // Auto-discovers all workers from imported modules
667+
```
668+
669+
### Legacy TaskManager API
670+
671+
The legacy `TaskManager` API continues to work with full backward compatibility. However, **new projects should use the decorator-based approach above** for better maintainability and cleaner code.
672+
673+
Both APIs can coexist in the same application, allowing gradual migration from legacy to decorator-based workers.
674+
675+
#### The TaskManager
676+
677+
The `TaskManager` is the legacy tool for managing workers. It handles polling, task execution, and result reporting, allowing you to run multiple workers concurrently. For a complete method reference, see the [TaskManager API Reference](docs/api-reference/task-manager.md).
678+
679+
#### Quick Start: Building a Worker
680+
681+
Building a worker with the legacy API involves defining its logic, handling outcomes, and managing its execution.
682+
683+
##### Step 1: Define the Worker's Logic
461684

462685
A worker is an object that defines a `taskDefName` (which must match the task name in your workflow) and an `execute` function containing your business logic.
463686

@@ -484,7 +707,7 @@ const emailWorker: ConductorWorker = {
484707
};
485708
```
486709

487-
#### Step 2: Handle Task Outcomes and Errors
710+
##### Step 2: Handle Task Outcomes and Errors
488711

489712
The `execute` function must return an object indicating the task's outcome.
490713

@@ -512,7 +735,7 @@ try {
512735
}
513736
```
514737

515-
#### Step 3: Run the Worker with TaskManager
738+
##### Step 3: Run the Worker with TaskManager
516739

517740
The `TaskManager` is responsible for polling Conductor, managing task execution, and reporting back results. You can run a single worker or multiple workers with one manager.
518741

@@ -539,7 +762,7 @@ For a complete method reference, see the [TaskManager API Reference](docs/api-re
539762

540763
### Worker Design Principles
541764

542-
When designing workers, it's best to follow these principles:
765+
When designing workers (with either API), it's best to follow these principles:
543766

544767
- **Stateless**: Workers should not rely on local state.
545768
- **Idempotent**: The same task input should always produce the same result.

jest.config.mjs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,19 @@ export default {
99
"!src/**/*.d.ts",
1010
"!src/**/generated/**",
1111
"!src/**/spec/**",
12+
"!src/**/*.test.{ts,tsx}",
13+
"!src/**/index.ts",
14+
"!src/**/types.ts",
15+
"!src/**/*.types.ts",
16+
"!src/**/exceptions/**",
1217
],
1318
coverageReporters: ["text", "lcov", "cobertura"],
1419
transformIgnorePatterns: ["/node_modules/", "\\.pnp\\.[^\\/]+$"],
20+
moduleNameMapper: {
21+
"^@/(.*)$": "<rootDir>/src/$1",
22+
"^@open-api/(.*)$": "<rootDir>/src/open-api/$1",
23+
"^@test-utils/(.*)$": "<rootDir>/src/integration-tests/utils/$1",
24+
},
1525
transform: {
1626
"^.+\\.tsx?$": [
1727
"ts-jest",

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@io-orkes/conductor-javascript",
3-
"description": "Typescript Client for Netflix Conductor",
4-
"version": "v0.0.0",
3+
"description": "Typescript SDK for Netflix Conductor",
4+
"version": "v3.0.0",
55
"private": false,
66
"homepage": "https://orkes.io",
77
"repository": {

0 commit comments

Comments
 (0)