Skip to content

Commit 51747ce

Browse files
committed
refactor: implement session management endpoints and refactor session handling
- Added new session management endpoints: get-sessions, get-session-info, create-session, delete-session, and add-system-message-to-turns. - Introduced AgentSessionStore class to encapsulate session-related logic, including creating new turns and managing chat surface sessions. - Refactored index.ts to streamline endpoint setup and improve code organization. - Created error handling utilities in errors.ts for better error management.
1 parent 609f6ad commit 51747ce

10 files changed

Lines changed: 1261 additions & 1054 deletions

agentResponseEvents.ts

Lines changed: 0 additions & 1 deletion
This file was deleted.

agentTurnService.ts

Lines changed: 355 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
import type { AdminUser, IAdminForth } from "adminforth";
2+
import { logger } from "adminforth";
3+
import { randomUUID } from "crypto";
4+
import { HumanMessage, SystemMessage } from "langchain";
5+
import type { BaseCheckpointSaver } from "@langchain/langgraph";
6+
import { createAgentChatModel, callAgent } from "./agent/simpleAgent.js";
7+
import { createSequenceDebugCollector } from "./agent/middleware/sequenceDebug.js";
8+
import { detectUserLanguage, type PreviousUserMessage } from "./agent/languageDetect.js";
9+
import { prepareApiBasedTools as buildApiBasedTools } from "./apiBasedTools.js";
10+
import type { AgentEventEmitter } from "./agentEvents.js";
11+
import { buildAgentTurnSystemPrompt } from "./agent/systemPrompt.js";
12+
import type { CurrentPageContext } from "./agent/tools/getUserLocation.js";
13+
import { isAbortError, getErrorMessage } from "./errors.js";
14+
import type { AgentSessionStore } from "./sessionStore.js";
15+
import type { PluginOptions } from "./types.js";
16+
17+
type AgentTurnRunInput = {
18+
prompt: string;
19+
sessionId: string;
20+
turnId: string;
21+
previousUserMessages: PreviousUserMessage[];
22+
modeName?: string | null;
23+
userTimeZone: string;
24+
currentPage?: CurrentPageContext;
25+
abortSignal?: AbortSignal;
26+
adminUser: AdminUser;
27+
sequenceDebugCollector: ReturnType<typeof createSequenceDebugCollector>;
28+
emit?: AgentEventEmitter;
29+
};
30+
31+
export type RunAndPersistAgentResponseInput = {
32+
prompt: string;
33+
sessionId: string;
34+
modeName?: string | null;
35+
userTimeZone: string;
36+
currentPage?: CurrentPageContext;
37+
abortSignal?: AbortSignal;
38+
adminUser: AdminUser;
39+
emit?: AgentEventEmitter;
40+
failureLogMessage: string;
41+
abortLogMessage: string;
42+
};
43+
44+
export type RunAndPersistAgentResponseResult = {
45+
text: string;
46+
turnId: string;
47+
aborted: boolean;
48+
failed: boolean;
49+
};
50+
51+
export type HandleTurnInput = Omit<RunAndPersistAgentResponseInput, "failureLogMessage" | "abortLogMessage"> & {
52+
emit: AgentEventEmitter;
53+
failureLogMessage?: string;
54+
abortLogMessage?: string;
55+
};
56+
57+
type AgentTurnServiceOptions = {
58+
getAdminforth: () => IAdminForth;
59+
getPluginInstanceId: () => string;
60+
options: PluginOptions;
61+
sessionStore: AgentSessionStore;
62+
getCheckpointer: () => BaseCheckpointSaver;
63+
getInternalAgentResourceIds: () => string[];
64+
getAgentSystemPrompt: () => Promise<string>;
65+
};
66+
67+
const VEGA_LITE_FENCE_START = "```vega-lite";
68+
const COMPLETE_VEGA_LITE_BLOCK_RE = /```vega-lite[\s\S]*?```/;
69+
70+
export class AgentTurnService {
71+
constructor(private serviceOptions: AgentTurnServiceOptions) {}
72+
73+
private async runAgentTurn(input: AgentTurnRunInput) {
74+
const adminforth = this.serviceOptions.getAdminforth();
75+
const options = this.serviceOptions.options;
76+
let fullResponse = "";
77+
let bufferedTextDelta = "";
78+
let isRenderingVegaLite = false;
79+
const maxTokens = options.maxTokens ?? 1000;
80+
const selectedMode = options.modes.find((mode) => mode.name === input.modeName) ?? options.modes[0];
81+
const [primaryModelSpec, summaryModelSpec] = await Promise.all([
82+
createAgentChatModel({
83+
adapter: selectedMode.completionAdapter,
84+
maxTokens,
85+
purpose: "primary",
86+
}),
87+
createAgentChatModel({
88+
adapter: selectedMode.completionAdapter,
89+
maxTokens,
90+
purpose: "summary",
91+
}),
92+
]);
93+
const model = primaryModelSpec.model;
94+
const summaryModel = summaryModelSpec.model;
95+
const modelMiddleware = primaryModelSpec.middleware;
96+
97+
const userLanguage = await detectUserLanguage(selectedMode.completionAdapter, input.prompt, input.previousUserMessages)
98+
.catch((error) => {
99+
if (input.abortSignal?.aborted || isAbortError(error)) {
100+
throw error;
101+
}
102+
103+
logger.warn(`Failed to detect user language: ${getErrorMessage(error)}`);
104+
return null;
105+
});
106+
const systemPrompt = buildAgentTurnSystemPrompt({
107+
agentSystemPrompt: await this.serviceOptions.getAgentSystemPrompt(),
108+
adminUser: input.adminUser,
109+
usernameField: adminforth.config.auth!.usernameField,
110+
userLanguage,
111+
});
112+
const apiBasedTools = buildApiBasedTools(
113+
adminforth,
114+
this.serviceOptions.getInternalAgentResourceIds(),
115+
);
116+
117+
const stream = await callAgent({
118+
name: `adminforth-agent-${this.serviceOptions.getPluginInstanceId()}`,
119+
model,
120+
summaryModel,
121+
modelMiddleware,
122+
checkpointer: this.serviceOptions.getCheckpointer(),
123+
messages: [
124+
new SystemMessage(systemPrompt),
125+
new HumanMessage(input.prompt),
126+
],
127+
adminUser: input.adminUser,
128+
adminforth,
129+
apiBasedTools,
130+
customComponentsDir: adminforth.config.customization.customComponentsDir ?? "custom",
131+
sessionId: input.sessionId,
132+
turnId: input.turnId,
133+
currentPage: input.currentPage,
134+
userTimeZone: input.userTimeZone,
135+
abortSignal: input.abortSignal,
136+
emitToolCallEvent: (event) => {
137+
input.sequenceDebugCollector.handleToolCallEvent(event);
138+
void input.emit?.({
139+
type: "tool-call",
140+
data: event,
141+
});
142+
},
143+
sequenceDebugSink: input.sequenceDebugCollector,
144+
});
145+
146+
for await (const rawChunk of stream as AsyncIterable<[any, any]>) {
147+
if (input.abortSignal?.aborted) {
148+
throw new DOMException("This operation was aborted", "AbortError");
149+
}
150+
151+
const [token, metadata] = rawChunk;
152+
153+
const nodeName =
154+
typeof metadata?.langgraph_node === "string"
155+
? metadata.langgraph_node
156+
: "";
157+
158+
if (nodeName && !["model", "model_request"].includes(nodeName)) {
159+
continue;
160+
}
161+
162+
const blocks = Array.isArray(token?.contentBlocks)
163+
? token.contentBlocks
164+
: Array.isArray(token?.content)
165+
? token.content
166+
: [];
167+
const reasoningDelta = blocks
168+
.filter((b: any) => b?.type === "reasoning")
169+
.map((b: any) => String(b.reasoning ?? ""))
170+
.join("");
171+
172+
const textDelta = blocks
173+
.filter((b: any) => b?.type === "text")
174+
.map((b: any) => String(b.text ?? ""))
175+
.join("");
176+
177+
if (reasoningDelta) {
178+
await input.emit?.({
179+
type: "reasoning-delta",
180+
delta: reasoningDelta,
181+
});
182+
}
183+
184+
if (textDelta) {
185+
fullResponse += textDelta;
186+
bufferedTextDelta += textDelta;
187+
188+
if (
189+
bufferedTextDelta.includes(VEGA_LITE_FENCE_START) &&
190+
!COMPLETE_VEGA_LITE_BLOCK_RE.test(bufferedTextDelta)
191+
) {
192+
if (!isRenderingVegaLite) {
193+
isRenderingVegaLite = true;
194+
await input.emit?.({
195+
type: "rendering",
196+
phase: "start",
197+
label: "Rendering...",
198+
});
199+
}
200+
continue;
201+
}
202+
203+
if (isRenderingVegaLite) {
204+
isRenderingVegaLite = false;
205+
await input.emit?.({
206+
type: "rendering",
207+
phase: "end",
208+
label: "Rendering...",
209+
});
210+
}
211+
212+
const streamableLength = bufferedTextDelta.includes(VEGA_LITE_FENCE_START)
213+
? bufferedTextDelta.length
214+
: bufferedTextDelta.length - getPartialVegaLiteFenceStartLength(bufferedTextDelta);
215+
216+
if (!streamableLength) {
217+
continue;
218+
}
219+
220+
await input.emit?.({
221+
type: "text-delta",
222+
delta: bufferedTextDelta.slice(0, streamableLength),
223+
});
224+
bufferedTextDelta = bufferedTextDelta.slice(streamableLength);
225+
}
226+
}
227+
228+
if (isRenderingVegaLite) {
229+
await input.emit?.({
230+
type: "rendering",
231+
phase: "end",
232+
label: "Rendering...",
233+
});
234+
}
235+
236+
if (bufferedTextDelta) {
237+
await input.emit?.({
238+
type: "text-delta",
239+
delta: bufferedTextDelta,
240+
});
241+
}
242+
243+
return {
244+
text: fullResponse,
245+
};
246+
}
247+
248+
async runAndPersistAgentResponse(input: RunAndPersistAgentResponseInput) {
249+
const adminforth = this.serviceOptions.getAdminforth();
250+
const options = this.serviceOptions.options;
251+
const previousUserMessages = await this.serviceOptions.sessionStore.getPreviousUserMessages(input.sessionId);
252+
const turnId = await this.serviceOptions.sessionStore.createNewTurn(input.sessionId, input.prompt);
253+
await adminforth.resource(options.sessionResource.resourceId).update(input.sessionId, {
254+
[options.sessionResource.createdAtField]: new Date().toISOString(),
255+
});
256+
const sequenceDebugCollector = createSequenceDebugCollector();
257+
let fullResponse = "";
258+
let aborted = false;
259+
let failed = false;
260+
261+
try {
262+
const agentResponse = await this.runAgentTurn({
263+
prompt: input.prompt,
264+
sessionId: input.sessionId,
265+
turnId,
266+
previousUserMessages,
267+
modeName: input.modeName,
268+
userTimeZone: input.userTimeZone,
269+
currentPage: input.currentPage,
270+
abortSignal: input.abortSignal,
271+
adminUser: input.adminUser,
272+
sequenceDebugCollector,
273+
emit: input.emit,
274+
});
275+
fullResponse = agentResponse.text;
276+
} catch (error) {
277+
if (input.abortSignal?.aborted || isAbortError(error)) {
278+
aborted = true;
279+
logger.info(input.abortLogMessage);
280+
} else {
281+
failed = true;
282+
fullResponse = getErrorMessage(error);
283+
logger.error(`${input.failureLogMessage}:\n${fullResponse}`);
284+
}
285+
}
286+
287+
sequenceDebugCollector.flush();
288+
const turnUpdates: Record<string, unknown> = {
289+
[options.turnResource.responseField]: fullResponse,
290+
};
291+
292+
if (options.turnResource.debugField) {
293+
turnUpdates[options.turnResource.debugField] = sequenceDebugCollector.getHistory();
294+
}
295+
296+
await adminforth.resource(options.turnResource.resourceId).update(turnId, turnUpdates);
297+
298+
return {
299+
text: fullResponse,
300+
turnId,
301+
aborted,
302+
failed,
303+
};
304+
}
305+
306+
async handleTurn(input: HandleTurnInput) {
307+
await input.emit({
308+
type: "turn-started",
309+
messageId: randomUUID(),
310+
});
311+
312+
const agentResponse = await this.runAndPersistAgentResponse({
313+
prompt: input.prompt,
314+
sessionId: input.sessionId,
315+
modeName: input.modeName,
316+
userTimeZone: input.userTimeZone,
317+
currentPage: input.currentPage,
318+
abortSignal: input.abortSignal,
319+
adminUser: input.adminUser,
320+
emit: input.emit,
321+
failureLogMessage: input.failureLogMessage ?? "Agent response failed",
322+
abortLogMessage: input.abortLogMessage ?? "Agent response aborted",
323+
});
324+
325+
if (agentResponse.failed) {
326+
await input.emit({
327+
type: "error",
328+
error: agentResponse.text,
329+
});
330+
} else if (!agentResponse.aborted) {
331+
await input.emit({
332+
type: "response",
333+
text: agentResponse.text,
334+
sessionId: input.sessionId,
335+
turnId: agentResponse.turnId,
336+
});
337+
}
338+
339+
await input.emit({
340+
type: "finish",
341+
});
342+
343+
return agentResponse;
344+
}
345+
}
346+
347+
function getPartialVegaLiteFenceStartLength(text: string): number {
348+
for (let length = Math.min(text.length, VEGA_LITE_FENCE_START.length - 1); length > 0; length -= 1) {
349+
if (VEGA_LITE_FENCE_START.startsWith(text.slice(-length))) {
350+
return length;
351+
}
352+
}
353+
354+
return 0;
355+
}

0 commit comments

Comments
 (0)