Skip to content

Commit da83142

Browse files
committed
feat(logging): enhance structured logging support across client and worker
- Added structured logging capabilities to ConsoleLogger and NoOpLogger. - Implemented logEvent method in ConsoleLogger and NoOpLogger to handle structured log events. - Introduced emitLog utility to centralize logging logic for structured and non-structured loggers. - Created centralized log definitions for Durable Task Client and Worker, aligning with .NET SDK event IDs and message templates. - Added various logging functions for orchestration and activity lifecycle events in the worker. - Implemented error handling utilities for safe error message extraction. - Updated tests to cover new structured logging functionality and ensure proper logging behavior.
1 parent 5a37595 commit da83142

21 files changed

Lines changed: 1358 additions & 112 deletions

examples/azure-managed/distributed-tracing.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ console.log(`OpenTelemetry SDK started – exporting traces to ${otlpEndpoint}`)
4545
import {
4646
DurableTaskAzureManagedClientBuilder,
4747
DurableTaskAzureManagedWorkerBuilder,
48-
createAzureLogger,
4948
} from "@microsoft/durabletask-js-azuremanaged";
49+
import { ConsoleLogger } from "@microsoft/durabletask-js";
5050
import { ActivityContext } from "@microsoft/durabletask-js/dist/task/context/activity-context";
5151
import { OrchestrationContext } from "@microsoft/durabletask-js/dist/task/context/orchestration-context";
5252
import { TOrchestrator } from "@microsoft/durabletask-js/dist/types/orchestrator.type";
@@ -56,10 +56,11 @@ import { whenAll } from "@microsoft/durabletask-js/dist/task";
5656
// 3. Application code
5757
// --------------------------------------------------------------------------
5858
(async () => {
59-
// Use the Azure logger adapter for the SDK internals – it respects AZURE_LOG_LEVEL
60-
// and suppresses debug/verbose messages by default, keeping the console output clean.
61-
// Set AZURE_LOG_LEVEL=verbose to see internal SDK logs.
62-
const sdkLogger = createAzureLogger();
59+
// Use ConsoleLogger so structured log events (with event IDs and categories) are
60+
// printed to the console by default, similar to .NET's default ILogger<T> output.
61+
// For production, consider using createAzureLogger() which integrates with @azure/logger
62+
// and respects the AZURE_LOG_LEVEL environment variable.
63+
const sdkLogger = new ConsoleLogger();
6364

6465
// --- Configuration ---
6566
const connectionString = process.env.DURABLE_TASK_SCHEDULER_CONNECTION_STRING;
@@ -204,6 +205,9 @@ import { whenAll } from "@microsoft/durabletask-js/dist/task";
204205
// --- Start worker ---
205206
console.log("Starting worker...");
206207
await worker.start();
208+
// Allow the worker time to establish the gRPC stream with the scheduler.
209+
// worker.start() returns before the connection is fully established.
210+
await new Promise((r) => setTimeout(r, 2000));
207211
console.log("Worker started.");
208212

209213
// --- Run orchestrations ---
@@ -212,14 +216,14 @@ import { whenAll } from "@microsoft/durabletask-js/dist/task";
212216
console.log("\n=== Sequence Orchestration ===");
213217
const seqId = await client.scheduleNewOrchestration(sequenceOrchestrator);
214218
console.log(`Scheduled: ${seqId}`);
215-
const seqState = await client.waitForOrchestrationCompletion(seqId, undefined, 60);
219+
const seqState = await client.waitForOrchestrationCompletion(seqId, true, 60);
216220
console.log(`Completed – result: ${seqState?.serializedOutput}`);
217221

218222
// 2) Data pipeline orchestration (fan-out/fan-in)
219223
console.log("\n=== Data Pipeline Orchestration ===");
220224
const pipelineId = await client.scheduleNewOrchestration(dataPipelineOrchestrator);
221225
console.log(`Scheduled: ${pipelineId}`);
222-
const pipelineState = await client.waitForOrchestrationCompletion(pipelineId, undefined, 60);
226+
const pipelineState = await client.waitForOrchestrationCompletion(pipelineId, true, 60);
223227
console.log(`Completed – result: ${pipelineState?.serializedOutput}`);
224228

225229
console.log("\n=== All orchestrations completed! ===");

examples/azure-managed/index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,17 @@ import { whenAll } from "@microsoft/durabletask-js/dist/task";
128128
// Start the worker
129129
logger.info("Starting worker...");
130130
await worker.start();
131+
// Allow the worker time to establish the gRPC stream with the scheduler.
132+
// worker.start() returns before the connection is fully established.
133+
await new Promise((resolve) => setTimeout(resolve, 2000));
131134
logger.info("Worker started successfully!");
132135

133136
// Run the sequence orchestrator
134137
logger.info("\n--- Running Sequence Orchestrator ---");
135138
const sequenceId = await client.scheduleNewOrchestration(sequenceOrchestrator);
136139
logger.info(`Orchestration scheduled with ID: ${sequenceId}`);
137140

138-
const sequenceState = await client.waitForOrchestrationCompletion(sequenceId, undefined, 60);
141+
const sequenceState = await client.waitForOrchestrationCompletion(sequenceId, true, 60);
139142
logger.info(`Sequence orchestration completed!`);
140143
logger.info(`Result: ${sequenceState?.serializedOutput}`);
141144

@@ -144,7 +147,7 @@ import { whenAll } from "@microsoft/durabletask-js/dist/task";
144147
const fanOutId = await client.scheduleNewOrchestration(fanOutFanInOrchestrator);
145148
logger.info(`Orchestration scheduled with ID: ${fanOutId}`);
146149

147-
const fanOutState = await client.waitForOrchestrationCompletion(fanOutId, undefined, 60);
150+
const fanOutState = await client.waitForOrchestrationCompletion(fanOutId, true, 60);
148151
logger.info(`Fan-out/fan-in orchestration completed!`);
149152
logger.info(`Result: ${fanOutState?.serializedOutput}`);
150153

packages/durabletask-js-azuremanaged/src/azure-logger-adapter.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// Licensed under the MIT License.
33

44
import { createClientLogger, AzureLogger } from "@azure/logger";
5-
import { Logger } from "@microsoft/durabletask-js";
5+
import { StructuredLogger, createLogEventHandler } from "@microsoft/durabletask-js";
66

77
/**
88
* Pre-configured logger adapter that uses the default "durabletask" namespace.
@@ -29,19 +29,22 @@ import { Logger } from "@microsoft/durabletask-js";
2929
* .build();
3030
* ```
3131
*/
32-
export const AzureLoggerAdapter: Logger = createAzureLogger();
32+
export const AzureLoggerAdapter: StructuredLogger = createAzureLogger();
3333

3434
/**
35-
* Creates a Logger instance that integrates with Azure SDK's logging infrastructure.
35+
* Creates a StructuredLogger instance that integrates with Azure SDK's logging infrastructure.
3636
*
3737
* The created logger uses `@azure/logger` under the hood, which means:
3838
* - Log output can be controlled via the `AZURE_LOG_LEVEL` environment variable
3939
* - Log output can be redirected using `setLogLevel()` from `@azure/logger`
4040
* - Logs are prefixed with the namespace for easy filtering
4141
*
42+
* When structured log events are emitted, the event ID and category are included in the
43+
* log message prefix for easy filtering and correlation.
44+
*
4245
* @param namespace - Optional sub-namespace to append to "durabletask".
4346
* For example, "client" results in "durabletask:client".
44-
* @returns A Logger instance configured for the specified namespace.
47+
* @returns A StructuredLogger instance configured for the specified namespace.
4548
*
4649
* @example
4750
* ```typescript
@@ -58,7 +61,7 @@ export const AzureLoggerAdapter: Logger = createAzureLogger();
5861
* // Logs will be prefixed with "durabletask"
5962
* ```
6063
*/
61-
export function createAzureLogger(namespace?: string): Logger {
64+
export function createAzureLogger(namespace?: string): StructuredLogger {
6265
const fullNamespace = namespace ? `durabletask:${namespace}` : "durabletask";
6366
const azureLogger: AzureLogger = createClientLogger(fullNamespace);
6467

@@ -75,5 +78,11 @@ export function createAzureLogger(namespace?: string): Logger {
7578
debug: (message: string, ...args: unknown[]): void => {
7679
azureLogger.verbose(message, ...args);
7780
},
81+
logEvent: createLogEventHandler({
82+
error: azureLogger.error.bind(azureLogger),
83+
warn: azureLogger.warning.bind(azureLogger),
84+
info: azureLogger.info.bind(azureLogger),
85+
debug: azureLogger.verbose.bind(azureLogger),
86+
}),
7887
};
7988
}

packages/durabletask-js/src/client/client.ts

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import { Logger, ConsoleLogger } from "../types/logger.type";
2929
import { StartOrchestrationOptions } from "../task/options";
3030
import { mapToRecord } from "../utils/tags.util";
3131
import { populateTagsMap } from "../utils/pb-helper.util";
32+
import * as ClientLogs from "./logs";
3233
import {
3334
startSpanForNewOrchestration,
3435
startSpanForEventRaisedFromClient,
@@ -228,7 +229,9 @@ export class TaskHubGrpcClient {
228229
// Create a tracing span for the new orchestration (if OTEL is available)
229230
const span = startSpanForNewOrchestration(req);
230231

231-
this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}${effectiveVersion ? ` (version: ${effectiveVersion})` : ""}`);
232+
const serializedInput = i.getValue() ?? "";
233+
const inputSizeInBytes = Buffer.byteLength(serializedInput, "utf8");
234+
ClientLogs.schedulingOrchestration(this._logger, req.getInstanceid(), name, inputSizeInBytes);
232235

233236
try {
234237
const res = await callWithMetadata<pb.CreateInstanceRequest, pb.CreateInstanceResponse>(
@@ -340,7 +343,7 @@ export class TaskHubGrpcClient {
340343
req.setInstanceid(instanceId);
341344
req.setGetinputsandoutputs(fetchPayloads);
342345

343-
this._logger.info(`Waiting ${timeout} seconds for instance ${instanceId} to complete...`);
346+
ClientLogs.waitingForInstanceCompletion(this._logger, instanceId);
344347

345348
const callPromise = callWithMetadata<pb.GetInstanceRequest, pb.GetInstanceResponse>(
346349
this._stub.waitForInstanceCompletion.bind(this._stub),
@@ -364,11 +367,11 @@ export class TaskHubGrpcClient {
364367

365368
if (state.runtimeStatus === OrchestrationStatus.FAILED && state.failureDetails) {
366369
details = state.failureDetails;
367-
this._logger.info(`Instance ${instanceId} failed: [${details.errorType}] ${details.message}`);
370+
ClientLogs.instanceFailed(this._logger, instanceId, details.errorType, details.message);
368371
} else if (state.runtimeStatus === OrchestrationStatus.TERMINATED) {
369-
this._logger.info(`Instance ${instanceId} was terminated`);
372+
ClientLogs.instanceTerminated(this._logger, instanceId);
370373
} else if (state.runtimeStatus === OrchestrationStatus.COMPLETED) {
371-
this._logger.info(`Instance ${instanceId} completed`);
374+
ClientLogs.instanceCompleted(this._logger, instanceId);
372375
}
373376

374377
return state;
@@ -397,7 +400,7 @@ export class TaskHubGrpcClient {
397400
// Create a tracing span for the raised event (if OTEL is available)
398401
const span = startSpanForEventRaisedFromClient(eventName, instanceId);
399402

400-
this._logger.info(`Raising event '${eventName}' for instance '${instanceId}'`);
403+
ClientLogs.raisingEvent(this._logger, instanceId, eventName);
401404

402405
try {
403406
await callWithMetadata<pb.RaiseEventRequest, pb.RaiseEventResponse>(
@@ -461,7 +464,7 @@ export class TaskHubGrpcClient {
461464
req.setOutput(i);
462465
req.setRecursive(recursive);
463466

464-
this._logger.info(`Terminating '${instanceId}'${recursive ? ' (recursive)' : ''}`);
467+
ClientLogs.terminatingInstance(this._logger, instanceId);
465468

466469
await callWithMetadata<pb.TerminateRequest, pb.TerminateResponse>(
467470
this._stub.terminateInstance.bind(this._stub),
@@ -474,7 +477,7 @@ export class TaskHubGrpcClient {
474477
const req = new pb.SuspendRequest();
475478
req.setInstanceid(instanceId);
476479

477-
this._logger.info(`Suspending '${instanceId}'`);
480+
ClientLogs.suspendingInstance(this._logger, instanceId);
478481

479482
await callWithMetadata<pb.SuspendRequest, pb.SuspendResponse>(
480483
this._stub.suspendInstance.bind(this._stub),
@@ -487,7 +490,7 @@ export class TaskHubGrpcClient {
487490
const req = new pb.ResumeRequest();
488491
req.setInstanceid(instanceId);
489492

490-
this._logger.info(`Resuming '${instanceId}'`);
493+
ClientLogs.resumingInstance(this._logger, instanceId);
491494

492495
await callWithMetadata<pb.ResumeRequest, pb.ResumeResponse>(
493496
this._stub.resumeInstance.bind(this._stub),
@@ -525,7 +528,7 @@ export class TaskHubGrpcClient {
525528
req.setReason(reasonValue);
526529
}
527530

528-
this._logger.info(`Rewinding '${instanceId}' with reason: ${reason}`);
531+
ClientLogs.rewindingInstance(this._logger, instanceId, reason);
529532

530533
try {
531534
await callWithMetadata<pb.RewindInstanceRequest, pb.RewindInstanceResponse>(
@@ -582,7 +585,7 @@ export class TaskHubGrpcClient {
582585
req.setInstanceid(instanceId);
583586
req.setRestartwithnewinstanceid(restartWithNewInstanceId);
584587

585-
this._logger.info(`Restarting '${instanceId}' with restartWithNewInstanceId=${restartWithNewInstanceId}`);
588+
ClientLogs.restartingInstance(this._logger, instanceId, restartWithNewInstanceId);
586589

587590
try {
588591
const res = await callWithMetadata<pb.RestartInstanceRequest, pb.RestartInstanceResponse>(
@@ -637,7 +640,7 @@ export class TaskHubGrpcClient {
637640
req.setInstanceid(instanceId);
638641
req.setRecursive(options?.recursive ?? false);
639642

640-
this._logger.info(`Purging Instance '${instanceId}'${options?.recursive ? ' (recursive)' : ''}`);
643+
ClientLogs.purgingInstanceMetadata(this._logger, instanceId);
641644

642645
res = await callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
643646
this._stub.purgeInstances.bind(this._stub),
@@ -668,7 +671,7 @@ export class TaskHubGrpcClient {
668671
req.setRecursive(options?.recursive ?? false);
669672
const timeout = purgeInstanceCriteria.getTimeout();
670673

671-
this._logger.info(`Purging Instances using purging criteria${options?.recursive ? " (recursive)" : ""}`);
674+
ClientLogs.purgingInstances(this._logger, createdTimeFrom, createdTimeTo, runtimeStatusList.map(String).join(", "));
672675

673676
const callPromise = callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
674677
this._stub.purgeInstances.bind(this._stub),

0 commit comments

Comments
 (0)