99import lombok .ToString ;
1010import net .ladenthin .llama .parameters .InferenceParameters ;
1111import net .ladenthin .llama .value .ChatMessage ;
12- import net .ladenthin .llama .value .ChatTranscript ;
12+ import net .ladenthin .llama .value .Pair ;
1313import org .jspecify .annotations .Nullable ;
1414
1515/**
3030 * {@link #commitStreamedReply(String)}.
3131 * </p>
3232 *
33- * <p>{@code toString} is generated by Lombok over the slot id, system message, and
34- * accumulated turns. The owning {@link LlamaModel} is excluded because its
35- * {@code toString} would render native state. The {@code paramsCustomizer}
36- * {@ link UnaryOperator} is excluded because lambda {@code toString} is the implementation
37- * hash, not useful in logs. The intrinsic {@code lock } is excluded as a noise field .
33+ * <p>{@code toString} is generated by Lombok over the slot id and the
34+ * {@link SessionState} (which renders the transcript and streaming flag). The owning
35+ * {@link LlamaModel} is excluded because its {@code toString} would render native
36+ * state. The {@code paramsCustomizer} {@ link UnaryOperator} is excluded because lambda
37+ * {@code toString } is the implementation hash, not useful in logs .
3838 * {@code equals}/{@code hashCode} are intentionally NOT generated: a session is a
3939 * mutable lifecycle handle managed by identity.</p>
4040 */
@@ -48,23 +48,18 @@ public final class Session implements AutoCloseable {
4848 private final int slotId ;
4949
5050 /**
51- * Append-only transcript with two-phase commit semantics. See the
52- * {@link net.ladenthin.llama.value.ChatTranscript} class Javadoc for the full invariant statement
53- * and the {@code ChatTranscriptTest} class for the running-documentation
54- * tests that pin the contract.
51+ * The lock-guarded streaming-flag + transcript state machine. Extracted to
52+ * {@link SessionState} so its concurrency contract (the two-phase commit and the
53+ * streaming guard) is testable without the native model; see that class and the
54+ * {@code SessionStateInterleavingTest}. This {@code Session} only adds the model
55+ * calls, injected as callbacks that {@link SessionState} runs under its lock.
5556 */
56- private final ChatTranscript transcript ;
57+ private final SessionState state ;
5758
5859 // Lambda UnaryOperator — toString is the implementation hash, not useful in logs.
5960 @ ToString .Exclude
6061 private final @ Nullable UnaryOperator <InferenceParameters > paramsCustomizer ;
6162
62- // Intrinsic lock used only for synchronisation; rendering its identity adds noise.
63- @ ToString .Exclude
64- private final Object lock = new Object ();
65-
66- private boolean streamingActive ;
67-
6863 /**
6964 * Create a session bound to a specific slot id, with an optional system prompt
7065 * applied to every {@link #send(String)} call.
@@ -95,7 +90,7 @@ public Session(
9590 @ Nullable UnaryOperator <InferenceParameters > paramsCustomizer ) {
9691 this .model = model ;
9792 this .slotId = slotId ;
98- this .transcript = new ChatTranscript ( systemMessage );
93+ this .state = new SessionState ( slotId , systemMessage );
9994 this .paramsCustomizer = paramsCustomizer ;
10095 }
10196
@@ -106,22 +101,13 @@ public Session(
106101 * @return the assistant's reply text
107102 */
108103 public String send (String userMessage ) {
109- synchronized (lock ) {
110- if (streamingActive ) {
111- throw new IllegalStateException ("stream in progress on slot " + slotId
112- + " (transcript=" + transcript .size () + " turns)"
113- + "; call commitStreamedReply(...) before send(...)" );
114- }
115- // Two-phase commit: build the wire-format with the pending user turn
116- // outside the transcript via messagesWithPendingUserTurn(...). On
117- // model success, commit BOTH turns atomically through appendRound(...).
118- // On model failure, nothing was committed — no rollback logic needed.
119- // Invariant pinned by ChatTranscriptTest.
120- InferenceParameters params = buildParamsWithPendingUserTurn (userMessage );
121- String reply = model .chatCompleteText (params );
122- transcript .appendRound (userMessage , reply );
123- return reply ;
124- }
104+ // Two-phase commit lives in SessionState.send(...): it guards against an
105+ // in-progress stream, builds the wire-format with the pending user turn, runs
106+ // the model call below under the lock, and on success commits BOTH turns
107+ // atomically. On model failure nothing is committed — no rollback needed.
108+ return state .send (
109+ userMessage ,
110+ (systemMessage , wireMessages ) -> model .chatCompleteText (buildParams (systemMessage , wireMessages )));
125111 }
126112
127113 /**
@@ -134,20 +120,13 @@ public String send(String userMessage) {
134120 * @return a {@link LlamaIterable} that yields assistant reply chunks
135121 */
136122 public LlamaIterable stream (String userMessage ) {
137- synchronized (lock ) {
138- if (streamingActive ) {
139- throw new IllegalStateException ("stream in progress on slot " + slotId
140- + " (transcript=" + transcript .size () + " turns)"
141- + "; call commitStreamedReply(...) before stream(...)" );
142- }
143- // Two-phase commit: see send(). The user turn is committed only after
144- // generateChat successfully returns the iterable; the assistant turn is
145- // committed separately by commitStreamedReply(...).
146- LlamaIterable iterable = model .generateChat (buildParamsWithPendingUserTurn (userMessage ));
147- transcript .appendUserTurn (userMessage );
148- streamingActive = true ;
149- return iterable ;
150- }
123+ // SessionState.beginStream(...) guards against an in-progress stream, runs the
124+ // model call below under the lock, and on success commits the user turn and
125+ // marks streaming active; the assistant turn is committed separately by
126+ // commitStreamedReply(...).
127+ return state .beginStream (
128+ userMessage ,
129+ (systemMessage , wireMessages ) -> model .generateChat (buildParams (systemMessage , wireMessages )));
151130 }
152131
153132 /**
@@ -157,15 +136,7 @@ public LlamaIterable stream(String userMessage) {
157136 * @param assistantText the assistant text accumulated from a prior {@link #stream(String)} call
158137 */
159138 public void commitStreamedReply (String assistantText ) {
160- synchronized (lock ) {
161- if (!streamingActive ) {
162- throw new IllegalStateException ("no stream in progress on slot " + slotId
163- + " (transcript=" + transcript .size () + " turns)"
164- + "; call stream(...) first" );
165- }
166- transcript .appendAssistantTurn (assistantText );
167- streamingActive = false ;
168- }
139+ state .commitStreamedReply (assistantText );
169140 }
170141
171142 /**
@@ -175,14 +146,7 @@ public void commitStreamedReply(String assistantText) {
175146 * @return the JSON response from the native save action
176147 */
177148 public String save (String filepath ) {
178- synchronized (lock ) {
179- if (streamingActive ) {
180- throw new IllegalStateException ("stream in progress on slot " + slotId
181- + " (transcript=" + transcript .size () + " turns)"
182- + "; call commitStreamedReply(...) before save(...)" );
183- }
184- return model .saveSlot (slotId , filepath );
185- }
149+ return state .runWhenNotStreaming ("save" , () -> model .saveSlot (slotId , filepath ));
186150 }
187151
188152 /**
@@ -192,48 +156,36 @@ public String save(String filepath) {
192156 * @return the JSON response from the native restore action
193157 */
194158 public String restore (String filepath ) {
195- synchronized (lock ) {
196- if (streamingActive ) {
197- throw new IllegalStateException ("stream in progress on slot " + slotId
198- + " (transcript=" + transcript .size () + " turns)"
199- + "; call commitStreamedReply(...) before restore(...)" );
200- }
201- return model .restoreSlot (slotId , filepath );
202- }
159+ return state .runWhenNotStreaming ("restore" , () -> model .restoreSlot (slotId , filepath ));
203160 }
204161
205162 /**
206163 * Transcript accessor.
207164 * @return the accumulated transcript so far, in order, including the system message if any
208165 */
209166 public List <ChatMessage > getMessages () {
210- synchronized (lock ) {
211- return transcript .snapshot ();
212- }
167+ return state .snapshot ();
213168 }
214169
215170 /** Erase the bound slot's KV cache. Does not modify the in-memory transcript. */
216171 @ Override
217172 public void close () {
218- synchronized (lock ) {
219- model .eraseSlot (slotId );
220- }
173+ state .runUnderLock (() -> model .eraseSlot (slotId ));
221174 }
222175
223176 /**
224- * Build inference parameters with a pending user turn appended to the existing
225- * transcript — without mutating the underlying {@link net.ladenthin.llama.value.ChatTranscript}. The
226- * actual transcript mutation happens AFTER the model call returns successfully,
227- * either via {@link net.ladenthin.llama.value.ChatTranscript#appendRound(String, String)} (send path)
228- * or {@link net.ladenthin.llama.value.ChatTranscript#appendUserTurn(String)} (stream path) .
177+ * Build inference parameters from the system message and the wire-format messages
178+ * supplied by {@link SessionState} ( the committed turns plus the pending user
179+ * turn), applying the optional {@code paramsCustomizer}. The transcript itself is
180+ * never mutated here; {@link SessionState} commits turns only after the model call
181+ * returns successfully .
229182 *
230- * @param pendingUserMessage the user turn to include in the wire format
231- * @return inference parameters carrying transcript + pending user turn
183+ * @param systemMessage the system prompt, or {@code null} when none was configured
184+ * @param wireMessages the committed turns plus the pending user turn
185+ * @return inference parameters carrying the system message + wire messages
232186 */
233- private InferenceParameters buildParamsWithPendingUserTurn (String pendingUserMessage ) {
234- InferenceParameters params = InferenceParameters .empty ()
235- .withMessages (
236- transcript .getSystemMessage (), transcript .messagesWithPendingUserTurn (pendingUserMessage ));
187+ private InferenceParameters buildParams (@ Nullable String systemMessage , List <Pair <String , String >> wireMessages ) {
188+ InferenceParameters params = InferenceParameters .empty ().withMessages (systemMessage , wireMessages );
237189 return paramsCustomizer == null ? params : paramsCustomizer .apply (params );
238190 }
239191}
0 commit comments