Skip to content

Commit 7159a13

Browse files
authored
feat: Modularize SystemStatus to allow custom backpressure mechanisms (apify#3529)
1 parent be6dbc4 commit 7159a13

11 files changed

Lines changed: 811 additions & 355 deletions

packages/core/src/autoscaling/autoscaled_pool.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { betterClearInterval, betterSetInterval } from '@apify/utilities';
88
import { Configuration } from '../configuration';
99
import { CriticalError } from '../errors';
1010
import { log as defaultLog } from '../log';
11+
import type { LoadSignal } from './load_signal';
1112
import type { SnapshotterOptions } from './snapshotter';
1213
import { Snapshotter } from './snapshotter';
1314
import type { SystemInfo, SystemStatusOptions } from './system_status';
@@ -203,6 +204,10 @@ export class AutoscaledPool {
203204
private resolve: ((val?: unknown) => void) | null = null;
204205
private reject: ((reason?: unknown) => void) | null = null;
205206
private snapshotter: Snapshotter;
207+
208+
/** Additional SystemStatus loadSignals - tracked here for initialization and cleanup */
209+
private loadSignals: LoadSignal[];
210+
206211
private systemStatus: SystemStatus;
207212
private autoscaleInterval!: BetterIntervalID;
208213
private maybeRunInterval!: BetterIntervalID;
@@ -295,6 +300,7 @@ export class AutoscaledPool {
295300
});
296301
ssoCopy.config ??= this.config;
297302
this.snapshotter = ssoCopy.snapshotter;
303+
this.loadSignals = ssoCopy.loadSignals ?? [];
298304
this.systemStatus = new SystemStatus(ssoCopy);
299305
}
300306

@@ -366,6 +372,7 @@ export class AutoscaledPool {
366372
});
367373

368374
await this.snapshotter.start();
375+
await Promise.all(this.loadSignals.map((s) => s.start()));
369376

370377
// This interval checks the system status and updates the desired concurrency accordingly.
371378
this.autoscaleInterval = betterSetInterval(this._autoscale, this.autoscaleIntervalMillis);
@@ -699,6 +706,7 @@ export class AutoscaledPool {
699706
betterClearInterval(this.maybeRunInterval);
700707
if (this.tasksDonePerSecondInterval) betterClearInterval(this.tasksDonePerSecondInterval);
701708
if (this.snapshotter) await this.snapshotter.stop();
709+
await Promise.all(this.loadSignals.map((s) => s.stop()));
702710
}
703711

704712
protected _incrementTasksDonePerSecond(intervalCallback: () => void) {
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import type { StorageClient } from '@crawlee/types';
2+
3+
import type { LoadSnapshot } from './load_signal';
4+
import { SnapshotStore } from './load_signal';
5+
6+
const CLIENT_RATE_LIMIT_ERROR_RETRY_COUNT = 2;
7+
8+
export interface ClientSnapshot extends LoadSnapshot {
9+
rateLimitErrorCount: number;
10+
}
11+
12+
export interface ClientLoadSignalOptions {
13+
client: StorageClient;
14+
clientSnapshotIntervalSecs?: number;
15+
maxClientErrors?: number;
16+
overloadedRatio?: number;
17+
snapshotHistoryMillis?: number;
18+
}
19+
20+
/**
21+
* Periodically checks the storage client for rate-limit errors (HTTP 429)
22+
* and reports overload when the error delta exceeds a threshold.
23+
*/
24+
export function createClientLoadSignal(options: ClientLoadSignalOptions) {
25+
const maxClientErrors = options.maxClientErrors ?? 3;
26+
27+
const signal = SnapshotStore.fromInterval<ClientSnapshot>({
28+
name: 'clientInfo',
29+
overloadedRatio: options.overloadedRatio ?? 0.3,
30+
intervalMillis: (options.clientSnapshotIntervalSecs ?? 1) * 1000,
31+
snapshotHistoryMillis: options.snapshotHistoryMillis,
32+
handler(store, intervalCallback) {
33+
const now = new Date();
34+
35+
const allErrorCounts = options.client.stats?.rateLimitErrors ?? [];
36+
const currentErrCount = allErrorCounts[CLIENT_RATE_LIMIT_ERROR_RETRY_COUNT] || 0;
37+
38+
const snapshot: ClientSnapshot = {
39+
createdAt: now,
40+
isOverloaded: false,
41+
rateLimitErrorCount: currentErrCount,
42+
};
43+
const all = store.getAll();
44+
const previousSnapshot = all[all.length - 1];
45+
if (previousSnapshot) {
46+
const { rateLimitErrorCount } = previousSnapshot;
47+
const delta = currentErrCount - rateLimitErrorCount;
48+
if (delta > maxClientErrors) snapshot.isOverloaded = true;
49+
}
50+
51+
store.push(snapshot, now);
52+
intervalCallback();
53+
},
54+
});
55+
56+
return signal;
57+
}
58+
59+
/** @internal Return type for backward compat in Snapshotter facade */
60+
export type ClientLoadSignal = ReturnType<typeof createClientLoadSignal>;
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import type { Configuration } from '../configuration';
2+
import { EventType } from '../events/event_manager';
3+
import type { LoadSnapshot } from './load_signal';
4+
import { SnapshotStore } from './load_signal';
5+
import type { SystemInfo } from './system_status';
6+
7+
export interface CpuSnapshot extends LoadSnapshot {
8+
usedRatio: number;
9+
ticks?: { idle: number; total: number };
10+
}
11+
12+
export interface CpuLoadSignalOptions {
13+
overloadedRatio?: number;
14+
snapshotHistoryMillis?: number;
15+
config: Configuration;
16+
}
17+
18+
/**
19+
* Tracks CPU usage via `SYSTEM_INFO` events and reports overload when
20+
* the platform or local OS metrics indicate the CPU is overloaded.
21+
*/
22+
export function createCpuLoadSignal(options: CpuLoadSignalOptions) {
23+
return SnapshotStore.fromEvent<CpuSnapshot, SystemInfo>({
24+
name: 'cpuInfo',
25+
overloadedRatio: options.overloadedRatio ?? 0.4,
26+
events: options.config.getEventManager(),
27+
event: EventType.SYSTEM_INFO,
28+
snapshotHistoryMillis: options.snapshotHistoryMillis,
29+
handler(store, systemInfo) {
30+
const { cpuCurrentUsage, isCpuOverloaded } = systemInfo;
31+
const createdAt = systemInfo.createdAt ? new Date(systemInfo.createdAt) : new Date();
32+
store.push(
33+
{
34+
createdAt,
35+
isOverloaded: isCpuOverloaded!,
36+
usedRatio: Math.ceil(cpuCurrentUsage! / 100),
37+
},
38+
createdAt,
39+
);
40+
},
41+
});
42+
}
43+
44+
/** @internal Return type for backward compat in Snapshotter facade */
45+
export type CpuLoadSignal = ReturnType<typeof createCpuLoadSignal>;
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import type { LoadSnapshot } from './load_signal';
2+
import { SnapshotStore } from './load_signal';
3+
4+
export interface EventLoopSnapshot extends LoadSnapshot {
5+
exceededMillis: number;
6+
}
7+
8+
export interface EventLoopLoadSignalOptions {
9+
eventLoopSnapshotIntervalSecs?: number;
10+
maxBlockedMillis?: number;
11+
overloadedRatio?: number;
12+
snapshotHistoryMillis?: number;
13+
}
14+
15+
/**
16+
* Periodically measures event loop delay and reports overload when the
17+
* delay exceeds a configured threshold.
18+
*/
19+
export function createEventLoopLoadSignal(options: EventLoopLoadSignalOptions = {}) {
20+
const intervalMillis = (options.eventLoopSnapshotIntervalSecs ?? 0.5) * 1000;
21+
const maxBlockedMillis = options.maxBlockedMillis ?? 50;
22+
23+
const signal = SnapshotStore.fromInterval<EventLoopSnapshot>({
24+
name: 'eventLoopInfo',
25+
overloadedRatio: options.overloadedRatio ?? 0.6,
26+
intervalMillis,
27+
snapshotHistoryMillis: options.snapshotHistoryMillis,
28+
handler(store, intervalCallback) {
29+
const now = new Date();
30+
31+
const snapshot: EventLoopSnapshot = {
32+
createdAt: now,
33+
isOverloaded: false,
34+
exceededMillis: 0,
35+
};
36+
37+
const all = store.getAll();
38+
const previousSnapshot = all[all.length - 1];
39+
if (previousSnapshot) {
40+
const { createdAt } = previousSnapshot;
41+
const delta = now.getTime() - +createdAt - intervalMillis;
42+
43+
if (delta > maxBlockedMillis) snapshot.isOverloaded = true;
44+
snapshot.exceededMillis = Math.max(delta - maxBlockedMillis, 0);
45+
}
46+
47+
store.push(snapshot, now);
48+
intervalCallback();
49+
},
50+
});
51+
52+
return signal;
53+
}
54+
55+
/** @internal Return type for backward compat in Snapshotter facade */
56+
export type EventLoopLoadSignal = ReturnType<typeof createEventLoopLoadSignal>;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
11
export * from './autoscaled_pool';
2+
export * from './client_load_signal';
3+
export * from './cpu_load_signal';
4+
export * from './event_loop_load_signal';
5+
export * from './load_signal';
6+
export * from './memory_load_signal';
27
export * from './snapshotter';
38
export * from './system_status';

0 commit comments

Comments
 (0)