Skip to content

Commit 26f3dcc

Browse files
authored
LinuxProcess: Start stdin relay after process start (#478)
Fixes #477 Because we start piping stdin before process launch we can fill up the guest pipe buffer before the process even starts. We'd need some backpressure mechanism to handle this and slow consumers (register the other end with epoll and buffer some data etc.), but we should probably just start piping after the process is up and running. This change does exactly that, as well as stops holding the process mutex while draining stdin in the guest for `CloseStdin()`. This fixes issues where we try and write > pipe_buf bytes through stdin. Today this hangs.
1 parent 0e91de6 commit 26f3dcc

4 files changed

Lines changed: 323 additions & 41 deletions

File tree

Sources/Containerization/LinuxProcess.swift

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -146,44 +146,9 @@ extension LinuxProcess {
146146
}
147147
}
148148

149-
if let stdin = self.ioSetup.stdin {
150-
if let handle = handles[0] {
151-
self.state.withLock {
152-
$0.stdinRelay = Task {
153-
for await data in stdin.reader.stream() {
154-
do {
155-
try handle.write(contentsOf: data)
156-
} catch {
157-
self.logger?.error("failed to write to stdin: \(error)")
158-
break
159-
}
160-
}
161-
162-
do {
163-
self.logger?.debug("stdin relay finished, closing")
164-
165-
// There's two ways we can wind up here:
166-
//
167-
// 1. The stream finished on its own (e.g. we wrote all the
168-
// data) and we will close the underlying stdin in the guest below.
169-
//
170-
// 2. The client explicitly called closeStdin() themselves
171-
// which will cancel this relay task AFTER actually closing
172-
// the fds. If the client did that, then this task will be
173-
// cancelled, and the fds are already gone so there's nothing
174-
// for us to do.
175-
if Task.isCancelled {
176-
return
177-
}
178-
179-
try await self._closeStdin()
180-
} catch {
181-
self.logger?.error("failed to close stdin: \(error)")
182-
}
183-
}
184-
}
185-
}
186-
}
149+
// Note: stdin relay is started separately via startStdinRelay() after
150+
// the process has started, to avoid a deadlock where closeStdin is
151+
// called before the process is consuming from the pipe.
187152

188153
var configuredStreams = 0
189154
let (stream, cc) = AsyncStream<Void>.makeStream()
@@ -231,6 +196,45 @@ extension LinuxProcess {
231196
return handles
232197
}
233198

199+
func startStdinRelay(handle: FileHandle) {
200+
guard let stdin = self.ioSetup.stdin else { return }
201+
202+
self.state.withLock {
203+
$0.stdinRelay = Task {
204+
for await data in stdin.reader.stream() {
205+
do {
206+
try handle.write(contentsOf: data)
207+
} catch {
208+
self.logger?.error("failed to write to stdin: \(error)")
209+
break
210+
}
211+
}
212+
213+
do {
214+
self.logger?.debug("stdin relay finished, closing")
215+
216+
// There's two ways we can wind up here:
217+
//
218+
// 1. The stream finished on its own (e.g. we wrote all the
219+
// data) and we will close the underlying stdin in the guest below.
220+
//
221+
// 2. The client explicitly called closeStdin() themselves
222+
// which will cancel this relay task AFTER actually closing
223+
// the fds. If the client did that, then this task will be
224+
// cancelled, and the fds are already gone so there's nothing
225+
// for us to do.
226+
if Task.isCancelled {
227+
return
228+
}
229+
230+
try await self._closeStdin()
231+
} catch {
232+
self.logger?.error("failed to close stdin: \(error)")
233+
}
234+
}
235+
}
236+
}
237+
234238
/// Start the process.
235239
public func start() async throws {
236240
do {
@@ -273,6 +277,12 @@ extension LinuxProcess {
273277
containerID: self.owningContainer
274278
)
275279

280+
// Start stdin relay after process launch to avoid filling the pipe
281+
// buffer before the process is even running.
282+
if let stdinHandle = result[0] {
283+
self.startStdinRelay(handle: stdinHandle)
284+
}
285+
276286
self.state.withLock {
277287
$0.stdio = StdioHandles(
278288
stdin: result[0],

Sources/Integration/ContainerTests.swift

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,32 @@ extension IntegrationSuite {
107107
}
108108
}
109109

110+
final class ChunkedStdinBuffer: ReaderStream {
111+
let chunks: [Data]
112+
let delayMs: Int
113+
114+
init(chunks: [Data], delayMs: Int = 0) {
115+
self.chunks = chunks
116+
self.delayMs = delayMs
117+
}
118+
119+
func stream() -> AsyncStream<Data> {
120+
let chunks = self.chunks
121+
let delayMs = self.delayMs
122+
return AsyncStream { cont in
123+
Task {
124+
for chunk in chunks {
125+
if delayMs > 0 {
126+
try? await Task.sleep(for: .milliseconds(delayMs))
127+
}
128+
cont.yield(chunk)
129+
}
130+
cont.finish()
131+
}
132+
}
133+
}
134+
}
135+
110136
func testProcessEchoHi() async throws {
111137
let id = "test-process-echo-hi"
112138
let bs = try await bootstrap(id)
@@ -1761,4 +1787,245 @@ extension IntegrationSuite {
17611787
throw IntegrationError.assert(msg: "expected /etc/resolv.conf to contain DNS servers, got: \(output)")
17621788
}
17631789
}
1790+
1791+
func testLargeStdinInput() async throws {
1792+
let id = "test-large-stdin-input"
1793+
1794+
let bs = try await bootstrap(id)
1795+
1796+
let inputSize = 128 * 1024
1797+
let inputData = Data(repeating: 0x41, count: inputSize) // 'A' repeated
1798+
1799+
let buffer = BufferWriter()
1800+
let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in
1801+
config.process.arguments = ["cat"]
1802+
config.process.stdin = StdinBuffer(data: inputData)
1803+
config.process.stdout = buffer
1804+
config.bootLog = bs.bootLog
1805+
}
1806+
1807+
do {
1808+
try await container.create()
1809+
try await container.start()
1810+
1811+
let status = try await container.wait()
1812+
try await container.stop()
1813+
1814+
guard status.exitCode == 0 else {
1815+
throw IntegrationError.assert(msg: "process status \(status) != 0")
1816+
}
1817+
1818+
guard buffer.data.count == inputSize else {
1819+
throw IntegrationError.assert(
1820+
msg: "output size \(buffer.data.count) != input size \(inputSize)")
1821+
}
1822+
1823+
guard buffer.data == inputData else {
1824+
throw IntegrationError.assert(msg: "output data does not match input data")
1825+
}
1826+
} catch {
1827+
try? await container.stop()
1828+
throw error
1829+
}
1830+
}
1831+
1832+
func testExecLargeStdinInput() async throws {
1833+
let id = "test-exec-large-stdin-input"
1834+
let bs = try await bootstrap(id)
1835+
1836+
let inputSize = 128 * 1024
1837+
let inputData = Data(repeating: 0x42, count: inputSize)
1838+
1839+
let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in
1840+
config.process.arguments = ["sleep", "100"]
1841+
config.bootLog = bs.bootLog
1842+
}
1843+
1844+
do {
1845+
try await container.create()
1846+
try await container.start()
1847+
1848+
let buffer = BufferWriter()
1849+
let exec = try await container.exec("large-stdin-exec") { config in
1850+
config.arguments = ["cat"]
1851+
config.stdin = StdinBuffer(data: inputData)
1852+
config.stdout = buffer
1853+
}
1854+
1855+
try await exec.start()
1856+
let status = try await exec.wait()
1857+
try await exec.delete()
1858+
1859+
guard status.exitCode == 0 else {
1860+
throw IntegrationError.assert(msg: "exec status \(status) != 0")
1861+
}
1862+
1863+
guard buffer.data.count == inputSize else {
1864+
throw IntegrationError.assert(msg: "output size \(buffer.data.count) != \(inputSize)")
1865+
}
1866+
1867+
guard buffer.data == inputData else {
1868+
throw IntegrationError.assert(msg: "output data mismatch")
1869+
}
1870+
1871+
try await container.kill(SIGKILL)
1872+
try await container.wait()
1873+
try await container.stop()
1874+
} catch {
1875+
try? await container.stop()
1876+
throw error
1877+
}
1878+
}
1879+
1880+
func testStdinExplicitClose() async throws {
1881+
let id = "test-stdin-explicit-close"
1882+
let bs = try await bootstrap(id)
1883+
1884+
let inputData = "explicit close test\n".data(using: .utf8)!
1885+
let buffer = BufferWriter()
1886+
1887+
let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in
1888+
config.process.arguments = ["sleep", "100"]
1889+
config.bootLog = bs.bootLog
1890+
}
1891+
1892+
do {
1893+
try await container.create()
1894+
try await container.start()
1895+
1896+
let exec = try await container.exec("stdin-close-exec") { config in
1897+
config.arguments = ["head", "-n", "1"]
1898+
config.stdin = StdinBuffer(data: inputData)
1899+
config.stdout = buffer
1900+
}
1901+
1902+
try await exec.start()
1903+
let status = try await exec.wait()
1904+
try await exec.delete()
1905+
1906+
guard status.exitCode == 0 else {
1907+
throw IntegrationError.assert(msg: "exec status \(status) != 0")
1908+
}
1909+
1910+
guard buffer.data == inputData else {
1911+
throw IntegrationError.assert(msg: "output mismatch")
1912+
}
1913+
1914+
try await container.kill(SIGKILL)
1915+
try await container.wait()
1916+
try await container.stop()
1917+
} catch {
1918+
try? await container.stop()
1919+
throw error
1920+
}
1921+
}
1922+
1923+
func testStdinBinaryData() async throws {
1924+
let id = "test-stdin-binary-data"
1925+
let bs = try await bootstrap(id)
1926+
1927+
var inputData = Data()
1928+
for i: UInt8 in 0...255 {
1929+
inputData.append(contentsOf: [UInt8](repeating: i, count: 256))
1930+
}
1931+
1932+
let buffer = BufferWriter()
1933+
let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in
1934+
config.process.arguments = ["cat"]
1935+
config.process.stdin = StdinBuffer(data: inputData)
1936+
config.process.stdout = buffer
1937+
config.bootLog = bs.bootLog
1938+
}
1939+
1940+
do {
1941+
try await container.create()
1942+
try await container.start()
1943+
1944+
let status = try await container.wait()
1945+
try await container.stop()
1946+
1947+
guard status.exitCode == 0 else {
1948+
throw IntegrationError.assert(msg: "process status \(status) != 0")
1949+
}
1950+
1951+
guard buffer.data == inputData else {
1952+
throw IntegrationError.assert(msg: "binary data mismatch")
1953+
}
1954+
} catch {
1955+
try? await container.stop()
1956+
throw error
1957+
}
1958+
}
1959+
1960+
func testStdinMultipleChunks() async throws {
1961+
let id = "test-stdin-multiple-chunks"
1962+
let bs = try await bootstrap(id)
1963+
1964+
let chunks = (0..<10).map { i in
1965+
Data(repeating: UInt8(0x30 + i), count: 10 * 1024)
1966+
}
1967+
let expectedData = chunks.reduce(Data()) { $0 + $1 }
1968+
1969+
let buffer = BufferWriter()
1970+
let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in
1971+
config.process.arguments = ["cat"]
1972+
config.process.stdin = ChunkedStdinBuffer(chunks: chunks, delayMs: 10)
1973+
config.process.stdout = buffer
1974+
config.bootLog = bs.bootLog
1975+
}
1976+
1977+
do {
1978+
try await container.create()
1979+
try await container.start()
1980+
1981+
let status = try await container.wait()
1982+
try await container.stop()
1983+
1984+
guard status.exitCode == 0 else {
1985+
throw IntegrationError.assert(msg: "process status \(status) != 0")
1986+
}
1987+
1988+
guard buffer.data == expectedData else {
1989+
throw IntegrationError.assert(msg: "chunked data mismatch")
1990+
}
1991+
} catch {
1992+
try? await container.stop()
1993+
throw error
1994+
}
1995+
}
1996+
1997+
func testStdinVeryLarge() async throws {
1998+
let id = "test-stdin-very-large"
1999+
let bs = try await bootstrap(id)
2000+
2001+
let inputSize = 10 * 1024 * 1024
2002+
let inputData = Data(repeating: 0x58, count: inputSize)
2003+
2004+
let stdout = DiscardingWriter()
2005+
let container = try LinuxContainer(id, rootfs: bs.rootfs, vmm: bs.vmm) { config in
2006+
config.process.arguments = ["wc", "-c"]
2007+
config.process.stdin = StdinBuffer(data: inputData)
2008+
config.process.stdout = stdout
2009+
config.bootLog = bs.bootLog
2010+
}
2011+
2012+
do {
2013+
try await container.create()
2014+
try await container.start()
2015+
2016+
let status = try await container.wait()
2017+
try await container.stop()
2018+
2019+
guard status.exitCode == 0 else {
2020+
throw IntegrationError.assert(msg: "process status \(status) != 0")
2021+
}
2022+
2023+
guard stdout.count > 0 else {
2024+
throw IntegrationError.assert(msg: "no output from wc")
2025+
}
2026+
} catch {
2027+
try? await container.stop()
2028+
throw error
2029+
}
2030+
}
17642031
}

Sources/Integration/Suite.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,12 @@ struct IntegrationSuite: AsyncParsableCommand {
312312
Test("container read-only rootfs", testReadOnlyRootfs),
313313
Test("container read-only rootfs hosts file", testReadOnlyRootfsHostsFileWritten),
314314
Test("container read-only rootfs DNS", testReadOnlyRootfsDNSConfigured),
315+
Test("large stdin input", testLargeStdinInput),
316+
Test("exec large stdin input", testExecLargeStdinInput),
317+
Test("stdin explicit close", testStdinExplicitClose),
318+
Test("stdin binary data", testStdinBinaryData),
319+
Test("stdin multiple chunks", testStdinMultipleChunks),
320+
Test("stdin very large", testStdinVeryLarge),
315321

316322
// Pods
317323
Test("pod single container", testPodSingleContainer),

0 commit comments

Comments
 (0)