Complete API documentation for the World interface and all its components.
Source: All TypeScript interfaces are exported from the
@workflow/worldnpm package. Install withnpm install @workflow/worldto get type definitions.Workflow 4.1 note: Runtime writes are event-sourced through
storage.events.create(...). For migration guidance and legacy-run behavior, see 07 - Workflow 4.1 Migration Guide.
// From @workflow/world
export interface World extends Queue, Storage, Streamer {
/**
* Optional method called to start background tasks.
* Use this to start queue workers, cleanup jobs, etc.
* Not needed for serverless environments.
*/
start?(): Promise<void>;
}// From @workflow/world
export interface Queue {
getDeploymentId(): Promise<string>;
queue(
queueName: ValidQueueName,
message: QueuePayload,
opts?: {
deploymentId?: string;
idempotencyKey?: string;
}
): Promise<{ messageId: MessageId }>;
createQueueHandler(
queueNamePrefix: QueuePrefix,
handler: (
message: unknown,
meta: { attempt: number; queueName: ValidQueueName; messageId: MessageId }
) => Promise<void | { timeoutSeconds: number }>
): (req: Request) => Promise<Response>;
}Returns a unique identifier for the current deployment.
Returns: Promise<string> - Deployment ID (e.g., "dpl_abc123" or "dpl_embedded")
Usage: Used to track which deployment started a workflow run.
Enqueues a message for processing by a workflow or step handler.
Parameters:
| Parameter | Type | Description |
|---|---|---|
queueName |
ValidQueueName |
Queue name with prefix (__wkf_workflow_* or __wkf_step_*) |
message |
QueuePayload |
The message payload (workflow or step invocation) |
opts.deploymentId |
string? |
Optional target deployment |
opts.idempotencyKey |
string? |
Key to prevent duplicate processing |
Returns: Promise<{ messageId: MessageId }> - Unique message identifier
Notes:
- Idempotency keys should prevent duplicate messages with the same key
- Messages should be processed asynchronously (don't block on execution)
Creates an HTTP request handler for processing queued messages.
Parameters:
| Parameter | Type | Description |
|---|---|---|
queueNamePrefix |
QueuePrefix |
Either '__wkf_step_' or '__wkf_workflow_' |
handler |
Function |
The message handler function |
The handler function receives:
message: The deserialized message payloadmeta.attempt: Current attempt number (1-indexed)meta.queueName: Full queue namemeta.messageId: Unique message ID
The handler returns:
void- Success, message is acknowledged{ timeoutSeconds: number }- Retry after specified delay
Returns: (req: Request) => Promise<Response> - HTTP handler function
// Queue name prefixes
export type QueuePrefix = '__wkf_step_' | '__wkf_workflow_';
// Valid queue name (prefix + workflow/step name)
export type ValidQueueName = `${QueuePrefix}${string}`;
// Message ID (branded string)
export type MessageId = string & { __brand: 'MessageId' };
// Workflow invocation payload
export interface WorkflowInvokePayload {
runId: string;
traceCarrier?: Record<string, string>; // OpenTelemetry context
}
// Step invocation payload
export interface StepInvokePayload {
workflowName: string;
workflowRunId: string;
workflowStartedAt: number; // Timestamp
stepId: string;
traceCarrier?: Record<string, string>;
}
// Union of both payload types
export type QueuePayload = WorkflowInvokePayload | StepInvokePayload;// From @workflow/world
export interface Storage {
runs: { /* ... */ };
steps: { /* ... */ };
events: { /* ... */ };
hooks: { /* ... */ };
}Read-only run materialized view.
runs: {
get(id: string, params?: GetWorkflowRunParams): Promise<WorkflowRun>;
list(params?: ListWorkflowRunsParams): Promise<PaginatedResponse<WorkflowRun>>;
}Retrieves a workflow run by ID.
Parameters:
| Parameter | Type | Description |
|---|---|---|
id |
string |
The run ID |
params.resolveData |
'all' | 'none' |
Whether to include input/output data |
Returns: The WorkflowRun or throws WorkflowRunNotFoundError
Lists workflow runs with filtering and pagination.
Parameters:
interface ListWorkflowRunsParams {
workflowName?: string; // Filter by workflow name
status?: WorkflowRunStatus; // Filter by status
pagination?: PaginationOptions;
resolveData?: ResolveData;
}Returns: PaginatedResponse<WorkflowRun>
runs does not expose mutators in Workflow 4.1. State changes are applied through events.create(...).
interface WorkflowRun {
runId: string; // e.g., "wrun_01HX..."
status: WorkflowRunStatus; // 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'
deploymentId: string;
workflowName: string;
input: any[]; // Function arguments
output?: any; // Return value (if completed)
error?: StructuredError; // Error details (if failed)
executionContext?: Record<string, any>;
startedAt?: Date; // First transition to 'running'
completedAt?: Date; // Transition to terminal status
createdAt: Date;
updatedAt: Date;
}
type WorkflowRunStatus = 'pending' | 'running' | 'completed' | 'failed' | 'cancelled';Read-only step materialized view.
steps: {
get(runId: string | undefined, stepId: string, params?: GetStepParams): Promise<Step>;
list(params: ListWorkflowRunStepsParams): Promise<PaginatedResponse<Step>>;
}Retrieves a step by ID. Note that runId can be undefined - in this case, search across all runs to find the step.
steps does not expose mutators in Workflow 4.1. Step lifecycle updates are driven by events.create(...).
interface Step {
runId: string;
stepId: string;
stepName: string;
status: StepStatus; // 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'
input: any[];
output?: any;
error?: StructuredError;
attempt: number; // Current attempt (0 = first try)
startedAt?: Date;
completedAt?: Date;
createdAt: Date;
updatedAt: Date;
retryAfter?: Date; // When to retry (for retryable errors)
}
type StepStatus = 'pending' | 'running' | 'completed' | 'failed' | 'cancelled';Manages the append-only event log and all state mutations.
events: {
create(runId: null, data: RunCreatedEventRequest, params?: CreateEventParams): Promise<EventResult>;
create(runId: string, data: CreateEventRequest, params?: CreateEventParams): Promise<EventResult>;
list(params: ListEventsParams): Promise<PaginatedResponse<Event>>;
listByCorrelationId(params: ListEventsByCorrelationIdParams): Promise<PaginatedResponse<Event>>;
}Creates an event and atomically applies the corresponding run/step/hook state transition.
type CreateEventRequest =
| { eventType: 'run_started' }
| { eventType: 'run_completed'; eventData: { output?: any } }
| { eventType: 'run_failed'; eventData: { error: any; errorCode?: string } }
| { eventType: 'run_cancelled' }
| { eventType: 'step_created'; correlationId: string; eventData: { stepName: string; input: any } }
| { eventType: 'step_completed'; correlationId: string; eventData: { result: any } }
| { eventType: 'step_failed'; correlationId: string; eventData: { error: any; stack?: string } }
| { eventType: 'step_started'; correlationId: string; eventData?: { attempt?: number } }
| { eventType: 'step_retrying'; correlationId: string; eventData: { error: any; stack?: string; retryAfter?: Date } }
| { eventType: 'hook_created'; correlationId: string; eventData: { token: string; metadata?: any } }
| { eventType: 'hook_received'; correlationId: string; eventData: { payload: any } }
| { eventType: 'hook_disposed'; correlationId: string }
| { eventType: 'wait_created'; correlationId: string; eventData: { resumeAt: Date } }
| { eventType: 'wait_completed'; correlationId: string };
type RunCreatedEventRequest = {
eventType: 'run_created';
eventData: {
deploymentId: string;
workflowName: string;
input: any;
executionContext?: Record<string, any>;
};
};Implementation notes:
run_createdmust be called withrunId = null; world generates runId- World may return
hook_conflictevent for duplicate hook tokens - Legacy runs may return
event: undefinedforrun_cancelled
Lists events for a specific run.
interface ListEventsParams {
runId: string;
pagination?: PaginationOptions;
resolveData?: ResolveData;
}Note: Default sort order should be 'asc' (oldest first) for proper replay.
Lists events with a specific correlation ID (e.g., all events for a step).
interface Event {
runId: string;
eventId: string; // e.g., "wevt_01HX..."
eventType: EventType;
correlationId?: string; // Links related events
eventData?: object; // Event-specific data
createdAt: Date;
}
type EventType =
| 'run_created' | 'run_started' | 'run_completed' | 'run_failed' | 'run_cancelled'
| 'step_created' | 'step_completed' | 'step_failed' | 'step_retrying' | 'step_started'
| 'hook_created' | 'hook_received' | 'hook_disposed'
| 'wait_created' | 'wait_completed'
| 'hook_conflict';Read-only hook materialized view.
hooks: {
get(hookId: string, params?: GetHookParams): Promise<Hook>;
getByToken(token: string, params?: GetHookParams): Promise<Hook>;
list(params: ListHooksParams): Promise<PaginatedResponse<Hook>>;
}Retrieves a hook by its security token. Used when an external caller resumes a hook.
hooks creation/disposal is performed via events.create(hook_created|hook_disposed|hook_received).
interface Hook {
runId: string;
hookId: string;
token: string; // Unique security token
ownerId: string; // User/owner identifier
projectId: string; // Project identifier
environment: string; // Environment (e.g., "production")
metadata?: any;
createdAt: Date;
}// From @workflow/world
export interface Streamer {
writeToStream(
name: string,
runId: string | Promise<string>,
chunk: string | Uint8Array
): Promise<void>;
closeStream(
name: string,
runId: string | Promise<string>
): Promise<void>;
readFromStream(
name: string,
startIndex?: number
): Promise<ReadableStream<Uint8Array>>;
listStreamsByRunId(runId: string): Promise<string[]>;
}Writes a chunk of data to a named stream.
Parameters:
| Parameter | Type | Description |
|---|---|---|
name |
string |
Stream identifier |
runId |
string | Promise<string> |
Associated run ID (may be a promise) |
chunk |
string | Uint8Array |
Data to write |
Implementation notes:
- Await
runIdif it's a promise before writing - Generate ULID for chunk ordering
- Emit real-time notifications if supported
Signals the end of a stream.
Implementation notes:
- Write an EOF marker
- Notify readers that stream is complete
Returns a ReadableStream for consuming stream data.
Parameters:
| Parameter | Type | Description |
|---|---|---|
name |
string |
Stream identifier |
startIndex |
number? |
Resume from this chunk index |
Returns: Promise<ReadableStream<Uint8Array>>
Implementation notes:
- Load existing chunks from storage
- Subscribe to real-time notifications for new chunks
- Deduplicate chunks that arrive via both mechanisms
- Close stream when EOF marker is received
interface PaginationOptions {
limit?: number; // Max items (default varies, max 1000)
cursor?: string; // Cursor from previous response
sortOrder?: 'asc' | 'desc'; // Sort direction
}interface PaginatedResponse<T> {
data: T[]; // Items for this page
cursor: string | null; // Cursor for next page (null if no more)
hasMore: boolean; // Whether more pages exist
}type ResolveData = 'none' | 'all';'all'- Include full input/output data'none'- Omit input/output (return[]andundefined)
interface StructuredError {
message: string;
stack?: string;
code?: string;
}Continue to 03 - Implementation Guide for a step-by-step tutorial on building each component.