Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7b9c301
Imp(javascript): babel source map parsing (#373)
e11sy Mar 1, 2025
b2c8132
chore(javascript): add logs
e11sy Mar 5, 2025
cd417c1
deps(hawk/types): update to new version
e11sy Mar 14, 2025
4e55825
feat(grouper): event grouping by pattern
e11sy Mar 14, 2025
e8664d7
test(grouper): cover pattern grouping with tests
e11sy Mar 14, 2025
a8e5dac
fix(grouper): remove cache key on event saving
e11sy Mar 14, 2025
7311787
fix(): tests mock fixed
e11sy Mar 14, 2025
86340c0
chore(grouper): remove redundant logs
e11sy Mar 14, 2025
1a2911b
chore(grouper): move patterns to project object
e11sy Mar 14, 2025
31adcba
update from master
e11sy Mar 16, 2025
7710fc8
imp(grouper): cache getOriginalEvent db query
e11sy Mar 16, 2025
00016eb
chore(grouper): lint fix
e11sy Mar 16, 2025
95fdca0
fix(tests): fix getPatterns method
e11sy Mar 16, 2025
c9f5f4a
fix(grouper): fix findMatchingPattern method
e11sy Mar 16, 2025
bd2fb5a
imp(grouper): new testcases added
e11sy Mar 16, 2025
039ce1f
imp(grouper): move pattern grouping to findSimilarEvent method
e11sy Mar 16, 2025
4955ea4
imp(grouper): cache getProjectPatterns response
e11sy Mar 16, 2025
8f83926
chore(): lint fixes
e11sy Mar 16, 2025
42ceac4
deps(): update @hawk.so/types package version
e11sy Mar 16, 2025
533587c
fix(tests): fix limiter tests
e11sy Mar 16, 2025
9f9015a
fix(archiver): build fix
e11sy Mar 16, 2025
9f9af6c
feat(migrations): create index on event.payload.title
e11sy Mar 16, 2025
f4eb16b
Merge branch 'stage' into manual-event-grouping
e11sy Mar 18, 2025
a4bd447
remove duplicated code
e11sy Mar 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions migrations/20241111000000-add-payload-title-index-for-events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* This migration creates indexes for all collections on payload.title field
*/

/**
* Index name for payload.title field
*/
const payloadTitleIndexName = 'payloadTitle';

module.exports = {
async up(db) {
const collections = await db.listCollections({}, {
authorizedCollections: true,
nameOnly: true,
}).toArray();

const targetCollections = [];

collections.forEach((collection) => {
if (/events/.test(collection.name)) {
targetCollections.push(collection.name);
}
});

for (const collectionName of targetCollections) {
const hasIndexAlready = await db.collection(collectionName).indexExists(payloadTitleIndexName);

if (!hasIndexAlready) {
await db.collection(collectionName).createIndex({
'payload.title': 1,
}, {
name: payloadTitleIndexName,
});
}
}
},
async down(db) {
const collections = await db.listCollections({}, {
authorizedCollections: true,
nameOnly: true,
}).toArray();

const targetCollections = [];

collections.forEach((collection) => {
if (/events/.test(collection.name)) {
targetCollections.push(collection.name);
}
});

for (const collectionName of targetCollections) {
await db.collection(collectionName).dropIndex(payloadTitleIndexName);
}
},
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
},
"dependencies": {
"@hawk.so/nodejs": "^3.1.1",
"@hawk.so/types": "^0.1.26",
"@hawk.so/types": "^0.1.28",
"@types/amqplib": "^0.8.2",
"@types/jest": "^29.2.3",
"@types/mongodb": "^3.5.15",
Expand Down
9 changes: 5 additions & 4 deletions workers/archiver/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ process.env.MAX_DAYS_NUMBER = '30';

const mockedProject: ProjectDBScheme = {
notifications: [],
eventGroupingPatterns: [],
token: '5342',
integrationId: 'eyJpbnRlZ3JhdGlvbklkIjoiMzg3NGNkOWMtZjJiYS00ZDVkLTk5ZmQtM2UzZjYzMDcxYmJhIiwic2VjcmV0IjoiMGZhM2JkM2EtYmMyZC00YWRiLThlMWMtNjg2OGY0MzM1YjRiIn0=',
uidAdded: new ObjectId('5e4ff518628a6c714515f4db'),
Expand Down Expand Up @@ -53,7 +54,7 @@ describe('Archiver worker', () => {

beforeEach(async () => {
await db.collection('releases').deleteMany({});
})
});

test('Should correctly remove old events', async () => {
/**
Expand Down Expand Up @@ -129,7 +130,7 @@ describe('Archiver worker', () => {
/**
* Insert one release with object id based on current time, it should not be removed
*/
await db.collection('releases').insert(releasesToStay)
await db.collection('releases').insert(releasesToStay);

const worker = new ArchiverWorker();

Expand Down Expand Up @@ -173,9 +174,9 @@ describe('Archiver worker', () => {
expect(newReleasesCollection).toEqual([
mockedReleases[mockedReleasesLength - 2],
mockedReleases[mockedReleasesLength - 1],
])
]);
await worker.finish();
})
});

afterAll(async () => {
await db.dropCollection('releases');
Expand Down
123 changes: 105 additions & 18 deletions workers/grouper/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@ export default class GrouperWorker extends Worker {
public readonly type: string = pkg.workerType;

/**
* Database Controller
* Events database Controller
*/
private db: DatabaseController = new DatabaseController(process.env.MONGO_EVENTS_DATABASE_URI);
private eventsDb: DatabaseController = new DatabaseController(process.env.MONGO_EVENTS_DATABASE_URI);

/**
* Accounts database Controller
*/
private accountsDb: DatabaseController = new DatabaseController(process.env.MONGO_ACCOUNTS_DATABASE_URI);

/**
* This class will filter sensitive information
Expand All @@ -52,7 +57,8 @@ export default class GrouperWorker extends Worker {
public async start(): Promise<void> {
console.log('starting grouper worker');

await this.db.connect();
await this.eventsDb.connect();
await this.accountsDb.connect();
this.prepareCache();
console.log('redis initializing');

Expand All @@ -67,7 +73,8 @@ export default class GrouperWorker extends Worker {
public async finish(): Promise<void> {
await super.finish();
this.prepareCache();
await this.db.close();
await this.eventsDb.close();
await this.accountsDb.close();
await this.redis.close();
}

Expand All @@ -85,12 +92,13 @@ export default class GrouperWorker extends Worker {
let existedEvent = await this.getEvent(task.projectId, uniqueEventHash);

/**
* If we couldn't group by group hash (title), try grouping by Levenshtein distance with last N events
* If we couldn't group by group hash (title), try grouping by Levenshtein distance or patterns
*/
if (!existedEvent) {
const similarEvent = await this.findSimilarEvent(task.projectId, task.event);

if (similarEvent) {
this.logger.info(`similar event: ${JSON.stringify(similarEvent)}`);
/**
* Override group hash with found event's group hash
*/
Expand Down Expand Up @@ -226,7 +234,7 @@ export default class GrouperWorker extends Worker {
}

/**
* Tries to find events with a small Levenshtein distance of a title
* Tries to find events with a small Levenshtein distance of a title or by matching grouping patterns
*
* @param projectId - where to find
* @param event - event to compare
Expand All @@ -237,23 +245,101 @@ export default class GrouperWorker extends Worker {

const lastUniqueEvents = await this.findLastEvents(projectId, eventsCountToCompare);

return lastUniqueEvents.filter(prevEvent => {
/**
* First try to find by Levenshtein distance
*/
const similarByLevenshtein = lastUniqueEvents.filter(prevEvent => {
const distance = levenshtein(event.title, prevEvent.payload.title);
const threshold = event.title.length * diffTreshold;

return distance < threshold;
}).pop();

if (similarByLevenshtein) {
return similarByLevenshtein;
}

/**
* If no match by Levenshtein, try matching by patterns
*/
const patterns = await this.getProjectPatterns(projectId);

if (patterns && patterns.length > 0) {
const matchingPattern = await this.findMatchingPattern(patterns, event);

if (matchingPattern !== null) {
const originalEvent = await this.cache.get(`${projectId}:${matchingPattern}:originalEvent`, async () => {
return await this.eventsDb.getConnection()
.collection(`events:${projectId}`)
.findOne(
{ 'payload.title': { $regex: matchingPattern } },
{ sort: { _id: 1 } }
);
});

this.logger.info(`original event for pattern: ${JSON.stringify(originalEvent)}`);

if (originalEvent) {
return originalEvent;
}
}
}

return undefined;
}

/**
* Method that returns matched pattern for event, if event do not match any of patterns return null
*
* @param patterns - list of the patterns of the related project
* @param event - event which title would be cheched
* @returns {string | null} matched pattern or null if no match
*/
private async findMatchingPattern(patterns: string[], event: EventDataAccepted<EventAddons>): Promise<string | null> {
if (!patterns || patterns.length === 0) {
return null;
}

return patterns.filter(pattern => {
const patternRegExp = new RegExp(pattern);

return event.title.match(patternRegExp);
}).pop() || null;
}

/**
* Method that gets event patterns for a project
*
* @param projectId - id of the project to find related event patterns
* @returns {string[]} EventPatterns object with projectId and list of patterns
*/
private async getProjectPatterns(projectId: string): Promise<string[]> {
return this.cache.get(`project:${projectId}:patterns`, async () => {
const project = await this.accountsDb.getConnection()
.collection('projects')
.findOne({
_id: new mongodb.ObjectId(projectId),
});

return project?.eventGroupingPatterns || [];
},
/**
* Cache project patterns for 5 minutes since they don't change frequently
*/
/* eslint-disable-next-line @typescript-eslint/no-magic-numbers */
5 * TimeMs.MINUTE / MS_IN_SEC);
}

/**
* Returns last N unique events by a project id
*
* @param projectId - where to find
* @param count - how many events to return
* @returns {GroupedEventDBScheme[]} list of the last N unique events
*/
private findLastEvents(projectId: string, count): Promise<GroupedEventDBScheme[]> {
return this.cache.get(`last:${count}:eventsOf:${projectId}`, async () => {
return this.db.getConnection()
return this.eventsDb.getConnection()
.collection(`events:${projectId}`)
.find()
.sort({
Expand Down Expand Up @@ -308,7 +394,7 @@ export default class GrouperWorker extends Worker {
*/
const repetitionCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}`;
const repetition = await this.cache.get(repetitionCacheKey, async () => {
return this.db.getConnection().collection(`repetitions:${task.projectId}`)
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
.findOne({
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
Expand Down Expand Up @@ -342,7 +428,7 @@ export default class GrouperWorker extends Worker {
*/
const repetitionDailyCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}:${eventMidnight}`;
const repetitionDaily = await this.cache.get(repetitionDailyCacheKey, async () => {
return this.db.getConnection().collection(`repetitions:${task.projectId}`)
return this.eventsDb.getConnection().collection(`repetitions:${task.projectId}`)
.findOne({
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
Expand Down Expand Up @@ -377,7 +463,7 @@ export default class GrouperWorker extends Worker {
* Returns finds event by query from project with passed ID
*
* @param projectId - project's identifier
* @param groupHash - group hash of the event
* @param groupHash - group hash of the event
*/
private async getEvent(projectId: string, groupHash: string): Promise<GroupedEventDBScheme> {
if (!mongodb.ObjectID.isValid(projectId)) {
Expand All @@ -387,7 +473,7 @@ export default class GrouperWorker extends Worker {
const eventCacheKey = await this.getEventCacheKey(projectId, groupHash);

return this.cache.get(eventCacheKey, async () => {
return this.db.getConnection()
return this.eventsDb.getConnection()
.collection(`events:${projectId}`)
.findOne({
groupHash,
Expand All @@ -400,12 +486,13 @@ export default class GrouperWorker extends Worker {

/**
* Method that returns event cache key based on projectId and groupHash
*
* @param projectId - used for cache key creation
* @param groupHash - used for cache key creation
* @returns cache key
* @returns {string} cache key for event
*/
private async getEventCacheKey(projectId: string, groupHash: string): Promise<string> {
return `${projectId}:${JSON.stringify({groupHash: groupHash})}`
return `${projectId}:${JSON.stringify({ groupHash: groupHash })}`;
}

/**
Expand All @@ -421,7 +508,7 @@ export default class GrouperWorker extends Worker {
throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed');
}

const collection = this.db.getConnection().collection(`events:${projectId}`);
const collection = this.eventsDb.getConnection().collection(`events:${projectId}`);

encodeUnsafeFields(groupedEventData);

Expand All @@ -441,7 +528,7 @@ export default class GrouperWorker extends Worker {
}

try {
const collection = this.db.getConnection().collection(`repetitions:${projectId}`);
const collection = this.eventsDb.getConnection().collection(`repetitions:${projectId}`);

encodeUnsafeFields(repetition);

Expand Down Expand Up @@ -480,7 +567,7 @@ export default class GrouperWorker extends Worker {
},
};

return (await this.db.getConnection()
return (await this.eventsDb.getConnection()
.collection(`events:${projectId}`)
.updateOne(query, updateQuery)).modifiedCount;
} catch (err) {
Expand Down Expand Up @@ -512,7 +599,7 @@ export default class GrouperWorker extends Worker {
try {
const midnight = this.getMidnightByEventTimestamp(eventTimestamp);

await this.db.getConnection()
await this.eventsDb.getConnection()
.collection(`dailyEvents:${projectId}`)
.updateOne(
{
Expand Down
Loading
Loading