Skip to content

Commit ca7fd0b

Browse files
author
李泓宇
committed
Merge branch 'github_main' into 'develop'
feat(backend): 修复chunk无法跨进程广播的问题 (#620) See merge request weibo_rd/common/wecode/wegent!877
2 parents 1bd4842 + 3cf8f28 commit ca7fd0b

8 files changed

Lines changed: 141 additions & 115 deletions

File tree

backend/app/services/chat/ai_trigger.py

Lines changed: 35 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,12 @@ async def _trigger_direct_chat(
124124
"""
125125
from app.api.ws.events import ServerEvents
126126

127-
# Emit chat:start event
127+
# Emit chat:start event using global emitter for cross-worker broadcasting
128128
logger.info(f"[ai_trigger] Emitting chat:start event")
129-
await namespace.emit(
130-
ServerEvents.CHAT_START,
131-
{
132-
"task_id": task.id,
133-
"subtask_id": assistant_subtask.id,
134-
},
135-
room=task_room,
129+
emitter = get_ws_emitter()
130+
await emitter.emit_chat_start(
131+
task_id=task.id,
132+
subtask_id=assistant_subtask.id,
136133
)
137134
logger.info(f"[ai_trigger] chat:start emitted")
138135

@@ -259,10 +256,11 @@ async def _stream_chat_response(
259256
# Record error in span
260257
span_manager.record_error(TelemetryEventNames.BOT_NOT_FOUND, error_msg)
261258

262-
await namespace.emit(
263-
ServerEvents.CHAT_ERROR,
264-
{"subtask_id": subtask_id, "error": error_msg},
265-
room=task_room,
259+
emitter = get_ws_emitter()
260+
await emitter.emit_chat_error(
261+
task_id=task_id,
262+
subtask_id=subtask_id,
263+
error=error_msg,
266264
)
267265
return
268266

@@ -367,6 +365,9 @@ async def _stream_chat_response(
367365
from app.services.chat.providers import get_provider
368366
from app.services.chat.providers.base import ChunkType
369367

368+
# Get global emitter
369+
emitter = get_ws_emitter()
370+
370371
# Check if this is a group chat - get history from database with user names
371372
is_group_chat = payload.is_group_chat
372373
if is_group_chat:
@@ -405,10 +406,10 @@ async def _stream_chat_response(
405406
TelemetryEventNames.PROVIDER_CREATION_FAILED, error_msg
406407
)
407408

408-
await namespace.emit(
409-
ServerEvents.CHAT_ERROR,
410-
{"subtask_id": subtask_id, "error": error_msg},
411-
room=task_room,
409+
await emitter.emit_chat_error(
410+
task_id=task_id,
411+
subtask_id=subtask_id,
412+
error=error_msg,
412413
)
413414
return
414415

@@ -429,25 +430,21 @@ async def _stream_chat_response(
429430
async for chunk in stream_gen:
430431
if cancel_event.is_set() or await session_manager.is_cancelled(subtask_id):
431432
# Cancelled
432-
await namespace.emit(
433-
ServerEvents.CHAT_CANCELLED,
434-
{"subtask_id": subtask_id},
435-
room=task_room,
433+
await emitter.emit_chat_cancelled(
434+
task_id=task_id,
435+
subtask_id=subtask_id,
436436
)
437437
break
438438

439439
if chunk.type == ChunkType.CONTENT and chunk.content:
440440
full_response += chunk.content
441441

442442
# Emit chunk
443-
await namespace.emit(
444-
ServerEvents.CHAT_CHUNK,
445-
{
446-
"subtask_id": subtask_id,
447-
"content": chunk.content,
448-
"offset": offset,
449-
},
450-
room=task_room,
443+
await emitter.emit_chat_chunk(
444+
task_id=task_id,
445+
subtask_id=subtask_id,
446+
content=chunk.content,
447+
offset=offset,
451448
)
452449
offset += len(chunk.content)
453450

@@ -480,13 +477,10 @@ async def _stream_chat_response(
480477
TelemetryEventNames.STREAM_CHUNK_ERROR, error_msg, model_config
481478
)
482479

483-
await namespace.emit(
484-
ServerEvents.CHAT_ERROR,
485-
{
486-
"subtask_id": subtask_id,
487-
"error": error_msg,
488-
},
489-
room=task_room,
480+
await emitter.emit_chat_error(
481+
task_id=task_id,
482+
subtask_id=subtask_id,
483+
error=error_msg,
490484
)
491485
await db_handler.update_subtask_status(
492486
subtask_id, "FAILED", error=chunk.error
@@ -551,10 +545,12 @@ async def _stream_chat_response(
551545
# Record error in span
552546
span_manager.record_exception(e)
553547

554-
await namespace.emit(
555-
ServerEvents.CHAT_ERROR,
556-
{"subtask_id": subtask_id, "error": str(e)},
557-
room=task_room,
548+
# Use global emitter for cross-worker broadcasting
549+
error_emitter = get_ws_emitter()
550+
await error_emitter.emit_chat_error(
551+
task_id=task_id,
552+
subtask_id=subtask_id,
553+
error=str(e),
558554
)
559555
finally:
560556
# Detach OTEL context first (before exiting span)

backend/app/services/chat/ws_emitter.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ async def emit_chat_start(
5252
task_id: int,
5353
subtask_id: int,
5454
bot_name: Optional[str] = None,
55+
shell_type: str = "Chat",
56+
message_id: Optional[int] = None,
5557
) -> None:
5658
"""
5759
Emit chat:start event to task room.
@@ -60,18 +62,24 @@ async def emit_chat_start(
6062
task_id: Task ID
6163
subtask_id: Subtask ID
6264
bot_name: Optional bot name
65+
shell_type: Shell type for frontend display logic (default: "Chat")
66+
message_id: Optional message ID for ordering
6367
"""
6468
await self.sio.emit(
6569
ServerEvents.CHAT_START,
6670
{
6771
"task_id": task_id,
6872
"subtask_id": subtask_id,
6973
"bot_name": bot_name,
74+
"shell_type": shell_type,
75+
"message_id": message_id,
7076
},
7177
room=f"task:{task_id}",
7278
namespace=self.namespace,
7379
)
74-
logger.debug(f"[WS] emit chat:start task={task_id} subtask={subtask_id}")
80+
logger.debug(
81+
f"[WS] emit chat:start task={task_id} subtask={subtask_id} shell_type={shell_type}"
82+
)
7583

7684
async def emit_chat_chunk(
7785
self,

backend/app/services/chat_v2/ai_trigger.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ async def _trigger_direct_chat(
118118
Emits chat:start event and starts streaming in background task.
119119
"""
120120
from app.api.ws.events import ServerEvents
121+
from app.services.chat.ws_emitter import get_ws_emitter
121122

122123
# Extract data from ORM objects before starting background task
123124
# This prevents DetachedInstanceError
@@ -241,10 +242,13 @@ async def _stream_chat_response(
241242
if not team:
242243
error_msg = "Team not found"
243244
span_manager.record_error(TelemetryEventNames.TEAM_NOT_FOUND, error_msg)
244-
await namespace.emit(
245-
ServerEvents.CHAT_ERROR,
246-
{"subtask_id": subtask_id, "error": error_msg},
247-
room=task_room,
245+
from app.services.chat.ws_emitter import get_ws_emitter
246+
247+
error_emitter = get_ws_emitter()
248+
await error_emitter.emit_chat_error(
249+
task_id=task_data["id"],
250+
subtask_id=subtask_id,
251+
error=error_msg,
248252
)
249253
return
250254

@@ -269,10 +273,13 @@ async def _stream_chat_response(
269273
span_manager.record_error(
270274
TelemetryEventNames.CONFIG_BUILD_FAILED, error_msg
271275
)
272-
await namespace.emit(
273-
ServerEvents.CHAT_ERROR,
274-
{"subtask_id": subtask_id, "error": error_msg},
275-
room=task_room,
276+
from app.services.chat.ws_emitter import get_ws_emitter
277+
278+
error_emitter = get_ws_emitter()
279+
await error_emitter.emit_chat_error(
280+
task_id=task_data["id"],
281+
subtask_id=subtask_id,
282+
error=error_msg,
276283
)
277284
return
278285

@@ -337,10 +344,14 @@ async def _stream_chat_response(
337344
logger.exception("[ai_trigger] Stream error subtask=%d: %s", subtask_id, e)
338345
# Record error in span
339346
span_manager.record_exception(e)
340-
await namespace.emit(
341-
ServerEvents.CHAT_ERROR,
342-
{"subtask_id": subtask_id, "error": str(e)},
343-
room=task_room,
347+
# Use global emitter for cross-worker broadcasting
348+
from app.services.chat.ws_emitter import get_ws_emitter
349+
350+
error_emitter = get_ws_emitter()
351+
await error_emitter.emit_chat_error(
352+
task_id=task_data["id"],
353+
subtask_id=subtask_id,
354+
error=str(e),
344355
)
345356
finally:
346357
# Detach OTEL context first (before exiting span)

backend/app/services/chat_v2/service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ async def stream_to_websocket(
555555
task_room = config.task_room
556556

557557
# Create WebSocket emitter
558-
emitter = WebSocketEmitter(namespace, task_room)
558+
emitter = WebSocketEmitter(namespace, task_room, task_id)
559559

560560
# Create streaming state
561561
state = StreamingState(

backend/app/services/chat_v2/streaming/emitters.py

Lines changed: 48 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -153,35 +153,35 @@ def has_events(self) -> bool:
153153

154154

155155
class WebSocketEmitter(StreamEmitter):
156-
"""WebSocket emitter using Socket.IO namespace.
156+
"""WebSocket emitter using global ws_emitter for cross-worker broadcasting.
157157
158-
Emits events through a Socket.IO namespace to a specific room.
158+
Uses the global ws_emitter (with Redis adapter) to ensure events are
159+
broadcast to all backend replicas in multi-instance deployments.
159160
"""
160161

161-
def __init__(self, namespace: Any, task_room: str):
162+
def __init__(self, namespace: Any, task_room: str, task_id: int):
162163
"""Initialize WebSocket emitter.
163164
164165
Args:
165-
namespace: Socket.IO namespace instance
166+
namespace: Socket.IO namespace instance (kept for compatibility)
166167
task_room: Room name for broadcasting events
168+
task_id: Task ID for emitting events
167169
"""
168170
self.namespace = namespace
169171
self.task_room = task_room
172+
self.task_id = task_id
170173

171174
async def emit_start(
172175
self, task_id: int, subtask_id: int, shell_type: str = "Chat"
173176
) -> None:
174-
"""Emit chat:start event with shell_type."""
175-
from app.api.ws.events import ServerEvents
176-
177-
await self.namespace.emit(
178-
ServerEvents.CHAT_START,
179-
{
180-
"task_id": task_id,
181-
"subtask_id": subtask_id,
182-
"shell_type": shell_type, # Include shell_type for frontend display logic
183-
},
184-
room=self.task_room,
177+
"""Emit chat:start event using global emitter with shell_type."""
178+
from app.services.chat.ws_emitter import get_ws_emitter
179+
180+
emitter = get_ws_emitter()
181+
await emitter.emit_chat_start(
182+
task_id=task_id,
183+
subtask_id=subtask_id,
184+
shell_type=shell_type,
185185
)
186186
logger.info("[WS_EMITTER] chat:start emitted with shell_type=%s", shell_type)
187187

@@ -192,11 +192,11 @@ async def emit_chunk(
192192
subtask_id: int,
193193
result: dict[str, Any] | None = None,
194194
) -> None:
195-
"""Emit chat:chunk event with optional result data (thinking, workbench).
195+
"""Emit chat:chunk event using global emitter with optional result data.
196196
197197
This follows the same pattern as executor tasks to enable thinking/workbench display.
198198
"""
199-
from app.api.ws.events import ServerEvents
199+
from app.services.chat.ws_emitter import get_ws_emitter
200200

201201
logger.debug(
202202
"[WS_EMITTER] emit_chunk: subtask_id=%d, offset=%d, content_len=%d, has_result=%s",
@@ -205,19 +205,13 @@ async def emit_chunk(
205205
len(content),
206206
result is not None,
207207
)
208-
payload: dict[str, Any] = {
209-
"subtask_id": subtask_id,
210-
"content": content,
211-
"offset": offset,
212-
}
213-
# Include full result if provided (for chat_v2 thinking/workbench display)
214-
if result is not None:
215-
payload["result"] = result
216-
217-
await self.namespace.emit(
218-
ServerEvents.CHAT_CHUNK,
219-
payload,
220-
room=self.task_room,
208+
emitter = get_ws_emitter()
209+
await emitter.emit_chat_chunk(
210+
task_id=self.task_id,
211+
subtask_id=subtask_id,
212+
content=content,
213+
offset=offset,
214+
result=result,
221215
)
222216

223217
async def emit_done(
@@ -228,40 +222,38 @@ async def emit_done(
228222
result: dict[str, Any],
229223
message_id: int | None = None,
230224
) -> None:
231-
"""Emit chat:done event."""
232-
from app.api.ws.events import ServerEvents
233-
234-
await self.namespace.emit(
235-
ServerEvents.CHAT_DONE,
236-
{
237-
"task_id": task_id,
238-
"subtask_id": subtask_id,
239-
"offset": offset,
240-
"result": result,
241-
"message_id": message_id,
242-
},
243-
room=self.task_room,
225+
"""Emit chat:done event using global emitter."""
226+
from app.services.chat.ws_emitter import get_ws_emitter
227+
228+
emitter = get_ws_emitter()
229+
await emitter.emit_chat_done(
230+
task_id=task_id,
231+
subtask_id=subtask_id,
232+
offset=offset,
233+
result=result,
234+
message_id=message_id,
244235
)
245236
logger.info("[WS_EMITTER] chat:done emitted message_id=%s", message_id)
246237

247238
async def emit_error(self, subtask_id: int, error: str) -> None:
248-
"""Emit chat:error event."""
249-
from app.api.ws.events import ServerEvents
250-
251-
await self.namespace.emit(
252-
ServerEvents.CHAT_ERROR,
253-
{"subtask_id": subtask_id, "error": error},
254-
room=self.task_room,
239+
"""Emit chat:error event using global emitter."""
240+
from app.services.chat.ws_emitter import get_ws_emitter
241+
242+
emitter = get_ws_emitter()
243+
await emitter.emit_chat_error(
244+
task_id=self.task_id,
245+
subtask_id=subtask_id,
246+
error=error,
255247
)
256248
logger.warning("[WS_EMITTER] chat:error emitted: %s", error)
257249

258250
async def emit_cancelled(self, subtask_id: int) -> None:
259-
"""Emit chat:cancelled event."""
260-
from app.api.ws.events import ServerEvents
251+
"""Emit chat:cancelled event using global emitter."""
252+
from app.services.chat.ws_emitter import get_ws_emitter
261253

262-
await self.namespace.emit(
263-
ServerEvents.CHAT_CANCELLED,
264-
{"subtask_id": subtask_id},
265-
room=self.task_room,
254+
emitter = get_ws_emitter()
255+
await emitter.emit_chat_cancelled(
256+
task_id=self.task_id,
257+
subtask_id=subtask_id,
266258
)
267259
logger.info("[WS_EMITTER] chat:cancelled emitted")

0 commit comments

Comments
 (0)