Skip to content

Commit c8a47fa

Browse files
committed
fix: harden LoggableResponseBody capture against zero-read spin and double-close
Two defects in the bounded response-body capture path: - The bounded drain loop subtracted the read count from its remaining budget and handled only EOF (-1) and positive reads. A delegate Source that returns 0 for a positive byteCount — a Source.read contract violation — made the subtraction a no-op, so the loop spun forever and hung whatever thread triggered the capture. The loop now fails fast with an IOException on a 0-read, matching TeeSink.writeAll; the existing failure path caches it and source() re-throws it deterministically while snapshot() still returns the partial bytes. A 0-read is not treated as EOF, which would silently truncate the body. - On the over-cap path the wrapper retains the delegate's source as the live tail and hands it out inside a one-shot prefix+tail stream. That stream's close() closed the tail directly while the wrapper's own close() independently closed the delegate, so for a delegate whose source() returns the same instance and whose close() closes it, the underlying source was closed twice. Both paths now funnel through a single closeDelegateOnce() guard under the existing lock, so whichever closes first wins and the other is a no-op. The peek-based prefix view is still closed directly. This was masked by the self-guarded close in the shipped Okio adapter but was latent for a bring-your-own IoProvider. Also tightens the over-cap source() to assert the live-tail invariant with checkNotNull rather than silently falling back to a peek view, since the tail is always present once control reaches that point.
1 parent ea0cc81 commit c8a47fa

2 files changed

Lines changed: 157 additions & 8 deletions

File tree

sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/response/LoggableResponseBody.kt

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,10 @@ public class LoggableResponseBody
143143
"LoggableResponseBody capture exceeded maxCaptureBytes; source() over the live tail " +
144144
"is single-use and was already consumed."
145145
}
146-
val tail = liveTail ?: return buf.peek()
146+
// On the over-cap path the live tail is always set by the time control reaches here:
147+
// drainError throws above and fullyCaptured returns above, so the only remaining regime
148+
// is "cap hit with bytes pending", which always assigns liveTail.
149+
val tail = checkNotNull(liveTail) { "over-cap source() reached with no live tail to replay" }
147150
return provider.bufferedSource(PrefixThenTailSource(buf.peek(), tail))
148151
}
149152

@@ -190,15 +193,29 @@ public class LoggableResponseBody
190193
// On an over-cap capture the delegate is still open and must be closed here; the
191194
// delegate owns the live-tail source, so closing the delegate releases it too (we
192195
// must NOT close the live tail separately, or the source would be closed twice).
193-
if (!delegateClosed) {
194-
delegate.close()
195-
delegateClosed = true
196-
}
196+
// Both this path and the over-cap one-shot source funnel through the same guard so
197+
// whichever fires first wins and the second is a no-op.
198+
closeDelegateOnce()
197199
// The captured buffer intentionally survives close — it holds in-memory bytes
198200
// and no network resources, and is needed for post-mortem snapshot logging.
199201
}
200202
}
201203

204+
/**
205+
* Closes the delegate (and, by ownership, its source) at most once. Both [close] and the
206+
* over-cap one-shot source close the same underlying source on the over-cap path; routing
207+
* both through this single guard prevents a double-close on a delegate whose [source]
208+
* returns the same instance and whose [close] closes it (some sockets / streams throw on
209+
* double-close). Callers must hold [lock].
210+
*/
211+
@Throws(IOException::class)
212+
private fun closeDelegateOnce() {
213+
if (!delegateClosed) {
214+
delegate.close()
215+
delegateClosed = true
216+
}
217+
}
218+
202219
private fun ensureCaptured(): Buffer {
203220
captured?.let { return it }
204221
return lock.withLock {
@@ -244,6 +261,14 @@ public class LoggableResponseBody
244261
fullyCaptured = true
245262
break
246263
}
264+
// A 0-read for a positive byteCount violates the Source.read contract; treating
265+
// it as a no-op would spin this loop forever. Fail fast (the catch below caches
266+
// it and source() re-throws it). Do NOT treat 0 as EOF — that would truncate.
267+
if (n == 0L) {
268+
throw IOException(
269+
"Source returned 0 for byteCount=$chunk which violates the Source.read contract",
270+
)
271+
}
247272
remaining -= n
248273
}
249274
if (fullyCaptured) {
@@ -285,9 +310,12 @@ public class LoggableResponseBody
285310

286311
/**
287312
* A one-shot [Source] that yields the captured [prefix] bytes first, then continues from
288-
* the live [tail]. Used only on the over-cap path; closing it closes the tail.
313+
* the live [tail]. Used only on the over-cap path. Closing it closes the independent peek
314+
* [prefix] view directly, but routes the live-tail close through [closeDelegateOnce] — the
315+
* same single-close guard the wrapper's own [close] uses — so the underlying delegate
316+
* source is never closed twice when both this source and the wrapper are closed.
289317
*/
290-
private class PrefixThenTailSource(
318+
private inner class PrefixThenTailSource(
291319
private val prefix: BufferedSource,
292320
private val tail: BufferedSource,
293321
) : Source {
@@ -303,9 +331,12 @@ public class LoggableResponseBody
303331
@Throws(IOException::class)
304332
override fun close() {
305333
try {
334+
// The peek view is independent of the delegate source and cheap to close.
306335
prefix.close()
307336
} finally {
308-
tail.close()
337+
// The tail IS the delegate's source; close it through the shared guard so the
338+
// wrapper's close() and this close() cannot both close the underlying source.
339+
lock.withLock { closeDelegateOnce() }
309340
}
310341
}
311342
}

sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/response/LoggableResponseBodyTest.kt

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import org.dexpace.sdk.core.io.Buffer
1212
import org.dexpace.sdk.core.io.BufferedSource
1313
import org.dexpace.sdk.core.io.Io
1414
import org.dexpace.sdk.io.OkioIoProvider
15+
import org.junit.jupiter.api.assertTimeoutPreemptively
1516
import java.io.IOException
17+
import java.time.Duration
1618
import java.util.concurrent.CountDownLatch
1719
import java.util.concurrent.Executors
1820
import java.util.concurrent.atomic.AtomicInteger
@@ -374,4 +376,120 @@ class LoggableResponseBodyTest {
374376
assertEquals(payload, results[i], "Thread $i got unexpected data")
375377
}
376378
}
379+
380+
// ----- bounded drain must not spin on a zero-read source -----
381+
382+
@Test
383+
fun `drain on a zero-read source fails fast instead of spinning forever`() {
384+
// A delegate Source that returns 0 for a positive byteCount violates the Source.read
385+
// contract. The bounded drain subtracts the read count from `remaining`, so a 0-read
386+
// is a no-op that loops forever. The drain must instead fail fast with an IOException,
387+
// and source() must re-throw it (rather than hang the caller / worker / completion thread).
388+
val seed = Io.provider.buffer().also { it.writeUtf8("ignored") }
389+
val zeroReadSource =
390+
object : BufferedSource by seed.peek() {
391+
override fun read(
392+
sink: Buffer,
393+
byteCount: Long,
394+
): Long = 0L
395+
396+
override fun close() {}
397+
}
398+
val delegate =
399+
object : ResponseBody() {
400+
override fun mediaType(): MediaType? = null
401+
402+
override fun contentLength(): Long = -1
403+
404+
override fun source(): BufferedSource = zeroReadSource
405+
406+
override fun close() {}
407+
}
408+
val wrapper = LoggableResponseBody(delegate)
409+
410+
assertTimeoutPreemptively(Duration.ofSeconds(5)) {
411+
// The drain must terminate; it must not spin on the zero-read.
412+
val ex = assertFailsWith<IOException> { wrapper.source() }
413+
assertNotNull(ex)
414+
}
415+
416+
// The drain failure is cached as an IOException and surfaced via captureException.
417+
val captured = wrapper.captureException
418+
assertNotNull(captured, "a zero-read drain must cache a failure, not truncate silently")
419+
assertTrue(
420+
captured is IOException,
421+
"the cached drain failure must be an IOException, got: ${captured::class}",
422+
)
423+
// The partial snapshot is still available (documented failure semantics).
424+
assertEquals(0, wrapper.snapshot().size, "no bytes were read before the contract violation")
425+
}
426+
427+
// ----- over-cap live tail must be closed exactly once -----
428+
429+
/**
430+
* A delegate that returns the SAME [BufferedSource] instance on every [source] call and whose
431+
* [close] actually closes that instance, counting underlying closes. Models a BYO transport
432+
* whose `source()` returns one source and whose `close()` closes it — the case where routing
433+
* the live-tail close separately from the delegate close double-closes the socket.
434+
*/
435+
private class SharedSourceBody(
436+
private val text: String,
437+
sourceCloseCount: AtomicInteger,
438+
) : ResponseBody() {
439+
private val backing: BufferedSource = Io.provider.buffer().also { it.writeUtf8(text) }
440+
private val shared: BufferedSource =
441+
object : BufferedSource by backing {
442+
override fun close() {
443+
sourceCloseCount.incrementAndGet()
444+
}
445+
}
446+
447+
override fun mediaType(): MediaType? = MediaType.parse("text/plain")
448+
449+
override fun contentLength(): Long = text.toByteArray(Charsets.UTF_8).size.toLong()
450+
451+
override fun source(): BufferedSource = shared
452+
453+
override fun close() {
454+
shared.close()
455+
}
456+
}
457+
458+
@Test
459+
fun `over-cap source then close closes the underlying source exactly once`() {
460+
// Over-cap: the wrapper retains delegate.source() as the live tail and hands it out inside
461+
// a one-shot stream. Closing that stream and then closing the wrapper must not close the
462+
// same underlying source twice (some sockets / streams throw on double-close).
463+
val sourceCloseCount = AtomicInteger(0)
464+
val payload = "abcdefghijklmnopqrstuvwxyz" // 26 bytes > cap
465+
val wrapper = LoggableResponseBody.bounded(SharedSourceBody(payload, sourceCloseCount), Io.provider, 5L)
466+
467+
wrapper.source().use { it.readUtf8() }
468+
wrapper.close()
469+
470+
assertEquals(
471+
1,
472+
sourceCloseCount.get(),
473+
"the underlying source must be closed exactly once across the one-shot stream and wrapper close",
474+
)
475+
}
476+
477+
@Test
478+
fun `over-cap close then source close closes the underlying source exactly once`() {
479+
// The reverse order: wrapper.close() first, then closing the already-handed-out one-shot
480+
// stream. Whichever path closes the delegate first wins; the second must be a no-op.
481+
val sourceCloseCount = AtomicInteger(0)
482+
val payload = "abcdefghijklmnopqrstuvwxyz" // 26 bytes > cap
483+
val wrapper = LoggableResponseBody.bounded(SharedSourceBody(payload, sourceCloseCount), Io.provider, 5L)
484+
485+
val tail = wrapper.source()
486+
wrapper.close()
487+
tail.close()
488+
489+
assertEquals(
490+
1,
491+
sourceCloseCount.get(),
492+
"the underlying source must be closed exactly once regardless of close ordering",
493+
)
494+
}
377495
}

0 commit comments

Comments
 (0)