Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/app-factory.serve-http.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ describe('AppFactory.shouldServeHttp', () => {
RunMode.CAMPAIGN_SENDER,
RunMode.CAMPAIGN_TRACKER,
RunMode.EVENT_PROCESS,
// EVO-1764: the dedicated journey worker opens the probe listener so its
// /ready (queue-health) + /metrics (poller gauges) are scrapeable.
RunMode.TEMPORAL_WORKER,
];
const noServe = [
RunMode.EVENT_WORKER,
RunMode.SEGMENT_WORKER,
RunMode.TEMPORAL_WORKER,
RunMode.CAMPAIGN_WORKER,
];

Expand Down
5 changes: 5 additions & 0 deletions src/app-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ export class AppFactory {
RunMode.CAMPAIGN_SENDER,
RunMode.CAMPAIGN_TRACKER,
RunMode.EVENT_PROCESS,
// EVO-1764: the dedicated journey worker must expose the probe listener so
// its `/ready` (journey-execution queue-health indicator) and `/metrics`
// (poller gauges) are scrapeable — otherwise the no-executor signal is
// unreachable exactly where the executor runs.
RunMode.TEMPORAL_WORKER,
].includes(config.runMode);
}

Expand Down
19 changes: 17 additions & 2 deletions src/health/active-indicators.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const all: AllIndicators = {
redis: stub('redis'),
broker: stub('broker'),
clickhouse: stub('clickhouse'),
temporal: stub('temporal-journey-queue'),
};

const names = (mode: RunMode) =>
Expand All @@ -32,9 +33,23 @@ describe('selectActiveIndicators', () => {
RunMode.CAMPAIGN_SENDER,
RunMode.CAMPAIGN_TRACKER,
RunMode.EVENT_RECEIVER,
RunMode.SINGLE,
RunMode.API,
])('%s excludes ClickHouse, keeps the base trio (AC4)', (mode) => {
// SINGLE deliberately excludes the temporal probe so a journey-queue dip
// does not 503 the co-located API (EVO-1764).
RunMode.SINGLE,
RunMode.CAMPAIGN_WORKER,
])('%s excludes ClickHouse + Temporal, keeps the base trio', (mode) => {
expect(names(mode)).toEqual(['postgres', 'redis', 'broker']);
});

// EVO-1764 (AC9): the journey-execution queue-health probe is added only for
// the dedicated temporal-worker (NOT single — see selectActiveIndicators).
it('temporal-worker includes the temporal-journey-queue probe (EVO-1764)', () => {
expect(names(RunMode.TEMPORAL_WORKER)).toEqual([
'postgres',
'redis',
'broker',
'temporal-journey-queue',
]);
});
});
12 changes: 10 additions & 2 deletions src/health/active-indicators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ export interface AllIndicators {
redis: HealthIndicator;
broker: HealthIndicator;
clickhouse: HealthIndicator;
temporal: HealthIndicator;
}

/**
* Indicators evaluated by `/ready` for a given RUN_MODE (EVO-1226):
* Postgres + Redis + Broker for every mode, plus ClickHouse only for
* `event-process` (AC4). Pure function so the gating is unit-testable without
* booting the module graph.
* `event-process` (AC4), plus the journey-execution queue-health probe only for
* the dedicated `temporal-worker` (EVO-1764). It is deliberately NOT added in
* `single` mode: there the worker shares the process with the API, so a
* journey-queue dip must not 503 the whole co-located surface and risk LB
* eviction. SINGLE still gets the signal via the `/metrics` poller gauges.
* Pure function so the gating is unit-testable without booting the module graph.
*/
export function selectActiveIndicators(
mode: RunMode,
Expand All @@ -22,5 +27,8 @@ export function selectActiveIndicators(
if (mode === RunMode.EVENT_PROCESS) {
active.push(all.clickhouse);
}
if (mode === RunMode.TEMPORAL_WORKER) {
active.push(all.temporal);
}
return active;
}
11 changes: 10 additions & 1 deletion src/health/health.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { PostgresHealthIndicator } from './indicators/postgres.health-indicator'
import { RedisHealthIndicator } from './indicators/redis.health-indicator';
import { BrokerHealthIndicator } from './indicators/broker.health-indicator';
import { ClickHouseHealthIndicator } from './indicators/clickhouse.health-indicator';
import { TemporalTaskQueueIndicator } from './indicators/temporal-task-queue.health-indicator';
import { TemporalQueueHealthModule } from '../modules/temporal/temporal-queue-health.module';

/**
* Reusable health module imported by every RUN_MODE (EVO-1226 [5.1]). It owns
Expand All @@ -20,7 +22,11 @@ import { ClickHouseHealthIndicator } from './indicators/clickhouse.health-indica
* from the global graph.
*/
@Module({
imports: [ProcessingModule],
// TemporalQueueHealthModule exports the journey-execution queue-health probe
// (EVO-1764). It is dependency-light, so importing it into this always-on
// module is safe in every RUN_MODE; the indicator is only *evaluated* in the
// journey-worker modes (see selectActiveIndicators).
imports: [ProcessingModule, TemporalQueueHealthModule],
controllers: [HealthController],
providers: [
PostgresHealthIndicator,
Expand All @@ -34,18 +40,21 @@ import { ClickHouseHealthIndicator } from './indicators/clickhouse.health-indica
RedisHealthIndicator,
BrokerHealthIndicator,
ClickHouseHealthIndicator,
TemporalTaskQueueIndicator,
],
useFactory: (
postgres: PostgresHealthIndicator,
redis: RedisHealthIndicator,
broker: BrokerHealthIndicator,
clickhouse: ClickHouseHealthIndicator,
temporal: TemporalTaskQueueIndicator,
) =>
selectActiveIndicators(getProcessingConfig().runMode, {
postgres,
redis,
broker,
clickhouse,
temporal,
}),
},
],
Expand Down
62 changes: 62 additions & 0 deletions src/health/indicators/temporal-task-queue.health-indicator.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { TemporalTaskQueueIndicator } from './temporal-task-queue.health-indicator';

const build = (getStatus: jest.Mock) =>
new TemporalTaskQueueIndicator({ getStatus } as any);

describe('TemporalTaskQueueIndicator (EVO-1764)', () => {
it('up when the poller reports healthy', async () => {
const indicator = build(
jest.fn().mockReturnValue({
healthy: true,
workflowPollers: 2,
activityPollers: 2,
zeroSince: null,
sustainedZeroMs: 0,
stale: false,
}),
);
await expect(indicator.check()).resolves.toEqual({
name: 'temporal-journey-queue',
status: 'up',
});
});

it('down with structured detail on a confirmed sustained-zero', async () => {
const zeroSince = new Date('2026-06-23T00:00:00.000Z');
const indicator = build(
jest.fn().mockReturnValue({
healthy: false,
workflowPollers: 0,
activityPollers: 0,
zeroSince,
sustainedZeroMs: 90_000,
stale: false,
}),
);
const result = await indicator.check();
expect(result).toMatchObject({
name: 'temporal-journey-queue',
status: 'down',
error: expect.stringContaining('no WORKFLOW pollers'),
});
expect(result.detail).toMatchObject({
workflowPollers: 0,
zeroSince,
sustainedZeroMs: 90_000,
});
});

it('never throws — a poller read failure resolves as down', async () => {
const indicator = build(
jest.fn(() => {
throw new Error('boom');
}),
);
const result = await indicator.check();
expect(result).toMatchObject({
name: 'temporal-journey-queue',
status: 'down',
});
expect(result.error).toContain('boom');
});
});
47 changes: 47 additions & 0 deletions src/health/indicators/temporal-task-queue.health-indicator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { Injectable } from '@nestjs/common';
import { HealthIndicator, IndicatorResult } from './health-indicator.interface';
import { JourneyExecutionPollerService } from '../../modules/temporal/services/journey-execution-poller.service';

/**
* Readiness probe for the `journey-execution` Temporal task queue (EVO-1764).
*
* Reports `down` only when the queue-health poller has *confirmed* zero WORKFLOW
* pollers sustained past the configured threshold — i.e. there is genuinely no
* executor for triggered journeys. Reads the poller's cached snapshot (no I/O),
* so the sustained semantics + the "stale ≠ down" rule (a Temporal outage is not
* "no worker") live in one place and the probe can never hang.
*/
@Injectable()
export class TemporalTaskQueueIndicator implements HealthIndicator {
readonly name = 'temporal-journey-queue';

constructor(private readonly poller: JourneyExecutionPollerService) {}

// Not `async`: the only work is a synchronous, cached read, so we return a
// resolved Promise to satisfy the HealthIndicator contract without an idle
// await. Still never rejects (the contract's hard rule).
check(): Promise<IndicatorResult> {
try {
const status = this.poller.getStatus();
if (status.healthy) {
return Promise.resolve({ name: this.name, status: 'up' });
}
return Promise.resolve({
name: this.name,
status: 'down',
error: 'no WORKFLOW pollers on journey-execution',
detail: {
workflowPollers: status.workflowPollers,
zeroSince: status.zeroSince,
sustainedZeroMs: status.sustainedZeroMs,
},
});
} catch (err) {
return Promise.resolve({
name: this.name,
status: 'down',
error: (err as Error).message,
});
}
}
}
43 changes: 43 additions & 0 deletions src/modules/cache/services/journey-session-cache.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,49 @@ export class JourneySessionCacheService extends BaseCacheService<
);
}

/**
* Create-or-overwrite a session row in the terminal `failed` state when a
* journey could not be dispatched (EVO-1764). The normal failure path goes
* through a worker activity that has already created the row, so it can use
* `updateSessionStatus`; the dispatch guard runs when there is *no* worker, so
* the row does not exist yet and an update-only write would silently no-op.
* This goes through `set()` (Redis + Postgres write-through), making the
* failed-to-dispatch journey durable and visible instead of vanishing.
*/
async createFailedDispatchSession(params: {
sessionId: string;
journeyId: string;
contactId: string;
workflowId?: string;
workflowRunId?: string;
errorMessage: string;
}): Promise<void> {
const now = new Date();
await this.set({
id: params.sessionId,
journeyId: params.journeyId,
contactId: params.contactId,
status: 'failed',
workflowId: params.workflowId,
workflowRunId: params.workflowRunId,
failedAt: now,
errorMessage: params.errorMessage,
variables: {},
retryCount: 0,
maxRetries: 3,
executionLogs: [],
createdAt: now,
updatedAt: now,
lastCached: now,
} as unknown as JourneySession);

this.eventEmitter.emit('journey-session.status-updated', {
id: params.sessionId,
status: 'failed',
errorMessage: params.errorMessage,
});
}

async updateSessionStatus(
sessionId: string,
status: string,
Expand Down
2 changes: 2 additions & 0 deletions src/modules/journeys/journeys.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { WaitRegistryService } from './services/wait-registry.service';
// WaitCheckerJob removed - using Temporal timers instead of Bull queues
import { ProcessingModule } from '../processing/processing.module';
import { CacheModule } from '../cache/cache.module';
import { TemporalQueueHealthModule } from '../temporal/temporal-queue-health.module';
import { AppFactory } from '../../app-factory';

// Only include JourneyTriggerProcessor if we should start temporal worker
Expand All @@ -33,6 +34,7 @@ if (AppFactory.shouldStartTemporalWorker()) {
// BullModule removed - using Temporal timers instead of Bull queues for wait processing
ProcessingModule,
CacheModule, // 🚀 PERFORMANCE: Import cache services for journey performance
TemporalQueueHealthModule, // EVO-1764: queue-health poller for the dispatch guard
],
controllers: [JourneysController, JourneySessionsController, ScheduledActionsController],
providers: moduleProviders,
Expand Down
46 changes: 44 additions & 2 deletions src/modules/journeys/services/journey-sessions.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ describe('JourneySessionsService.startJourney', () => {
updateSessionStatus: jest.Mock;
invalidate: jest.Mock;
};
let poller: { isQueueUnexecutable: jest.Mock };
let workflowStart: jest.Mock;

const journey = { id: 'journey-1', name: 'J1' };
Expand All @@ -32,7 +33,13 @@ describe('JourneySessionsService.startJourney', () => {
updateSessionStatus: jest.fn().mockResolvedValue(undefined),
invalidate: jest.fn().mockResolvedValue(undefined),
};
service = new JourneySessionsService(cache as any);
// Default: a healthy queue so the existing happy-path assertions hold.
poller = {
isQueueUnexecutable: jest
.fn()
.mockResolvedValue({ unexecutable: false, status: {} }),
};
service = new JourneySessionsService(cache as any, poller as any);
jest
.spyOn((service as any).logger, 'log')
.mockImplementation(() => undefined);
Expand All @@ -42,7 +49,7 @@ describe('JourneySessionsService.startJourney', () => {

workflowStart = jest
.fn()
.mockResolvedValue({ firstExecutionRunId: 'run-1' });
.mockResolvedValue({ firstExecutionRunId: 'run-1', terminate: jest.fn() });
jest
.spyOn(service as any, 'getTemporalClient')
.mockResolvedValue({ workflow: { start: workflowStart } });
Expand Down Expand Up @@ -128,6 +135,41 @@ describe('JourneySessionsService.startJourney', () => {
expect(workflowStart).toHaveBeenCalledTimes(1);
});

it('EVO-1764: fails fast when journey-execution has no worker (forceLive)', async () => {
const handle = {
firstExecutionRunId: 'run-1',
terminate: jest.fn().mockResolvedValue(undefined),
};
workflowStart.mockResolvedValue(handle);
poller.isQueueUnexecutable.mockResolvedValue({
unexecutable: true,
status: { sustainedZeroMs: 0 },
});

const result = await service.startJourney(journey, contactId, triggerEvent);

// Manual path forces a live check (its process may not run the poller).
expect(poller.isQueueUnexecutable).toHaveBeenCalledWith({ forceLive: true });
expect(result.started).toBe(false);
expect(result.reason).toBe('no_worker_available');
// The just-started workflow is terminated and the pre-created ACTIVE row is
// flipped to FAILED so it no longer blocks future triggers.
expect(handle.terminate).toHaveBeenCalledTimes(1);
expect(cache.updateSessionStatus).toHaveBeenCalledWith(
result.sessionId,
JourneySessionStatus.FAILED,
expect.objectContaining({
errorMessage: expect.stringContaining('no journey-execution worker'),
}),
);
// Not marked ACTIVE.
expect(cache.updateSessionStatus).not.toHaveBeenCalledWith(
result.sessionId,
JourneySessionStatus.ACTIVE,
expect.anything(),
);
});

it('rolls back the created session when the workflow fails to start', async () => {
workflowStart.mockRejectedValue(new Error('temporal down'));

Expand Down
Loading
Loading