Skip to content

Commit 7fe925d

Browse files
authored
fix(deepchat): sync queue and stop states (#1376)
* fix(deepchat): sync queue and stop states * fix(deepchat): harden pending message states
1 parent 07344a1 commit 7fe925d

File tree

15 files changed

+1150
-111
lines changed

15 files changed

+1150
-111
lines changed

src/main/presenter/deepchatAgentPresenter/dispatch.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import type {
1717
import type { ChatMessage } from '@shared/types/core/chat-message'
1818
import { nanoid } from 'nanoid'
1919
import type { ToolOutputGuard } from './toolOutputGuard'
20+
import { buildTerminalErrorBlocks } from './messageStore'
2021

2122
type PermissionType = 'read' | 'write' | 'all' | 'command'
2223

@@ -734,17 +735,7 @@ export function finalize(state: StreamState, io: IoParams): void {
734735

735736
export function finalizeError(state: StreamState, io: IoParams, error: unknown): void {
736737
const errorMessage = error instanceof Error ? error.message : String(error)
737-
const errorBlock: AssistantMessageBlock = {
738-
type: 'error',
739-
content: errorMessage,
740-
status: 'error',
741-
timestamp: Date.now()
742-
}
743-
state.blocks.push(errorBlock)
744-
745-
for (const block of state.blocks) {
746-
if (block.status === 'pending') block.status = 'error'
747-
}
738+
state.blocks = buildTerminalErrorBlocks(state.blocks, errorMessage)
748739

749740
const endTime = Date.now()
750741
state.metadata.generationTime = endTime - state.startTime

src/main/presenter/deepchatAgentPresenter/index.ts

Lines changed: 142 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import {
4242
} from './contextBuilder'
4343
import { appendSummarySection, CompactionService, type CompactionIntent } from './compactionService'
4444
import { buildPersistableMessageTracePayload } from './messageTracePayload'
45-
import { DeepChatMessageStore } from './messageStore'
45+
import { buildTerminalErrorBlocks, DeepChatMessageStore } from './messageStore'
4646
import { PendingInputCoordinator } from './pendingInputCoordinator'
4747
import { DeepChatPendingInputStore } from './pendingInputStore'
4848
import { processStream } from './process'
@@ -95,6 +95,12 @@ type SystemPromptCacheEntry = {
9595
fingerprint: string
9696
}
9797

98+
type ActiveGeneration = {
99+
runId: string
100+
messageId: string
101+
abortController: AbortController
102+
}
103+
98104
const isReasoningEffort = (value: unknown): value is 'minimal' | 'low' | 'medium' | 'high' =>
99105
value === 'minimal' || value === 'low' || value === 'medium' || value === 'high'
100106

@@ -113,6 +119,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
113119
private readonly runtimeState: Map<string, DeepChatSessionState> = new Map()
114120
private readonly sessionGenerationSettings: Map<string, SessionGenerationSettings> = new Map()
115121
private readonly abortControllers: Map<string, AbortController> = new Map()
122+
private readonly activeGenerations: Map<string, ActiveGeneration> = new Map()
116123
private readonly sessionAgentIds: Map<string, string> = new Map()
117124
private readonly sessionProjectDirs: Map<string, string | null> = new Map()
118125
private readonly systemPromptCache: Map<string, SystemPromptCacheEntry> = new Map()
@@ -123,6 +130,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
123130
private readonly compactionService: CompactionService
124131
private readonly toolOutputGuard: ToolOutputGuard
125132
private readonly hooksBridge?: NewSessionHooksBridge
133+
private nextRunSequence = 0
126134

127135
constructor(
128136
llmProviderPresenter: ILlmProviderPresenter,
@@ -207,11 +215,13 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
207215
}
208216

209217
async destroySession(sessionId: string): Promise<void> {
210-
const controller = this.abortControllers.get(sessionId)
218+
const controller =
219+
this.activeGenerations.get(sessionId)?.abortController ?? this.abortControllers.get(sessionId)
211220
if (controller) {
212221
controller.abort()
213222
this.abortControllers.delete(sessionId)
214223
}
224+
this.activeGenerations.delete(sessionId)
215225

216226
this.pendingInputCoordinator.deleteBySession(sessionId)
217227
this.messageStore.deleteBySession(sessionId)
@@ -264,14 +274,19 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
264274
throw new Error(`Session ${sessionId} not found`)
265275
}
266276

267-
const record = this.pendingInputCoordinator.queuePendingInput(sessionId, content)
268-
if (this.isAwaitingToolQuestionFollowUp(sessionId)) {
269-
const claimedFollowUp = this.pendingInputCoordinator.claimQueuedInput(sessionId, record.id)
270-
void this.processMessage(sessionId, claimedFollowUp.payload, {
277+
const shouldClaimImmediately =
278+
this.isAwaitingToolQuestionFollowUp(sessionId) ||
279+
this.shouldStartQueuedInputImmediately(sessionId, state.status)
280+
const record = this.pendingInputCoordinator.queuePendingInput(sessionId, content, {
281+
state: shouldClaimImmediately ? 'claimed' : 'pending'
282+
})
283+
284+
if (record.state === 'claimed') {
285+
void this.processMessage(sessionId, record.payload, {
271286
projectDir: this.resolveProjectDir(sessionId),
272-
pendingQueueItemId: claimedFollowUp.id
287+
pendingQueueItemId: record.id
273288
})
274-
return claimedFollowUp
289+
return record
275290
}
276291

277292
void this.drainPendingQueueIfPossible(sessionId, 'enqueue')
@@ -345,6 +360,8 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
345360

346361
this.setSessionStatus(sessionId, 'generating')
347362
let consumedPendingQueueItem = false
363+
let userMessageId: string | null = null
364+
let assistantMessageId: string | null = null
348365

349366
try {
350367
const generationSettings = await this.getEffectiveSessionGenerationSettings(sessionId)
@@ -382,7 +399,6 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
382399
preserveInterleavedReasoning: interleavedReasoning.preserveReasoningContent,
383400
newUserContent: normalizedInput
384401
})
385-
let userMessageId: string
386402
let summaryState: SessionSummaryState
387403

388404
if (compactionIntent) {
@@ -397,7 +413,6 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
397413
this.messageStore.getNextOrderSeq(sessionId),
398414
userContent
399415
)
400-
this.emitMessageRefresh(sessionId, userMessageId)
401416
this.emitCompactionState(sessionId, {
402417
status: 'compacting',
403418
cursorOrderSeq: compactionIntent.targetCursorOrderSeq,
@@ -415,6 +430,10 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
415430
userContent
416431
)
417432
}
433+
if (!userMessageId) {
434+
throw new Error('Failed to create user message.')
435+
}
436+
this.emitMessageRefresh(sessionId, userMessageId)
418437

419438
this.dispatchHook('UserPromptSubmit', {
420439
sessionId,
@@ -442,21 +461,18 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
442461
)
443462

444463
const assistantOrderSeq = this.messageStore.getNextOrderSeq(sessionId)
445-
const assistantMessageId = this.messageStore.createAssistantMessage(
446-
sessionId,
447-
assistantOrderSeq
448-
)
464+
assistantMessageId = this.messageStore.createAssistantMessage(sessionId, assistantOrderSeq)
449465

450466
if (context?.pendingQueueItemId) {
451467
this.pendingInputCoordinator.consumeQueuedInput(sessionId, context.pendingQueueItemId)
452468
consumedPendingQueueItem = true
453469
}
454470

455471
if (context?.emitRefreshBeforeStream) {
456-
this.emitMessageRefresh(sessionId, assistantMessageId || userMessageId)
472+
this.emitMessageRefresh(sessionId, assistantMessageId)
457473
}
458474

459-
const result = await this.runStreamForMessage({
475+
const { runId, result } = await this.runStreamForMessage({
460476
sessionId,
461477
messageId: assistantMessageId,
462478
messages,
@@ -465,7 +481,11 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
465481
tools,
466482
interleavedReasoning
467483
})
468-
this.applyProcessResultStatus(sessionId, result)
484+
try {
485+
this.applyProcessResultStatus(sessionId, result, runId)
486+
} finally {
487+
this.clearActiveGeneration(sessionId, runId)
488+
}
469489
if (result?.status === 'completed') {
470490
void this.drainPendingQueueIfPossible(sessionId, 'completed')
471491
}
@@ -482,6 +502,15 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
482502
}
483503
}
484504
const errorMessage = err instanceof Error ? err.message : String(err)
505+
if (assistantMessageId) {
506+
const existingAssistant = this.messageStore.getMessage(assistantMessageId)
507+
const blocks = buildTerminalErrorBlocks(
508+
existingAssistant ? this.parseAssistantBlocks(existingAssistant.content) : [],
509+
errorMessage
510+
)
511+
this.messageStore.setMessageError(assistantMessageId, blocks)
512+
this.emitMessageRefresh(sessionId, assistantMessageId)
513+
}
485514
this.dispatchHook('Stop', {
486515
sessionId,
487516
providerId: state.providerId,
@@ -825,10 +854,32 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
825854
}
826855

827856
async cancelGeneration(sessionId: string): Promise<void> {
828-
const controller = this.abortControllers.get(sessionId)
829-
if (controller) {
830-
controller.abort()
831-
this.abortControllers.delete(sessionId)
857+
const activeGeneration = this.activeGenerations.get(sessionId)
858+
if (activeGeneration) {
859+
activeGeneration.abortController.abort()
860+
this.clearActiveGeneration(sessionId, activeGeneration.runId)
861+
862+
const assistantMessage = this.messageStore.getMessage(activeGeneration.messageId)
863+
if (assistantMessage?.role === 'assistant') {
864+
const blocks = buildTerminalErrorBlocks(
865+
this.parseAssistantBlocks(assistantMessage.content),
866+
'common.error.userCanceledGeneration'
867+
)
868+
this.messageStore.setMessageError(activeGeneration.messageId, blocks)
869+
this.emitMessageRefresh(sessionId, activeGeneration.messageId)
870+
}
871+
872+
this.dispatchTerminalHooks(sessionId, this.runtimeState.get(sessionId), {
873+
status: 'aborted',
874+
stopReason: 'user_stop',
875+
errorMessage: 'common.error.userCanceledGeneration'
876+
})
877+
} else {
878+
const controller = this.abortControllers.get(sessionId)
879+
if (controller) {
880+
controller.abort()
881+
this.abortControllers.delete(sessionId)
882+
}
832883
}
833884
this.setSessionStatus(sessionId, 'idle')
834885
}
@@ -1139,7 +1190,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
11391190
initialBlocks?: AssistantMessageBlock[]
11401191
promptPreview?: string
11411192
interleavedReasoning?: InterleavedReasoningConfig
1142-
}): Promise<ProcessResult> {
1193+
}): Promise<{ runId: string; result: ProcessResult }> {
11431194
const {
11441195
sessionId,
11451196
messageId,
@@ -1217,7 +1268,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
12171268
const supportsVision = this.supportsVision(state.providerId, state.modelId)
12181269

12191270
const abortController = new AbortController()
1220-
this.abortControllers.set(sessionId, abortController)
1271+
const activeGeneration = this.registerActiveGeneration(sessionId, messageId, abortController)
12211272

12221273
try {
12231274
this.dispatchHook('SessionStart', {
@@ -1229,7 +1280,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
12291280
projectDir
12301281
})
12311282

1232-
return await processStream({
1283+
const result = await processStream({
12331284
messages,
12341285
tools,
12351286
toolPresenter: this.toolPresenter,
@@ -1356,11 +1407,13 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
13561407
abortSignal: abortController.signal
13571408
}
13581409
})
1359-
} finally {
1360-
const active = this.abortControllers.get(sessionId)
1361-
if (active === abortController) {
1362-
this.abortControllers.delete(sessionId)
1410+
return {
1411+
runId: activeGeneration.runId,
1412+
result
13631413
}
1414+
} catch (error) {
1415+
this.clearActiveGeneration(sessionId, activeGeneration.runId)
1416+
throw error
13641417
}
13651418
}
13661419

@@ -1444,6 +1497,22 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
14441497
}
14451498
}
14461499

1500+
private shouldStartQueuedInputImmediately(
1501+
sessionId: string,
1502+
status: DeepChatSessionState['status']
1503+
): boolean {
1504+
if (!this.canDrainPendingQueueFromStatus(status, 'enqueue')) {
1505+
return false
1506+
}
1507+
if (this.hasPendingInteractions(sessionId)) {
1508+
return false
1509+
}
1510+
if (this.drainingPendingQueues.has(sessionId)) {
1511+
return false
1512+
}
1513+
return this.pendingInputCoordinator.getNextQueuedInput(sessionId) === null
1514+
}
1515+
14471516
private canDrainPendingQueueFromStatus(
14481517
status: DeepChatSessionState['status'],
14491518
reason: 'enqueue' | 'resume' | 'completed'
@@ -1455,10 +1524,44 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
14551524
return (reason === 'enqueue' || reason === 'resume') && status === 'error'
14561525
}
14571526

1527+
private registerActiveGeneration(
1528+
sessionId: string,
1529+
messageId: string,
1530+
abortController: AbortController
1531+
): ActiveGeneration {
1532+
const generation: ActiveGeneration = {
1533+
runId: `${sessionId}:${++this.nextRunSequence}`,
1534+
messageId,
1535+
abortController
1536+
}
1537+
this.activeGenerations.set(sessionId, generation)
1538+
this.abortControllers.set(sessionId, abortController)
1539+
return generation
1540+
}
1541+
1542+
private clearActiveGeneration(sessionId: string, runId: string): void {
1543+
const activeGeneration = this.activeGenerations.get(sessionId)
1544+
if (!activeGeneration || activeGeneration.runId !== runId) {
1545+
return
1546+
}
1547+
this.activeGenerations.delete(sessionId)
1548+
if (this.abortControllers.get(sessionId) === activeGeneration.abortController) {
1549+
this.abortControllers.delete(sessionId)
1550+
}
1551+
}
1552+
1553+
private isActiveRun(sessionId: string, runId: string): boolean {
1554+
return this.activeGenerations.get(sessionId)?.runId === runId
1555+
}
1556+
14581557
private applyProcessResultStatus(
14591558
sessionId: string,
1460-
result: ProcessResult | null | undefined
1559+
result: ProcessResult | null | undefined,
1560+
runId?: string
14611561
): void {
1562+
if (runId && !this.isActiveRun(sessionId, runId)) {
1563+
return
1564+
}
14621565
const state = this.runtimeState.get(sessionId)
14631566
if (!result || !result.status) {
14641567
this.setSessionStatus(sessionId, 'idle')
@@ -1576,7 +1679,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
15761679
}
15771680
}
15781681

1579-
const result = await this.runStreamForMessage({
1682+
const { runId, result } = await this.runStreamForMessage({
15801683
sessionId,
15811684
messageId,
15821685
messages: resumeContext,
@@ -1585,13 +1688,21 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
15851688
initialBlocks,
15861689
interleavedReasoning
15871690
})
1588-
this.applyProcessResultStatus(sessionId, result)
1691+
try {
1692+
this.applyProcessResultStatus(sessionId, result, runId)
1693+
} finally {
1694+
this.clearActiveGeneration(sessionId, runId)
1695+
}
15891696
if (result?.status === 'completed') {
15901697
void this.drainPendingQueueIfPossible(sessionId, 'completed')
15911698
}
15921699
return true
15931700
} catch (error) {
15941701
console.error('[DeepChatAgent] resumeAssistantMessage error:', error)
1702+
const errorMessage = error instanceof Error ? error.message : String(error)
1703+
const blocks = buildTerminalErrorBlocks(initialBlocks, errorMessage)
1704+
this.messageStore.setMessageError(messageId, blocks)
1705+
this.emitMessageRefresh(sessionId, messageId)
15951706
this.setSessionStatus(sessionId, 'error')
15961707
throw error
15971708
} finally {

0 commit comments

Comments
 (0)