Skip to content

Commit f392bee

Browse files
authored
Merge pull request bluewave-labs#3533 from bluewave-labs/fix/monitor-status-atomicity
fix: monitor status atomicity
2 parents d139db7 + d40b766 commit f392bee

6 files changed

Lines changed: 231 additions & 56 deletions

File tree

server/src/repositories/monitors/IMonitorsRepository.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { type MonitorType, type Monitor, type MonitorsSummary } from "@/types/index.js";
1+
import { type MonitorType, type Monitor, type MonitorsSummary, CheckSnapshot } from "@/types/index.js";
22

33
export interface TeamQueryConfig {
44
limit?: number;
@@ -29,6 +29,15 @@ export interface IMonitorsRepository {
2929

3030
// update
3131
updateById(monitorId: string, teamId: string, updates: Partial<Monitor>): Promise<Monitor>;
32+
updateStatusWindowAndChecks(
33+
monitorId: string,
34+
teamId: string,
35+
status: boolean,
36+
checkSnapshot: CheckSnapshot,
37+
windowSize: number,
38+
maxRecentChecks: number,
39+
statusPatch?: Partial<Monitor>
40+
): Promise<Monitor>;
3241
togglePauseById(monitorId: string, teamId: string): Promise<Monitor>;
3342
// delete
3443
deleteById(monitorId: string, teamId: string): Promise<Monitor>;

server/src/repositories/monitors/MongoMonitorsRepository.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,33 @@ class MongoMonitorsRepository implements IMonitorsRepository {
182182
return this.toEntity(updatedMonitor);
183183
};
184184

185+
updateStatusWindowAndChecks = async (
186+
monitorId: string,
187+
teamId: string,
188+
status: boolean,
189+
checkSnapshot: CheckSnapshot,
190+
windowSize: number,
191+
maxRecentChecks: number,
192+
statusPatch?: Partial<Monitor>
193+
): Promise<Monitor> => {
194+
const updatedMonitor = await MonitorModel.findOneAndUpdate(
195+
{ _id: monitorId, teamId },
196+
{
197+
$push: {
198+
statusWindow: { $each: [status], $slice: -windowSize },
199+
recentChecks: { $each: [checkSnapshot], $slice: -maxRecentChecks },
200+
},
201+
...(statusPatch && { $set: statusPatch }),
202+
},
203+
{ returnDocument: "after" }
204+
);
205+
206+
if (!updatedMonitor) {
207+
throw new AppError({ message: `Failed to update status and checks for monitor with id ${monitorId}`, status: 500 });
208+
}
209+
return this.toEntity(updatedMonitor);
210+
};
211+
185212
togglePauseById = async (monitorId: string, teamId: string) => {
186213
const monitor = await MonitorModel.findOneAndUpdate(
187214
{ _id: monitorId, teamId },

server/src/repositories/monitors/TimescaleMonitorsRepository.ts

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { Pool } from "pg";
2-
import type { Monitor, MonitorsSummary, MonitorStatus, MonitorType, MonitorMatchMethod, GeoContinent } from "@/types/monitor.js";
2+
import type { Monitor, MonitorsSummary, MonitorStatus, MonitorType, MonitorMatchMethod, GeoContinent, CheckSnapshot } from "@/types/monitor.js";
33
import type { IMonitorsRepository, TeamQueryConfig, SummaryConfig } from "./IMonitorsRepository.js";
44
import { AppError } from "@/utils/AppError.js";
55

@@ -641,6 +641,55 @@ export class TimescaleMonitorsRepository implements IMonitorsRepository {
641641
return entity;
642642
};
643643

644+
updateStatusWindowAndChecks = async (
645+
monitorId: string,
646+
teamId: string,
647+
status: boolean,
648+
_checkSnapshot: CheckSnapshot,
649+
windowSize: number,
650+
_maxRecentChecks: number,
651+
statusPatch?: Partial<Monitor>
652+
): Promise<Monitor> => {
653+
// In TimescaleDB, recentChecks live in the checks table (inserted by the buffer service),
654+
// so we only need to atomically update status_window and any statusPatch fields.
655+
const sets: string[] = [
656+
`status_window = (array_append(COALESCE(status_window, ARRAY[]::boolean[]), $1))[array_length(array_append(COALESCE(status_window, ARRAY[]::boolean[]), $1), 1) - $2 + 1:]`,
657+
`updated_at = NOW()`,
658+
];
659+
const values: unknown[] = [status, windowSize];
660+
let paramIndex = 3;
661+
662+
if (statusPatch) {
663+
const patchFieldMap: [keyof Monitor, string][] = [
664+
["status", "status"],
665+
["cpuAlertCounter", "cpu_alert_counter"],
666+
["memoryAlertCounter", "memory_alert_counter"],
667+
["diskAlertCounter", "disk_alert_counter"],
668+
["tempAlertCounter", "temp_alert_counter"],
669+
];
670+
for (const [key, column] of patchFieldMap) {
671+
if (statusPatch[key] !== undefined) {
672+
sets.push(`${column} = $${paramIndex++}`);
673+
values.push(statusPatch[key]);
674+
}
675+
}
676+
}
677+
678+
values.push(monitorId, teamId);
679+
const result = await this.pool.query<MonitorRow>(
680+
`UPDATE monitors SET ${sets.join(", ")} WHERE id = $${paramIndex++} AND team_id = $${paramIndex}
681+
RETURNING ${MONITOR_COLUMNS}`,
682+
values
683+
);
684+
const row = result.rows[0];
685+
if (!row) {
686+
throw new AppError({ message: `Failed to update status and checks for monitor with id ${monitorId}`, status: 500 });
687+
}
688+
const entity = this.toEntity(row);
689+
entity.notifications = await this.fetchNotificationIds([monitorId]).then((m) => m.get(monitorId) ?? []);
690+
return entity;
691+
};
692+
644693
togglePauseById = async (monitorId: string, teamId: string): Promise<Monitor> => {
645694
const result = await this.pool.query<MonitorRow>(
646695
`UPDATE monitors SET

server/src/service/infrastructure/statusService.ts

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,6 @@ export class StatusService implements IStatusService {
102102
}
103103
};
104104

105-
private appendToWindow = (monitor: Monitor, status: boolean) => {
106-
monitor.statusWindow = monitor.statusWindow || [];
107-
monitor.statusWindow.push(status);
108-
while (monitor.statusWindow.length > monitor.statusWindowSize) {
109-
monitor.statusWindow.shift();
110-
}
111-
};
112-
113105
private toCheckSnapshot = (check: Check): CheckSnapshot => {
114106
return {
115107
id: check.id,
@@ -134,15 +126,6 @@ export class StatusService implements IStatusService {
134126
};
135127
};
136128

137-
private appendToRecentChecks = (monitor: Monitor, check: Check) => {
138-
monitor.recentChecks = monitor.recentChecks || [];
139-
const checkSnapshot = this.toCheckSnapshot(check);
140-
monitor.recentChecks.push(checkSnapshot);
141-
while (monitor.recentChecks.length > MAX_RECENT_CHECKS) {
142-
monitor.recentChecks.shift();
143-
}
144-
};
145-
146129
private computeReachability = (
147130
currentStatus: MonitorStatus,
148131
window: Array<boolean>,
@@ -238,16 +221,31 @@ export class StatusService implements IStatusService {
238221
// Update running stats
239222
await this.tryUpdateRunningStats(monitor, statusResponse);
240223

241-
// Update the sliding window and recent checks
242-
this.appendToWindow(monitor, status);
243-
this.appendToRecentChecks(monitor, check);
244-
245224
const prevStatus = monitor.status;
225+
const checkSnapshot = this.toCheckSnapshot(check);
226+
227+
// Project the window as it will look after updating DB
228+
// This is done because we need the updated status window to compute new status, but we don't
229+
// want an an extra DB write just to get the window.
230+
const projectedWindow = [...(monitor.statusWindow || []), check.status].slice(-monitor.statusWindowSize);
231+
232+
// Build the status patch — computed against the projected window
233+
const patch: Partial<Monitor> = {};
234+
235+
// Not enough data points yet
236+
if (projectedWindow.length < monitor.statusWindowSize) {
237+
patch.status = status === true ? "up" : "down";
238+
239+
const updated = await this.monitorsRepository.updateStatusWindowAndChecks(
240+
monitor.id,
241+
monitor.teamId,
242+
check.status,
243+
checkSnapshot,
244+
monitor.statusWindowSize,
245+
MAX_RECENT_CHECKS,
246+
patch
247+
);
246248

247-
// Return early if not enough data points
248-
if (monitor.statusWindow.length < monitor.statusWindowSize) {
249-
monitor.status = status === true ? "up" : "down";
250-
const updated = await this.monitorsRepository.updateById(monitor.id, monitor.teamId, monitor);
251249
return {
252250
monitor: updated,
253251
statusChanged: false,
@@ -257,7 +255,8 @@ export class StatusService implements IStatusService {
257255
};
258256
}
259257

260-
// With a full window, a single raw check must not change UNLESS we are initializing. Otherwise, only the sliding-window threshold can trigger a transition.
258+
// With a full window, a single raw check must not change UNLESS we are initializing.
259+
// Otherwise, only the sliding-window threshold can trigger a transition.
261260
let newStatus: MonitorStatus;
262261
if (monitor.status === "initializing") {
263262
newStatus = status === true ? "up" : "down";
@@ -267,8 +266,9 @@ export class StatusService implements IStatusService {
267266

268267
let statusChanged = false;
269268

270-
// First evaluate reachability-based status changes, which apply to all monitor types and take precedence over hardware breaches.
271-
const reachabilityResult = this.computeReachability(newStatus, monitor.statusWindow, monitor.statusWindowThreshold);
269+
// First evaluate reachability-based status changes, which apply to all monitor types
270+
// and take precedence over hardware breaches.
271+
const reachabilityResult = this.computeReachability(newStatus, projectedWindow, monitor.statusWindowThreshold);
272272
if (reachabilityResult.transitioned) {
273273
newStatus = reachabilityResult.nextStatus;
274274
statusChanged = true;
@@ -296,21 +296,29 @@ export class StatusService implements IStatusService {
296296
},
297297
});
298298

299-
monitor.cpuAlertCounter = hardware.nextCounters.cpu;
300-
monitor.memoryAlertCounter = hardware.nextCounters.memory;
301-
monitor.diskAlertCounter = hardware.nextCounters.disk;
302-
monitor.tempAlertCounter = hardware.nextCounters.temp;
299+
patch.cpuAlertCounter = hardware.nextCounters.cpu;
300+
patch.memoryAlertCounter = hardware.nextCounters.memory;
301+
patch.diskAlertCounter = hardware.nextCounters.disk;
302+
patch.tempAlertCounter = hardware.nextCounters.temp;
303303
thresholdBreaches = hardware.breaches;
304304
if (hardware.transitioned) {
305305
newStatus = hardware.nextStatus;
306306
statusChanged = true;
307307
}
308308
}
309309

310-
// Apply the final status
311-
monitor.status = newStatus;
312-
313-
const updated = await this.monitorsRepository.updateById(monitor.id, monitor.teamId, monitor);
310+
patch.status = newStatus;
311+
312+
// Single atomic write: push arrays + set status/counters
313+
const updated = await this.monitorsRepository.updateStatusWindowAndChecks(
314+
monitor.id,
315+
monitor.teamId,
316+
check.status,
317+
checkSnapshot,
318+
monitor.statusWindowSize,
319+
MAX_RECENT_CHECKS,
320+
patch
321+
);
314322

315323
return {
316324
monitor: updated,

server/test/helpers/InMemoryMonitorsRepository.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { IMonitorsRepository, TeamQueryConfig, SummaryConfig } from "../../src/repositories/monitors/IMonitorsRepository.ts";
2-
import type { Monitor, MonitorsSummary } from "../../src/types/index.ts";
2+
import type { Monitor, MonitorsSummary, CheckSnapshot } from "../../src/types/index.ts";
33

44
export class InMemoryMonitorsRepository implements IMonitorsRepository {
55
private monitors: Monitor[] = [];
@@ -49,6 +49,37 @@ export class InMemoryMonitorsRepository implements IMonitorsRepository {
4949
return { ...updated };
5050
}
5151

52+
async updateStatusWindowAndChecks(
53+
monitorId: string,
54+
teamId: string,
55+
status: boolean,
56+
checkSnapshot: CheckSnapshot,
57+
windowSize: number,
58+
maxRecentChecks: number,
59+
statusPatch?: Partial<Monitor>
60+
): Promise<Monitor> {
61+
const index = this.monitors.findIndex((m) => m.id === monitorId && m.teamId === teamId);
62+
if (index === -1) {
63+
throw new Error(`Monitor ${monitorId} not found`);
64+
}
65+
const monitor = this.monitors[index];
66+
monitor.statusWindow = monitor.statusWindow || [];
67+
monitor.statusWindow.push(status);
68+
while (monitor.statusWindow.length > windowSize) {
69+
monitor.statusWindow.shift();
70+
}
71+
monitor.recentChecks = monitor.recentChecks || [];
72+
monitor.recentChecks.push(checkSnapshot);
73+
while (monitor.recentChecks.length > maxRecentChecks) {
74+
monitor.recentChecks.shift();
75+
}
76+
if (statusPatch) {
77+
Object.assign(monitor, statusPatch);
78+
}
79+
this.monitors[index] = monitor;
80+
return { ...monitor };
81+
}
82+
5283
async togglePauseById(monitorId: string, teamId: string): Promise<Monitor> {
5384
const monitor = await this.findById(monitorId, teamId);
5485
const newStatus = monitor.status === "paused" ? "up" : "paused";

0 commit comments

Comments
 (0)