Skip to content

Commit ff614d2

Browse files
committed
fix(server): synchronize streamed writes on an owned final lock (SonarCloud S2445)
The SonarCloud quality gate kept failing with "E Reliability Rating on New Code" even after the previous commit reworked the streaming close, and the gate was unchanged across that push — the signal that the Blocker reliability bug was the synchronization TARGET, not the close. The streaming write helpers synchronized on a writeLock passed as a method parameter (and, after the last commit, on a method-local Object), which SonarCloud flags as java:S2445 "Blocks should be synchronized on read-only fields" — a Blocker reliability bug. (fb-contrib's NOS flagged the same code; that suppression masked it from spotbugs but Sonar evaluates it independently.) Fix: introduce a small per-request AutoCloseable ResponseStream that owns the response OutputStream and a private final lock, and serializes writeStrict / writeQuietly / close on that owned lock. The four streaming handlers drive it via try-with-resources, so the stream is closed (under the lock, after the heartbeat is cancelled) on every path — which also satisfies S2095 and lets the now-stale NOS suppression be removed. Per-request locking is preserved (each request has its own lock), so independent concurrent streams never serialize against each other. Verified locally: spotbugs:check -> 0 bugs; 48 server unit tests pass incl. the 32 OpenAiCompatServerHttpTest streaming-path tests over real sockets; spotless clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01JdLpWD8nedY7LwNnHefZLF
1 parent 490b549 commit ff614d2

2 files changed

Lines changed: 139 additions & 170 deletions

File tree

spotbugs-exclude.xml

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -456,23 +456,6 @@ SPDX-License-Identifier: MIT
456456
<Method name="main"/>
457457
</Match>
458458

459-
<!--
460-
The streaming write helpers writeStrict / writeQuietly receive a per-request lock Object (created
461-
by the streaming handler) as a parameter and synchronize on it, so the generation thread and the
462-
heartbeat-timer task never interleave writes on the same response stream. fb-contrib NOS flags
463-
synchronizing on a method parameter ("non-owned"), but the lock is owned by the in-flight request,
464-
not shared across requests; promoting it to an instance field would incorrectly serialize
465-
independent concurrent streams. The dedicated per-request lock is the intended design.
466-
-->
467-
<Match>
468-
<Class name="net.ladenthin.llama.server.OpenAiCompatServer"/>
469-
<Bug pattern="NOS_NON_OWNED_SYNCHRONIZATION"/>
470-
<Or>
471-
<Method name="writeStrict"/>
472-
<Method name="writeQuietly"/>
473-
</Or>
474-
</Match>
475-
476459
<!--
477460
OpenAiSseFormatter.sseDone() and heartbeat() are self-documenting accessors for the two fixed SSE
478461
protocol tokens (the stream terminator and the comment-line keep-alive). They sit alongside

src/main/java/net/ladenthin/llama/server/OpenAiCompatServer.java

Lines changed: 139 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -348,45 +348,33 @@ private void streamChat(HttpExchange exchange, JsonNode request) throws IOExcept
348348
exchange.getResponseHeaders().set("Content-Type", CONTENT_TYPE_SSE);
349349
exchange.getResponseHeaders().set("Cache-Control", "no-cache");
350350
exchange.sendResponseHeaders(HTTP_OK, 0);
351-
final OutputStream os = exchange.getResponseBody();
352-
final Object writeLock = new Object();
353-
ScheduledFuture<?> heartbeat = null;
354-
try {
355-
heartbeat = heartbeatExecutor.scheduleAtFixedRate(
356-
() -> writeQuietly(os, writeLock, OpenAiSseFormatter.heartbeat()),
357-
config.getHeartbeatMillis(),
358-
config.getHeartbeatMillis(),
359-
TimeUnit.MILLISECONDS);
360-
backend.stream(
361-
request,
362-
chunkJson -> writeStrict(
363-
os,
364-
writeLock,
365-
OpenAiSseFormatter.sseData(OpenAiSseFormatter.ensureUsageCachedTokens(chunkJson))));
366-
writeStrict(os, writeLock, OpenAiSseFormatter.sseDone());
367-
} catch (IllegalArgumentException e) {
368-
writeQuietly(
369-
os,
370-
writeLock,
371-
OpenAiSseFormatter.sseData(OpenAiSseFormatter.errorJson(message(e), ERROR_TYPE_REQUEST, null)));
372-
} catch (IOException e) {
373-
LOG.debug("client disconnected during stream", e);
374-
} catch (RuntimeException e) {
375-
LOG.warn("streaming chat completion failed", e);
376-
writeQuietly(
377-
os,
378-
writeLock,
379-
OpenAiSseFormatter.sseData(OpenAiSseFormatter.errorJson(message(e), ERROR_TYPE_SERVER, null)));
380-
} finally {
381-
if (heartbeat != null) {
382-
heartbeat.cancel(false);
383-
}
384-
// Close under the write lock so the close never races a still-in-flight heartbeat write.
385-
synchronized (writeLock) {
386-
try {
387-
os.close();
388-
} catch (IOException e) {
389-
LOG.trace("stream close failed", e);
351+
try (ResponseStream out = new ResponseStream(exchange.getResponseBody())) {
352+
ScheduledFuture<?> heartbeat = null;
353+
try {
354+
heartbeat = heartbeatExecutor.scheduleAtFixedRate(
355+
() -> out.writeQuietly(OpenAiSseFormatter.heartbeat()),
356+
config.getHeartbeatMillis(),
357+
config.getHeartbeatMillis(),
358+
TimeUnit.MILLISECONDS);
359+
backend.stream(
360+
request,
361+
chunkJson -> out.writeStrict(
362+
OpenAiSseFormatter.sseData(OpenAiSseFormatter.ensureUsageCachedTokens(chunkJson))));
363+
out.writeStrict(OpenAiSseFormatter.sseDone());
364+
} catch (IllegalArgumentException e) {
365+
out.writeQuietly(
366+
OpenAiSseFormatter.sseData(OpenAiSseFormatter.errorJson(message(e), ERROR_TYPE_REQUEST, null)));
367+
} catch (IOException e) {
368+
LOG.debug("client disconnected during stream", e);
369+
} catch (RuntimeException e) {
370+
LOG.warn("streaming chat completion failed", e);
371+
out.writeQuietly(
372+
OpenAiSseFormatter.sseData(OpenAiSseFormatter.errorJson(message(e), ERROR_TYPE_SERVER, null)));
373+
} finally {
374+
// try-with-resources closes the stream (under its lock) after the heartbeat is cancelled,
375+
// so the close never races a still-in-flight heartbeat write.
376+
if (heartbeat != null) {
377+
heartbeat.cancel(false);
390378
}
391379
}
392380
}
@@ -469,33 +457,24 @@ private void streamOllamaChat(HttpExchange exchange, JsonNode openAiRequest, Str
469457
exchange.getResponseHeaders().set("Content-Type", CONTENT_TYPE_NDJSON);
470458
exchange.getResponseHeaders().set("Cache-Control", "no-cache");
471459
exchange.sendResponseHeaders(HTTP_OK, 0);
472-
final OutputStream os = exchange.getResponseBody();
473-
final Object writeLock = new Object();
474460
final ToolCallDeltaAccumulator accumulator = new ToolCallDeltaAccumulator();
475-
try {
476-
backend.stream(openAiRequest, chunkJson -> {
477-
accumulator.accept(chunkJson);
478-
String line = OllamaApiSupport.toOllamaContentLine(chunkJson, model);
479-
if (line != null) {
480-
writeStrict(os, writeLock, line);
481-
}
482-
});
483-
writeStrict(os, writeLock, OllamaApiSupport.toOllamaDoneLine(model, accumulator));
484-
} catch (IllegalArgumentException e) {
485-
writeQuietly(os, writeLock, ollamaError(message(e)) + "\n");
486-
} catch (IOException e) {
487-
LOG.debug("ollama client disconnected during stream", e);
488-
} catch (RuntimeException e) {
489-
LOG.warn("ollama streaming chat failed", e);
490-
writeQuietly(os, writeLock, ollamaError(message(e)) + "\n");
491-
} finally {
492-
// Close under the write lock so the close never races a concurrent best-effort write.
493-
synchronized (writeLock) {
494-
try {
495-
os.close();
496-
} catch (IOException e) {
497-
LOG.trace("stream close failed", e);
498-
}
461+
try (ResponseStream out = new ResponseStream(exchange.getResponseBody())) {
462+
try {
463+
backend.stream(openAiRequest, chunkJson -> {
464+
accumulator.accept(chunkJson);
465+
String line = OllamaApiSupport.toOllamaContentLine(chunkJson, model);
466+
if (line != null) {
467+
out.writeStrict(line);
468+
}
469+
});
470+
out.writeStrict(OllamaApiSupport.toOllamaDoneLine(model, accumulator));
471+
} catch (IllegalArgumentException e) {
472+
out.writeQuietly(ollamaError(message(e)) + "\n");
473+
} catch (IOException e) {
474+
LOG.debug("ollama client disconnected during stream", e);
475+
} catch (RuntimeException e) {
476+
LOG.warn("ollama streaming chat failed", e);
477+
out.writeQuietly(ollamaError(message(e)) + "\n");
499478
}
500479
}
501480
}
@@ -582,42 +561,34 @@ private void streamAnthropic(HttpExchange exchange, JsonNode openAiRequest, Stri
582561
exchange.getResponseHeaders().set("Content-Type", CONTENT_TYPE_SSE);
583562
exchange.getResponseHeaders().set("Cache-Control", "no-cache");
584563
exchange.sendResponseHeaders(HTTP_OK, 0);
585-
final OutputStream os = exchange.getResponseBody();
586-
final Object writeLock = new Object();
587564
final AnthropicStreamTranslator translator =
588565
new AnthropicStreamTranslator("msg_" + Long.toHexString(System.nanoTime()), model);
589-
ScheduledFuture<?> heartbeat = null;
590-
try {
591-
heartbeat = heartbeatExecutor.scheduleAtFixedRate(
592-
() -> writeQuietly(os, writeLock, OpenAiSseFormatter.heartbeat()),
593-
config.getHeartbeatMillis(),
594-
config.getHeartbeatMillis(),
595-
TimeUnit.MILLISECONDS);
596-
writeStrict(os, writeLock, translator.begin());
597-
backend.stream(openAiRequest, chunkJson -> {
598-
String events = translator.onChunk(chunkJson);
599-
if (!events.isEmpty()) {
600-
writeStrict(os, writeLock, events);
601-
}
602-
});
603-
writeStrict(os, writeLock, translator.end());
604-
} catch (IllegalArgumentException e) {
605-
writeQuietly(os, writeLock, AnthropicApiSupport.sseEvent("error", anthropicError(message(e))));
606-
} catch (IOException e) {
607-
LOG.debug("anthropic client disconnected during stream", e);
608-
} catch (RuntimeException e) {
609-
LOG.warn("anthropic streaming failed", e);
610-
writeQuietly(os, writeLock, AnthropicApiSupport.sseEvent("error", anthropicError(message(e))));
611-
} finally {
612-
if (heartbeat != null) {
613-
heartbeat.cancel(false);
614-
}
615-
// Close under the write lock so the close never races a still-in-flight heartbeat write.
616-
synchronized (writeLock) {
617-
try {
618-
os.close();
619-
} catch (IOException e) {
620-
LOG.trace("stream close failed", e);
566+
try (ResponseStream out = new ResponseStream(exchange.getResponseBody())) {
567+
ScheduledFuture<?> heartbeat = null;
568+
try {
569+
heartbeat = heartbeatExecutor.scheduleAtFixedRate(
570+
() -> out.writeQuietly(OpenAiSseFormatter.heartbeat()),
571+
config.getHeartbeatMillis(),
572+
config.getHeartbeatMillis(),
573+
TimeUnit.MILLISECONDS);
574+
out.writeStrict(translator.begin());
575+
backend.stream(openAiRequest, chunkJson -> {
576+
String events = translator.onChunk(chunkJson);
577+
if (!events.isEmpty()) {
578+
out.writeStrict(events);
579+
}
580+
});
581+
out.writeStrict(translator.end());
582+
} catch (IllegalArgumentException e) {
583+
out.writeQuietly(AnthropicApiSupport.sseEvent("error", anthropicError(message(e))));
584+
} catch (IOException e) {
585+
LOG.debug("anthropic client disconnected during stream", e);
586+
} catch (RuntimeException e) {
587+
LOG.warn("anthropic streaming failed", e);
588+
out.writeQuietly(AnthropicApiSupport.sseEvent("error", anthropicError(message(e))));
589+
} finally {
590+
if (heartbeat != null) {
591+
heartbeat.cancel(false);
621592
}
622593
}
623594
}
@@ -670,49 +641,35 @@ private void streamResponses(HttpExchange exchange, JsonNode openAiRequest, Stri
670641
exchange.getResponseHeaders().set("Content-Type", CONTENT_TYPE_SSE);
671642
exchange.getResponseHeaders().set("Cache-Control", "no-cache");
672643
exchange.sendResponseHeaders(HTTP_OK, 0);
673-
final OutputStream os = exchange.getResponseBody();
674-
final Object writeLock = new Object();
675644
final ResponsesStreamTranslator translator = new ResponsesStreamTranslator(model, responseId);
676-
ScheduledFuture<?> heartbeat = null;
677-
try {
678-
heartbeat = heartbeatExecutor.scheduleAtFixedRate(
679-
() -> writeQuietly(os, writeLock, OpenAiSseFormatter.heartbeat()),
680-
config.getHeartbeatMillis(),
681-
config.getHeartbeatMillis(),
682-
TimeUnit.MILLISECONDS);
683-
writeStrict(os, writeLock, translator.begin());
684-
backend.stream(openAiRequest, chunkJson -> {
685-
String events = translator.onChunk(chunkJson);
686-
if (!events.isEmpty()) {
687-
writeStrict(os, writeLock, events);
688-
}
689-
});
690-
writeStrict(os, writeLock, translator.end());
691-
} catch (IllegalArgumentException e) {
692-
writeQuietly(
693-
os,
694-
writeLock,
695-
"event: error\ndata: " + OpenAiSseFormatter.errorJson(message(e), ERROR_TYPE_REQUEST, null)
696-
+ "\n\n");
697-
} catch (IOException e) {
698-
LOG.debug("responses client disconnected during stream", e);
699-
} catch (RuntimeException e) {
700-
LOG.warn("responses streaming failed", e);
701-
writeQuietly(
702-
os,
703-
writeLock,
704-
"event: error\ndata: " + OpenAiSseFormatter.errorJson(message(e), ERROR_TYPE_SERVER, null)
705-
+ "\n\n");
706-
} finally {
707-
if (heartbeat != null) {
708-
heartbeat.cancel(false);
709-
}
710-
// Close under the write lock so the close never races a still-in-flight heartbeat write.
711-
synchronized (writeLock) {
712-
try {
713-
os.close();
714-
} catch (IOException e) {
715-
LOG.trace("stream close failed", e);
645+
try (ResponseStream out = new ResponseStream(exchange.getResponseBody())) {
646+
ScheduledFuture<?> heartbeat = null;
647+
try {
648+
heartbeat = heartbeatExecutor.scheduleAtFixedRate(
649+
() -> out.writeQuietly(OpenAiSseFormatter.heartbeat()),
650+
config.getHeartbeatMillis(),
651+
config.getHeartbeatMillis(),
652+
TimeUnit.MILLISECONDS);
653+
out.writeStrict(translator.begin());
654+
backend.stream(openAiRequest, chunkJson -> {
655+
String events = translator.onChunk(chunkJson);
656+
if (!events.isEmpty()) {
657+
out.writeStrict(events);
658+
}
659+
});
660+
out.writeStrict(translator.end());
661+
} catch (IllegalArgumentException e) {
662+
out.writeQuietly("event: error\ndata: "
663+
+ OpenAiSseFormatter.errorJson(message(e), ERROR_TYPE_REQUEST, null) + "\n\n");
664+
} catch (IOException e) {
665+
LOG.debug("responses client disconnected during stream", e);
666+
} catch (RuntimeException e) {
667+
LOG.warn("responses streaming failed", e);
668+
out.writeQuietly("event: error\ndata: "
669+
+ OpenAiSseFormatter.errorJson(message(e), ERROR_TYPE_SERVER, null) + "\n\n");
670+
} finally {
671+
if (heartbeat != null) {
672+
heartbeat.cancel(false);
716673
}
717674
}
718675
}
@@ -831,22 +788,51 @@ private void sendError(HttpExchange exchange, int status, String type, String me
831788
sendJson(exchange, status, OpenAiSseFormatter.errorJson(message, type, null));
832789
}
833790

834-
/** Write under the response lock, propagating failures so a streaming generation can be cancelled. */
835-
private void writeStrict(OutputStream os, Object writeLock, String text) throws IOException {
836-
synchronized (writeLock) {
837-
os.write(text.getBytes(StandardCharsets.UTF_8));
838-
os.flush();
791+
/**
792+
* Per-request, thread-safe wrapper over a streaming HTTP response body. Every write and the close are
793+
* serialized on a {@code private final} lock, so the generation thread and the heartbeat-timer task
794+
* never write to (or close) the same stream concurrently. The lock is owned by this per-request
795+
* instance rather than shared, so independent concurrent streams never serialize against each other.
796+
* It is {@link AutoCloseable} so callers drive it with try-with-resources, which closes the stream
797+
* (under the lock) on every exit path.
798+
*/
799+
private static final class ResponseStream implements AutoCloseable {
800+
801+
private final OutputStream os;
802+
private final Object lock = new Object();
803+
804+
ResponseStream(OutputStream os) {
805+
this.os = os;
839806
}
840-
}
841807

842-
/** Write under the response lock, swallowing failures (used for heartbeats and best-effort events). */
843-
private void writeQuietly(OutputStream os, Object writeLock, String text) {
844-
synchronized (writeLock) {
845-
try {
808+
/** Write under the lock, propagating failures so a streaming generation can be cancelled. */
809+
void writeStrict(String text) throws IOException {
810+
synchronized (lock) {
846811
os.write(text.getBytes(StandardCharsets.UTF_8));
847812
os.flush();
848-
} catch (IOException e) {
849-
LOG.trace("stream write failed (client likely disconnected)", e);
813+
}
814+
}
815+
816+
/** Write under the lock, swallowing failures (used for heartbeats and best-effort events). */
817+
void writeQuietly(String text) {
818+
synchronized (lock) {
819+
try {
820+
os.write(text.getBytes(StandardCharsets.UTF_8));
821+
os.flush();
822+
} catch (IOException e) {
823+
LOG.trace("stream write failed (client likely disconnected)", e);
824+
}
825+
}
826+
}
827+
828+
@Override
829+
public void close() {
830+
synchronized (lock) {
831+
try {
832+
os.close();
833+
} catch (IOException e) {
834+
LOG.trace("stream close failed", e);
835+
}
850836
}
851837
}
852838
}

0 commit comments

Comments
 (0)