Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions packages/k8s/src/k8s/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,27 @@ export interface HeartbeatWebSocket {
readyState: number
ping(): void
close(): void
terminate(): void
on(event: string, listener: (...args: any[]) => void): this
once(event: string, listener: (...args: any[]) => void): this
}

// Force-close a kc WebSocket so the peer's `kubectl exec` subprocess exits.
// `ws.close()` performs a graceful close handshake that the kubelet streaming
// server does not forward to the child process — see kubernetes-client/javascript#2532.
export function safeTerminateWs(
ws: HeartbeatWebSocket | null | undefined
): void {
if (!ws) {
return
}
try {
ws.terminate()
} catch (err) {
core.debug(`[safeTerminateWs] terminate() threw, ignoring: ${err}`)
}
Comment on lines +23 to +25
}
Comment on lines +15 to +26

export function parsePositiveMsEnv(
value: string | undefined,
fallback: number
Expand Down Expand Up @@ -66,11 +83,7 @@ export class WebSocketHeartbeat {
`[Heartbeat] No pong received in ${this.pongDeadlineMs}ms, closing stale connection`
)
this.stop()
try {
ws.close()
} catch {
// ignore errors closing an already-closing socket
}
safeTerminateWs(ws)
reject(
new Error(
`WebSocket heartbeat timeout: no pong within ${this.pongDeadlineMs}ms`
Expand Down
6 changes: 3 additions & 3 deletions packages/k8s/src/k8s/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
WORK_VOLUME
} from './utils'
import * as shlex from 'shlex'
import { parsePositiveMsEnv, WebSocketHeartbeat } from './heartbeat'
import { parsePositiveMsEnv, safeTerminateWs, WebSocketHeartbeat } from './heartbeat'
import type { HeartbeatWebSocket } from './heartbeat'

const kc = new k8s.KubeConfig()
Expand Down Expand Up @@ -315,7 +315,7 @@ export async function execPodStep(
core.debug('[execPodStep] WebSocket closed cleanly')
closeResolve()
})
socket.close()
safeTerminateWs(socket)
})
}
}
Expand Down Expand Up @@ -361,7 +361,7 @@ export async function execPodStep(
clearTimeout(closeTimeout)
closeResolve()
})
socket.close()
safeTerminateWs(socket)
})
}

Expand Down
9 changes: 7 additions & 2 deletions packages/k8s/tests/heartbeat-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class MockWebSocket extends EventEmitter implements HeartbeatWebSocket {
readyState: number
ping = jest.fn()
close = jest.fn()
terminate = jest.fn()

constructor(readyState = 1) {
super()
Expand Down Expand Up @@ -100,7 +101,7 @@ describe('WebSocketHeartbeat', () => {
})

describe('pong timeout', () => {
it('closes the socket and rejects the promise when no pong is received after first ping', () => {
it('terminates the socket and rejects the promise when no pong is received after first ping', () => {
const ws = new MockWebSocket(1)
const reject = jest.fn()
// pingPeriodMs=100, pongDeadlineMs=200
Expand All @@ -111,7 +112,11 @@ describe('WebSocketHeartbeat', () => {

jest.advanceTimersByTime(350) // past first ping + deadline

expect(ws.close).toHaveBeenCalledTimes(1)
// Pong timeout means the connection is already declared stale —
// force-terminate, do not perform a graceful close that the kubelet
// streaming server would not forward to the child process.
expect(ws.terminate).toHaveBeenCalledTimes(1)
expect(ws.close).not.toHaveBeenCalled()
expect(reject).toHaveBeenCalledTimes(1)
expect(reject.mock.calls[0][0]).toBeInstanceOf(Error)
expect(reject.mock.calls[0][0].message).toMatch(/heartbeat timeout/)
Expand Down
180 changes: 180 additions & 0 deletions packages/k8s/tests/safe-terminate-ws-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
const mockExec = jest.fn()

jest.mock('@kubernetes/client-node', () => {
return {
KubeConfig: jest.fn().mockImplementation(() => ({
loadFromDefault: jest.fn(),
makeApiClient: jest.fn().mockImplementation(() => ({})),
getContexts: jest.fn().mockReturnValue([{ namespace: 'test-namespace' }])
})),
Exec: jest.fn().mockImplementation(() => ({ exec: mockExec })),
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
CoreV1Api: class CoreV1Api {},
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
BatchV1Api: class BatchV1Api {},
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
AuthorizationV1Api: class AuthorizationV1Api {},
Log: jest.fn()
}
})

import { EventEmitter } from 'events'
import { execPodStep } from '../src/k8s'

interface FakeWebSocket extends EventEmitter {
readyState: number
terminate: jest.Mock
close: jest.Mock
ping: jest.Mock
}

function makeFakeWebSocket(readyState = 1): FakeWebSocket {
const ws = new EventEmitter() as FakeWebSocket
ws.readyState = readyState
ws.terminate = jest.fn(() => {
process.nextTick(() => ws.emit('close'))
})
ws.close = jest.fn(() => {
process.nextTick(() => ws.emit('close'))
})
ws.ping = jest.fn()
return ws
}

describe('safeTerminateWs (via execPodStep cleanup paths)', () => {
beforeEach(() => {
jest.clearAllMocks()
process.env['ACTIONS_RUNNER_KUBERNETES_NAMESPACE'] = 'test-namespace'
// Keep the heartbeat from interfering with the test timing.
process.env['ACTIONS_RUNNER_HEARTBEAT_PERIOD_MS'] = '600000'
process.env['ACTIONS_RUNNER_HEARTBEAT_DEADLINE_MS'] = '600000'
})

afterEach(() => {
delete process.env['ACTIONS_RUNNER_KUBERNETES_NAMESPACE']
delete process.env['ACTIONS_RUNNER_HEARTBEAT_PERIOD_MS']
delete process.env['ACTIONS_RUNNER_HEARTBEAT_DEADLINE_MS']
})

it('calls terminate() (not close()) on the success-status cleanup path', async () => {
const ws = makeFakeWebSocket()
mockExec.mockImplementation(
async (
_ns,
_pod,
_container,
_cmd,
_stdout,
_stderr,
_stdin,
_tty,
statusCb
) => {
// Fire the status callback asynchronously so callers can attach
// .then/.catch first (mimics kc's Exec behavior).
process.nextTick(() => statusCb({ status: 'Success', code: 0 }))
return ws
}
)

await expect(
execPodStep(['echo', 'hi'], 'pod', 'container')
).resolves.toBe(0)

expect(ws.terminate).toHaveBeenCalledTimes(1)
expect(ws.close).not.toHaveBeenCalled()
})

it('calls terminate() (not close()) on the failure-status cleanup path', async () => {
const ws = makeFakeWebSocket()
mockExec.mockImplementation(
async (
_ns,
_pod,
_container,
_cmd,
_stdout,
_stderr,
_stdin,
_tty,
statusCb
) => {
process.nextTick(() =>
statusCb({ status: 'Failure', message: 'boom' })
)
return ws
}
)

await expect(
execPodStep(['echo', 'hi'], 'pod', 'container')
).rejects.toThrow('boom')

expect(ws.terminate).toHaveBeenCalledTimes(1)
expect(ws.close).not.toHaveBeenCalled()
})

it('calls terminate() (not close()) on the exec.exec error-handler path', async () => {
const ws = makeFakeWebSocket()
// exec.exec resolves with ws (so the .then assigns ws to the outer
// closure), then the chain rejects so the .catch fires. A custom thenable
// is the simplest way to interleave a resolve and a reject without racing
// through the heartbeat machinery.
let thenCb: ((v: unknown) => void) | undefined
const thenable = {
then(onFulfilled: (v: unknown) => void) {
thenCb = onFulfilled
return {
catch: (onRejected: (e: Error) => void) => {
process.nextTick(() => {
thenCb?.(ws)
process.nextTick(() => onRejected(new Error('exec failed')))
})
return Promise.resolve()
}
}
}
}
mockExec.mockReturnValue(thenable)

await expect(
execPodStep(['echo', 'hi'], 'pod', 'container')
).rejects.toThrow('exec failed')

expect(ws.terminate).toHaveBeenCalledTimes(1)
expect(ws.close).not.toHaveBeenCalled()
})

it('swallows errors thrown by terminate() so cleanup never crashes', async () => {
const ws = makeFakeWebSocket()
ws.terminate = jest.fn(() => {
// Schedule the close emit so the awaiting Promise resolves...
process.nextTick(() => ws.emit('close'))
// ...but also throw to exercise the defensive try/catch.
throw new Error('terminate boom')
})

mockExec.mockImplementation(
async (
_ns,
_pod,
_container,
_cmd,
_stdout,
_stderr,
_stdin,
_tty,
statusCb
) => {
process.nextTick(() => statusCb({ status: 'Success', code: 0 }))
return ws
}
)

await expect(
execPodStep(['echo', 'hi'], 'pod', 'container')
).resolves.toBe(0)

expect(ws.terminate).toHaveBeenCalledTimes(1)
})
})
Loading