diff --git a/lambdas/functions/control-plane/src/aws/runners.test.ts b/lambdas/functions/control-plane/src/aws/runners.test.ts index 63f1412dd0..d09e6dc9ec 100644 --- a/lambdas/functions/control-plane/src/aws/runners.test.ts +++ b/lambdas/functions/control-plane/src/aws/runners.test.ts @@ -19,7 +19,7 @@ import 'aws-sdk-client-mock-jest/vitest'; import { beforeEach, describe, expect, it, vi } from 'vitest'; import ScaleError from './../scale-runners/ScaleError'; -import { createRunner, listEC2Runners, tag, terminateRunner, untag } from './runners'; +import { createRunner, listEC2Runners, tag, terminateRunners, untag } from './runners'; import type { RunnerInfo, RunnerInputParameters, RunnerType } from './runners.d'; process.env.AWS_REGION = 'eu-east-1'; @@ -250,23 +250,62 @@ describe('list instances', () => { }); }); -describe('terminate runner', () => { +describe('terminateRunners', () => { beforeEach(() => { vi.clearAllMocks(); + mockEC2Client.reset(); }); - it('calls terminate instances with the right instance ids', async () => { - mockEC2Client.on(TerminateInstancesCommand).resolves({}); - const runner: RunnerInfo = { - instanceId: 'instance-2', - owner: 'owner-2', - type: 'Repo', - }; - await terminateRunner(runner.instanceId); + it('does nothing for empty list', async () => { + await terminateRunners([]); + expect(mockEC2Client).not.toHaveReceivedCommand(TerminateInstancesCommand); + }); + + it('issues one EC2 call for a small batch', async () => { + mockEC2Client.on(TerminateInstancesCommand).resolves({ + TerminatingInstances: [ + { InstanceId: 'i-1', CurrentState: { Name: 'shutting-down' } }, + { InstanceId: 'i-2', CurrentState: { Name: 'shutting-down' } }, + ], + }); + await terminateRunners(['i-1', 'i-2']); + expect(mockEC2Client).toHaveReceivedCommandTimes(TerminateInstancesCommand, 1); expect(mockEC2Client).toHaveReceivedCommandWith(TerminateInstancesCommand, { - InstanceIds: [runner.instanceId], + InstanceIds: ['i-1', 'i-2'], }); }); + + it('chunks into batches of 100 for large lists', async () => { + mockEC2Client.on(TerminateInstancesCommand).resolves({}); + const ids = Array.from({ length: 150 }, (_, i) => `i-${i}`); + await terminateRunners(ids); + expect(mockEC2Client).toHaveReceivedCommandTimes(TerminateInstancesCommand, 2); + expect(mockEC2Client).toHaveReceivedNthCommandWith(1, TerminateInstancesCommand, { + InstanceIds: ids.slice(0, 100), + }); + expect(mockEC2Client).toHaveReceivedNthCommandWith(2, TerminateInstancesCommand, { + InstanceIds: ids.slice(100), + }); + }); + + it('retries individually on batch error and terminates good instances', async () => { + // i-bad always throws; i-good and i-also-good succeed + mockEC2Client + .on(TerminateInstancesCommand, { InstanceIds: ['i-good', 'i-bad', 'i-also-good'] }) + .rejects(new Error('InvalidInstanceID.NotFound')) + .on(TerminateInstancesCommand, { InstanceIds: ['i-bad'] }) + .rejects(new Error('InvalidInstanceID.NotFound')) + .on(TerminateInstancesCommand, { InstanceIds: ['i-good'] }) + .resolves({}) + .on(TerminateInstancesCommand, { InstanceIds: ['i-also-good'] }) + .resolves({}); + + await terminateRunners(['i-good', 'i-bad', 'i-also-good']); + + // good instances must have been terminated despite the bad id + expect(mockEC2Client).toHaveReceivedCommandWith(TerminateInstancesCommand, { InstanceIds: ['i-good'] }); + expect(mockEC2Client).toHaveReceivedCommandWith(TerminateInstancesCommand, { InstanceIds: ['i-also-good'] }); + }); }); describe('tag runner', () => { diff --git a/lambdas/functions/control-plane/src/aws/runners.ts b/lambdas/functions/control-plane/src/aws/runners.ts index 7f7f5750bf..d5a34ce083 100644 --- a/lambdas/functions/control-plane/src/aws/runners.ts +++ b/lambdas/functions/control-plane/src/aws/runners.ts @@ -102,11 +102,33 @@ function getRunnerInfo(runningInstances: DescribeInstancesResult) { return runners; } -export async function terminateRunner(instanceId: string): Promise { - logger.debug(`Runner '${instanceId}' will be terminated.`); +// AWS TerminateInstances accepts up to 1000 InstanceIds per call; we keep batches +// at 100 to bound payload + blast radius (a failed batch retries per id, see terminateBatch). +const TERMINATE_BATCH_SIZE = 100; + +export async function terminateRunners(instanceIds: string[]): Promise { + if (instanceIds.length === 0) return; // empty InstanceIds errors at the API const ec2 = getTracedAWSV3Client(new EC2Client({ region: process.env.AWS_REGION })); - await ec2.send(new TerminateInstancesCommand({ InstanceIds: [instanceId] })); - logger.debug(`Runner ${instanceId} has been terminated.`); + for (let i = 0; i < instanceIds.length; i += TERMINATE_BATCH_SIZE) { + await terminateBatch(ec2, instanceIds.slice(i, i + TERMINATE_BATCH_SIZE)); + } +} + +// Common path = 1 EC2 call for the whole batch. TerminateInstances is all-or-nothing +// (one bad id throws InvalidInstanceID.NotFound and nothing is terminated), so on +// error we retry each id individually — the bad one errors at size 1 and is logged. +async function terminateBatch(ec2: EC2Client, batch: string[]): Promise { + try { + await ec2.send(new TerminateInstancesCommand({ InstanceIds: batch })); + logger.debug(`Runners terminated: ${batch.join(', ')}`); + } catch (e) { + if (batch.length === 1) { + logger.error(`Failed to terminate runner '${batch[0]}'`, { error: e as Error }); + return; + } + logger.warn(`Batch terminate failed (${batch.length} ids), retrying individually.`, { error: e as Error }); + for (const id of batch) await terminateBatch(ec2, [id]); + } } export async function tag(instanceId: string, tags: Tag[]): Promise { diff --git a/lambdas/functions/control-plane/src/scale-runners/scale-down.test.ts b/lambdas/functions/control-plane/src/scale-runners/scale-down.test.ts index 42fd442a3f..0aa577889f 100644 --- a/lambdas/functions/control-plane/src/scale-runners/scale-down.test.ts +++ b/lambdas/functions/control-plane/src/scale-runners/scale-down.test.ts @@ -5,7 +5,7 @@ import nock from 'nock'; import { RunnerInfo, RunnerList } from '../aws/runners.d'; import * as ghAuth from '../github/auth'; -import { listEC2Runners, terminateRunner, tag, untag } from './../aws/runners'; +import { listEC2Runners, terminateRunners, tag, untag } from './../aws/runners'; import { githubCache } from './cache'; import { newestFirstStrategy, oldestFirstStrategy, scaleDown } from './scale-down'; import { describe, it, expect, beforeEach, vi } from 'vitest'; @@ -37,7 +37,7 @@ vi.mock('./../aws/runners', async (importOriginal) => { ...actual, tag: vi.fn(), untag: vi.fn(), - terminateRunner: vi.fn(), + terminateRunners: vi.fn(), listEC2Runners: vi.fn(), }; }); @@ -68,7 +68,7 @@ const mockCreateClient = vi.mocked(ghAuth.createOctokitClient); const mockListRunners = vi.mocked(listEC2Runners); const mockTagRunners = vi.mocked(tag); const mockUntagRunners = vi.mocked(untag); -const mockTerminateRunners = vi.mocked(terminateRunner); +const mockTerminateRunners = vi.mocked(terminateRunners); export interface TestData { repositoryName: string; @@ -208,7 +208,7 @@ describe('Scale down runners', () => { expect(listEC2Runners).toHaveBeenCalledWith({ environment: ENVIRONMENT, }); - expect(terminateRunner).not.toHaveBeenCalled(); + expect(terminateRunners).not.toHaveBeenCalled(); expect(mockOctokit.apps.getRepoInstallation).not.toHaveBeenCalled(); expect(mockOctokit.apps.getRepoInstallation).not.toHaveBeenCalled(); }); @@ -303,7 +303,7 @@ describe('Scale down runners', () => { await scaleDown(); // assert - expect(terminateRunner).not.toHaveBeenCalled(); + expect(terminateRunners).not.toHaveBeenCalled(); checkNonTerminated(runners); }); @@ -407,7 +407,7 @@ describe('Scale down runners', () => { // assert expect(mockUntagRunners).toHaveBeenCalledWith(orphanRunner.instanceId, [{ Key: 'ghr:orphan', Value: 'true' }]); - expect(mockTerminateRunners).not.toHaveBeenCalledWith(orphanRunner.instanceId); + expect(mockTerminateRunners.mock.calls.flatMap((c) => c[0] as string[])).not.toContain(orphanRunner.instanceId); // arrange if (type === 'Repo') { @@ -424,7 +424,7 @@ describe('Scale down runners', () => { await scaleDown(); // assert - expect(mockTerminateRunners).toHaveBeenCalledWith(orphanRunner.instanceId); + expect(mockTerminateRunners.mock.calls.flatMap((c) => c[0] as string[])).toContain(orphanRunner.instanceId); }); it('Should handle 404 error when checking orphaned runner (JIT) - treat as orphaned', async () => { @@ -463,7 +463,7 @@ describe('Scale down runners', () => { await scaleDown(); // assert - should terminate since 404 means runner doesn't exist on GitHub - expect(mockTerminateRunners).toHaveBeenCalledWith(orphanRunner.instanceId); + expect(mockTerminateRunners.mock.calls.flatMap((c) => c[0] as string[])).toContain(orphanRunner.instanceId); }); it('Should handle 404 error when checking runner busy state - treat as not busy', async () => { @@ -539,7 +539,8 @@ describe('Scale down runners', () => { await expect(scaleDown()).resolves.not.toThrow(); // Should not terminate since the error was not a 404 - expect(terminateRunner).not.toHaveBeenCalledWith(orphanRunner.instanceId); + const calledIdsAfterError = mockTerminateRunners.mock.calls.flatMap((call) => call[0] as string[]); + expect(calledIdsAfterError).not.toContain(orphanRunner.instanceId); }); it(`Should ignore errors when termination orphan fails.`, async () => { @@ -664,14 +665,15 @@ describe('Scale down runners', () => { await scaleDown(); // assert + const allCalledIds = mockTerminateRunners.mock.calls.flatMap((call) => call[0] as string[]); const runnersToTerminate = runners.filter((r) => r.shouldBeTerminated); for (const toTerminate of runnersToTerminate) { - expect(terminateRunner).toHaveBeenCalledWith(toTerminate.instanceId); + expect(allCalledIds).toContain(toTerminate.instanceId); } const runnersNotToTerminate = runners.filter((r) => !r.shouldBeTerminated); for (const notTerminated of runnersNotToTerminate) { - expect(terminateRunner).not.toHaveBeenCalledWith(notTerminated.instanceId); + expect(allCalledIds).not.toContain(notTerminated.instanceId); } }); }); @@ -809,17 +811,16 @@ function mockAwsRunners(runners: RunnerTestItem[]) { function checkNonTerminated(runners: RunnerTestItem[]) { const notTerminated = runners.filter((r) => !r.shouldBeTerminated); - for (const toTerminate of notTerminated) { - expect(terminateRunner).not.toHaveBeenCalledWith(toTerminate.instanceId); + const allCalledIds = mockTerminateRunners.mock.calls.flatMap((call) => call[0] as string[]); + for (const r of notTerminated) { + expect(allCalledIds).not.toContain(r.instanceId); } } function checkTerminated(runners: RunnerTestItem[]) { - const runnersToTerminate = runners.filter((r) => r.shouldBeTerminated); - expect(terminateRunner).toHaveBeenCalledTimes(runnersToTerminate.length); - for (const toTerminate of runnersToTerminate) { - expect(terminateRunner).toHaveBeenCalledWith(toTerminate.instanceId); - } + const idsToTerminate = runners.filter((r) => r.shouldBeTerminated).map((r) => r.instanceId); + const allCalledIds = mockTerminateRunners.mock.calls.flatMap((call) => call[0] as string[]); + expect([...allCalledIds].sort()).toEqual([...idsToTerminate].sort()); } function mockGitHubRunners(runners: RunnerTestItem[]) { diff --git a/lambdas/functions/control-plane/src/scale-runners/scale-down.ts b/lambdas/functions/control-plane/src/scale-runners/scale-down.ts index c92dddfca9..4f0386180e 100644 --- a/lambdas/functions/control-plane/src/scale-runners/scale-down.ts +++ b/lambdas/functions/control-plane/src/scale-runners/scale-down.ts @@ -10,7 +10,7 @@ import { createOctokitClient, getStoredInstallationId, } from '../github/auth'; -import { bootTimeExceeded, listEC2Runners, tag, untag, terminateRunner } from './../aws/runners'; +import { bootTimeExceeded, listEC2Runners, tag, untag, terminateRunners } from './../aws/runners'; import { RunnerInfo, RunnerList } from './../aws/runners.d'; import { GhRunners, githubCache } from './cache'; import { ScalingDownConfig, getEvictionStrategy, getIdleRunnerCount } from './scale-down-config'; @@ -137,15 +137,17 @@ function runnerMinimumTimeExceeded(runner: RunnerInfo): boolean { return launchTimePlusMinimum < now; } -async function removeRunner(ec2runner: RunnerInfo, ghRunnerIds: number[]): Promise { - const githubAppClient = await getOrCreateOctokit(ec2runner); +// Returns the instanceId when the runner was successfully de-registered from GitHub, +// so the caller can batch-terminate in one EC2 call. Returns undefined otherwise. +async function deregisterRunner(ec2runner: RunnerInfo, ghRunnerIds: number[]): Promise { try { + const githubAppClient = await getOrCreateOctokit(ec2runner); const runnerList = ec2runner as unknown as RunnerList; if (runnerList.bypassRemoval) { logger.info( `Runner '${ec2runner.instanceId}' has bypass-removal tag set, skipping removal. Remove the tag to allow scale-down.`, ); - return; + return undefined; } const states = await Promise.all( @@ -174,18 +176,21 @@ async function removeRunner(ec2runner: RunnerInfo, ghRunnerIds: number[]): Promi ); if (statuses.every((status) => status == 204)) { - await terminateRunner(ec2runner.instanceId); - logger.info(`AWS runner instance '${ec2runner.instanceId}' is terminated and GitHub runner is de-registered.`); + logger.info(`GitHub runner de-registered for '${ec2runner.instanceId}'.`); + return ec2runner.instanceId; } else { logger.error(`Failed to de-register GitHub runner: ${statuses}`); + return undefined; } } else { logger.info(`Runner '${ec2runner.instanceId}' cannot be de-registered, because it is still busy.`); + return undefined; } } catch (e) { logger.error(`Runner '${ec2runner.instanceId}' cannot be de-registered. Error: ${e}`, { error: e as Error, }); + return undefined; } } @@ -196,6 +201,7 @@ async function evaluateAndRemoveRunners( let idleCounter = getIdleRunnerCount(scaleDownConfigs); const evictionStrategy = getEvictionStrategy(scaleDownConfigs); const ownerTags = new Set(ec2Runners.map((runner) => runner.owner)); + const toTerminate: string[] = []; for (const ownerTag of ownerTags) { const ec2RunnersFiltered = ec2Runners @@ -220,11 +226,15 @@ async function evaluateAndRemoveRunners( idleCounter--; logger.info(`Runner '${ec2Runner.instanceId}' will be kept idle.`); } else { - logger.info(`Terminating all non busy runners.`); - await removeRunner( + logger.info(`Runner '${ec2Runner.instanceId}' exceeds idle limit, de-registering.`); + const id = await deregisterRunner( ec2Runner, ghRunnersFiltered.map((runner: { id: number }) => runner.id), ); + if (id) { + toTerminate.push(id); + logger.info(`Runner '${id}' queued for termination.`); + } } } } else if (bootTimeExceeded(ec2Runner)) { @@ -234,6 +244,11 @@ async function evaluateAndRemoveRunners( } } } + + if (toTerminate.length > 0) { + logger.info(`Terminating ${toTerminate.length} runner(s): ${toTerminate.join(', ')}`); + await terminateRunners(toTerminate); + } } async function markOrphan(instanceId: string): Promise { @@ -280,22 +295,32 @@ async function lastChanceCheckOrphanRunner(runner: RunnerList): Promise async function terminateOrphan(environment: string): Promise { try { const orphanRunners = await listEC2Runners({ environment, orphan: true }); + const toTerminate: string[] = []; for (const runner of orphanRunners) { - if (runner.runnerId) { - const isOrphan = await lastChanceCheckOrphanRunner(runner); - if (isOrphan) { - await terminateRunner(runner.instanceId); + try { + if (runner.runnerId) { + const isOrphan = await lastChanceCheckOrphanRunner(runner); + if (isOrphan) { + toTerminate.push(runner.instanceId); + } else { + await unMarkOrphan(runner.instanceId); + } } else { - await unMarkOrphan(runner.instanceId); + logger.info(`Queuing orphan runner '${runner.instanceId}' for termination.`); + toTerminate.push(runner.instanceId); } - } else { - logger.info(`Terminating orphan runner '${runner.instanceId}'`); - await terminateRunner(runner.instanceId).catch((e) => { - logger.error(`Failed to terminate orphan runner '${runner.instanceId}'`, { error: e }); - }); + } catch (e) { + logger.error(`Failed to evaluate orphan runner '${runner.instanceId}', skipping.`, { error: e as Error }); } } + + if (toTerminate.length > 0) { + logger.info(`Terminating ${toTerminate.length} orphan runner(s): ${toTerminate.join(', ')}`); + await terminateRunners(toTerminate).catch((e) => { + logger.error(`Failed to terminate orphan runners: ${toTerminate.join(', ')}`, { error: e }); + }); + } } catch (e) { logger.warn(`Failure during orphan termination processing.`, { error: e }); } diff --git a/lambdas/functions/control-plane/src/scale-runners/scale-up.ts b/lambdas/functions/control-plane/src/scale-runners/scale-up.ts index 8ebd8810d8..a970b25dd8 100644 --- a/lambdas/functions/control-plane/src/scale-runners/scale-up.ts +++ b/lambdas/functions/control-plane/src/scale-runners/scale-up.ts @@ -10,7 +10,7 @@ import { getAppCount, getStoredInstallationId, } from '../github/auth'; -import { createRunner, listEC2Runners, tag, terminateRunner } from './../aws/runners'; +import { createRunner, listEC2Runners, tag, terminateRunners } from './../aws/runners'; import { RunnerInputParameters } from './../aws/runners.d'; import { metricGitHubAppRateLimit } from '../github/rate-limit'; import { publishRetryMessage } from './job-retry'; @@ -281,7 +281,7 @@ export async function createRunners( for (const instanceId of failedInstances) { try { - await terminateRunner(instanceId); + await terminateRunners([instanceId]); } catch (error) { logger.error('Failed to terminate instance', { instanceId, diff --git a/scripts/check-terminate-quota.sh b/scripts/check-terminate-quota.sh new file mode 100755 index 0000000000..64353d9ccb --- /dev/null +++ b/scripts/check-terminate-quota.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash +# Snapshot EC2 bucket-1 (Fleet mutations) quota usage for cicd-prod (346156333547). +# Run before and after deploying the batched-terminate change to measure impact. +# +# Usage: check-terminate-quota.sh [--from=2d] +# --from accepts pup relative ranges: 1h, 2d, 7d, etc. (default: 2d) +# +# Requires: pup CLI authenticated (pup auth login) +set -euo pipefail + +FROM="${1:---from=2d}" + +q() { + local query="$1" + pup metrics query --query="$query" "$FROM" --output=json \ + | python3 -c ' +import sys, json +d = json.load(sys.stdin) +pts = [p[1] for p in d["data"]["series"][0]["pointlist"] if p[1] is not None] +if not pts: + print("no data") + sys.exit() +s = sorted(pts) +n = len(s) +print(f"p50={s[n//2]:.0f} p95={s[int(n*0.95)]:.0f} p99={s[int(n*0.99)]:.0f} max={max(pts):.0f} calls/min") +' +} + +TERMINATE='sum:aws.usage.call_count.sum{service:ec2,resource:terminateinstances}.rollup(max,60)' +BUCKET1="${TERMINATE}\ ++sum:aws.usage.call_count.sum{service:ec2,resource:createfleet}.rollup(max,60)\ ++sum:aws.usage.call_count.sum{service:ec2,resource:runinstances}.rollup(max,60)\ ++sum:aws.usage.call_count.sum{service:ec2,resource:deletenetworkinterfaces}.rollup(max,60)" + +echo "Window: ${FROM#--from=}" +echo "" +printf "%-30s %s\n" "TerminateInstances alone:" "$(q "$TERMINATE")" +printf "%-30s %s (sustained limit: 540)\n" "Bucket-1 total:" "$(q "$BUCKET1")"