Skip to content

Commit a327fb6

Browse files
authored
Improved message logging (#385)
* improve message logging * linting * test fix * removed accidental linting fixes * removed accidental linting fixes * removed accidental linting fixes * tests fix * tests fix * linting * linting * minor types adjustment * fixed typo in field name * fixed return type * adjusted coverage threshold * updated kafka consumer event name * README.md update
1 parent b2850df commit a327fb6

16 files changed

Lines changed: 182 additions & 107 deletions

README.md

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ They implement the following public methods:
4040
* `policyConfig` - SQS only - configuration for queue access policies (see [SQS Policy Configuration](#sqs-policy-configuration) for more information);
4141
* `deletionConfig` - automatic cleanup of resources;
4242
* `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information);
43-
* `logMessages` - add logs for processed messages.
43+
* `logMessages` - add debug logs for processed messages. When enabled, logs structured metadata including message id, type, timestamps, and queue name. For privacy reasons, the full message payload is not logged by default. See [Message Logging](#message-logging) for more details.
44+
* `messageMetadataField` - which field in the message contains metadata for logging purposes (by default it is `metadata`).
4445
* `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading).
4546
* `messageDeduplicationConfig` - configuration for store-based message deduplication on publisher level. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication).
4647
* `enablePublisherDeduplication` - enable store-based publisher-level deduplication. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication).
@@ -110,7 +111,8 @@ Multi-schema consumers support multiple message types via handler configs. They
110111
* `consumerOverrides` – available only for SQS consumers;
111112
* `deadLetterQueue` - available only for SQS and SNS consumers (see [Dead Letter Queue](#dead-letter-queue) for more information);
112113
* `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information);
113-
* `logMessages` - add logs for processed messages.
114+
* `logMessages` - add debug logs for processed messages. When enabled, logs structured metadata including message id, type, timestamps, and queue name. For privacy reasons, the full message payload is not logged by default. To include custom message data in logs, configure `messageLogFormatter` on your handlers. See [Message Logging](#message-logging) for more details.
115+
* `messageMetadataField` - which field in the message contains metadata for logging purposes (by default it is `metadata`).
114116
* `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading).
115117
* `concurrentConsumersAmount` - configuration for specifying the number of concurrent consumers to create. Available only for SQS and SNS consumers
116118
* `messageDeduplicationConfig` - configuration for store-based message deduplication on consumer level. For more details on setting this up, see [Consumer-level store-based-message deduplication](#consumer-level-store-based-message-deduplication).
@@ -173,7 +175,9 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer<
173175
preHandlerBarrier: async (message) => {
174176
// do barrier check here
175177
return true
176-
}
178+
},
179+
// Optional: customize what message data is logged (see Message Logging section)
180+
messageLogFormatter: (message) => ({ id: message.id, type: message.type }),
177181
},
178182
)
179183
.addConfig(PERMISSIONS_REMOVE_MESSAGE_SCHEMA,
@@ -642,6 +646,56 @@ const result = await myConsumer.handlerSpy.waitForMessageWithId('1')
642646
expect(result.processingResult).toEqual({ status: 'consumed' })
643647
```
644648

649+
## Message Logging
650+
651+
When `logMessages` is enabled, processed messages are logged at the `debug` level with structured metadata. For privacy reasons, the full message payload is **not logged by default** to avoid exposing sensitive data.
652+
653+
### What is logged by default
654+
655+
Each log entry includes processed message metadata:
656+
- `messageId` - unique identifier of the message
657+
- `messageType` - type of the message
658+
- `queueName` - name of the queue or topic
659+
- `messageTimestamp` - when the message was originally sent
660+
- `messageProcessingStartTimestamp` - when processing started
661+
- `messageProcessingEndTimestamp` - when processing completed
662+
- `messageDeduplicationId` - deduplication id (if deduplication is enabled)
663+
- `messageMetadata` - contents of the metadata field (configurable via `messageMetadataField`)
664+
- `processingResult` - outcome of processing (e.g., `{ status: 'consumed' }` or `{ status: 'published' }`)
665+
666+
### Custom message logging with messageLogFormatter
667+
668+
If you need to include additional message data in logs, you can configure a `messageLogFormatter` on your handler. This formatter receives the message and returns the data to be logged:
669+
670+
```typescript
671+
new MessageHandlerConfigBuilder<SupportedMessages, ExecutionContext>()
672+
.addConfig(
673+
MY_MESSAGE_SCHEMA,
674+
async (message, context) => {
675+
// handler logic
676+
return { result: 'success' }
677+
},
678+
{
679+
// Only log specific fields, excluding sensitive data
680+
messageLogFormatter: (message) => ({
681+
id: message.id,
682+
type: message.type,
683+
// Exclude sensitive fields like email, password, etc.
684+
}),
685+
},
686+
)
687+
.build()
688+
```
689+
690+
When a `messageLogFormatter` is provided, its output is included in the log under the `message` key alongside the processed message metadata.
691+
692+
### Configuration options
693+
694+
| Option | Default | Description |
695+
|--------|---------|-------------|
696+
| `logMessages` | `false` | Enable debug logging for processed messages |
697+
| `messageMetadataField` | `'metadata'` | Field in the message containing metadata to include in logs |
698+
645699
## Payload Offloading
646700
Payload offloading allows you to manage large message payloads by storing them in external storage, bypassing any message size restrictions imposed by queue systems.
647701

@@ -773,6 +827,7 @@ It needs to implement the following methods:
773827
- `messageProcessingStartTimestamp` - the timestamp when the processing of the message started
774828
- `messageProcessingEndTimestamp` - the timestamp when the processing of the message finished
775829
- `messageDeduplicationId` - the deduplication id of the message, in case deduplication is enabled
830+
- `messageMetadata` - contents of the message metadata field (configurable via `messageMetadataField`)
776831

777832
See [@message-queue-toolkit/metrics](packages/metrics/README.md) for concrete implementations
778833

packages/amqp/lib/AbstractAmqpConsumer.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ import type {
66
ParseMessageResult,
77
PreHandlingOutputs,
88
Prehandler,
9+
ProcessedMessageMetadata,
910
QueueConsumer,
1011
QueueConsumerOptions,
1112
TransactionObservabilityManager,
1213
} from '@message-queue-toolkit/core'
1314
import { HandlerContainer, isMessageError, parseMessage } from '@message-queue-toolkit/core'
1415
import type { ChannelModel, Message } from 'amqplib'
15-
1616
import type {
1717
AMQPConsumerDependencies,
1818
AMQPQueueCreationConfig,
@@ -157,10 +157,6 @@ export abstract class AbstractAmqpConsumer<
157157
// @ts-expect-error
158158
const uniqueTransactionKey = parsedMessage[this.messageIdField]
159159
this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey)
160-
if (this.logMessages) {
161-
const resolvedLogMessage = this.resolveMessageLog(parsedMessage, messageType)
162-
this.logMessage(resolvedLogMessage)
163-
}
164160
this.internalProcessMessage(parsedMessage, messageType)
165161
.then((result) => {
166162
if (result.result === 'success') {
@@ -267,9 +263,17 @@ export abstract class AbstractAmqpConsumer<
267263
return this._messageSchemaContainer.resolveSchema(message)
268264
}
269265

270-
protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown {
271-
const handler = this.handlerContainer.resolveHandler(messageType)
272-
return handler.messageLogFormatter(message)
266+
protected override resolveMessageLog(
267+
processedMessageMetadata: ProcessedMessageMetadata<MessagePayloadType>,
268+
): unknown | null {
269+
if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) {
270+
return null
271+
}
272+
const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType)
273+
if (!handler.messageLogFormatter) {
274+
return null
275+
}
276+
return handler.messageLogFormatter(processedMessageMetadata.message)
273277
}
274278

275279
// eslint-disable-next-line max-params

packages/amqp/lib/AbstractAmqpPublisher.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,6 @@ export abstract class AbstractAmqpPublisher<
8888
return
8989
}
9090

91-
if (this.logMessages) {
92-
const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown'
93-
const resolvedLogMessage = this.resolveMessageLog(message, messageType)
94-
this.logMessage(resolvedLogMessage)
95-
}
96-
9791
message = this.updateInternalProperties(message)
9892

9993
try {

packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,22 +95,21 @@ describe('AmqpPermissionConsumer', () => {
9595

9696
await newConsumer.close()
9797

98-
expect(logger.loggedMessages.length).toBe(6)
98+
expect(logger.loggedMessages.length).toBe(4)
9999
expect(logger.loggedMessages).toMatchObject([
100100
'Propagating new connection across 0 receivers',
101-
{
102-
id: '1',
103-
messageType: 'add',
104-
},
105101
'timestamp not defined, adding it automatically',
106-
expect.any(Object),
107102
{
108-
id: '1',
109-
messageType: 'add',
110-
timestamp: expect.any(String),
103+
processedMessageMetadata: expect.objectContaining({
104+
processingResult: { status: 'published' },
105+
}),
111106
},
112107
{
113-
processedMessageMetadata: expect.any(String),
108+
processedMessageMetadata: expect.objectContaining({
109+
messageId: '1',
110+
messageType: 'add',
111+
processingResult: { status: 'consumed' },
112+
}),
114113
},
115114
])
116115
})
@@ -164,6 +163,7 @@ describe('AmqpPermissionConsumer', () => {
164163
id: '1',
165164
messageType: 'add',
166165
}),
166+
messageMetadata: undefined,
167167
},
168168
])
169169
})

packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,20 @@ describe('PermissionPublisher', () => {
4747
publisher.publish(message)
4848

4949
await waitAndRetry(() => {
50-
return logger.loggedMessages.length === 2
50+
return logger.loggedMessages.length === 3
5151
})
5252

53-
expect(logger.loggedMessages[1]).toEqual({
54-
id: '1',
55-
messageType: 'add',
56-
})
53+
expect(logger.loggedMessages).toMatchObject([
54+
'Propagating new connection across 0 receivers',
55+
'timestamp not defined, adding it automatically',
56+
{
57+
processedMessageMetadata: expect.objectContaining({
58+
messageId: '1',
59+
messageType: 'add',
60+
processingResult: { status: 'published' },
61+
}),
62+
},
63+
])
5764
})
5865
})
5966

packages/core/lib/queues/AbstractQueueService.ts

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ export abstract class AbstractQueueService<
114114
* Used to know the store-based message deduplication options
115115
*/
116116
protected readonly messageDeduplicationOptionsField: string
117+
/**
118+
* Used to know where metadata is stored - for debug logging purposes only
119+
*/
120+
protected readonly messageMetadataField: string
117121
protected readonly errorReporter: ErrorReporter
118122
public readonly logger: CommonLogger
119123
protected readonly messageIdField: string
@@ -157,6 +161,7 @@ export abstract class AbstractQueueService<
157161
this.messageDeduplicationIdField = options.messageDeduplicationIdField ?? 'deduplicationId'
158162
this.messageDeduplicationOptionsField =
159163
options.messageDeduplicationOptionsField ?? 'deduplicationOptions'
164+
this.messageMetadataField = options.messageMetadataField ?? 'metadata'
160165
this.creationConfig = options.creationConfig
161166
this.locatorConfig = options.locatorConfig
162167
this.deletionConfig = options.deletionConfig
@@ -239,15 +244,36 @@ export abstract class AbstractQueueService<
239244
/**
240245
* Format message for logging
241246
*/
242-
protected resolveMessageLog(message: MessagePayloadSchemas, _messageType: string): unknown {
243-
return message
247+
protected resolveMessageLog(
248+
_processedMessageMetadata: ProcessedMessageMetadata<MessagePayloadSchemas>,
249+
): unknown | null {
250+
return null
244251
}
245252

246-
/**
247-
* Log preformatted and potentially presanitized message payload
248-
*/
249-
protected logMessage(messageLogEntry: unknown) {
250-
this.logger.debug(messageLogEntry)
253+
protected logMessageProcessed(
254+
processedMessageMetadata: ProcessedMessageMetadata<MessagePayloadSchemas>,
255+
) {
256+
const processedMessageMetadataLog = {
257+
processingResult: processedMessageMetadata.processingResult,
258+
messageId: processedMessageMetadata.messageId,
259+
messageType: processedMessageMetadata.messageType,
260+
queueName: processedMessageMetadata.queueName,
261+
messageTimestamp: processedMessageMetadata.messageTimestamp,
262+
messageDeduplicationId: processedMessageMetadata.messageDeduplicationId,
263+
messageProcessingStartTimestamp: processedMessageMetadata.messageProcessingStartTimestamp,
264+
messageProcessingEndTimestamp: processedMessageMetadata.messageProcessingEndTimestamp,
265+
messageMetadata: stringValueSerializer(processedMessageMetadata.messageMetadata),
266+
}
267+
268+
const resolvedMessageLog = this.resolveMessageLog(processedMessageMetadata)
269+
270+
this.logger.debug(
271+
{
272+
processedMessageMetadata: processedMessageMetadataLog,
273+
...(resolvedMessageLog ? { message: resolvedMessageLog } : {}),
274+
},
275+
`Finished processing message ${processedMessageMetadata.messageId}`,
276+
)
251277
}
252278

253279
protected handleError(err: unknown, context?: Record<string, unknown>) {
@@ -284,8 +310,8 @@ export abstract class AbstractQueueService<
284310
messageType,
285311
)
286312

287-
const debugLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug')
288-
if (!debugLoggingEnabled && !this.messageMetricsManager) return
313+
const debugMessageLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug')
314+
if (!debugMessageLoggingEnabled && !this.messageMetricsManager) return
289315

290316
const processedMessageMetadata = this.resolveProcessedMessageMetadata(
291317
message,
@@ -295,11 +321,8 @@ export abstract class AbstractQueueService<
295321
params.queueName,
296322
messageId,
297323
)
298-
if (debugLoggingEnabled) {
299-
this.logger.debug(
300-
{ processedMessageMetadata: stringValueSerializer(processedMessageMetadata) },
301-
`Finished processing message ${processedMessageMetadata.messageId}`,
302-
)
324+
if (debugMessageLoggingEnabled) {
325+
this.logMessageProcessed(processedMessageMetadata)
303326
}
304327
if (this.messageMetricsManager) {
305328
this.messageMetricsManager.registerProcessedMessage(processedMessageMetadata)
@@ -321,8 +344,13 @@ export abstract class AbstractQueueService<
321344
const messageType = message ? this.resolveMessageTypeFromMessage(message) : undefined
322345
const messageDeduplicationId =
323346
message && this.messageDeduplicationIdField in message
324-
? // @ts-ignore
325-
message[this.messageDeduplicationId]
347+
? // @ts-expect-error
348+
message[this.messageDeduplicationIdField]
349+
: undefined
350+
const messageMetadata =
351+
message && this.messageMetadataField in message
352+
? // @ts-expect-error
353+
message[this.messageMetadataField]
326354
: undefined
327355

328356
return {
@@ -335,6 +363,7 @@ export abstract class AbstractQueueService<
335363
messageDeduplicationId,
336364
messageProcessingStartTimestamp,
337365
messageProcessingEndTimestamp,
366+
messageMetadata,
338367
}
339368
}
340369

packages/core/lib/queues/HandlerContainer.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ export type Prehandler<MessagePayloadSchema extends object, ExecutionContext, Pr
5555
next: (result: PrehandlerResult) => void,
5656
) => void
5757

58-
export const defaultLogFormatter = <MessagePayloadSchema>(message: MessagePayloadSchema) => message
59-
6058
export type HandlerConfigOptions<
6159
MessagePayloadSchema extends object,
6260
ExecutionContext,
@@ -98,7 +96,7 @@ export class MessageHandlerConfig<
9896
PrehandlerOutput,
9997
BarrierOutput
10098
>
101-
public readonly messageLogFormatter: LogFormatter<MessagePayloadSchema>
99+
public readonly messageLogFormatter?: LogFormatter<MessagePayloadSchema>
102100
public readonly preHandlerBarrier?: BarrierCallback<
103101
MessagePayloadSchema,
104102
ExecutionContext,
@@ -126,7 +124,7 @@ export class MessageHandlerConfig<
126124
this.definition = eventDefinition
127125
this.messageType = options?.messageType
128126
this.handler = handler
129-
this.messageLogFormatter = options?.messageLogFormatter ?? defaultLogFormatter
127+
this.messageLogFormatter = options?.messageLogFormatter
130128
this.preHandlerBarrier = options?.preHandlerBarrier
131129
this.preHandlers = options?.preHandlers ?? []
132130
}

packages/core/lib/types/queueOptionsTypes.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ export type ProcessedMessageMetadata<MessagePayloadSchemas extends object = obje
6262
* ID used for the message deduplication, in case it's enabled
6363
*/
6464
messageDeduplicationId?: string
65+
66+
/**
67+
* Message metadata (see ConsumerMessageMetadataType)
68+
*/
69+
messageMetadata?: Record<string, unknown>
6570
}
6671

6772
export interface MessageMetricsManager<MessagePayloadSchemas extends object = object> {
@@ -113,6 +118,7 @@ export type CommonQueueOptions = {
113118
messageTimestampField?: string
114119
messageDeduplicationIdField?: string
115120
messageDeduplicationOptionsField?: string
121+
messageMetadataField?: string
116122
handlerSpy?: HandlerSpy<object> | HandlerSpyParams | boolean
117123
logMessages?: boolean
118124
deletionConfig?: DeletionConfig

0 commit comments

Comments
 (0)