Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions autobots/orchestrator.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export class GrowthOrchestrator {
#startedAt = 0;
#bots = [];
#stopped = false;
#tickPromise = null;
#poolExhaustedLogged = false;

constructor({
serverUrl,
Expand Down Expand Up @@ -125,12 +127,43 @@ export class GrowthOrchestrator {
}
}

await this.#tickLoop();
this.#tickPromise = this.#tickLoop();
await this.#tickPromise;
}

stop() {
/**
* Signal all bots + the tick loop to exit and await their completion,
* bounded by `timeoutMs` so a wedged loop can't block shutdown forever.
*/
async stop(timeoutMs = 25_000) {
this.#stopped = true;
for (const bot of this.#bots) bot.stop();
// Snapshot the bot list before signalling so any concurrent mutation
// (e.g. release callbacks removing bots from #bots) doesn't alter the
// set we wait on.
const loops = this.#bots.map((b) => {
b.stop();
return b.loopPromise;
});
Comment on lines +143 to +146
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment mentions taking a "snapshot" of the bot list, but the code iterates over this.#bots directly. While safe in this specific implementation because b.stop() is synchronous and doesn't trigger immediate removal, it is better practice to actually snapshot the array (e.g., using the spread operator) to ensure the set of bots being signaled is isolated from any concurrent mutations to the collection during iteration.

Suggested change
const loops = this.#bots.map((b) => {
b.stop();
return b.loopPromise;
});
const loops = [...this.#bots].map((b) => {
b.stop();
return b.loopPromise;
});

if (this.#tickPromise) loops.push(this.#tickPromise);
const pending = loops.filter(Boolean);
if (pending.length === 0) return;
let timerHandle;
const timer = new Promise((resolve) => {
timerHandle = setTimeout(() => resolve("timeout"), timeoutMs);
});
const settled = Promise.allSettled(pending).then((results) => {
for (const r of results) {
if (r.status === "rejected") {
logCritical(`Loop rejected during shutdown: ${r.reason?.message || r.reason}`);
}
}
return "clean";
});
const outcome = await Promise.race([settled, timer]);
clearTimeout(timerHandle);
if (outcome === "timeout") {
logWarn(`Orchestrator.stop(): ${pending.length} loop(s) still running after ${timeoutMs}ms; exiting anyway`);
}
}

async #spawnBot(keyPath, reused = false) {
Expand Down Expand Up @@ -184,8 +217,16 @@ export class GrowthOrchestrator {
logWarn(
`Party pool exhausted (${this.#pool.inUseCount}/${this.#pool.maxSize}); will retry next tick`
);
if (!this.#poolExhaustedLogged) {
// Emit once per stretch; reset when a checkout next succeeds.
console.log(
`[EVENT] ${iso()} pool_exhausted inUse=${this.#pool.inUseCount} max=${this.#pool.maxSize}`
);
this.#poolExhaustedLogged = true;
}
break;
}
this.#poolExhaustedLogged = false;
try {
await this.#spawnBot(reservation.keyPath, reservation.reused);
} catch {
Expand Down
48 changes: 41 additions & 7 deletions autobots/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,21 @@ function buildPersonas(weightsSpec) {
}

async function runGrowthMode(opts, tasks) {
// Sanity-check demo-time-scale: must be positive & finite. Warn if it's
// absurdly high — a misconfigured value like 100000 would collapse all
// per-bot intervals toward the 100ms floor.
if (!Number.isFinite(opts.demoTimeScale) || opts.demoTimeScale <= 0) {
logCritical(
`Invalid --demo-time-scale ${opts.demoTimeScale}; must be a positive number. Aborting.`
);
process.exit(1);
}
if (opts.demoTimeScale > 1000) {
logWarn(
`--demo-time-scale=${opts.demoTimeScale} is very high; per-bot intervals may collapse to the 100ms floor. Continuing anyway.`
);
}

const personas = buildPersonas(opts.personaWeights);
const personaSummary = Object.values(personas)
.map((p) => `${p.name}=${p.weight}`)
Expand Down Expand Up @@ -343,16 +358,26 @@ async function runGrowthMode(opts, tasks) {
poolSize: opts.poolSize,
});

const shutdown = () => {
logInfo("Shutdown requested — stopping orchestrator and printing final status...");
orchestrator.stop();
if (statusTimer) clearInterval(statusTimer);
let shuttingDown = false;
const shutdown = async (signal) => {
if (shuttingDown) return;
shuttingDown = true;
logInfo(`Shutdown requested (${signal}) — stopping orchestrator and printing final status...`);
if (statusTimer) {
clearInterval(statusTimer);
statusTimer = null;
}
try {
await orchestrator.stop();
} catch (err) {
logCritical(`Error during orchestrator.stop: ${err.message}`);
}
console.log(stats.formatStatus());
logInfo(`Final: ${JSON.stringify(stats.snapshot().cumulative)} bots=${stats.botCount}`);
process.exit(0);
};
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
process.on("SIGINT", () => void shutdown("SIGINT"));
process.on("SIGTERM", () => void shutdown("SIGTERM"));

process.on("uncaughtException", (err) => {
logCritical(`Uncaught exception: ${err.message}`);
Expand All @@ -364,7 +389,16 @@ async function runGrowthMode(opts, tasks) {
if (reason?.stack) console.error(reason.stack);
});

await orchestrator.start();
try {
await orchestrator.start();
} finally {
// Belt-and-braces: if start() ever returns (or throws) before shutdown
// fires, still release the status timer so node can exit cleanly.
if (statusTimer) {
clearInterval(statusTimer);
statusTimer = null;
}
}
}

async function main() {
Expand Down
39 changes: 37 additions & 2 deletions autobots/virtual-bot.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}

// Dormant after this many consecutive task errors (thrown or agent-reported)
// with no successful task between. Prevents an orphaned bot from spinning
// against a dead token / stale key / misconfigured server forever.
//
// Only *task* errors count — top-up failures log but don't retire a bot,
// because a billing-portal outage simultaneously hitting all 20 bots would
// otherwise cause mass dormancy. If top-ups fail long enough that the bot
// runs out of credit, subsequent task failures (billing denials) will
// correctly increment this counter.
const MAX_CONSECUTIVE_TASK_ERRORS = 5;

export class VirtualBot {
#agent;
#keyPath;
Expand All @@ -41,8 +52,10 @@ export class VirtualBot {
#taskIndex = 0;
#tasksRun = 0;
#topUpsDone = 0;
#consecutiveTaskErrors = 0;
#nextTaskAt = 0;
#nextTopUpAt = 0;
#loopPromise = null;

constructor({
keyPath,
Expand Down Expand Up @@ -107,7 +120,13 @@ export class VirtualBot {
const result = await this.#agent.executeTask(task);
this.#stats.addTaskResults([result], { persona: this.#persona.name });
this.#tasksRun++;
if (result.error) {
this.#consecutiveTaskErrors++;
} else {
this.#consecutiveTaskErrors = 0;
}
} catch (err) {
this.#consecutiveTaskErrors++;
console.log(
`[EVENT] ${iso()} bot_task_crash partyId=${this.partyId?.slice(0, 40) || "unknown"} persona=${this.#persona.name} error=${JSON.stringify(err.message)}`
);
Expand All @@ -130,6 +149,7 @@ export class VirtualBot {
amountCC: this.#persona.topUpAmountCC,
});
} catch (err) {
// Intentionally does NOT bump the task-error counter; see constant comment.
console.log(
`[WARN] ${iso()} bot_topup_failed partyId=${this.partyId?.slice(0, 40) || "unknown"} persona=${this.#persona.name} error=${JSON.stringify(err.message)}`
);
Expand Down Expand Up @@ -160,10 +180,10 @@ export class VirtualBot {
}

startLoop() {
if (this.#running) return;
if (this.#running) return this.#loopPromise;
this.#running = true;

(async () => {
this.#loopPromise = (async () => {
const now = Date.now();
this.#scheduleNextTask(now);
this.#scheduleNextTopUp(now);
Expand All @@ -185,13 +205,28 @@ export class VirtualBot {
this.#goDormant("lifetime_cap");
break;
}
if (this.#consecutiveTaskErrors >= MAX_CONSECUTIVE_TASK_ERRORS) {
this.#goDormant("task_error_threshold");
break;
}
this.#scheduleNextTask(Date.now());
} else if (fired >= this.#nextTopUpAt) {
await this.#runTopUp();
if (this.#consecutiveTaskErrors >= MAX_CONSECUTIVE_TASK_ERRORS) {
this.#goDormant("task_error_threshold");
break;
}
this.#scheduleNextTopUp(Date.now());
Comment on lines +215 to 219
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This error threshold check is redundant in the top-up block. The MAX_CONSECUTIVE_TASK_ERRORS counter is only incremented within #runTask(), and it is already checked immediately after that call (lines 208-211). Since #runTopUp() does not modify the error counter, the condition cannot become true during this block if it wasn't already true (and caught) previously.

          this.#scheduleNextTopUp(Date.now());

}
}
this.#running = false;
})();

return this.#loopPromise;
}

/** Promise resolving when the loop has exited. Used by orchestrator.stop(). */
get loopPromise() {
return this.#loopPromise;
}
}
Loading