Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 50 additions & 11 deletions lambdas/functions/control-plane/src/aws/runners.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import 'aws-sdk-client-mock-jest/vitest';

import { beforeEach, describe, expect, it, vi } from 'vitest';
import ScaleError from './../scale-runners/ScaleError';
import { createRunner, listEC2Runners, tag, terminateRunner, untag } from './runners';
import { createRunner, listEC2Runners, tag, terminateRunners, untag } from './runners';
import type { RunnerInfo, RunnerInputParameters, RunnerType } from './runners.d';

process.env.AWS_REGION = 'eu-east-1';
Expand Down Expand Up @@ -250,23 +250,62 @@ describe('list instances', () => {
});
});

describe('terminate runner', () => {
describe('terminateRunners', () => {
beforeEach(() => {
vi.clearAllMocks();
mockEC2Client.reset();
});
it('calls terminate instances with the right instance ids', async () => {
mockEC2Client.on(TerminateInstancesCommand).resolves({});
const runner: RunnerInfo = {
instanceId: 'instance-2',
owner: 'owner-2',
type: 'Repo',
};
await terminateRunner(runner.instanceId);

it('does nothing for empty list', async () => {
await terminateRunners([]);
expect(mockEC2Client).not.toHaveReceivedCommand(TerminateInstancesCommand);
});

it('issues one EC2 call for a small batch', async () => {
mockEC2Client.on(TerminateInstancesCommand).resolves({
TerminatingInstances: [
{ InstanceId: 'i-1', CurrentState: { Name: 'shutting-down' } },
{ InstanceId: 'i-2', CurrentState: { Name: 'shutting-down' } },
],
});
await terminateRunners(['i-1', 'i-2']);
expect(mockEC2Client).toHaveReceivedCommandTimes(TerminateInstancesCommand, 1);
expect(mockEC2Client).toHaveReceivedCommandWith(TerminateInstancesCommand, {
InstanceIds: [runner.instanceId],
InstanceIds: ['i-1', 'i-2'],
});
});

it('chunks into batches of 100 for large lists', async () => {
mockEC2Client.on(TerminateInstancesCommand).resolves({});
const ids = Array.from({ length: 150 }, (_, i) => `i-${i}`);
await terminateRunners(ids);
expect(mockEC2Client).toHaveReceivedCommandTimes(TerminateInstancesCommand, 2);
expect(mockEC2Client).toHaveReceivedNthCommandWith(1, TerminateInstancesCommand, {
InstanceIds: ids.slice(0, 100),
});
expect(mockEC2Client).toHaveReceivedNthCommandWith(2, TerminateInstancesCommand, {
InstanceIds: ids.slice(100),
});
});

it('retries individually on batch error and terminates good instances', async () => {
// i-bad always throws; i-good and i-also-good succeed
mockEC2Client
.on(TerminateInstancesCommand, { InstanceIds: ['i-good', 'i-bad', 'i-also-good'] })
.rejects(new Error('InvalidInstanceID.NotFound'))
.on(TerminateInstancesCommand, { InstanceIds: ['i-bad'] })
.rejects(new Error('InvalidInstanceID.NotFound'))
.on(TerminateInstancesCommand, { InstanceIds: ['i-good'] })
.resolves({})
.on(TerminateInstancesCommand, { InstanceIds: ['i-also-good'] })
.resolves({});

await terminateRunners(['i-good', 'i-bad', 'i-also-good']);

// good instances must have been terminated despite the bad id
expect(mockEC2Client).toHaveReceivedCommandWith(TerminateInstancesCommand, { InstanceIds: ['i-good'] });
expect(mockEC2Client).toHaveReceivedCommandWith(TerminateInstancesCommand, { InstanceIds: ['i-also-good'] });
});
});

describe('tag runner', () => {
Expand Down
30 changes: 26 additions & 4 deletions lambdas/functions/control-plane/src/aws/runners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,33 @@ function getRunnerInfo(runningInstances: DescribeInstancesResult) {
return runners;
}

export async function terminateRunner(instanceId: string): Promise<void> {
logger.debug(`Runner '${instanceId}' will be terminated.`);
// AWS TerminateInstances accepts up to 1000 InstanceIds per call; we keep batches
// at 100 to bound payload + blast radius (a failed batch retries per id, see terminateBatch).
const TERMINATE_BATCH_SIZE = 100;

export async function terminateRunners(instanceIds: string[]): Promise<void> {
if (instanceIds.length === 0) return; // empty InstanceIds errors at the API
const ec2 = getTracedAWSV3Client(new EC2Client({ region: process.env.AWS_REGION }));
await ec2.send(new TerminateInstancesCommand({ InstanceIds: [instanceId] }));
logger.debug(`Runner ${instanceId} has been terminated.`);
for (let i = 0; i < instanceIds.length; i += TERMINATE_BATCH_SIZE) {
await terminateBatch(ec2, instanceIds.slice(i, i + TERMINATE_BATCH_SIZE));
}
}

// Common path = 1 EC2 call for the whole batch. TerminateInstances is all-or-nothing
// (one bad id throws InvalidInstanceID.NotFound and nothing is terminated), so on
// error we retry each id individually — the bad one errors at size 1 and is logged.
async function terminateBatch(ec2: EC2Client, batch: string[]): Promise<void> {
try {
await ec2.send(new TerminateInstancesCommand({ InstanceIds: batch }));
logger.debug(`Runners terminated: ${batch.join(', ')}`);
} catch (e) {
if (batch.length === 1) {
logger.error(`Failed to terminate runner '${batch[0]}'`, { error: e as Error });
return;
}
logger.warn(`Batch terminate failed (${batch.length} ids), retrying individually.`, { error: e as Error });
for (const id of batch) await terminateBatch(ec2, [id]);
}
}

export async function tag(instanceId: string, tags: Tag[]): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import nock from 'nock';

import { RunnerInfo, RunnerList } from '../aws/runners.d';
import * as ghAuth from '../github/auth';
import { listEC2Runners, terminateRunner, tag, untag } from './../aws/runners';
import { listEC2Runners, terminateRunners, tag, untag } from './../aws/runners';
import { githubCache } from './cache';
import { newestFirstStrategy, oldestFirstStrategy, scaleDown } from './scale-down';
import { describe, it, expect, beforeEach, vi } from 'vitest';
Expand Down Expand Up @@ -37,7 +37,7 @@ vi.mock('./../aws/runners', async (importOriginal) => {
...actual,
tag: vi.fn(),
untag: vi.fn(),
terminateRunner: vi.fn(),
terminateRunners: vi.fn(),
listEC2Runners: vi.fn(),
};
});
Expand Down Expand Up @@ -68,7 +68,7 @@ const mockCreateClient = vi.mocked(ghAuth.createOctokitClient);
const mockListRunners = vi.mocked(listEC2Runners);
const mockTagRunners = vi.mocked(tag);
const mockUntagRunners = vi.mocked(untag);
const mockTerminateRunners = vi.mocked(terminateRunner);
const mockTerminateRunners = vi.mocked(terminateRunners);

export interface TestData {
repositoryName: string;
Expand Down Expand Up @@ -208,7 +208,7 @@ describe('Scale down runners', () => {
expect(listEC2Runners).toHaveBeenCalledWith({
environment: ENVIRONMENT,
});
expect(terminateRunner).not.toHaveBeenCalled();
expect(terminateRunners).not.toHaveBeenCalled();
expect(mockOctokit.apps.getRepoInstallation).not.toHaveBeenCalled();
expect(mockOctokit.apps.getRepoInstallation).not.toHaveBeenCalled();
});
Expand Down Expand Up @@ -303,7 +303,7 @@ describe('Scale down runners', () => {
await scaleDown();

// assert
expect(terminateRunner).not.toHaveBeenCalled();
expect(terminateRunners).not.toHaveBeenCalled();
checkNonTerminated(runners);
});

Expand Down Expand Up @@ -407,7 +407,7 @@ describe('Scale down runners', () => {

// assert
expect(mockUntagRunners).toHaveBeenCalledWith(orphanRunner.instanceId, [{ Key: 'ghr:orphan', Value: 'true' }]);
expect(mockTerminateRunners).not.toHaveBeenCalledWith(orphanRunner.instanceId);
expect(mockTerminateRunners.mock.calls.flatMap((c) => c[0] as string[])).not.toContain(orphanRunner.instanceId);

// arrange
if (type === 'Repo') {
Expand All @@ -424,7 +424,7 @@ describe('Scale down runners', () => {
await scaleDown();

// assert
expect(mockTerminateRunners).toHaveBeenCalledWith(orphanRunner.instanceId);
expect(mockTerminateRunners.mock.calls.flatMap((c) => c[0] as string[])).toContain(orphanRunner.instanceId);
});

it('Should handle 404 error when checking orphaned runner (JIT) - treat as orphaned', async () => {
Expand Down Expand Up @@ -463,7 +463,7 @@ describe('Scale down runners', () => {
await scaleDown();

// assert - should terminate since 404 means runner doesn't exist on GitHub
expect(mockTerminateRunners).toHaveBeenCalledWith(orphanRunner.instanceId);
expect(mockTerminateRunners.mock.calls.flatMap((c) => c[0] as string[])).toContain(orphanRunner.instanceId);
});

it('Should handle 404 error when checking runner busy state - treat as not busy', async () => {
Expand Down Expand Up @@ -539,7 +539,8 @@ describe('Scale down runners', () => {
await expect(scaleDown()).resolves.not.toThrow();

// Should not terminate since the error was not a 404
expect(terminateRunner).not.toHaveBeenCalledWith(orphanRunner.instanceId);
const calledIdsAfterError = mockTerminateRunners.mock.calls.flatMap((call) => call[0] as string[]);
expect(calledIdsAfterError).not.toContain(orphanRunner.instanceId);
});

it(`Should ignore errors when termination orphan fails.`, async () => {
Expand Down Expand Up @@ -664,14 +665,15 @@ describe('Scale down runners', () => {
await scaleDown();

// assert
const allCalledIds = mockTerminateRunners.mock.calls.flatMap((call) => call[0] as string[]);
const runnersToTerminate = runners.filter((r) => r.shouldBeTerminated);
for (const toTerminate of runnersToTerminate) {
expect(terminateRunner).toHaveBeenCalledWith(toTerminate.instanceId);
expect(allCalledIds).toContain(toTerminate.instanceId);
}

const runnersNotToTerminate = runners.filter((r) => !r.shouldBeTerminated);
for (const notTerminated of runnersNotToTerminate) {
expect(terminateRunner).not.toHaveBeenCalledWith(notTerminated.instanceId);
expect(allCalledIds).not.toContain(notTerminated.instanceId);
}
});
});
Expand Down Expand Up @@ -809,17 +811,16 @@ function mockAwsRunners(runners: RunnerTestItem[]) {

function checkNonTerminated(runners: RunnerTestItem[]) {
const notTerminated = runners.filter((r) => !r.shouldBeTerminated);
for (const toTerminate of notTerminated) {
expect(terminateRunner).not.toHaveBeenCalledWith(toTerminate.instanceId);
const allCalledIds = mockTerminateRunners.mock.calls.flatMap((call) => call[0] as string[]);
for (const r of notTerminated) {
expect(allCalledIds).not.toContain(r.instanceId);
}
}

function checkTerminated(runners: RunnerTestItem[]) {
const runnersToTerminate = runners.filter((r) => r.shouldBeTerminated);
expect(terminateRunner).toHaveBeenCalledTimes(runnersToTerminate.length);
for (const toTerminate of runnersToTerminate) {
expect(terminateRunner).toHaveBeenCalledWith(toTerminate.instanceId);
}
const idsToTerminate = runners.filter((r) => r.shouldBeTerminated).map((r) => r.instanceId);
const allCalledIds = mockTerminateRunners.mock.calls.flatMap((call) => call[0] as string[]);
expect([...allCalledIds].sort()).toEqual([...idsToTerminate].sort());
Comment thread
karouf marked this conversation as resolved.
}

function mockGitHubRunners(runners: RunnerTestItem[]) {
Expand Down
61 changes: 43 additions & 18 deletions lambdas/functions/control-plane/src/scale-runners/scale-down.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
createOctokitClient,
getStoredInstallationId,
} from '../github/auth';
import { bootTimeExceeded, listEC2Runners, tag, untag, terminateRunner } from './../aws/runners';
import { bootTimeExceeded, listEC2Runners, tag, untag, terminateRunners } from './../aws/runners';
import { RunnerInfo, RunnerList } from './../aws/runners.d';
import { GhRunners, githubCache } from './cache';
import { ScalingDownConfig, getEvictionStrategy, getIdleRunnerCount } from './scale-down-config';
Expand Down Expand Up @@ -137,15 +137,17 @@ function runnerMinimumTimeExceeded(runner: RunnerInfo): boolean {
return launchTimePlusMinimum < now;
}

async function removeRunner(ec2runner: RunnerInfo, ghRunnerIds: number[]): Promise<void> {
const githubAppClient = await getOrCreateOctokit(ec2runner);
// Returns the instanceId when the runner was successfully de-registered from GitHub,
// so the caller can batch-terminate in one EC2 call. Returns undefined otherwise.
async function deregisterRunner(ec2runner: RunnerInfo, ghRunnerIds: number[]): Promise<string | undefined> {
try {
const githubAppClient = await getOrCreateOctokit(ec2runner);
const runnerList = ec2runner as unknown as RunnerList;
if (runnerList.bypassRemoval) {
logger.info(
`Runner '${ec2runner.instanceId}' has bypass-removal tag set, skipping removal. Remove the tag to allow scale-down.`,
);
return;
return undefined;
}

const states = await Promise.all(
Expand Down Expand Up @@ -174,18 +176,21 @@ async function removeRunner(ec2runner: RunnerInfo, ghRunnerIds: number[]): Promi
);

if (statuses.every((status) => status == 204)) {
await terminateRunner(ec2runner.instanceId);
logger.info(`AWS runner instance '${ec2runner.instanceId}' is terminated and GitHub runner is de-registered.`);
logger.info(`GitHub runner de-registered for '${ec2runner.instanceId}'.`);
return ec2runner.instanceId;
} else {
logger.error(`Failed to de-register GitHub runner: ${statuses}`);
return undefined;
}
} else {
logger.info(`Runner '${ec2runner.instanceId}' cannot be de-registered, because it is still busy.`);
return undefined;
}
} catch (e) {
logger.error(`Runner '${ec2runner.instanceId}' cannot be de-registered. Error: ${e}`, {
error: e as Error,
});
return undefined;
}
}

Expand All @@ -196,6 +201,7 @@ async function evaluateAndRemoveRunners(
let idleCounter = getIdleRunnerCount(scaleDownConfigs);
const evictionStrategy = getEvictionStrategy(scaleDownConfigs);
const ownerTags = new Set(ec2Runners.map((runner) => runner.owner));
const toTerminate: string[] = [];

for (const ownerTag of ownerTags) {
const ec2RunnersFiltered = ec2Runners
Expand All @@ -220,11 +226,15 @@ async function evaluateAndRemoveRunners(
idleCounter--;
logger.info(`Runner '${ec2Runner.instanceId}' will be kept idle.`);
} else {
logger.info(`Terminating all non busy runners.`);
await removeRunner(
logger.info(`Runner '${ec2Runner.instanceId}' exceeds idle limit, de-registering.`);
const id = await deregisterRunner(
ec2Runner,
ghRunnersFiltered.map((runner: { id: number }) => runner.id),
);
if (id) {
toTerminate.push(id);
logger.info(`Runner '${id}' queued for termination.`);
}
}
}
} else if (bootTimeExceeded(ec2Runner)) {
Expand All @@ -234,6 +244,11 @@ async function evaluateAndRemoveRunners(
}
}
}

if (toTerminate.length > 0) {
logger.info(`Terminating ${toTerminate.length} runner(s): ${toTerminate.join(', ')}`);
await terminateRunners(toTerminate);
}
}

async function markOrphan(instanceId: string): Promise<void> {
Expand Down Expand Up @@ -280,22 +295,32 @@ async function lastChanceCheckOrphanRunner(runner: RunnerList): Promise<boolean>
async function terminateOrphan(environment: string): Promise<void> {
try {
const orphanRunners = await listEC2Runners({ environment, orphan: true });
const toTerminate: string[] = [];

for (const runner of orphanRunners) {
if (runner.runnerId) {
const isOrphan = await lastChanceCheckOrphanRunner(runner);
if (isOrphan) {
await terminateRunner(runner.instanceId);
try {
if (runner.runnerId) {
const isOrphan = await lastChanceCheckOrphanRunner(runner);
if (isOrphan) {
toTerminate.push(runner.instanceId);
} else {
await unMarkOrphan(runner.instanceId);
}
} else {
await unMarkOrphan(runner.instanceId);
logger.info(`Queuing orphan runner '${runner.instanceId}' for termination.`);
toTerminate.push(runner.instanceId);
}
} else {
logger.info(`Terminating orphan runner '${runner.instanceId}'`);
await terminateRunner(runner.instanceId).catch((e) => {
logger.error(`Failed to terminate orphan runner '${runner.instanceId}'`, { error: e });
});
} catch (e) {
logger.error(`Failed to evaluate orphan runner '${runner.instanceId}', skipping.`, { error: e as Error });
}
}

if (toTerminate.length > 0) {
logger.info(`Terminating ${toTerminate.length} orphan runner(s): ${toTerminate.join(', ')}`);
await terminateRunners(toTerminate).catch((e) => {
logger.error(`Failed to terminate orphan runners: ${toTerminate.join(', ')}`, { error: e });
});
}
} catch (e) {
logger.warn(`Failure during orphan termination processing.`, { error: e });
}
Expand Down
4 changes: 2 additions & 2 deletions lambdas/functions/control-plane/src/scale-runners/scale-up.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
getAppCount,
getStoredInstallationId,
} from '../github/auth';
import { createRunner, listEC2Runners, tag, terminateRunner } from './../aws/runners';
import { createRunner, listEC2Runners, tag, terminateRunners } from './../aws/runners';
import { RunnerInputParameters } from './../aws/runners.d';
import { metricGitHubAppRateLimit } from '../github/rate-limit';
import { publishRetryMessage } from './job-retry';
Expand Down Expand Up @@ -281,7 +281,7 @@ export async function createRunners(

for (const instanceId of failedInstances) {
try {
await terminateRunner(instanceId);
await terminateRunners([instanceId]);
} catch (error) {
logger.error('Failed to terminate instance', {
instanceId,
Expand Down
Loading