From 40fa6491fb46fd7a61caae0b2915e66ca6815665 Mon Sep 17 00:00:00 2001 From: KoshaevEugeny <103786108+akulistus@users.noreply.github.com> Date: Wed, 11 Mar 2026 21:34:49 +0300 Subject: [PATCH 1/6] feat(workers): catch-unhandled-exceptions (#532) * feat(workers): add decorator to catch unhandled exceptions and report them to Hawk with worker type context * fix lint * Revert "fix lint" This reverts commit 7a7c0a94b3cc95b1e97af1b33b9d899935631cf3. * Revert "feat(workers): add decorator to catch unhandled exceptions and report them to Hawk with worker type context" This reverts commit ebb1fd74a0ee627ba903fc763760ca0361739283. * refactor(runner): add active worker names to HawkCatcher context --- runner.ts | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/runner.ts b/runner.ts index 443beea8..5dd05b1b 100644 --- a/runner.ts +++ b/runner.ts @@ -12,10 +12,6 @@ import * as dotenv from 'dotenv'; dotenv.config(); -if (process.env.HAWK_CATCHER_TOKEN) { - HawkCatcher.init(process.env.HAWK_CATCHER_TOKEN); -} - type WorkerConstructor = new () => Worker; const BEGINNING_OF_ARGS = 2; @@ -27,6 +23,18 @@ const BEGINNING_OF_ARGS = 2; */ const workerNames = process.argv.slice(BEGINNING_OF_ARGS); +/** + * Initialize HawkCatcher +*/ +if (process.env.HAWK_CATCHER_TOKEN) { + HawkCatcher.init({ + token: process.env.HAWK_CATCHER_TOKEN, + context: { + workerTypes: workerNames.join(","), + } + }); +} + /** * Workers dispatcher. * Load, run and finish workers. From d1c7af4d724dbe3ae62906282c877f88b4ec0cf2 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Tue, 7 Apr 2026 16:27:33 +0300 Subject: [PATCH 2/6] fix(): redis never throws key does not exist (#539) --- workers/grouper/src/redisHelper.ts | 32 ++++++++++-------------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index ec486ddb..57a35543 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -193,17 +193,11 @@ export default class RedisHelper { ): Promise { const timestamp = Date.now(); - try { - await this.tsIncrBy(key, value, timestamp, labels); - } catch (error) { - if (error instanceof Error && error.message.includes('TSDB: key does not exist')) { - this.logger.warn(`TS key ${key} does not exist, creating it...`); - await this.tsCreateIfNotExists(key, labels, retentionMs); - await this.tsIncrBy(key, value, timestamp, labels); - } else { - throw error; - } - } + /** + * Create key if not exists — then call increment + */ + await this.tsCreateIfNotExists(key, labels, retentionMs); + await this.tsIncrBy(key, value, timestamp, labels); } /** @@ -252,17 +246,11 @@ export default class RedisHelper { ): Promise { const timestamp = Date.now(); - try { - await this.tsAdd(key, value, timestamp, labels); - } catch (error) { - if (error instanceof Error && error.message.includes('TSDB: key does not exist')) { - this.logger.warn(`TS key ${key} does not exist, creating it...`); - await this.tsCreateIfNotExists(key, labels, retentionMs); - await this.tsAdd(key, value, timestamp, labels); - } else { - throw error; - } - } + /** + * Create key if not exists — then call increment + */ + await this.tsCreateIfNotExists(key, labels, retentionMs); + await this.tsAdd(key, value, timestamp, labels); } /** From 152362b09cb3091899486d92e9587d6b072326d8 Mon Sep 17 00:00:00 2001 From: Pavel Zotikov Date: Tue, 14 Apr 2026 21:18:35 +0300 Subject: [PATCH 3/6] fix(metrics): round Redis TimeSeries timestamps to bucket boundaries (#542) * fix(metrics): round Redis TimeSeries timestamps to bucket boundaries * refactor(metrics): extract bucketTimestampMs to utils and add unit tests * fix(metrics): replace magic number 90 with named constant --- workers/grouper/src/index.ts | 43 +++++------- workers/grouper/src/redisHelper.ts | 6 +- workers/grouper/src/utils/bucketTimestamp.ts | 17 +++++ workers/grouper/tests/bucketTimestamp.test.ts | 65 +++++++++++++++++++ workers/grouper/tests/index.test.ts | 9 ++- 5 files changed, 107 insertions(+), 33 deletions(-) create mode 100644 workers/grouper/src/utils/bucketTimestamp.ts create mode 100644 workers/grouper/tests/bucketTimestamp.test.ts diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index f016c850..ea74084b 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -23,6 +23,7 @@ import TimeMs from '../../../lib/utils/time'; import DataFilter from './data-filter'; import RedisHelper from './redisHelper'; import { computeDelta } from './utils/repetitionDiff'; +import { bucketTimestampMs } from './utils/bucketTimestamp'; import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; @@ -48,6 +49,11 @@ const CACHE_CLEANUP_INTERVAL_SECONDS = 30; */ const DB_DUPLICATE_KEY_ERROR = '11000'; +/** + * Retention period for daily Redis TimeSeries metrics in days + */ +const DAILY_METRICS_RETENTION_DAYS = 90; + /** * Maximum length for backtrace code line or title */ @@ -343,37 +349,18 @@ export default class GrouperWorker extends Worker { }; const series = [ - { - key: minutelyKey, - label: 'minutely', - retentionMs: TimeMs.DAY, - }, - { - key: hourlyKey, - label: 'hourly', - retentionMs: TimeMs.WEEK, - }, - { - key: dailyKey, - label: 'daily', - // eslint-disable-next-line @typescript-eslint/no-magic-numbers - retentionMs: 90 * TimeMs.DAY, - }, + { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY, timestampMs: bucketTimestampMs('minutely') }, + { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK, timestampMs: bucketTimestampMs('hourly') }, + { key: dailyKey, label: 'daily', retentionMs: DAILY_METRICS_RETENTION_DAYS * TimeMs.DAY, timestampMs: bucketTimestampMs('daily') }, ]; - const operations = series.map(({ key, label, retentionMs }) => ({ - label, - promise: this.redis.safeTsAdd(key, 1, labels, retentionMs), - })); - - const results = await Promise.allSettled(operations.map((op) => op.promise)); - - results.forEach((result, index) => { - if (result.status === 'rejected') { - const { label } = operations[index]; - this.logger.error(`Failed to add ${label} TS for ${metricType}`, result.reason); + for (const { key, label, retentionMs, timestampMs } of series) { + try { + await this.redis.safeTsAdd(key, 1, labels, retentionMs, timestampMs); + } catch (error) { + this.logger.error(`Failed to add ${label} TS for ${metricType}`, error); } - }); + } } /** diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index 57a35543..242b74f8 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -237,14 +237,16 @@ export default class RedisHelper { * @param value - value to add * @param labels - labels to attach to the time series * @param retentionMs - optional retention in milliseconds + * @param timestampMs - timestamp in milliseconds; defaults to current time */ public async safeTsAdd( key: string, value: number, labels: Record, - retentionMs = 0 + retentionMs = 0, + timestampMs = 0 ): Promise { - const timestamp = Date.now(); + const timestamp = timestampMs === 0 ? Date.now() : timestampMs; /** * Create key if not exists — then call increment diff --git a/workers/grouper/src/utils/bucketTimestamp.ts b/workers/grouper/src/utils/bucketTimestamp.ts new file mode 100644 index 00000000..29dbd8aa --- /dev/null +++ b/workers/grouper/src/utils/bucketTimestamp.ts @@ -0,0 +1,17 @@ +import TimeMs from '../../../../lib/utils/time'; + +/** + * Returns the current time truncated to the start of the given granularity + * bucket in milliseconds (UTC). All events within the same bucket share one + * timestamp so ON_DUPLICATE SUM accumulates them into a single sample. + * + * @param granularity - time granularity level + * @param now - current timestamp in ms, defaults to Date.now() + */ +export function bucketTimestampMs(granularity: 'minutely' | 'hourly' | 'daily', now = Date.now()): number { + switch (granularity) { + case 'hourly': return now - (now % TimeMs.HOUR); + case 'daily': return now - (now % TimeMs.DAY); + default: return now - (now % TimeMs.MINUTE); // minutely + } +} diff --git a/workers/grouper/tests/bucketTimestamp.test.ts b/workers/grouper/tests/bucketTimestamp.test.ts new file mode 100644 index 00000000..53ebac45 --- /dev/null +++ b/workers/grouper/tests/bucketTimestamp.test.ts @@ -0,0 +1,65 @@ +import '../../../env-test'; +import { bucketTimestampMs } from '../src/utils/bucketTimestamp'; + +describe('bucketTimestampMs', () => { + /** + * 2026-04-14T15:37:42.500Z + * minute start: 2026-04-14T15:37:00.000Z + * hour start: 2026-04-14T15:00:00.000Z + * day start: 2026-04-14T00:00:00.000Z + */ + const now = new Date('2026-04-14T15:37:42.500Z').getTime(); + + it('truncates to the start of the current minute', () => { + const expected = new Date('2026-04-14T15:37:00.000Z').getTime(); + + expect(bucketTimestampMs('minutely', now)).toBe(expected); + }); + + it('truncates to the start of the current hour', () => { + const expected = new Date('2026-04-14T15:00:00.000Z').getTime(); + + expect(bucketTimestampMs('hourly', now)).toBe(expected); + }); + + it('truncates to the start of the current day (UTC midnight)', () => { + const expected = new Date('2026-04-14T00:00:00.000Z').getTime(); + + expect(bucketTimestampMs('daily', now)).toBe(expected); + }); + + it('returns the same value for two calls within the same minute', () => { + const t1 = new Date('2026-04-14T15:37:00.000Z').getTime(); + const t2 = new Date('2026-04-14T15:37:59.999Z').getTime(); + + expect(bucketTimestampMs('minutely', t1)).toBe(bucketTimestampMs('minutely', t2)); + }); + + it('returns different values for two calls in different minutes', () => { + const t1 = new Date('2026-04-14T15:37:59.999Z').getTime(); + const t2 = new Date('2026-04-14T15:38:00.000Z').getTime(); + + expect(bucketTimestampMs('minutely', t1)).not.toBe(bucketTimestampMs('minutely', t2)); + }); + + it('returns the same value for two calls within the same hour', () => { + const t1 = new Date('2026-04-14T15:00:00.000Z').getTime(); + const t2 = new Date('2026-04-14T15:59:59.999Z').getTime(); + + expect(bucketTimestampMs('hourly', t1)).toBe(bucketTimestampMs('hourly', t2)); + }); + + it('returns the same value for two calls within the same day', () => { + const t1 = new Date('2026-04-14T00:00:00.000Z').getTime(); + const t2 = new Date('2026-04-14T23:59:59.999Z').getTime(); + + expect(bucketTimestampMs('daily', t1)).toBe(bucketTimestampMs('daily', t2)); + }); + + it('returns different values for two calls on different days', () => { + const t1 = new Date('2026-04-14T23:59:59.999Z').getTime(); + const t2 = new Date('2026-04-15T00:00:00.000Z').getTime(); + + expect(bucketTimestampMs('daily', t1)).not.toBe(bucketTimestampMs('daily', t2)); + }); +}); diff --git a/workers/grouper/tests/index.test.ts b/workers/grouper/tests/index.test.ts index 6ad01812..c75351a9 100644 --- a/workers/grouper/tests/index.test.ts +++ b/workers/grouper/tests/index.test.ts @@ -763,21 +763,24 @@ describe('GrouperWorker', () => { `ts:project-events-accepted:${projectIdMock}:minutely`, 1, expectedLabels, - TimeMs.DAY + TimeMs.DAY, + expect.any(Number), ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 2, `ts:project-events-accepted:${projectIdMock}:hourly`, 1, expectedLabels, - TimeMs.WEEK + TimeMs.WEEK, + expect.any(Number), ); expect(safeTsAddSpy).toHaveBeenNthCalledWith( 3, `ts:project-events-accepted:${projectIdMock}:daily`, 1, expectedLabels, - 90 * TimeMs.DAY + 90 * TimeMs.DAY, + expect.any(Number), ); } finally { safeTsAddSpy.mockRestore(); From 3092d391931654e57d0fe8767b7b2fe6d29a8572 Mon Sep 17 00:00:00 2001 From: KoshaevEugeny <103786108+akulistus@users.noreply.github.com> Date: Wed, 15 Apr 2026 18:37:19 +0300 Subject: [PATCH 4/6] feat(sentry): include message into title (#541) * feat(sentry): include message into title * fix(sentry): message is included in title if exception is missing --- workers/sentry/src/utils/converter.ts | 8 +++++++- workers/sentry/tests/converter.test.ts | 22 ++++++++++++++++++++++ workers/sentry/tests/index.test.ts | 2 +- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/workers/sentry/src/utils/converter.ts b/workers/sentry/src/utils/converter.ts index 916f158c..ba7f6eb6 100644 --- a/workers/sentry/src/utils/converter.ts +++ b/workers/sentry/src/utils/converter.ts @@ -54,7 +54,13 @@ function flattenObject(obj: unknown, prefix = ''): string[] { * @param eventPayload - Sentry event payload */ export function composeTitle(eventPayload: SentryEvent): string { - return `${eventPayload.exception?.values?.[0]?.type || 'Unknown'}: ${eventPayload.exception?.values?.[0]?.value || ''}`; + const exception = eventPayload.exception?.values?.[0]; + + if (exception) { + return `${exception.type || 'Unknown'}: ${exception.value || ''}`; + } + + return eventPayload.message || 'Unknown: '; } /** diff --git a/workers/sentry/tests/converter.test.ts b/workers/sentry/tests/converter.test.ts index 7c630c69..17517150 100644 --- a/workers/sentry/tests/converter.test.ts +++ b/workers/sentry/tests/converter.test.ts @@ -21,6 +21,28 @@ describe('converter utils', () => { expect(composeTitle(event)).toBe('Unknown: '); }); + + it('should compose title from message if exception is missing', () => { + const event: SentryEvent = { + message: 'message' + }; + + expect(composeTitle(event)).toBe('message'); + }); + + it('should compose title from exception type and value even if message is present', () => { + const event: SentryEvent = { + exception: { + values: [ { + type: 'Error', + value: 'Something went wrong', + } ], + }, + message: 'message' + }; + + expect(composeTitle(event)).toBe('Error: Something went wrong'); + }); }); describe('composeBacktrace()', () => { diff --git a/workers/sentry/tests/index.test.ts b/workers/sentry/tests/index.test.ts index 7d964c1b..6e8f34e2 100644 --- a/workers/sentry/tests/index.test.ts +++ b/workers/sentry/tests/index.test.ts @@ -304,7 +304,7 @@ describe('SentryEventWorker', () => { }, }, catcherVersion: '1.0.1', - title: 'Unknown: ', + title: 'Test timestamp', type: 'error', }, })); From ab8db60b21b970cef1f571870e905a92ba0c838f Mon Sep 17 00:00:00 2001 From: Dobrunia Kostrigin <48620984+Dobrunia@users.noreply.github.com> Date: Thu, 16 Apr 2026 23:40:58 +0300 Subject: [PATCH 5/6] fix: include repetitionId in notification event URLs for Telegram, Slack, and Loop (#544) --- package.json | 2 +- workers/loop/src/templates/event.ts | 3 +- workers/loop/tests/provider.test.ts | 48 ++++++++++++++++++++++++ workers/slack/src/templates/event.ts | 2 +- workers/slack/src/templates/utils.ts | 7 +++- workers/slack/tests/utils.test.ts | 40 ++++++++++++++++++++ workers/telegram/src/templates/event.ts | 3 +- workers/telegram/tests/provider.test.ts | 49 +++++++++++++++++++++++++ 8 files changed, 148 insertions(+), 6 deletions(-) create mode 100644 workers/slack/tests/utils.test.ts diff --git a/package.json b/package.json index 175b7bc5..f2e1e154 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "hawk.workers", "private": true, - "version": "0.1.3", + "version": "0.1.4", "description": "Hawk workers", "repository": "git@github.com:codex-team/hawk.workers.git", "license": "BUSL-1.1", diff --git a/workers/loop/src/templates/event.ts b/workers/loop/src/templates/event.ts index bd1f9ced..afc6622e 100644 --- a/workers/loop/src/templates/event.ts +++ b/workers/loop/src/templates/event.ts @@ -39,7 +39,8 @@ function renderBacktrace(event: GroupedEventDBScheme): string { export default function render(tplData: EventsTemplateVariables): string { const eventInfo = tplData.events[0] as TemplateEventData; const event = eventInfo.event; - const eventURL = tplData.host + '/project/' + tplData.project._id + '/event/' + event._id + '/'; + const repetitionId = eventInfo.repetitionId; + const eventURL = tplData.host + '/project/' + tplData.project._id + '/event/' + event._id + '/' + (repetitionId ? repetitionId + '/overview' : ''); let location = 'Неизвестное место'; if (event.payload.backtrace && event.payload.backtrace.length > 0) { diff --git a/workers/loop/tests/provider.test.ts b/workers/loop/tests/provider.test.ts index b8185928..955ad491 100644 --- a/workers/loop/tests/provider.test.ts +++ b/workers/loop/tests/provider.test.ts @@ -137,6 +137,54 @@ describe('LoopProvider', () => { expect(message).toBeDefined(); }); + /** + * Event URL should include repetitionId when provided + */ + describe('event URL contains correct repetitionId', () => { + const eventId = new ObjectId('5d206f7f9aaf7c0071d64597'); + const projectId = new ObjectId('5d206f7f9aaf7c0071d64596'); + const host = 'https://garage.hawk.so'; + + const basePayload = { + events: [ { + event: { + _id: eventId, + totalCount: 1, + timestamp: Date.now(), + payload: { title: 'Err', backtrace: [] }, + } as DecodedGroupedEvent, + daysRepeated: 1, + newCount: 1, + } ], + period: 60, + host, + hostOfStatic: '', + project: { + _id: projectId, + token: 'tok', + name: 'P', + workspaceId: projectId, + uidAdded: projectId, + notifications: [], + } as ProjectDBScheme, + }; + + it('should include repetitionId and /overview in URL when repetitionId is set', () => { + const repetitionId = '5d206f7f9aaf7c0071d64599'; + const payload = { ...basePayload, events: [ { ...basePayload.events[0], repetitionId } ] }; + const message = templates.EventTpl(payload); + + expect(message).toContain(`/event/${eventId}/${repetitionId}/overview`); + }); + + it('should omit repetitionId from URL when repetitionId is not set', () => { + const message = templates.EventTpl(basePayload); + + expect(message).toContain(`/event/${eventId}/`); + expect(message).not.toContain('/overview'); + }); + }); + /** * Check that rendering of a several events message works without errors */ diff --git a/workers/slack/src/templates/event.ts b/workers/slack/src/templates/event.ts index d11fa66f..00833bce 100644 --- a/workers/slack/src/templates/event.ts +++ b/workers/slack/src/templates/event.ts @@ -45,7 +45,7 @@ function renderBacktrace(event: GroupedEventDBScheme): string { export default function render(tplData: EventsTemplateVariables): IncomingWebhookSendArguments { const eventInfo = tplData.events[0] as TemplateEventData; const event = eventInfo.event; - const eventURL = getEventUrl(tplData.host, tplData.project, event); + const eventURL = getEventUrl(tplData.host, tplData.project, event, eventInfo.repetitionId); const location = getEventLocation(event); const blocks = [ diff --git a/workers/slack/src/templates/utils.ts b/workers/slack/src/templates/utils.ts index cc0af00d..9b14e586 100644 --- a/workers/slack/src/templates/utils.ts +++ b/workers/slack/src/templates/utils.ts @@ -32,9 +32,12 @@ export function getEventLocation(event: DecodedGroupedEvent): string { * @param host - garage host. Also, can be accessed from process.env.GARAGE_URL * @param project - parent project * @param event - event to compose its URL + * @param repetitionId - id of the specific repetition that triggered the notification */ -export function getEventUrl(host: string, project: ProjectDBScheme, event: GroupedEventDBScheme): string { - return host + '/project/' + project._id + '/event/' + event._id + '/'; +export function getEventUrl(host: string, project: ProjectDBScheme, event: GroupedEventDBScheme, repetitionId?: string | null): string { + const base = host + '/project/' + project._id + '/event/' + event._id + '/'; + + return repetitionId ? base + repetitionId + '/overview' : base; } /** diff --git a/workers/slack/tests/utils.test.ts b/workers/slack/tests/utils.test.ts new file mode 100644 index 00000000..7c67f75d --- /dev/null +++ b/workers/slack/tests/utils.test.ts @@ -0,0 +1,40 @@ +import { ObjectId } from 'mongodb'; +import { ProjectDBScheme, GroupedEventDBScheme } from '@hawk.so/types'; +import { getEventUrl } from '../src/templates/utils'; + +const project = { + _id: new ObjectId('5d206f7f9aaf7c0071d64596'), + token: 'project-token', + name: 'Project', + workspaceId: new ObjectId('5d206f7f9aaf7c0071d64596'), + uidAdded: new ObjectId('5d206f7f9aaf7c0071d64596'), + notifications: [], +} as ProjectDBScheme; + +const event = { + _id: new ObjectId('5d206f7f9aaf7c0071d64597'), + payload: { title: 'Error' }, +} as unknown as GroupedEventDBScheme; + +const host = 'https://garage.hawk.so'; + +describe('getEventUrl', () => { + it('should return base URL with trailing slash when no repetitionId', () => { + const url = getEventUrl(host, project, event); + + expect(url).toBe(`${host}/project/${project._id}/event/${event._id}/`); + }); + + it('should return base URL with trailing slash when repetitionId is null', () => { + const url = getEventUrl(host, project, event, null); + + expect(url).toBe(`${host}/project/${project._id}/event/${event._id}/`); + }); + + it('should append repetitionId and /overview when repetitionId is provided', () => { + const repetitionId = '5d206f7f9aaf7c0071d64599'; + const url = getEventUrl(host, project, event, repetitionId); + + expect(url).toBe(`${host}/project/${project._id}/event/${event._id}/${repetitionId}/overview`); + }); +}); diff --git a/workers/telegram/src/templates/event.ts b/workers/telegram/src/templates/event.ts index c8676c00..46da0cba 100644 --- a/workers/telegram/src/templates/event.ts +++ b/workers/telegram/src/templates/event.ts @@ -8,7 +8,8 @@ import type { EventsTemplateVariables, TemplateEventData } from 'hawk-worker-sen export default function render(tplData: EventsTemplateVariables): string { const eventInfo = tplData.events[0] as TemplateEventData; const event = eventInfo.event; - const eventURL = tplData.host + '/project/' + tplData.project._id + '/event/' + event._id + '/'; + const repetitionId = eventInfo.repetitionId; + const eventURL = tplData.host + '/project/' + tplData.project._id + '/event/' + event._id + '/' + (repetitionId ? repetitionId + '/overview' : ''); let location = ''; if (event.payload.backtrace && event.payload.backtrace.length > 0) { diff --git a/workers/telegram/tests/provider.test.ts b/workers/telegram/tests/provider.test.ts index 7cb4ca4b..51ba45c4 100644 --- a/workers/telegram/tests/provider.test.ts +++ b/workers/telegram/tests/provider.test.ts @@ -2,6 +2,7 @@ import { EventNotification, SeveralEventsNotification } from 'hawk-worker-sender import { DecodedGroupedEvent, ProjectDBScheme } from '@hawk.so/types'; import TelegramProvider from 'hawk-worker-telegram/src/provider'; import templates from '../src/templates'; +import EventTpl from '../src/templates/event'; import { ObjectId } from 'mongodb'; /** @@ -64,6 +65,54 @@ describe('TelegramProvider', () => { expect(message).toBeDefined(); }); + /** + * Event URL should include repetitionId when provided + */ + describe('event URL contains correct repetitionId', () => { + const eventId = new ObjectId('5d206f7f9aaf7c0071d64597'); + const projectId = new ObjectId('5d206f7f9aaf7c0071d64596'); + const host = 'https://garage.hawk.so'; + + const basePayload = { + events: [ { + event: { + _id: eventId, + totalCount: 1, + timestamp: Date.now(), + payload: { title: 'Err', backtrace: [] }, + } as DecodedGroupedEvent, + daysRepeated: 1, + newCount: 1, + } ], + period: 60, + host, + hostOfStatic: '', + project: { + _id: projectId, + token: 'tok', + name: 'P', + workspaceId: projectId, + uidAdded: projectId, + notifications: [], + } as ProjectDBScheme, + }; + + it('should include repetitionId and /overview in URL when repetitionId is set', () => { + const repetitionId = '5d206f7f9aaf7c0071d64599'; + const payload = { ...basePayload, events: [ { ...basePayload.events[0], repetitionId } ] }; + const message = EventTpl(payload); + + expect(message).toContain(`/event/${eventId}/${repetitionId}/overview`); + }); + + it('should omit repetitionId from URL when repetitionId is not set', () => { + const message = EventTpl(basePayload); + + expect(message).toContain(`/event/${eventId}/`); + expect(message).not.toContain('/overview'); + }); + }); + /** * Check that rendering of a several events message works without errors */ From bea2469a4e58dda970976c2d495323fa5e91435b Mon Sep 17 00:00:00 2001 From: Kuchizu <70284260+Kuchizu@users.noreply.github.com> Date: Tue, 28 Apr 2026 17:41:28 +0300 Subject: [PATCH 6/6] feat(grouper): add Prometheus metrics (#520) * feat(grouper): add Prometheus metrics * feat(grouper): OOM-debug logging Add MongoDB, payload and delta size metrics with OOM-debug logging * fix(grouper): handle undefined delta * feat(grouper): add memory leak diagnostics logs * fix(metrics): validate push interval, add push cleanup, and prevent retry double-counting in grouper * fix(grouper-metrics): (docs, log context, pushgateway options, duplicate-retry test * fix(grouper): resolve PR conflict & dupkey retry & getEvent fix * chore(grouper): remove duplicate-key retry loop and clarify memory config units * test(grouper): remove duplicate-key race regression case --------- Co-authored-by: Peter --- .env.sample | 12 +- lib/metrics.ts | 97 +++++++ runner.ts | 108 +++---- workers/grouper/src/index.ts | 263 ++++++++++++------ workers/grouper/src/metrics/config.ts | 72 +++++ workers/grouper/src/metrics/grouperMetrics.ts | 165 +++++++++++ workers/grouper/src/metrics/memoryMonitor.ts | 188 +++++++++++++ workers/grouper/src/redisHelper.ts | 6 +- workers/grouper/src/utils/bucketTimestamp.ts | 4 +- 9 files changed, 747 insertions(+), 168 deletions(-) create mode 100644 lib/metrics.ts create mode 100644 workers/grouper/src/metrics/config.ts create mode 100644 workers/grouper/src/metrics/grouperMetrics.ts create mode 100644 workers/grouper/src/metrics/memoryMonitor.ts diff --git a/.env.sample b/.env.sample index f8e8b6ef..f5c19c7a 100644 --- a/.env.sample +++ b/.env.sample @@ -31,6 +31,16 @@ PROMETHEUS_PUSHGATEWAY_URL= # pushgateway push interval in ms PROMETHEUS_PUSHGATEWAY_INTERVAL=10000 +# Grouper memory log controls +# Number of handled tasks between memory checkpoint logs +GROUPER_MEMORY_LOG_EVERY_TASKS=50 +# Number of handled tasks in one sustained-growth evaluation window +GROUPER_MEMORY_GROWTH_WINDOW_TASKS=200 +# Sustained-growth warning threshold in megabytes (MB) +GROUPER_MEMORY_GROWTH_WARN_MB=64 +# Single-handle growth warning threshold in megabytes (MB) +GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB=16 + # project token for error catching HAWK_CATCHER_TOKEN= @@ -40,4 +50,4 @@ HAWK_CATCHER_TOKEN= IS_NOTIFIER_WORKER_ENABLED=false ## Url for telegram notifications about workspace blocks and unblocks -TELEGRAM_LIMITER_CHAT_URL= \ No newline at end of file +TELEGRAM_LIMITER_CHAT_URL= diff --git a/lib/metrics.ts b/lib/metrics.ts new file mode 100644 index 00000000..e26f208f --- /dev/null +++ b/lib/metrics.ts @@ -0,0 +1,97 @@ +import * as client from 'prom-client'; +import os from 'os'; +import { nanoid } from 'nanoid'; +import createLogger from './logger'; + +const register = new client.Registry(); +const logger = createLogger(); + +const DEFAULT_PUSH_INTERVAL_MS = 10_000; +const ID_SIZE = 5; +const METRICS_JOB_NAME = 'workers'; + +let pushInterval: NodeJS.Timeout | null = null; +let currentWorkerName = ''; + +client.collectDefaultMetrics({ register }); + +export { register, client }; + +/** + * Parse push interval from environment. + */ +function getPushIntervalMs(): number { + const rawInterval = process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL; + const parsedInterval = rawInterval === undefined + ? DEFAULT_PUSH_INTERVAL_MS + : Number(rawInterval); + + const interval = Number.isFinite(parsedInterval) && parsedInterval > 0 + ? parsedInterval + : DEFAULT_PUSH_INTERVAL_MS; + + if (rawInterval !== undefined && interval !== parsedInterval) { + logger.warn(`[metrics] invalid PROMETHEUS_PUSHGATEWAY_INTERVAL="${rawInterval}", fallback to ${DEFAULT_PUSH_INTERVAL_MS}ms`); + } + + return interval; +} + +/** + * Stop periodic push to pushgateway. + */ +export function stopMetricsPushing(): void { + if (!pushInterval) { + return; + } + + clearInterval(pushInterval); + pushInterval = null; + logger.info(`[metrics] stopped pushing metrics for worker=${currentWorkerName}`); + currentWorkerName = ''; +} + +/** + * Start periodic push to pushgateway. + * + * @param workerName - name of the worker for grouping. + */ +export function startMetricsPushing(workerName: string): () => void { + const url = process.env.PROMETHEUS_PUSHGATEWAY_URL; + + if (!url) { + return stopMetricsPushing; + } + + if (pushInterval) { + logger.warn(`[metrics] pushing is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`); + + return stopMetricsPushing; + } + + const interval = getPushIntervalMs(); + const hostname = os.hostname(); + const id = nanoid(ID_SIZE); + const gateway = new client.Pushgateway(url, undefined, register); + + currentWorkerName = workerName; + + logger.info(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id}, worker: ${workerName})`); + + pushInterval = setInterval(() => { + gateway.pushAdd({ + jobName: METRICS_JOB_NAME, + groupings: { + worker: workerName, + host: hostname, + id, + }, + }, (err) => { + if (err) { + logger.error(`Metrics push error: ${err.message || err}`); + } + }); + }, interval); + + return stopMetricsPushing; +} diff --git a/runner.ts b/runner.ts index 5dd05b1b..e68fd090 100644 --- a/runner.ts +++ b/runner.ts @@ -9,6 +9,7 @@ import * as utils from './lib/utils'; import { Worker } from './lib/worker'; import HawkCatcher from '@hawk.so/nodejs'; import * as dotenv from 'dotenv'; +import { startMetricsPushing } from './lib/metrics'; dotenv.config(); @@ -48,9 +49,9 @@ class WorkerRunner { // private gateway?: promClient.Pushgateway; /** - * number returned by setInterval() of metrics push function + * Metrics push cleanup callback. */ - private pushIntervalNumber?: ReturnType; + private stopMetricsPushing?: () => void; /** * Create runner instance @@ -65,19 +66,17 @@ class WorkerRunner { .then((workerConstructors) => { this.constructWorkers(workerConstructors); }) - // .then(() => { - // try { - // this.startMetrics(); - // } catch (e) { - // HawkCatcher.send(e); - // console.error(`Metrics not started: ${e}`); - // } - // - // return Promise.resolve(); - // }) .then(() => { return this.startWorkers(); }) + .then(() => { + try { + this.startMetrics(); + } catch (e) { + HawkCatcher.send(e); + console.error(`Metrics not started: ${e}`); + } + }) .then(() => { this.observeProcess(); }) @@ -90,67 +89,27 @@ class WorkerRunner { /** * Run metrics exporter */ - // private startMetrics(): void { - // if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { - // return; - // } - // - // const PUSH_INTERVAL = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL); - // - // if (isNaN(PUSH_INTERVAL)) { - // throw new Error('PROMETHEUS_PUSHGATEWAY_INTERVAL is invalid or not set'); - // } - // - // const collectDefaultMetrics = promClient.collectDefaultMetrics; - // const Registry = promClient.Registry; - // - // const register = new Registry(); - // const startGcStats = gcStats(register); - // - // const hostname = os.hostname(); - // - // const ID_SIZE = 5; - // const id = nanoid(ID_SIZE); - // - // // eslint-disable-next-line node/no-deprecated-api - // const instance = url.parse(process.env.PROMETHEUS_PUSHGATEWAY_URL).host; - // - // // Initialize metrics for workers - // this.workers.forEach((worker) => { - // // worker.initMetrics(); - // worker.getMetrics().forEach((metric: promClient.Counter) => register.registerMetric(metric)); - // }); - // - // collectDefaultMetrics({ register }); - // startGcStats(); - // - // this.gateway = new promClient.Pushgateway(process.env.PROMETHEUS_PUSHGATEWAY_URL, null, register); - // - // console.log(`Start pushing metrics to ${process.env.PROMETHEUS_PUSHGATEWAY_URL}`); - // - // // Pushing metrics to the pushgateway every PUSH_INTERVAL - // this.pushIntervalNumber = setInterval(() => { - // this.workers.forEach((worker) => { - // if (!this.gateway || !instance) { - // return; - // } - // // Use pushAdd not to overwrite previous metrics - // this.gateway.pushAdd({ - // jobName: 'workers', - // groupings: { - // worker: worker.type.replace('/', '_'), - // host: hostname, - // id, - // }, - // }, (err?: Error) => { - // if (err) { - // HawkCatcher.send(err); - // console.log(`Error of pushing metrics to gateway: ${err}`); - // } - // }); - // }); - // }, PUSH_INTERVAL); - // } + private startMetrics(): void { + if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { + return; + } + + if (this.workers.length === 0) { + return; + } + + const workerTypes = Array.from(new Set(this.workers.map((worker) => { + return worker.type.replace('/', '_'); + }))); + + const workerTypeForMetrics = workerTypes.length === 1 ? workerTypes[0] : 'multi_worker_process'; + + if (workerTypes.length > 1) { + console.warn(`[metrics] ${workerTypes.length} workers are running in one process; pushing metrics as "${workerTypeForMetrics}" to avoid duplicated attribution`); + } + + this.stopMetricsPushing = startMetricsPushing(workerTypeForMetrics); + } /** * Dynamically loads workers through the yarn workspaces @@ -285,7 +244,8 @@ class WorkerRunner { private async stopWorker(worker: Worker): Promise { try { // stop pushing metrics - clearInterval(this.pushIntervalNumber); + this.stopMetricsPushing?.(); + this.stopMetricsPushing = undefined; await worker.finish(); console.log( diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index ea74084b..44460c8d 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -26,17 +26,20 @@ import { computeDelta } from './utils/repetitionDiff'; import { bucketTimestampMs } from './utils/bucketTimestamp'; import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; +import GrouperMetrics from './metrics/grouperMetrics'; +import GrouperMemoryMonitor from './metrics/memoryMonitor'; +import { grouperMemoryConfig } from './metrics/config'; /** * eslint does not count decorators as a variable usage */ -/* eslint-disable-next-line no-unused-vars */ +/* eslint-disable-next-line no-unused-vars, @typescript-eslint/no-unused-vars */ import { memoize } from '../../../lib/memoize'; /** * eslint does not count decorators as a variable usage */ -/* eslint-disable-next-line no-unused-vars */ +/* eslint-disable-next-line no-unused-vars, @typescript-eslint/no-unused-vars */ const MEMOIZATION_TTL = 60_000; /** @@ -88,11 +91,26 @@ export default class GrouperWorker extends Worker { */ private redis = new RedisHelper(); + /** + * Prometheus metrics facade. + */ + private grouperMetrics = new GrouperMetrics(); + + /** + * Memory leak monitoring helper. + */ + private memoryMonitor = new GrouperMemoryMonitor(this.logger, grouperMemoryConfig); + /** * Interval for periodic cache cleanup to prevent memory leaks from unbounded cache growth */ private cacheCleanupInterval: NodeJS.Timeout | null = null; + /** + * Number of handled tasks in current worker process. + */ + private handledTasksCount = 0; + /** * Start consuming messages */ @@ -114,6 +132,7 @@ export default class GrouperWorker extends Worker { this.cacheCleanupInterval = setInterval(() => { this.clearCache(); }, CACHE_CLEANUP_INTERVAL_SECONDS * TimeMs.SECOND); + this.memoryMonitor.initialize(this.handledTasksCount); await super.start(); } @@ -130,6 +149,7 @@ export default class GrouperWorker extends Worker { this.cacheCleanupInterval = null; } + this.memoryMonitor.logShutdown(this.handledTasksCount); await super.finish(); this.prepareCache(); await this.eventsDb.close(); @@ -143,7 +163,31 @@ export default class GrouperWorker extends Worker { * @param task - event to handle */ public async handle(task: GroupWorkerTask): Promise { - let uniqueEventHash = await this.getUniqueEventHash(task); + try { + await this.grouperMetrics.observeHandleDuration(async () => { + await this.handleInternal(task); + }); + } catch (error) { + this.grouperMetrics.incrementErrorsTotal(); + this.memoryMonitor.logHandleError(this.handledTasksCount, task.payload?.title, task.projectId); + throw error; + } + } + + /** + * Internal task handling function + * + * @param task - event to handle + */ + private async handleInternal(task: GroupWorkerTask): Promise { + const taskPayloadSize = Buffer.byteLength(JSON.stringify(task.payload)); + const handledTasksCount = ++this.handledTasksCount; + const memoryBeforeHandle = process.memoryUsage(); + + this.grouperMetrics.observePayloadSize(taskPayloadSize); + this.memoryMonitor.logBeforeHandle(memoryBeforeHandle, handledTasksCount, taskPayloadSize, task.projectId); + + this.logger.info(`[handle] project=${task.projectId} catcher=${task.catcherType} title="${task.payload.title}" payloadSize=${taskPayloadSize}b backtraceFrames=${task.payload.backtrace?.length ?? 0}`); // FIX RELEASE TYPE // TODO: REMOVE AFTER 01.01.2026, after the most of the users update to new js catcher @@ -154,7 +198,20 @@ export default class GrouperWorker extends Worker { }; } + let uniqueEventHash = await this.getUniqueEventHash(task); let existedEvent: GroupedEventDBScheme; + let repetitionId = null; + let incrementDailyAffectedUsers = false; + + /** + * Trim source code lines to prevent memory leaks + */ + this.trimSourceCodeLines(task.payload); + + /** + * Filter sensitive information + */ + this.dataFilter.processEvent(task.payload); /** * Find similar events by grouping pattern @@ -162,7 +219,7 @@ export default class GrouperWorker extends Worker { const similarEvent = await this.findSimilarEvent(task.projectId, task.payload.title); if (similarEvent) { - this.logger.info(`[handle] similar event found, groupHash=${similarEvent.groupHash} totalCount=${similarEvent.totalCount}`); + this.logger.info(`[handle] project=${task.projectId} title="${task.payload.title}" similar event found, groupHash=${similarEvent.groupHash} totalCount=${similarEvent.totalCount}`); /** * Override group hash with found event's group hash @@ -172,7 +229,7 @@ export default class GrouperWorker extends Worker { existedEvent = similarEvent; } else { /** - * If we couldn't group by grouping pattern — try grouping by hash (title) + * If we couldn't group by grouping pattern - try grouping by hash (title). */ /** * Find event by group hash. @@ -185,24 +242,12 @@ export default class GrouperWorker extends Worker { */ const isFirstOccurrence = !existedEvent && !similarEvent; - let repetitionId = null; - - let incrementDailyAffectedUsers = false; - - /** - * Trim source code lines to prevent memory leaks - */ - this.trimSourceCodeLines(task.payload); - - /** - * Filter sensitive information - */ - this.dataFilter.processEvent(task.payload); - if (isFirstOccurrence) { try { const incrementAffectedUsers = !!task.payload.user; + this.logger.info(`[saveEvent] project=${task.projectId} title="${task.payload.title}" new event, payloadSize=${taskPayloadSize}b`); + /** * Insert new event */ @@ -232,12 +277,15 @@ export default class GrouperWorker extends Worker { * and we need to process this event as repetition */ if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) { + this.grouperMetrics.incrementDuplicateRetriesTotal(); + this.logger.info(`[saveEvent] project=${task.projectId} title="${task.payload.title}" duplicate key, retrying as repetition`); + await this.handle(task); return; - } else { - throw e; } + + throw e; } } else { const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.shouldIncrementAffectedUsers(task, existedEvent); @@ -258,6 +306,10 @@ export default class GrouperWorker extends Worker { let delta: RepetitionDelta; + const existedPayloadSize = Buffer.byteLength(JSON.stringify(existedEvent.payload)); + + this.logger.info(`[computeDelta] project=${task.projectId} title="${task.payload.title}" existedPayloadSize=${existedPayloadSize}b taskPayloadSize=${taskPayloadSize}b`); + try { /** * Calculate delta between original event and repetition @@ -268,9 +320,16 @@ export default class GrouperWorker extends Worker { throw new DiffCalculationError(e, existedEvent.payload, task.payload); } + const deltaStr = JSON.stringify(delta); + const deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0; + + this.grouperMetrics.observeDeltaSize(deltaSize); + + this.logger.info(`[computeDelta] project=${task.projectId} title="${task.payload.title}" deltaSize=${deltaSize}b`); + const newRepetition = { groupHash: uniqueEventHash, - delta: JSON.stringify(delta), + delta: deltaStr, timestamp: task.timestamp, } as RepetitionDBScheme; @@ -283,6 +342,11 @@ export default class GrouperWorker extends Worker { delta = undefined; } + /** + * Increment metrics counter once per handled task + */ + this.grouperMetrics.incrementEventsTotal(isFirstOccurrence ? 'new' : 'repeated'); + /** * Store events counter by days */ @@ -294,6 +358,14 @@ export default class GrouperWorker extends Worker { incrementDailyAffectedUsers ); + this.memoryMonitor.logHandleCompletion( + memoryBeforeHandle, + handledTasksCount, + taskPayloadSize, + task.payload.title, + task.projectId + ); + /** * Add task for NotifierWorker only if event is not ignored */ @@ -349,9 +421,14 @@ export default class GrouperWorker extends Worker { }; const series = [ - { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY, timestampMs: bucketTimestampMs('minutely') }, - { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK, timestampMs: bucketTimestampMs('hourly') }, - { key: dailyKey, label: 'daily', retentionMs: DAILY_METRICS_RETENTION_DAYS * TimeMs.DAY, timestampMs: bucketTimestampMs('daily') }, + { key: minutelyKey, label: 'minutely', retentionMs: TimeMs.DAY, timestampMs: bucketTimestampMs('minutely') }, + { key: hourlyKey, label: 'hourly', retentionMs: TimeMs.WEEK, timestampMs: bucketTimestampMs('hourly') }, + { + key: dailyKey, + label: 'daily', + retentionMs: DAILY_METRICS_RETENTION_DAYS * TimeMs.DAY, + timestampMs: bucketTimestampMs('daily'), + }, ]; for (const { key, label, retentionMs, timestampMs } of series) { @@ -611,14 +688,16 @@ export default class GrouperWorker extends Worker { const eventCacheKey = await this.getEventCacheKey(projectId, groupHash); return this.cache.get(eventCacheKey, async () => { - return this.eventsDb.getConnection() - .collection(`events:${projectId}`) - .findOne({ - groupHash, - }) - .catch((err) => { - throw new DatabaseReadWriteError(err); - }); + return this.grouperMetrics.observeMongoDuration('getEvent', async () => { + return this.eventsDb.getConnection() + .collection(`events:${projectId}`) + .findOne({ + groupHash, + }) + .catch((err) => { + throw new DatabaseReadWriteError(err); + }); + }); }); } @@ -646,12 +725,14 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed'); } - const collection = this.eventsDb.getConnection().collection(`events:${projectId}`); + return this.grouperMetrics.observeMongoDuration('saveEvent', async () => { + const collection = this.eventsDb.getConnection().collection(`events:${projectId}`); - encodeUnsafeFields(groupedEventData); + encodeUnsafeFields(groupedEventData); - return (await collection - .insertOne(groupedEventData)).insertedId as mongodb.ObjectID; + return (await collection + .insertOne(groupedEventData)).insertedId as mongodb.ObjectID; + }); } /** @@ -665,18 +746,20 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveRepetition: Project ID is invalid or missing'); } - try { - const collection = this.eventsDb.getConnection().collection(`repetitions:${projectId}`); + return this.grouperMetrics.observeMongoDuration('saveRepetition', async () => { + try { + const collection = this.eventsDb.getConnection().collection(`repetitions:${projectId}`); - encodeUnsafeFields(repetition); + encodeUnsafeFields(repetition); - return (await collection.insertOne(repetition)).insertedId as mongodb.ObjectID; - } catch (err) { - throw new DatabaseReadWriteError(err, { - repetition: repetition as unknown as Record, - projectId, - }); - } + return (await collection.insertOne(repetition)).insertedId as mongodb.ObjectID; + } catch (err) { + throw new DatabaseReadWriteError(err, { + repetition: repetition as unknown as Record, + projectId, + }); + } + }); } /** @@ -691,26 +774,28 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed'); } - try { - const updateQuery = incrementAffected - ? { - $inc: { - totalCount: 1, - usersAffected: 1, - }, - } - : { - $inc: { - totalCount: 1, - }, - }; + return this.grouperMetrics.observeMongoDuration('incrementCounter', async () => { + try { + const updateQuery = incrementAffected + ? { + $inc: { + totalCount: 1, + usersAffected: 1, + }, + } + : { + $inc: { + totalCount: 1, + }, + }; - return (await this.eventsDb.getConnection() - .collection(`events:${projectId}`) - .updateOne(query, updateQuery)).modifiedCount; - } catch (err) { - throw new DatabaseReadWriteError(err); - } + return (await this.eventsDb.getConnection() + .collection(`events:${projectId}`) + .updateOne(query, updateQuery)).modifiedCount; + } catch (err) { + throw new DatabaseReadWriteError(err); + } + }); } /** @@ -734,32 +819,34 @@ export default class GrouperWorker extends Worker { throw new ValidationError('GrouperWorker.saveDailyEvents: Project ID is invalid or missed'); } - try { - const midnight = this.getMidnightByEventTimestamp(eventTimestamp); - - await this.eventsDb.getConnection() - .collection(`dailyEvents:${projectId}`) - .updateOne( - { - groupHash: eventHash, - groupingTimestamp: midnight, - }, - { - $set: { + await this.grouperMetrics.observeMongoDuration('saveDailyEvents', async () => { + try { + const midnight = this.getMidnightByEventTimestamp(eventTimestamp); + + await this.eventsDb.getConnection() + .collection(`dailyEvents:${projectId}`) + .updateOne( + { groupHash: eventHash, groupingTimestamp: midnight, - lastRepetitionTime: eventTimestamp, - lastRepetitionId: repetitionId, }, - $inc: { - count: 1, - affectedUsers: shouldIncrementAffectedUsers ? 1 : 0, + { + $set: { + groupHash: eventHash, + groupingTimestamp: midnight, + lastRepetitionTime: eventTimestamp, + lastRepetitionId: repetitionId, + }, + $inc: { + count: 1, + affectedUsers: shouldIncrementAffectedUsers ? 1 : 0, + }, }, - }, - { upsert: true }); - } catch (err) { - throw new DatabaseReadWriteError(err); - } + { upsert: true }); + } catch (err) { + throw new DatabaseReadWriteError(err); + } + }); } /** diff --git a/workers/grouper/src/metrics/config.ts b/workers/grouper/src/metrics/config.ts new file mode 100644 index 00000000..7fe5abb2 --- /dev/null +++ b/workers/grouper/src/metrics/config.ts @@ -0,0 +1,72 @@ +/** + * Parsed config for grouper memory monitoring. + */ +export interface GrouperMemoryConfig { + /** + * Write periodic memory checkpoint every N handled tasks. + */ + logEveryTasks: number; + + /** + * Number of handled tasks in one sustained-growth evaluation window. + */ + growthWindowTasks: number; + + /** + * Warn when heap growth in the evaluation window is greater than this amount in MB. + */ + growthWarnMb: number; + + /** + * Warn when a single handle() call grows heap by more than this amount in MB. + */ + handleGrowthWarnMb: number; +} + +/** + * Default memory checkpoint interval in tasks. + */ +const DEFAULT_MEMORY_LOG_INTERVAL_TASKS = 50; + +/** + * Default sustained-growth window size in handled tasks. + */ +const DEFAULT_MEMORY_GROWTH_WINDOW_TASKS = 200; + +/** + * Default sustained-growth warning threshold in MB. + */ +const DEFAULT_MEMORY_GROWTH_WARN_MB = 64; + +/** + * Default single-handle growth warning threshold in MB. + */ +const DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB = 16; + +/** + * Histogram buckets for payload and delta sizes (bytes). + */ +// eslint-disable-next-line @typescript-eslint/no-magic-numbers +export const GROUPER_METRICS_SIZE_BUCKETS = [100, 500, 1000, 5000, 10000, 50000, 100000, 500000]; + +/** + * Parse positive numeric env value. + * + * @param value - env string value. + * @param fallback - default numeric fallback. + */ +function asPositiveNumber(value: string | undefined, fallback: number): number { + const parsed = Number(value); + + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; +} + +/** + * Memory monitoring config from environment. + */ +export const grouperMemoryConfig: GrouperMemoryConfig = { + logEveryTasks: asPositiveNumber(process.env.GROUPER_MEMORY_LOG_EVERY_TASKS, DEFAULT_MEMORY_LOG_INTERVAL_TASKS), + growthWindowTasks: asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WINDOW_TASKS, DEFAULT_MEMORY_GROWTH_WINDOW_TASKS), + growthWarnMb: asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WARN_MB, DEFAULT_MEMORY_GROWTH_WARN_MB), + handleGrowthWarnMb: asPositiveNumber(process.env.GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB, DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB), +}; diff --git a/workers/grouper/src/metrics/grouperMetrics.ts b/workers/grouper/src/metrics/grouperMetrics.ts new file mode 100644 index 00000000..4be19e87 --- /dev/null +++ b/workers/grouper/src/metrics/grouperMetrics.ts @@ -0,0 +1,165 @@ +import { client, register } from '../../../../lib/metrics'; +import { GROUPER_METRICS_SIZE_BUCKETS } from './config'; + +type EventType = 'new' | 'repeated'; +type MongoOperation = 'getEvent' | 'saveEvent' | 'saveRepetition' | 'incrementCounter' | 'saveDailyEvents'; + +/** + * Reuse already registered metric by name, or create one. + * + * @param name - metric name. + * @param createMetric - metric factory. + */ +function getOrCreateMetric(name: string, createMetric: () => MetricType): MetricType { + const existing = register.getSingleMetric(name); + + if (existing) { + return existing as unknown as MetricType; + } + + return createMetric(); +} + +/** + * Grouper-specific Prometheus metrics facade. + */ +export default class GrouperMetrics { + private readonly eventsTotal = getOrCreateMetric( + 'hawk_grouper_events_total', + () => new client.Counter({ + name: 'hawk_grouper_events_total', + help: 'Total number of events processed by grouper', + labelNames: [ 'type' ], + registers: [ register ], + }) + ); + + private readonly handleDuration = getOrCreateMetric( + 'hawk_grouper_handle_duration_seconds', + () => new client.Histogram({ + name: 'hawk_grouper_handle_duration_seconds', + help: 'Duration of handle() call in seconds', + registers: [ register ], + }) + ); + + private readonly errorsTotal = getOrCreateMetric( + 'hawk_grouper_errors_total', + () => new client.Counter({ + name: 'hawk_grouper_errors_total', + help: 'Total number of errors during event processing', + registers: [ register ], + }) + ); + + private readonly mongoDuration = getOrCreateMetric( + 'hawk_grouper_mongo_duration_seconds', + () => new client.Histogram({ + name: 'hawk_grouper_mongo_duration_seconds', + help: 'Duration of MongoDB operations in seconds', + labelNames: [ 'operation' ], + registers: [ register ], + }) + ); + + private readonly deltaSize = getOrCreateMetric( + 'hawk_grouper_delta_size_bytes', + () => new client.Histogram({ + name: 'hawk_grouper_delta_size_bytes', + help: 'Size of computed repetition delta in bytes', + buckets: GROUPER_METRICS_SIZE_BUCKETS, + registers: [ register ], + }) + ); + + private readonly payloadSize = getOrCreateMetric( + 'hawk_grouper_payload_size_bytes', + () => new client.Histogram({ + name: 'hawk_grouper_payload_size_bytes', + help: 'Size of incoming event payload in bytes', + buckets: GROUPER_METRICS_SIZE_BUCKETS, + registers: [ register ], + }) + ); + + private readonly duplicateRetriesTotal = getOrCreateMetric( + 'hawk_grouper_duplicate_retries_total', + () => new client.Counter({ + name: 'hawk_grouper_duplicate_retries_total', + help: 'Number of retries due to duplicate key errors', + registers: [ register ], + }) + ); + + /** + * Measure top-level handle() duration. + * + * @param callback - callback to execute under timer. + */ + public async observeHandleDuration(callback: () => Promise): Promise { + const endTimer = this.handleDuration.startTimer(); + + try { + return await callback(); + } finally { + endTimer(); + } + } + + /** + * Increment events counter by event type. + * + * @param type - event type label. + */ + public incrementEventsTotal(type: EventType): void { + this.eventsTotal.inc({ type }); + } + + /** + * Increment total processing errors counter. + */ + public incrementErrorsTotal(): void { + this.errorsTotal.inc(); + } + + /** + * Observe incoming payload size. + * + * @param sizeBytes - payload size in bytes. + */ + public observePayloadSize(sizeBytes: number): void { + this.payloadSize.observe(sizeBytes); + } + + /** + * Observe computed delta size. + * + * @param sizeBytes - delta size in bytes. + */ + public observeDeltaSize(sizeBytes: number): void { + this.deltaSize.observe(sizeBytes); + } + + /** + * Increment retries caused by duplicate key races. + */ + public incrementDuplicateRetriesTotal(): void { + this.duplicateRetriesTotal.inc(); + } + + /** + * Measure Mongo operation duration. + * + * @param operation - mongodb operation label. + * @param callback - callback to execute under timer. + */ + public async observeMongoDuration(operation: MongoOperation, callback: () => Promise): Promise { + const endTimer = this.mongoDuration.startTimer({ operation }); + + try { + return await callback(); + } finally { + endTimer(); + } + } +} diff --git a/workers/grouper/src/metrics/memoryMonitor.ts b/workers/grouper/src/metrics/memoryMonitor.ts new file mode 100644 index 00000000..ac003ced --- /dev/null +++ b/workers/grouper/src/metrics/memoryMonitor.ts @@ -0,0 +1,188 @@ +import type { GrouperMemoryConfig } from './config'; + +interface LoggerLike { + info(message: string): void; + warn(message: string): void; +} + +const ROUND_PRECISION = 100; + +/** + * Number of bytes in one mebibyte. + */ +// eslint-disable-next-line @typescript-eslint/no-magic-numbers +const BYTES_IN_MEBIBYTE = 1024 * 1024; + +/** + * Handles memory checkpoints and leak-oriented logging for Grouper worker. + */ +export default class GrouperMemoryMonitor { + private readonly logger: LoggerLike; + private readonly config: GrouperMemoryConfig; + + /** + * Task number of the last sustained-growth checkpoint. + */ + private memoryCheckpointTask = 0; + + /** + * Heap usage (bytes) saved at the last sustained-growth checkpoint. + */ + private memoryCheckpointHeapUsed = 0; + + /** + * @param logger - logger instance. + * @param config - memory monitor thresholds. + */ + constructor(logger: LoggerLike, config: GrouperMemoryConfig) { + this.logger = logger; + this.config = config; + } + + /** + * Initialize baseline memory state on worker startup. + * + * @param handledTasksCount - currently handled tasks count. + */ + public initialize(handledTasksCount: number): void { + const startupMemory = process.memoryUsage(); + + this.memoryCheckpointTask = 0; + this.memoryCheckpointHeapUsed = startupMemory.heapUsed; + this.logCheckpoint('startup', startupMemory, handledTasksCount); + } + + /** + * Log shutdown memory checkpoint. + * + * @param handledTasksCount - handled tasks count on shutdown. + */ + public logShutdown(handledTasksCount: number): void { + this.logCheckpoint('shutdown', process.memoryUsage(), handledTasksCount); + } + + /** + * Log memory checkpoint on handle() error. + * + * @param handledTasksCount - currently handled tasks count. + * @param title - event title if available. + * @param projectId - project identifier. + */ + public logHandleError(handledTasksCount: number, title: string | undefined, projectId: string): void { + const suffix = title ? `title="${title}"` : ''; + + this.logCheckpoint('handle-error', process.memoryUsage(), handledTasksCount, projectId, suffix); + } + + /** + * Periodic memory checkpoint before handling task payload. + * + * @param memoryUsage - process memory usage. + * @param handledTasksCount - currently handled tasks count. + * @param payloadSizeBytes - task payload size. + * @param projectId - project identifier. + */ + public logBeforeHandle(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, payloadSizeBytes: number, projectId: string): void { + if (handledTasksCount !== 1 && handledTasksCount % this.config.logEveryTasks !== 0) { + return; + } + + this.logCheckpoint('before-handle', memoryUsage, handledTasksCount, projectId, `payloadSize=${payloadSizeBytes}b`); + } + + /** + * Log post-handle memory stats and emit warnings on suspicious growth. + * + * @param memoryBeforeHandle - memory usage before handling. + * @param handledTasksCount - currently handled tasks count. + * @param payloadSizeBytes - task payload size. + * @param title - event title. + * @param projectId - project id. + */ + public logHandleCompletion( + memoryBeforeHandle: NodeJS.MemoryUsage, + handledTasksCount: number, + payloadSizeBytes: number, + title: string, + projectId: string + ): void { + const memoryAfterHandle = process.memoryUsage(); + const heapDeltaBytes = memoryAfterHandle.heapUsed - memoryBeforeHandle.heapUsed; + const heapDeltaMb = Math.round((heapDeltaBytes / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + + this.logger.info( + `[memory][project=${projectId}] [handle] done, ${this.formatMemoryUsage(memoryAfterHandle)} heapDelta=${heapDeltaMb}MB handled=${handledTasksCount}` + ); + + if (heapDeltaBytes > this.config.handleGrowthWarnMb * BYTES_IN_MEBIBYTE) { + this.logger.warn( + `[memory][project=${projectId}] high heap growth in single handle: heapDelta=${heapDeltaMb}MB payloadSize=${payloadSizeBytes}b title="${title}"` + ); + } + + this.checkMemoryGrowthWindow(memoryAfterHandle, handledTasksCount, projectId); + } + + /** + * Logs sustained heap growth over a configurable number of handled tasks. + * + * @param memoryUsage - current process memory usage. + * @param handledTasksCount - currently handled tasks count. + * @param projectId - project identifier. + */ + private checkMemoryGrowthWindow(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, projectId: string): void { + const tasksInWindow = handledTasksCount - this.memoryCheckpointTask; + + if (tasksInWindow < this.config.growthWindowTasks) { + return; + } + + const heapGrowthBytes = memoryUsage.heapUsed - this.memoryCheckpointHeapUsed; + const heapGrowthMb = Math.round((heapGrowthBytes / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const heapUsedNowMb = Math.round((memoryUsage.heapUsed / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + + this.logger.info( + `[memory][project=${projectId}] growth window tasks=${tasksInWindow} handled=${this.memoryCheckpointTask + 1}-${handledTasksCount} heapGrowth=${heapGrowthMb}MB heapUsedNow=${heapUsedNowMb}MB` + ); + + if (heapGrowthBytes > this.config.growthWarnMb * BYTES_IN_MEBIBYTE) { + this.logger.warn( + `[memory][project=${projectId}] possible leak detected: heap grew by ${heapGrowthMb}MB in ${tasksInWindow} handled tasks` + ); + } + + this.memoryCheckpointTask = handledTasksCount; + this.memoryCheckpointHeapUsed = memoryUsage.heapUsed; + } + + /** + * Format memory usage for consistent logs. + * + * @param memoryUsage - current process memory usage. + */ + private formatMemoryUsage(memoryUsage: NodeJS.MemoryUsage): string { + const heapUsedMb = Math.round((memoryUsage.heapUsed / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const heapTotalMb = Math.round((memoryUsage.heapTotal / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const rssMb = Math.round((memoryUsage.rss / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const externalMb = Math.round((memoryUsage.external / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const arrayBuffersMb = Math.round((memoryUsage.arrayBuffers / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + + return `heapUsed=${heapUsedMb}MB heapTotal=${heapTotalMb}MB rss=${rssMb}MB external=${externalMb}MB arrayBuffers=${arrayBuffersMb}MB`; + } + + /** + * Writes one memory checkpoint record. + * + * @param stage - lifecycle stage. + * @param memoryUsage - current process memory usage. + * @param handledTasksCount - currently handled tasks count. + * @param projectId - optional project identifier. + * @param suffix - optional extra suffix. + */ + private logCheckpoint(stage: string, memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, projectId?: string, suffix = ''): void { + const extra = suffix ? ` ${suffix}` : ''; + const prefix = projectId ? `[memory][project=${projectId}]` : '[memory]'; + + this.logger.info(`${prefix} stage=${stage} handled=${handledTasksCount} ${this.formatMemoryUsage(memoryUsage)}${extra}`); + } +} diff --git a/workers/grouper/src/redisHelper.ts b/workers/grouper/src/redisHelper.ts index 242b74f8..75c92032 100644 --- a/workers/grouper/src/redisHelper.ts +++ b/workers/grouper/src/redisHelper.ts @@ -194,7 +194,7 @@ export default class RedisHelper { const timestamp = Date.now(); /** - * Create key if not exists — then call increment + * Create key if not exists - then call increment */ await this.tsCreateIfNotExists(key, labels, retentionMs); await this.tsIncrBy(key, value, timestamp, labels); @@ -249,7 +249,7 @@ export default class RedisHelper { const timestamp = timestampMs === 0 ? Date.now() : timestampMs; /** - * Create key if not exists — then call increment + * Create key if not exists - then call increment */ await this.tsCreateIfNotExists(key, labels, retentionMs); await this.tsAdd(key, value, timestamp, labels); @@ -290,4 +290,4 @@ export default class RedisHelper { resolve(resp !== 'OK'); }; } -} \ No newline at end of file +} diff --git a/workers/grouper/src/utils/bucketTimestamp.ts b/workers/grouper/src/utils/bucketTimestamp.ts index 29dbd8aa..32fc6391 100644 --- a/workers/grouper/src/utils/bucketTimestamp.ts +++ b/workers/grouper/src/utils/bucketTimestamp.ts @@ -11,7 +11,7 @@ import TimeMs from '../../../../lib/utils/time'; export function bucketTimestampMs(granularity: 'minutely' | 'hourly' | 'daily', now = Date.now()): number { switch (granularity) { case 'hourly': return now - (now % TimeMs.HOUR); - case 'daily': return now - (now % TimeMs.DAY); - default: return now - (now % TimeMs.MINUTE); // minutely + case 'daily': return now - (now % TimeMs.DAY); + default: return now - (now % TimeMs.MINUTE); // minutely } }