diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index f06b9e75..e5d6e3e2 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -466,6 +466,35 @@ jobs: ${{ github.workspace }}/target/surefire-reports/TEST-*.xml if-no-files-found: warn + # --------------------------------------------------------------------------- + # vmlens interleaving analysis — pure-Java, needs no native library or models. + # Staged to a single smoke test for now (see the `vmlens` profile in pom.xml). + # --------------------------------------------------------------------------- + vmlens: + name: Test (vmlens interleavings) + needs: startgate + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-java@v5 + with: + distribution: 'temurin' + java-version: ${{ env.JAVA_VERSION }} + cache: maven + - name: Test under vmlens (interleaving analysis) + # Add each new test in the `vmlens` package to this -Dtest list (surefire + # -Dtest matches simple class names, not package globs; the default suite is + # excluded from the vmlens package via pom.xml managed surefire ). + run: >- + mvn --batch-mode --no-transfer-progress -Pvmlens test + -Dtest=VmlensInterleavingSmokeTest,SessionStateInterleavingTest -DfailIfNoTests=false + - uses: actions/upload-artifact@v7 + if: always() + with: + name: vmlens-report + path: target/vmlens-report/ + if-no-files-found: ignore + test-java-macos-arm64-metal: name: Java Tests macOS 14 arm64 (Metal) needs: build-macos-arm64-metal diff --git a/pom.xml b/pom.xml index 8385d506..afb61c02 100644 --- a/pom.xml +++ b/pom.xml @@ -219,6 +219,21 @@ SPDX-License-Identifier: MIT ${reactor.version} test + + + com.vmlens + api + ${vmlens.version} + test + @@ -278,6 +293,19 @@ SPDX-License-Identifier: MIT org.apache.maven.plugins maven-surefire-plugin 3.5.6 + + + + **/vmlens/*.java + + org.apache.maven.plugins @@ -881,14 +909,6 @@ SPDX-License-Identifier: MIT vmlens - - - com.vmlens - api - ${vmlens.version} - test - - @@ -896,16 +916,15 @@ SPDX-License-Identifier: MIT vmlens-maven-plugin - - **/*$* - **/CancellationTokenLincheckTest.java - + + **/vmlens/*.java + diff --git a/src/main/java/net/ladenthin/llama/Session.java b/src/main/java/net/ladenthin/llama/Session.java index fe8654ac..9efe15e7 100644 --- a/src/main/java/net/ladenthin/llama/Session.java +++ b/src/main/java/net/ladenthin/llama/Session.java @@ -9,7 +9,7 @@ import lombok.ToString; import net.ladenthin.llama.parameters.InferenceParameters; import net.ladenthin.llama.value.ChatMessage; -import net.ladenthin.llama.value.ChatTranscript; +import net.ladenthin.llama.value.Pair; import org.jspecify.annotations.Nullable; /** @@ -30,11 +30,11 @@ * {@link #commitStreamedReply(String)}. *

* - *

{@code toString} is generated by Lombok over the slot id, system message, and - * accumulated turns. The owning {@link LlamaModel} is excluded because its - * {@code toString} would render native state. The {@code paramsCustomizer} - * {@link UnaryOperator} is excluded because lambda {@code toString} is the implementation - * hash, not useful in logs. The intrinsic {@code lock} is excluded as a noise field. + *

{@code toString} is generated by Lombok over the slot id and the + * {@link SessionState} (which renders the transcript and streaming flag). The owning + * {@link LlamaModel} is excluded because its {@code toString} would render native + * state. The {@code paramsCustomizer} {@link UnaryOperator} is excluded because lambda + * {@code toString} is the implementation hash, not useful in logs. * {@code equals}/{@code hashCode} are intentionally NOT generated: a session is a * mutable lifecycle handle managed by identity.

*/ @@ -48,23 +48,18 @@ public final class Session implements AutoCloseable { private final int slotId; /** - * Append-only transcript with two-phase commit semantics. See the - * {@link net.ladenthin.llama.value.ChatTranscript} class Javadoc for the full invariant statement - * and the {@code ChatTranscriptTest} class for the running-documentation - * tests that pin the contract. + * The lock-guarded streaming-flag + transcript state machine. Extracted to + * {@link SessionState} so its concurrency contract (the two-phase commit and the + * streaming guard) is testable without the native model; see that class and the + * {@code SessionStateInterleavingTest}. This {@code Session} only adds the model + * calls, injected as callbacks that {@link SessionState} runs under its lock. */ - private final ChatTranscript transcript; + private final SessionState state; // Lambda UnaryOperator — toString is the implementation hash, not useful in logs. @ToString.Exclude private final @Nullable UnaryOperator paramsCustomizer; - // Intrinsic lock used only for synchronisation; rendering its identity adds noise. - @ToString.Exclude - private final Object lock = new Object(); - - private boolean streamingActive; - /** * Create a session bound to a specific slot id, with an optional system prompt * applied to every {@link #send(String)} call. @@ -95,7 +90,7 @@ public Session( @Nullable UnaryOperator paramsCustomizer) { this.model = model; this.slotId = slotId; - this.transcript = new ChatTranscript(systemMessage); + this.state = new SessionState(slotId, systemMessage); this.paramsCustomizer = paramsCustomizer; } @@ -106,22 +101,13 @@ public Session( * @return the assistant's reply text */ public String send(String userMessage) { - synchronized (lock) { - if (streamingActive) { - throw new IllegalStateException("stream in progress on slot " + slotId - + " (transcript=" + transcript.size() + " turns)" - + "; call commitStreamedReply(...) before send(...)"); - } - // Two-phase commit: build the wire-format with the pending user turn - // outside the transcript via messagesWithPendingUserTurn(...). On - // model success, commit BOTH turns atomically through appendRound(...). - // On model failure, nothing was committed — no rollback logic needed. - // Invariant pinned by ChatTranscriptTest. - InferenceParameters params = buildParamsWithPendingUserTurn(userMessage); - String reply = model.chatCompleteText(params); - transcript.appendRound(userMessage, reply); - return reply; - } + // Two-phase commit lives in SessionState.send(...): it guards against an + // in-progress stream, builds the wire-format with the pending user turn, runs + // the model call below under the lock, and on success commits BOTH turns + // atomically. On model failure nothing is committed — no rollback needed. + return state.send( + userMessage, + (systemMessage, wireMessages) -> model.chatCompleteText(buildParams(systemMessage, wireMessages))); } /** @@ -134,20 +120,13 @@ public String send(String userMessage) { * @return a {@link LlamaIterable} that yields assistant reply chunks */ public LlamaIterable stream(String userMessage) { - synchronized (lock) { - if (streamingActive) { - throw new IllegalStateException("stream in progress on slot " + slotId - + " (transcript=" + transcript.size() + " turns)" - + "; call commitStreamedReply(...) before stream(...)"); - } - // Two-phase commit: see send(). The user turn is committed only after - // generateChat successfully returns the iterable; the assistant turn is - // committed separately by commitStreamedReply(...). - LlamaIterable iterable = model.generateChat(buildParamsWithPendingUserTurn(userMessage)); - transcript.appendUserTurn(userMessage); - streamingActive = true; - return iterable; - } + // SessionState.beginStream(...) guards against an in-progress stream, runs the + // model call below under the lock, and on success commits the user turn and + // marks streaming active; the assistant turn is committed separately by + // commitStreamedReply(...). + return state.beginStream( + userMessage, + (systemMessage, wireMessages) -> model.generateChat(buildParams(systemMessage, wireMessages))); } /** @@ -157,15 +136,7 @@ public LlamaIterable stream(String userMessage) { * @param assistantText the assistant text accumulated from a prior {@link #stream(String)} call */ public void commitStreamedReply(String assistantText) { - synchronized (lock) { - if (!streamingActive) { - throw new IllegalStateException("no stream in progress on slot " + slotId - + " (transcript=" + transcript.size() + " turns)" - + "; call stream(...) first"); - } - transcript.appendAssistantTurn(assistantText); - streamingActive = false; - } + state.commitStreamedReply(assistantText); } /** @@ -175,14 +146,7 @@ public void commitStreamedReply(String assistantText) { * @return the JSON response from the native save action */ public String save(String filepath) { - synchronized (lock) { - if (streamingActive) { - throw new IllegalStateException("stream in progress on slot " + slotId - + " (transcript=" + transcript.size() + " turns)" - + "; call commitStreamedReply(...) before save(...)"); - } - return model.saveSlot(slotId, filepath); - } + return state.runWhenNotStreaming("save", () -> model.saveSlot(slotId, filepath)); } /** @@ -192,14 +156,7 @@ public String save(String filepath) { * @return the JSON response from the native restore action */ public String restore(String filepath) { - synchronized (lock) { - if (streamingActive) { - throw new IllegalStateException("stream in progress on slot " + slotId - + " (transcript=" + transcript.size() + " turns)" - + "; call commitStreamedReply(...) before restore(...)"); - } - return model.restoreSlot(slotId, filepath); - } + return state.runWhenNotStreaming("restore", () -> model.restoreSlot(slotId, filepath)); } /** @@ -207,33 +164,28 @@ public String restore(String filepath) { * @return the accumulated transcript so far, in order, including the system message if any */ public List getMessages() { - synchronized (lock) { - return transcript.snapshot(); - } + return state.snapshot(); } /** Erase the bound slot's KV cache. Does not modify the in-memory transcript. */ @Override public void close() { - synchronized (lock) { - model.eraseSlot(slotId); - } + state.runUnderLock(() -> model.eraseSlot(slotId)); } /** - * Build inference parameters with a pending user turn appended to the existing - * transcript — without mutating the underlying {@link net.ladenthin.llama.value.ChatTranscript}. The - * actual transcript mutation happens AFTER the model call returns successfully, - * either via {@link net.ladenthin.llama.value.ChatTranscript#appendRound(String, String)} (send path) - * or {@link net.ladenthin.llama.value.ChatTranscript#appendUserTurn(String)} (stream path). + * Build inference parameters from the system message and the wire-format messages + * supplied by {@link SessionState} (the committed turns plus the pending user + * turn), applying the optional {@code paramsCustomizer}. The transcript itself is + * never mutated here; {@link SessionState} commits turns only after the model call + * returns successfully. * - * @param pendingUserMessage the user turn to include in the wire format - * @return inference parameters carrying transcript + pending user turn + * @param systemMessage the system prompt, or {@code null} when none was configured + * @param wireMessages the committed turns plus the pending user turn + * @return inference parameters carrying the system message + wire messages */ - private InferenceParameters buildParamsWithPendingUserTurn(String pendingUserMessage) { - InferenceParameters params = InferenceParameters.empty() - .withMessages( - transcript.getSystemMessage(), transcript.messagesWithPendingUserTurn(pendingUserMessage)); + private InferenceParameters buildParams(@Nullable String systemMessage, List> wireMessages) { + InferenceParameters params = InferenceParameters.empty().withMessages(systemMessage, wireMessages); return paramsCustomizer == null ? params : paramsCustomizer.apply(params); } } diff --git a/src/main/java/net/ladenthin/llama/SessionState.java b/src/main/java/net/ladenthin/llama/SessionState.java new file mode 100644 index 00000000..11713db0 --- /dev/null +++ b/src/main/java/net/ladenthin/llama/SessionState.java @@ -0,0 +1,184 @@ +// SPDX-FileCopyrightText: 2026 Bernard Ladenthin +// +// SPDX-License-Identifier: MIT + +package net.ladenthin.llama; + +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Supplier; +import lombok.ToString; +import net.ladenthin.llama.value.ChatMessage; +import net.ladenthin.llama.value.ChatTranscript; +import net.ladenthin.llama.value.Pair; +import org.jspecify.annotations.Nullable; + +/** + * The lock-guarded conversation state machine behind {@link Session}: a + * {@code streamingActive} flag plus a {@link ChatTranscript}, all serialised on a + * single intrinsic lock. Extracted from {@link Session} so the concurrency contract + * — the compound atomicity of the streaming guard and the two-phase transcript + * commit — is testable independently of {@link LlamaModel} and its native library + * (the same testability rationale that produced {@link ChatTranscript}). + * + *

The native model call is injected as a callback that runs under the + * lock, between the guard check and the transcript commit, so {@link Session} + * keeps exactly its previous serialisation semantics while this class owns no model + * state and can be exercised with plain lambdas.

+ * + *

Compound-atomicity invariant

+ * + *

Under every interleaving of concurrent callers, the {@code streamingActive} + * flag and the transcript move together: {@link #send(String, BiFunction)} and + * {@link #beginStream(String, BiFunction)} fail-fast with + * {@link IllegalStateException} while a stream is in progress, and the transcript + * therefore always stays a strictly alternating {@code user, assistant, …} + * sequence. This is the property the {@code SessionStateInterleavingTest} pins under + * vmlens.

+ * + *

{@code toString} contract

+ * + *

Lombok-generated over the slot id, transcript, and streaming flag. The + * intrinsic {@code lock} is excluded as a noise field. {@code equals}/{@code + * hashCode} are intentionally NOT generated: like {@link Session} this is a mutable + * lifecycle handle identified by identity.

+ */ +@ToString +public final class SessionState { + + // Intrinsic lock used only for synchronisation; rendering its identity adds noise. + @ToString.Exclude + private final Object lock = new Object(); + + private final int slotId; + + private final ChatTranscript transcript; + + private boolean streamingActive; + + /** + * Create a new state machine for the given slot, with an optional system prompt. + * + * @param slotId the slot id, used only to render diagnostic messages + * @param systemMessage optional system prompt (may be {@code null} or empty) + */ + public SessionState(int slotId, @Nullable String systemMessage) { + this.slotId = slotId; + this.transcript = new ChatTranscript(systemMessage); + } + + /** + * Run a synchronous send round atomically: fail-fast if a stream is in + * progress, invoke {@code modelCall} (under the lock) with the system message + * and the wire-format messages carrying a pending user turn, then commit the + * user turn and the returned reply as one round. + * + * @param userMessage the user turn to commit on success + * @param modelCall the model invocation; receives {@code (systemMessage, + * wireMessages)} and returns the assistant reply text + * @return the assistant reply produced by {@code modelCall} + * @throws IllegalStateException if a stream is currently in progress + */ + public String send(String userMessage, BiFunction<@Nullable String, List>, String> modelCall) { + synchronized (lock) { + requireNotStreaming("send"); + String reply = + modelCall.apply(transcript.getSystemMessage(), transcript.messagesWithPendingUserTurn(userMessage)); + transcript.appendRound(userMessage, reply); + return reply; + } + } + + /** + * Begin a streaming round atomically: fail-fast if a stream is in progress, + * invoke {@code modelCall} (under the lock) to obtain the stream handle, then + * commit the user turn and mark streaming active. The matching assistant turn + * is committed later via {@link #commitStreamedReply(String)}. + * + * @param the stream-handle type returned by {@code modelCall} + * @param userMessage the user turn to commit on success + * @param modelCall the model invocation; receives {@code (systemMessage, + * wireMessages)} and returns the stream handle + * @return the stream handle produced by {@code modelCall} + * @throws IllegalStateException if a stream is currently in progress + */ + public R beginStream( + String userMessage, BiFunction<@Nullable String, List>, R> modelCall) { + synchronized (lock) { + requireNotStreaming("stream"); + R streamHandle = + modelCall.apply(transcript.getSystemMessage(), transcript.messagesWithPendingUserTurn(userMessage)); + transcript.appendUserTurn(userMessage); + streamingActive = true; + return streamHandle; + } + } + + /** + * Commit the assistant reply accumulated from a prior + * {@link #beginStream(String, BiFunction)} and clear the streaming flag. + * + * @param assistantText the assistant text to append + * @throws IllegalStateException if no stream is currently in progress + */ + public void commitStreamedReply(String assistantText) { + synchronized (lock) { + if (!streamingActive) { + throw new IllegalStateException("no stream in progress on slot " + slotId + + " (transcript=" + transcript.size() + " turns)" + + "; call stream(...) first"); + } + transcript.appendAssistantTurn(assistantText); + streamingActive = false; + } + } + + /** + * Run an action under the lock, but only when no stream is in progress + * (used for slot save/restore, which must not race a streaming round). + * + * @param the action result type + * @param operation the operation name used in the diagnostic message + * @param action the action to run while holding the lock + * @return the action result + * @throws IllegalStateException if a stream is currently in progress + */ + public R runWhenNotStreaming(String operation, Supplier action) { + synchronized (lock) { + requireNotStreaming(operation); + return action.get(); + } + } + + /** + * Run an action under the lock with no streaming guard (used for slot erase on + * {@code close()}, which is valid regardless of streaming state). + * + * @param action the action to run while holding the lock + */ + public void runUnderLock(Runnable action) { + synchronized (lock) { + action.run(); + } + } + + /** + * Return an unmodifiable snapshot of the transcript, including the system + * message if one was configured. + * + * @return the unmodifiable transcript snapshot + */ + public List snapshot() { + synchronized (lock) { + return transcript.snapshot(); + } + } + + private void requireNotStreaming(String operation) { + if (streamingActive) { + throw new IllegalStateException("stream in progress on slot " + slotId + + " (transcript=" + transcript.size() + " turns)" + + "; call commitStreamedReply(...) before " + operation + "(...)"); + } + } +} diff --git a/src/test/java/net/ladenthin/llama/SessionStateTest.java b/src/test/java/net/ladenthin/llama/SessionStateTest.java new file mode 100644 index 00000000..65dd6147 --- /dev/null +++ b/src/test/java/net/ladenthin/llama/SessionStateTest.java @@ -0,0 +1,111 @@ +// SPDX-FileCopyrightText: 2026 Bernard Ladenthin +// +// SPDX-License-Identifier: MIT +package net.ladenthin.llama; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; +import java.util.stream.Collectors; +import net.ladenthin.llama.value.ChatMessage; +import org.junit.jupiter.api.Test; + +/** + * Model-free unit tests pinning the {@link SessionState} contract extracted from + * {@link Session}: the two-phase transcript commit and the streaming guard. These + * run in the ordinary suite (no native library), complementing the model-gated + * {@code SessionConcurrencyTest} and the agent-driven + * {@code SessionStateInterleavingTest}. + */ +public class SessionStateTest { + + private static List roles(List messages) { + return messages.stream().map(ChatMessage::getRole).collect(Collectors.toList()); + } + + @Test + public void send_commitsRound_andReturnsReply() { + SessionState state = new SessionState(0, null); + + String reply = state.send("hello", (systemMessage, wireMessages) -> "hi there"); + + assertThat(reply, is("hi there")); + assertThat(roles(state.snapshot()), contains("user", "assistant")); + } + + @Test + public void send_passesSystemMessageAndPendingUserTurn_toModelCall() { + SessionState state = new SessionState(0, "be terse"); + + state.send("first", (systemMessage, wireMessages) -> { + assertThat(systemMessage, is("be terse")); + // wire format carries the pending user turn before it is committed + assertThat(wireMessages.get(wireMessages.size() - 1).getValue(), is("first")); + return "ok"; + }); + + assertThat(roles(state.snapshot()), contains("system", "user", "assistant")); + } + + @Test + public void beginStream_thenCommit_completesRound_andClearsGuard() { + SessionState state = new SessionState(0, null); + + String handle = state.beginStream("q", (systemMessage, wireMessages) -> "HANDLE"); + assertThat(handle, is("HANDLE")); + // only the user turn is committed until the reply is committed + assertThat(roles(state.snapshot()), contains("user")); + + state.commitStreamedReply("a"); + assertThat(roles(state.snapshot()), contains("user", "assistant")); + + // guard cleared: a follow-up send succeeds + state.send("again", (systemMessage, wireMessages) -> "b"); + assertThat(roles(state.snapshot()), contains("user", "assistant", "user", "assistant")); + } + + @Test + public void send_whileStreaming_throwsWithDiagnosticMessage() { + SessionState state = new SessionState(7, null); + state.beginStream("q", (systemMessage, wireMessages) -> "HANDLE"); + + IllegalStateException ex = assertThrows(IllegalStateException.class, () -> state.send("x", (s, w) -> "y")); + assertThat(ex.getMessage(), containsString("stream in progress on slot 7")); + assertThat(ex.getMessage(), containsString("before send(...)")); + } + + @Test + public void runWhenNotStreaming_throwsWhileStreaming_andRunsOtherwise() { + SessionState state = new SessionState(3, null); + + assertThat(state.runWhenNotStreaming("save", () -> "saved"), is("saved")); + + state.beginStream("q", (systemMessage, wireMessages) -> "HANDLE"); + IllegalStateException ex = + assertThrows(IllegalStateException.class, () -> state.runWhenNotStreaming("restore", () -> "restored")); + assertThat(ex.getMessage(), containsString("before restore(...)")); + } + + @Test + public void commitStreamedReply_withoutStream_throws() { + SessionState state = new SessionState(0, null); + + IllegalStateException ex = assertThrows(IllegalStateException.class, () -> state.commitStreamedReply("a")); + assertThat(ex.getMessage(), containsString("no stream in progress")); + } + + @Test + public void runUnderLock_runsActionRegardlessOfStreamingState() { + SessionState state = new SessionState(0, null); + state.beginStream("q", (systemMessage, wireMessages) -> "HANDLE"); + + boolean[] ran = {false}; + state.runUnderLock(() -> ran[0] = true); + + assertThat(ran[0], is(true)); + } +} diff --git a/src/test/java/net/ladenthin/llama/vmlens/SessionStateInterleavingTest.java b/src/test/java/net/ladenthin/llama/vmlens/SessionStateInterleavingTest.java new file mode 100644 index 00000000..08a4df45 --- /dev/null +++ b/src/test/java/net/ladenthin/llama/vmlens/SessionStateInterleavingTest.java @@ -0,0 +1,97 @@ +// SPDX-FileCopyrightText: 2026 Bernard Ladenthin +// +// SPDX-License-Identifier: MIT +package net.ladenthin.llama.vmlens; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +import com.vmlens.api.AllInterleavings; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import net.ladenthin.llama.SessionState; +import net.ladenthin.llama.value.ChatMessage; +import org.junit.jupiter.api.Test; + +/** + * vmlens interleaving analysis of the {@link SessionState} streaming-guard and + * two-phase transcript commit. + * + *

One thread runs a synchronous {@code send} round while another runs a + * {@code beginStream} followed by {@code commitStreamedReply}. Both mutate the + * shared {@code streamingActive} flag and the transcript under {@code SessionState}'s + * single lock. The compound-atomicity invariant that must hold under every + * interleaving: the resulting transcript is always a strictly alternating + * {@code user, assistant, …} sequence (no torn round, no two same-role turns in a + * row), the streaming flag is never left stuck (a follow-up {@code send} succeeds), + * and the only acceptable failure is the {@link IllegalStateException} a {@code send} + * raises when it loses the race to an in-progress stream.

+ * + *

This is the multi-variable check-then-act ordering class that the existing + * single-{@code volatile} {@code CancellationToken} Lincheck/jcstress tests do not + * cover and that the model-gated {@code SessionConcurrencyTest} cannot explore + * exhaustively. The model call is injected as a pure-Java lambda, so no native + * library is needed. Like the rest of the package it runs only under the vmlens + * agent (see the {@code vmlens} profile and the {@code maven-surefire-plugin} + * {@code } in {@code pom.xml}).

+ * + *

Raw {@link Thread} usage is intentional: vmlens explores the interleavings of + * the threads it directly manages.

+ */ +public class SessionStateInterleavingTest { + + /** + * Drives a synchronous send against a stream round through every interleaving and + * asserts strict user/assistant alternation plus a non-stuck streaming flag. + * + * @throws InterruptedException if joining a worker thread is interrupted + */ + @Test + public void sendRacingStreamKeepsStrictAlternation() throws InterruptedException { + try (AllInterleavings allInterleavings = new AllInterleavings("SessionState.streamGuard")) { + while (allInterleavings.hasNext()) { + final SessionState state = new SessionState(0, null); + state.send("u0", (systemMessage, wireMessages) -> "a0"); // seed one aligned round + + final AtomicReference failure = new AtomicReference<>(); + + final Thread sender = new Thread(() -> { + try { + state.send("u1", (systemMessage, wireMessages) -> "a1"); + } catch (IllegalStateException lostRaceToStream) { + // Acceptable: a stream was in progress when send checked the guard. + } catch (Throwable t) { + failure.compareAndSet(null, t); + } + }); + final Thread streamer = new Thread(() -> { + try { + state.beginStream("u2", (systemMessage, wireMessages) -> "streamHandle"); + state.commitStreamedReply("a2"); + } catch (Throwable t) { + failure.compareAndSet(null, t); + } + }); + + sender.start(); + streamer.start(); + sender.join(); + streamer.join(); + + assertThat(failure.get(), is(nullValue())); + assertStrictAlternation(state.snapshot()); + // The streaming flag must be cleared: a follow-up send must not throw. + state.send("u3", (systemMessage, wireMessages) -> "a3"); + assertStrictAlternation(state.snapshot()); + } + } + } + + private static void assertStrictAlternation(List messages) { + for (int i = 0; i < messages.size(); i++) { + final String expectedRole = (i % 2 == 0) ? "user" : "assistant"; + assertThat("role at index " + i, messages.get(i).getRole(), is(expectedRole)); + } + } +} diff --git a/src/test/java/net/ladenthin/llama/vmlens/VmlensInterleavingSmokeTest.java b/src/test/java/net/ladenthin/llama/vmlens/VmlensInterleavingSmokeTest.java new file mode 100644 index 00000000..45f09dd2 --- /dev/null +++ b/src/test/java/net/ladenthin/llama/vmlens/VmlensInterleavingSmokeTest.java @@ -0,0 +1,57 @@ +// SPDX-FileCopyrightText: 2026 Bernard Ladenthin +// +// SPDX-License-Identifier: MIT +package net.ladenthin.llama.vmlens; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import com.vmlens.api.AllInterleavings; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.Test; + +/** + * Minimal vmlens interleaving-analysis smoke test demonstrating the setup. + * + *

Two threads each increment a shared {@link AtomicLong}; because + * {@link AtomicLong#incrementAndGet()} is atomic, the final value must always be + * {@code 2} regardless of how the two threads interleave. vmlens drives the body + * of the {@link AllInterleavings} loop through every possible interleaving and + * fails if any ordering violates the invariant.

+ * + *

This test is meaningful only when executed under the vmlens java agent, + * which is wired up by the {@code vmlens} Maven profile and the dedicated vmlens + * CI job ({@code mvn -Pvmlens test}). Without the agent + * {@link AllInterleavings#hasNext()} returns {@code false} and the loop body is + * skipped, so the ordinary surefire run excludes this class (see the + * {@code maven-surefire-plugin} {@code } in {@code pom.xml}). It is the + * staged "one class for now" baseline; widen the profile's {@code } as + * more concurrency tests are added.

+ */ +public class VmlensInterleavingSmokeTest { + + /** + * Verifies that two concurrent atomic increments always sum to two under + * every thread interleaving explored by vmlens. + * + * @throws InterruptedException if joining a worker thread is interrupted + */ + @Test + public void atomicIncrementsAreLinearizable() throws InterruptedException { + try (AllInterleavings allInterleavings = new AllInterleavings("VmlensInterleavingSmokeTest.atomicIncrements")) { + while (allInterleavings.hasNext()) { + final AtomicLong counter = new AtomicLong(); + + final Thread first = new Thread(counter::incrementAndGet); + final Thread second = new Thread(counter::incrementAndGet); + + first.start(); + second.start(); + first.join(); + second.join(); + + assertThat(counter.get(), is(2L)); + } + } + } +}