Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion opencti-platform/opencti-graphql/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@
"hub_registration_manager": {
"enabled": true,
"lock_key": "hub_registration_manager_lock",
"interval": 3600000
"interval": 3600000,
"news_feed_cleanup_interval_value": 180,
"news_feed_cleanup_interval_unit": "days"
},
"indicator_decay_manager": {
"enabled": true,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,34 @@
import { type ManagerDefinition, registerManager } from './managerModule';
import conf, { booleanConf } from '../config/conf';
import conf, { booleanConf, logApp } from '../config/conf';
import { executionContext, HUB_REGISTRATION_MANAGER_USER } from '../utils/access';
import { checkXTMHubConnectivity, loadAndSaveLatestNewsFeed } from '../domain/xtm-hub';
import { XtmHubRegistrationStatus } from '../generated/graphql';
import { cleanOldNewsFeedItems } from '../modules/xtm/hub/news-feed/news-feed-domain';
import moment from 'moment';

const HUB_REGISTRATION_MANAGER_ENABLED = booleanConf('hub_registration_manager:enabled', true);
const HUB_REGISTRATION_MANAGER_KEY = conf.get('hub_registration_manager:lock_key') || 'hub_registration_manager_lock';
const SCHEDULE_TIME = conf.get('hub_registration_manager:interval') || 60 * 60 * 1000; // 1 hour
const NEWS_FEED_CLEANUP_INTERVAL_VALUE = conf.get('hub_registration_manager:news_feed_cleanup_interval_value') || 180;
const NEWS_FEED_CLEANUP_INTERVAL_UNIT = conf.get('hub_registration_manager:news_feed_cleanup_interval_unit') || 'days';

const VALID_CLEANUP_UNITS = ['seconds', 'minutes', 'hours', 'days', 'weeks', 'months', 'years'] as const;
type ValidCleanupUnit = typeof VALID_CLEANUP_UNITS[number];

if (
typeof NEWS_FEED_CLEANUP_INTERVAL_VALUE !== 'number'
|| !Number.isFinite(NEWS_FEED_CLEANUP_INTERVAL_VALUE)
|| NEWS_FEED_CLEANUP_INTERVAL_VALUE <= 0
) {
throw new Error(
`[XTMH] Invalid news_feed_cleanup_interval_value: expected a positive number, got "${NEWS_FEED_CLEANUP_INTERVAL_VALUE}"`,
);
}
if (!VALID_CLEANUP_UNITS.includes(NEWS_FEED_CLEANUP_INTERVAL_UNIT as ValidCleanupUnit)) {
throw new Error(
`[XTMH] Invalid news_feed_cleanup_interval_unit: "${NEWS_FEED_CLEANUP_INTERVAL_UNIT}". Expected one of: ${VALID_CLEANUP_UNITS.join(', ')}`,
);
}

/**
* If platform is registered, calls XTM Hub backend to check if the registration data is still valid
Expand All @@ -18,6 +40,26 @@ export const hubRegistrationManager = async () => {
if (status === XtmHubRegistrationStatus.Registered) {
await loadAndSaveLatestNewsFeed(context, HUB_REGISTRATION_MANAGER_USER);
}
try {
const cutoffDate = moment()
.subtract(NEWS_FEED_CLEANUP_INTERVAL_VALUE, NEWS_FEED_CLEANUP_INTERVAL_UNIT as ValidCleanupUnit)
.toDate();
const deletedCount = await cleanOldNewsFeedItems(
context,
HUB_REGISTRATION_MANAGER_USER,
cutoffDate,
);
if (deletedCount > 0) {
logApp.info('[XTMH] Cleaned expired news feed items', {
deletedCount,
cutoffDate: cutoffDate.toISOString(),
intervalValue: NEWS_FEED_CLEANUP_INTERVAL_VALUE,
intervalUnit: NEWS_FEED_CLEANUP_INTERVAL_UNIT,
});
}
} catch (err) {
logApp.error('[XTMH] Failed to clean expired news feed items', { cause: err });
}
};

const HUB_REGISTRATION_MANAGER_DEFINITION: ManagerDefinition = {
Expand All @@ -37,5 +79,4 @@ const HUB_REGISTRATION_MANAGER_DEFINITION: ManagerDefinition = {
return this.enabledByConfig;
},
};

registerManager(HUB_REGISTRATION_MANAGER_DEFINITION);
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import type { NewsFeedAddInput } from './news-feed-types';
import { createInternalObject } from '../../../../domain/internalObject';
import { addFilter } from '../../../../utils/filtering/filtering-utils';
import { pageEntitiesConnection } from '../../../../database/middleware-loader';
import { elCount } from '../../../../database/engine';
import { elCount, elPaginate } from '../../../../database/engine';
import { READ_INDEX_INTERNAL_OBJECTS } from '../../../../database/utils';
import type { QueryMyNewsFeedsArgs } from '../../../../generated/graphql';
import { FilterMode, FilterOperator, type QueryMyNewsFeedsArgs } from '../../../../generated/graphql';
import { deleteElementById } from '../../../../database/middleware';
import { ALREADY_DELETED_ERROR } from '../../../../config/errors';
import { logApp } from '../../../../config/conf';
import { promiseMap } from '../../../../utils/promiseUtils';

export const addNewsFeed = async (context: AuthContext, user: AuthUser, input: NewsFeedAddInput) => {
const newsFeedToCreate = {
Expand Down Expand Up @@ -35,3 +39,67 @@ export const myUnreadNewsFeedsCount = (context: AuthContext, user: AuthUser) =>
const queryArgs = { filters: queryFilters, types: [ENTITY_TYPE_NEWS_FEED_ITEM] };
return elCount(context, user, READ_INDEX_INTERNAL_OBJECTS, queryArgs);
};

const NEWS_FEED_CLEANUP_BATCH_SIZE = 1500;
const NEWS_FEED_CLEANUP_CONCURRENCY = 5;

export const cleanOldNewsFeedItems = async (
context: AuthContext,
user: AuthUser,
cutoffDate: Date,
): Promise<number> => {
const filters = {
mode: FilterMode.And,
filters: [
{
key: ['creation_date'],
values: [cutoffDate.toISOString()],
operator: FilterOperator.Lt,
},
],
filterGroups: [],
};

let totalDeleted = 0;

while (true) {
const result = await elPaginate(context, user, READ_INDEX_INTERNAL_OBJECTS, {
filters,
types: [ENTITY_TYPE_NEWS_FEED_ITEM],
first: NEWS_FEED_CLEANUP_BATCH_SIZE,
}) as any;

if (!result.edges || result.edges.length === 0) {
break;
}

await promiseMap(
result.edges,
async (edge: any) => {
try {
await deleteElementById(
context,
user,
edge.node.internal_id,
ENTITY_TYPE_NEWS_FEED_ITEM,
);
totalDeleted += 1;
} catch (err: any) {
if (err?.extensions?.code !== ALREADY_DELETED_ERROR) {
logApp.error('[XTMH] Failed to delete news feed item during cleanup', {
cause: err,
id: edge.node.internal_id,
});
}
}
},
NEWS_FEED_CLEANUP_CONCURRENCY,
);

if (result.edges.length < NEWS_FEED_CLEANUP_BATCH_SIZE) {
break;
}
}

return totalDeleted;
};
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ vi.mock('../../../src/manager/managerModule', () => ({
registerManager: vi.fn(),
}));

vi.mock('../../../src/modules/xtm/hub/news-feed/news-feed-domain', () => ({
cleanOldNewsFeedItems: vi.fn(),
}));

import { hubRegistrationManager } from '../../../src/manager/hubRegistrationManager';
import { checkXTMHubConnectivity, loadAndSaveLatestNewsFeed } from '../../../src/domain/xtm-hub';
import { cleanOldNewsFeedItems } from '../../../src/modules/xtm/hub/news-feed/news-feed-domain';

const mockCleanOldNewsFeedItems = vi.mocked(cleanOldNewsFeedItems);
const mockCheckXTMHubConnectivity = vi.mocked(checkXTMHubConnectivity);
const mockLoadAndSaveLatestNewsFeed = vi.mocked(loadAndSaveLatestNewsFeed);

Expand Down Expand Up @@ -80,4 +86,13 @@ describe('hubRegistrationManager', () => {

await expect(hubRegistrationManager()).rejects.toThrow('Failed to load news feed');
});

it('should call cleanOldNewsFeedItems on every execution', async () => {
mockCheckXTMHubConnectivity.mockResolvedValue({ status: XtmHubRegistrationStatus.Unregistered });
mockCleanOldNewsFeedItems.mockResolvedValue(0);

await hubRegistrationManager();

expect(mockCleanOldNewsFeedItems).toHaveBeenCalledOnce();
});
});
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { addNewsFeed, myNewsFeedsFind, myUnreadNewsFeedsCount } from '../../../src/modules/xtm/hub/news-feed/news-feed-domain';
import { addNewsFeed, cleanOldNewsFeedItems, myNewsFeedsFind, myUnreadNewsFeedsCount } from '../../../src/modules/xtm/hub/news-feed/news-feed-domain';
import { NewsFeedItemType } from '../../../src/modules/xtm/hub/news-feed/news-feed-types';
import type { NewsFeedAddInput } from '../../../src/modules/xtm/hub/news-feed/news-feed-types';
import { ALREADY_DELETED_ERROR } from '../../../src/config/errors';

const mockCreateInternalObject = vi.fn();
const mockPageEntitiesConnection = vi.fn();
const mockElCount = vi.fn();
const mockFullEntitiesList = vi.fn();
const mockPatchAttribute = vi.fn();
const mockElPaginate = vi.fn();
const mockDeleteElementById = vi.fn();

vi.mock('../../../src/domain/internalObject', () => ({
createInternalObject: (...args: unknown[]) => mockCreateInternalObject(...args),
Expand All @@ -20,10 +23,12 @@ vi.mock('../../../src/database/middleware-loader', () => ({

vi.mock('../../../src/database/engine', () => ({
elCount: (...args: unknown[]) => mockElCount(...args),
elPaginate: (...args: unknown[]) => mockElPaginate(...args),
}));

vi.mock('../../../src/database/middleware', () => ({
patchAttribute: (...args: unknown[]) => mockPatchAttribute(...args),
deleteElementById: (...args: unknown[]) => mockDeleteElementById(...args),
}));

vi.mock('../../../src/utils/filtering/filtering-utils', () => ({
Expand Down Expand Up @@ -196,4 +201,90 @@ describe('News feed', () => {
expect(result).toBe(3);
});
});

describe('cleanOldNewsFeedItems', () => {
const cutoffDate = new Date('2026-05-01T00:00:00.000Z');

beforeEach(() => {
mockElPaginate.mockReset();
mockDeleteElementById.mockReset();
});

it('should call elPaginate with creation_date < cutoff filter', async () => {
mockElPaginate.mockResolvedValue({ edges: [] });

await cleanOldNewsFeedItems(mockContext, mockUser, cutoffDate);

expect(mockElPaginate).toHaveBeenCalledWith(
mockContext,
mockUser,
expect.any(String),
expect.objectContaining({
filters: expect.objectContaining({
filters: expect.arrayContaining([
expect.objectContaining({
key: ['creation_date'],
values: [cutoffDate.toISOString()],
operator: 'lt',
}),
]),
}),
types: ['NewsFeedItem'],
}),
);
});

it('should delete each returned item and return the count', async () => {
mockElPaginate
.mockResolvedValueOnce({
edges: [
{ node: { internal_id: 'item-1' } },
{ node: { internal_id: 'item-2' } },
{ node: { internal_id: 'item-3' } },
],
});
mockDeleteElementById.mockResolvedValue(undefined);

const result = await cleanOldNewsFeedItems(mockContext, mockUser, cutoffDate);

expect(mockDeleteElementById).toHaveBeenCalledTimes(3);
expect(mockDeleteElementById).toHaveBeenCalledWith(
mockContext,
mockUser,
'item-1',
'NewsFeedItem',
);
expect(result).toBe(3);
});

it('should return 0 and not call delete when no items match', async () => {
mockElPaginate.mockResolvedValue({ edges: [] });

const result = await cleanOldNewsFeedItems(mockContext, mockUser, cutoffDate);

expect(result).toBe(0);
expect(mockDeleteElementById).not.toHaveBeenCalled();
});

it('should swallow ALREADY_DELETED_ERROR without breaking the loop', async () => {
mockElPaginate.mockResolvedValueOnce({
edges: [
{ node: { internal_id: 'item-1' } },
{ node: { internal_id: 'item-2' } },
],
});
const alreadyDeletedError = Object.assign(new Error('Already deleted'), {
extensions: { code: ALREADY_DELETED_ERROR },
});
mockDeleteElementById
.mockRejectedValueOnce(alreadyDeletedError)
.mockResolvedValueOnce(undefined);

const result = await cleanOldNewsFeedItems(mockContext, mockUser, cutoffDate);

// Le 1er fail silencieusement (n'incrémente pas), le 2e réussit
expect(result).toBe(1);
expect(mockDeleteElementById).toHaveBeenCalledTimes(2);
});
});
});
Loading