Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/k8s/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ rules:
- Docker [create options](https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idcontaineroptions) are not supported
- Container actions will have to specify the entrypoint, since the default entrypoint will be overridden to run the commands from the workflow.
- Container actions need to have the following binaries in their container image: `sh`, `env`, `tail`.

## Configuration

These environment variables are read on the runner container of the runner pod. All knobs are optional; defaults preserve the historical behavior of the hooks.

| Variable | Default | Purpose |
|---|---|---|
| `ACTIONS_RUNNER_TAR_DRAIN_TIMEOUT_MS` | `60000` | Upper bound (milliseconds) on how long `execCpFromPod` waits for the tar extraction stream to drain after the underlying `kubectl exec` channel reports success. The wait is necessary because the Kubernetes exec status callback fires when the remote tar process exits, but the WebSocket may still have tar bytes in flight that the local `tar-fs` extractor has not yet written to disk; returning early truncates the extracted workspace. Tune up (e.g. `90000`) for very large workspaces over slow links; if the timeout fires, a warning is logged and the (possibly incomplete) extract is finalized. Invalid or non-positive values fall back to the default. |
71 changes: 49 additions & 22 deletions packages/k8s/src/k8s/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
fixArgs,
listDirAllCommand,
sleep,
tarDrainTimeoutMs,
EXTERNALS_VOLUME_NAME,
GITHUB_VOLUME_NAME,
WORK_VOLUME
Expand Down Expand Up @@ -585,30 +586,56 @@ export async function execCpFromPod(
const writerStream = tar.extract(parentRunnerPath)
const errStream = new WritableStreamBuffer()

await new Promise((resolve, reject) => {
exec
.exec(
namespace(),
podName,
JOB_CONTAINER_NAME,
command,
writerStream,
errStream,
null,
false,
async status => {
if (errStream.size()) {
reject(
new Error(
`Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`
try {
await new Promise((resolve, reject) => {
exec
.exec(
namespace(),
podName,
JOB_CONTAINER_NAME,
command,
writerStream,
errStream,
null,
false,
async status => {
if (errStream.size()) {
reject(
new Error(
`Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`
)
)
)
}
resolve(status)
}
Comment on lines +601 to 610
Comment on lines +601 to 610
resolve(status)
}
)
.catch(e => reject(e))
})
)
.catch(e => reject(e))
})
} catch (error) {
// Failure path: destroy immediately. Awaiting drain on a stream that
// kc reported Failure on could hang against a half-open WebSocket.
writerStream.destroy()
throw error
}

// Success path: kc reported the exec channel closed cleanly, but the
// tar bytes from the WebSocket may still be in flight to tar-fs's
// extractor. Returning before the extractor finishes truncates the
// workspace silently. Bound the wait so a malformed stream cannot
// hang the hook indefinitely.
const timeoutMs = tarDrainTimeoutMs()
try {
await stream.promises.finished(writerStream, {
signal: AbortSignal.timeout(timeoutMs)
})
} catch (err) {
core.warning(
`[execCpFromPod] tar drain did not complete within ${timeoutMs}ms; ` +
`the extracted workspace may be incomplete: ${formatError(err)}`
)
} finally {
writerStream.destroy()
}
break
} catch (error) {
core.debug(`Attempt ${attempt + 1} failed: ${error}`)
Expand Down
15 changes: 15 additions & 0 deletions packages/k8s/src/k8s/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ export const DEFAULT_CONTAINER_ENTRY_POINT = 'tail'

export const ENV_HOOK_TEMPLATE_PATH = 'ACTIONS_RUNNER_CONTAINER_HOOK_TEMPLATE'
export const ENV_USE_KUBE_SCHEDULER = 'ACTIONS_RUNNER_USE_KUBE_SCHEDULER'
export const ENV_TAR_DRAIN_TIMEOUT_MS = 'ACTIONS_RUNNER_TAR_DRAIN_TIMEOUT_MS'

export const DEFAULT_TAR_DRAIN_TIMEOUT_MS = 60000

export const EXTERNALS_VOLUME_NAME = 'externals'
export const GITHUB_VOLUME_NAME = 'github'
Expand Down Expand Up @@ -269,6 +272,18 @@ export function useKubeScheduler(): boolean {
return process.env[ENV_USE_KUBE_SCHEDULER] === 'true'
}

export function tarDrainTimeoutMs(): number {
const raw = process.env[ENV_TAR_DRAIN_TIMEOUT_MS]
if (raw === undefined || raw === '') {
return DEFAULT_TAR_DRAIN_TIMEOUT_MS
}
const parsed = parseInt(raw, 10)
if (Number.isNaN(parsed) || parsed <= 0) {
return DEFAULT_TAR_DRAIN_TIMEOUT_MS
}
return parsed
}
Comment on lines +275 to +285

export enum PodPhase {
PENDING = 'Pending',
RUNNING = 'Running',
Expand Down
3 changes: 2 additions & 1 deletion packages/k8s/tests/error-serialization-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ jest.mock('tar-fs', () => ({
pack: jest.fn().mockReturnValue({ pipe: jest.fn() }),
extract: jest.fn().mockReturnValue({
on: jest.fn(),
pipe: jest.fn()
pipe: jest.fn(),
destroy: jest.fn()
})
},
__esModule: true
Expand Down
260 changes: 260 additions & 0 deletions packages/k8s/tests/exec-cp-from-pod-drain-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
import { Writable } from 'stream'

const mockExec = jest.fn()
const mockWarning = jest.fn()
const mockDebug = jest.fn()

jest.mock('@kubernetes/client-node', () => {
return {
KubeConfig: jest.fn().mockImplementation(() => ({
loadFromDefault: jest.fn(),
makeApiClient: jest.fn().mockReturnValue({}),
getContexts: jest.fn().mockReturnValue([{ namespace: 'test-namespace' }])
})),
Exec: jest.fn().mockImplementation(() => ({ exec: mockExec })),
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
CoreV1Api: class CoreV1Api {},
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
BatchV1Api: class BatchV1Api {},
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
AuthorizationV1Api: class AuthorizationV1Api {},
Log: jest.fn()
}
})

// Single shared "current writer" the prod code receives from tar.extract.
// Each test replaces it before invoking execCpFromPod.
let currentWriter: Writable | null = null

jest.mock('tar-fs', () => ({
default: {
pack: jest.fn().mockReturnValue({ pipe: jest.fn() }),
extract: jest.fn().mockImplementation(() => {
if (!currentWriter) {
throw new Error('test forgot to assign currentWriter before extract()')
}
return currentWriter
})
},
__esModule: true
}))

jest.mock('../src/k8s/utils', () => {
const actual = jest.requireActual('../src/k8s/utils')
return {
...actual,
sleep: jest.fn().mockResolvedValue(undefined)
}
})

jest.mock('@actions/core', () => ({
info: jest.fn(),
warning: (...args: unknown[]) => mockWarning(...args),
debug: (...args: unknown[]) => mockDebug(...args),
error: jest.fn()
}))

import { execCpFromPod } from '../src/k8s'
import { ENV_TAR_DRAIN_TIMEOUT_MS } from '../src/k8s/utils'

// Drive the kc Exec callback once: optionally write some stderr bytes, then
// invoke the status callback so the prod settle promise resolves/rejects.
function mockExecOnceWithStatus(
statusValue: { status: string | undefined },
opts: {
stderrWrite?: Buffer
// If provided, called after statusCb so the test can simulate "data is
// still landing on writerStream after kc reported Success". Awaited so
// the writer can finish (or hang) before the test continues.
afterStatus?: () => Promise<void>
} = {}
): void {
mockExec.mockImplementationOnce(
async (
_ns: string,
_pod: string,
_container: string,
_command: string[],
_stdout: NodeJS.WritableStream | null,
stderr: NodeJS.WritableStream,
_stdin: NodeJS.ReadableStream | null,
_tty: boolean,
statusCb: (s: { status: string | undefined }) => void
) => {
if (opts.stderrWrite) {
;(stderr as unknown as { write: (b: Buffer) => void }).write(
opts.stderrWrite
)
}
statusCb(statusValue)
if (opts.afterStatus) {
await opts.afterStatus()
}
return { on: jest.fn() }
}
)
}

// Stub out the hash-verification loop's exec calls so the test only exercises
// the tar settle + drain path. The verification loop calls exec via
// execCalculateOutputHashSorted (a fresh `new k8s.Exec(kc)` per attempt).
// All those calls land on the same mockExec; queueing the same response for
// each verification attempt keeps the loop deterministic.
function stubVerificationLoop(): void {
for (let i = 0; i < 32; i++) {
mockExec.mockImplementationOnce(
async (
_ns: string,
_pod: string,
_container: string,
_command: string[],
stdout: NodeJS.WritableStream | null,
_stderr: NodeJS.WritableStream,
_stdin: NodeJS.ReadableStream | null,
_tty: boolean,
statusCb: (s: { status: string }) => void
) => {
if (stdout) {
;(stdout as unknown as { write: (b: Buffer) => void }).write(
Buffer.from('')
)
}
statusCb({ status: 'Success' })
return { on: jest.fn() }
}
)
}
}

describe('execCpFromPod tar drain', () => {
const originalDrainEnv = process.env[ENV_TAR_DRAIN_TIMEOUT_MS]

beforeEach(() => {
jest.clearAllMocks()
mockExec.mockReset()
mockWarning.mockReset()
mockDebug.mockReset()
currentWriter = null
process.env['ACTIONS_RUNNER_KUBERNETES_NAMESPACE'] = 'test-namespace'
})

afterEach(() => {
delete process.env['ACTIONS_RUNNER_KUBERNETES_NAMESPACE']
if (originalDrainEnv === undefined) {
delete process.env[ENV_TAR_DRAIN_TIMEOUT_MS]
} else {
process.env[ENV_TAR_DRAIN_TIMEOUT_MS] = originalDrainEnv
}
})

it('awaits writer drain on Success: late writer end is honored, no warning', async () => {
// Use a plain Writable so stream.finished resolves on `end()` without
// needing a downstream reader. A PassThrough would buffer forever
// because nothing pipes its readable side.
const writer = new Writable({
write(_chunk, _enc, cb) {
cb()
}
})
currentWriter = writer

process.env[ENV_TAR_DRAIN_TIMEOUT_MS] = '5000'

mockExecOnceWithStatus(
{ status: 'Success' },
{
// Simulate tar bytes still landing AFTER kc reports Success, then a
// graceful end. The drain await must observe this end.
afterStatus: async () => {
await new Promise(r => setTimeout(r, 30))
writer.write(Buffer.from('late tar bytes'))
writer.end()
}
}
)
// Verification-loop stubs MUST be queued AFTER the tar exec stub —
// mockImplementationOnce is FIFO.
stubVerificationLoop()

await expect(
execCpFromPod('my-pod', '/workspace/output', '/tmp/dst')
).resolves.toBeUndefined()

// Drain completed cleanly — the timeout warning MUST NOT fire.
const warnMessages = mockWarning.mock.calls.map(c => String(c[0]))
expect(
warnMessages.some(m => m.includes('tar drain did not complete'))
).toBe(false)
})

it('Success + writer that never ends: timeout fires, warning emitted, writer destroyed, promise resolves', async () => {
const writer = new Writable({
write(_chunk, _enc, cb) {
cb()
}
})
currentWriter = writer

// Small timeout so the test does not slow the suite. The drain bound
// must be measured in real time (AbortSignal.timeout), so a tiny value
// is the only way to keep this test fast.
process.env[ENV_TAR_DRAIN_TIMEOUT_MS] = '50'

const destroySpy = jest.spyOn(writer, 'destroy')

mockExecOnceWithStatus({ status: 'Success' })
// Deliberately do NOT call writer.end(). The drain await will trip the
// 50ms AbortSignal timeout.
// Verification stubs come AFTER the tar exec stub (FIFO).
stubVerificationLoop()

await expect(
execCpFromPod('my-pod', '/workspace/output', '/tmp/dst')
).resolves.toBeUndefined()

const warnMessages = mockWarning.mock.calls.map(c => String(c[0]))
expect(
warnMessages.some(m =>
m.includes('tar drain did not complete within 50ms')
)
).toBe(true)
expect(destroySpy).toHaveBeenCalled()
})

it('Failure status: writer is destroyed immediately, no drain await', async () => {
const writer = new Writable({
write(_chunk, _enc, cb) {
cb()
}
})
currentWriter = writer

process.env[ENV_TAR_DRAIN_TIMEOUT_MS] = '60000'
const destroySpy = jest.spyOn(writer, 'destroy')

// Upstream's settle logic only rejects when errStream is non-empty; a
// bare Failure status with no stderr resolves. Force the reject path
// by writing stderr on every attempt. execCpFromPod retries 30 times,
// each attempt creating a fresh writer via tar.extract — currentWriter
// is reused across attempts which is fine for the destroy assertion.
for (let i = 0; i < 30; i++) {
mockExecOnceWithStatus(
{ status: 'Failure' },
{ stderrWrite: Buffer.from('tar: cannot open: Permission denied') }
)
}

await expect(
execCpFromPod('my-pod', '/workspace/output', '/tmp/dst')
).rejects.toThrow()

// Writer was destroyed on the failure path — at least once per attempt.
expect(destroySpy).toHaveBeenCalled()
// Critically: the drain timeout warning must NOT fire on the failure
// path. Failure path skips the drain entirely.
const warnMessages = mockWarning.mock.calls.map(c => String(c[0]))
expect(
warnMessages.some(m => m.includes('tar drain did not complete'))
).toBe(false)
})
})
Loading