diff --git a/packages/k8s/src/k8s/heartbeat.ts b/packages/k8s/src/k8s/heartbeat.ts index b83f1654..ea517b31 100644 --- a/packages/k8s/src/k8s/heartbeat.ts +++ b/packages/k8s/src/k8s/heartbeat.ts @@ -4,10 +4,27 @@ export interface HeartbeatWebSocket { readyState: number ping(): void close(): void + terminate(): void on(event: string, listener: (...args: any[]) => void): this once(event: string, listener: (...args: any[]) => void): this } +// Force-close a kc WebSocket so the peer's `kubectl exec` subprocess exits. +// `ws.close()` performs a graceful close handshake that the kubelet streaming +// server does not forward to the child process — see kubernetes-client/javascript#2532. +export function safeTerminateWs( + ws: HeartbeatWebSocket | null | undefined +): void { + if (!ws) { + return + } + try { + ws.terminate() + } catch (err) { + core.debug(`[safeTerminateWs] terminate() threw, ignoring: ${err}`) + } +} + export function parsePositiveMsEnv( value: string | undefined, fallback: number @@ -66,11 +83,7 @@ export class WebSocketHeartbeat { `[Heartbeat] No pong received in ${this.pongDeadlineMs}ms, closing stale connection` ) this.stop() - try { - ws.close() - } catch { - // ignore errors closing an already-closing socket - } + safeTerminateWs(ws) reject( new Error( `WebSocket heartbeat timeout: no pong within ${this.pongDeadlineMs}ms` diff --git a/packages/k8s/src/k8s/index.ts b/packages/k8s/src/k8s/index.ts index beaee808..47f00cdb 100644 --- a/packages/k8s/src/k8s/index.ts +++ b/packages/k8s/src/k8s/index.ts @@ -25,7 +25,7 @@ import { WORK_VOLUME } from './utils' import * as shlex from 'shlex' -import { parsePositiveMsEnv, WebSocketHeartbeat } from './heartbeat' +import { parsePositiveMsEnv, safeTerminateWs, WebSocketHeartbeat } from './heartbeat' import type { HeartbeatWebSocket } from './heartbeat' const kc = new k8s.KubeConfig() @@ -315,7 +315,7 @@ export async function execPodStep( core.debug('[execPodStep] WebSocket closed cleanly') closeResolve() }) - socket.close() + safeTerminateWs(socket) }) } } @@ -361,7 +361,7 @@ export async function execPodStep( clearTimeout(closeTimeout) closeResolve() }) - socket.close() + safeTerminateWs(socket) }) } diff --git a/packages/k8s/tests/heartbeat-test.ts b/packages/k8s/tests/heartbeat-test.ts index 56ea083a..05d494ae 100644 --- a/packages/k8s/tests/heartbeat-test.ts +++ b/packages/k8s/tests/heartbeat-test.ts @@ -14,6 +14,7 @@ class MockWebSocket extends EventEmitter implements HeartbeatWebSocket { readyState: number ping = jest.fn() close = jest.fn() + terminate = jest.fn() constructor(readyState = 1) { super() @@ -100,7 +101,7 @@ describe('WebSocketHeartbeat', () => { }) describe('pong timeout', () => { - it('closes the socket and rejects the promise when no pong is received after first ping', () => { + it('terminates the socket and rejects the promise when no pong is received after first ping', () => { const ws = new MockWebSocket(1) const reject = jest.fn() // pingPeriodMs=100, pongDeadlineMs=200 @@ -111,7 +112,11 @@ describe('WebSocketHeartbeat', () => { jest.advanceTimersByTime(350) // past first ping + deadline - expect(ws.close).toHaveBeenCalledTimes(1) + // Pong timeout means the connection is already declared stale — + // force-terminate, do not perform a graceful close that the kubelet + // streaming server would not forward to the child process. + expect(ws.terminate).toHaveBeenCalledTimes(1) + expect(ws.close).not.toHaveBeenCalled() expect(reject).toHaveBeenCalledTimes(1) expect(reject.mock.calls[0][0]).toBeInstanceOf(Error) expect(reject.mock.calls[0][0].message).toMatch(/heartbeat timeout/) diff --git a/packages/k8s/tests/safe-terminate-ws-test.ts b/packages/k8s/tests/safe-terminate-ws-test.ts new file mode 100644 index 00000000..2f5dc3bc --- /dev/null +++ b/packages/k8s/tests/safe-terminate-ws-test.ts @@ -0,0 +1,180 @@ +const mockExec = jest.fn() + +jest.mock('@kubernetes/client-node', () => { + return { + KubeConfig: jest.fn().mockImplementation(() => ({ + loadFromDefault: jest.fn(), + makeApiClient: jest.fn().mockImplementation(() => ({})), + getContexts: jest.fn().mockReturnValue([{ namespace: 'test-namespace' }]) + })), + Exec: jest.fn().mockImplementation(() => ({ exec: mockExec })), + // eslint-disable-next-line @typescript-eslint/no-extraneous-class + CoreV1Api: class CoreV1Api {}, + // eslint-disable-next-line @typescript-eslint/no-extraneous-class + BatchV1Api: class BatchV1Api {}, + // eslint-disable-next-line @typescript-eslint/no-extraneous-class + AuthorizationV1Api: class AuthorizationV1Api {}, + Log: jest.fn() + } +}) + +import { EventEmitter } from 'events' +import { execPodStep } from '../src/k8s' + +interface FakeWebSocket extends EventEmitter { + readyState: number + terminate: jest.Mock + close: jest.Mock + ping: jest.Mock +} + +function makeFakeWebSocket(readyState = 1): FakeWebSocket { + const ws = new EventEmitter() as FakeWebSocket + ws.readyState = readyState + ws.terminate = jest.fn(() => { + process.nextTick(() => ws.emit('close')) + }) + ws.close = jest.fn(() => { + process.nextTick(() => ws.emit('close')) + }) + ws.ping = jest.fn() + return ws +} + +describe('safeTerminateWs (via execPodStep cleanup paths)', () => { + beforeEach(() => { + jest.clearAllMocks() + process.env['ACTIONS_RUNNER_KUBERNETES_NAMESPACE'] = 'test-namespace' + // Keep the heartbeat from interfering with the test timing. + process.env['ACTIONS_RUNNER_HEARTBEAT_PERIOD_MS'] = '600000' + process.env['ACTIONS_RUNNER_HEARTBEAT_DEADLINE_MS'] = '600000' + }) + + afterEach(() => { + delete process.env['ACTIONS_RUNNER_KUBERNETES_NAMESPACE'] + delete process.env['ACTIONS_RUNNER_HEARTBEAT_PERIOD_MS'] + delete process.env['ACTIONS_RUNNER_HEARTBEAT_DEADLINE_MS'] + }) + + it('calls terminate() (not close()) on the success-status cleanup path', async () => { + const ws = makeFakeWebSocket() + mockExec.mockImplementation( + async ( + _ns, + _pod, + _container, + _cmd, + _stdout, + _stderr, + _stdin, + _tty, + statusCb + ) => { + // Fire the status callback asynchronously so callers can attach + // .then/.catch first (mimics kc's Exec behavior). + process.nextTick(() => statusCb({ status: 'Success', code: 0 })) + return ws + } + ) + + await expect( + execPodStep(['echo', 'hi'], 'pod', 'container') + ).resolves.toBe(0) + + expect(ws.terminate).toHaveBeenCalledTimes(1) + expect(ws.close).not.toHaveBeenCalled() + }) + + it('calls terminate() (not close()) on the failure-status cleanup path', async () => { + const ws = makeFakeWebSocket() + mockExec.mockImplementation( + async ( + _ns, + _pod, + _container, + _cmd, + _stdout, + _stderr, + _stdin, + _tty, + statusCb + ) => { + process.nextTick(() => + statusCb({ status: 'Failure', message: 'boom' }) + ) + return ws + } + ) + + await expect( + execPodStep(['echo', 'hi'], 'pod', 'container') + ).rejects.toThrow('boom') + + expect(ws.terminate).toHaveBeenCalledTimes(1) + expect(ws.close).not.toHaveBeenCalled() + }) + + it('calls terminate() (not close()) on the exec.exec error-handler path', async () => { + const ws = makeFakeWebSocket() + // exec.exec resolves with ws (so the .then assigns ws to the outer + // closure), then the chain rejects so the .catch fires. A custom thenable + // is the simplest way to interleave a resolve and a reject without racing + // through the heartbeat machinery. + let thenCb: ((v: unknown) => void) | undefined + const thenable = { + then(onFulfilled: (v: unknown) => void) { + thenCb = onFulfilled + return { + catch: (onRejected: (e: Error) => void) => { + process.nextTick(() => { + thenCb?.(ws) + process.nextTick(() => onRejected(new Error('exec failed'))) + }) + return Promise.resolve() + } + } + } + } + mockExec.mockReturnValue(thenable) + + await expect( + execPodStep(['echo', 'hi'], 'pod', 'container') + ).rejects.toThrow('exec failed') + + expect(ws.terminate).toHaveBeenCalledTimes(1) + expect(ws.close).not.toHaveBeenCalled() + }) + + it('swallows errors thrown by terminate() so cleanup never crashes', async () => { + const ws = makeFakeWebSocket() + ws.terminate = jest.fn(() => { + // Schedule the close emit so the awaiting Promise resolves... + process.nextTick(() => ws.emit('close')) + // ...but also throw to exercise the defensive try/catch. + throw new Error('terminate boom') + }) + + mockExec.mockImplementation( + async ( + _ns, + _pod, + _container, + _cmd, + _stdout, + _stderr, + _stdin, + _tty, + statusCb + ) => { + process.nextTick(() => statusCb({ status: 'Success', code: 0 })) + return ws + } + ) + + await expect( + execPodStep(['echo', 'hi'], 'pod', 'container') + ).resolves.toBe(0) + + expect(ws.terminate).toHaveBeenCalledTimes(1) + }) +})