Skip to content

[Bug]: A2aAgent 中使用ttshook 无效 #1120

@Taogang00

Description

@Taogang00

以下代码在使用ReActAgent 时正常使用,但是在使用A2aAgent时无效

/*
 * Copyright 2024-2026 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.guanwei.agent.client;

import io.agentscope.core.a2a.agent.A2aAgent;
import io.agentscope.core.a2a.agent.card.AgentCardResolver;
import io.agentscope.core.a2a.agent.card.WellKnownAgentCardResolver;
import io.agentscope.core.agent.EventType;
import io.agentscope.core.agent.StreamOptions;
import io.agentscope.core.hook.TTSHook;
import io.agentscope.core.message.Base64Source;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.model.tts.DashScopeRealtimeTTSModel;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.util.Map;

/**
 * Chat controller with real-time TTS streaming.
 *
 * <p>Provides SSE endpoint that streams both text and audio to frontend:
 * <ul>
 *   <li>event: "text" - LLM generated text chunks</li>
 *   <li>event: "audio" - Base64 encoded audio chunks</li>
 *   <li>event: "done" - Stream completed</li>
 * </ul>
 */
@RestController
@RequestMapping("/")
@CrossOrigin(origins = "*")
public class AgentClientTtsChatController {

    private final DashScopeChatModel chatModel;

    private final String apiKey;

    /**
     * Creates a new ChatController.
     *
     * <p>Requires DASHSCOPE_API_KEY environment variable to be set.
     *
     * @throws IllegalStateException if DASHSCOPE_API_KEY is not set
     */
    public AgentClientTtsChatController() {
        this.apiKey = "sk-xxxxxxxxxxxxxxxxxxx";
        this.chatModel = DashScopeChatModel.builder().apiKey(apiKey).modelName("qwen-plus").build();
    }

    /**
     * Chat endpoint with real-time TTS.
     *
     * <p>Returns SSE stream with text and audio events:
     * <pre>
     * event: text
     * data: {"text": "你好"}
     *
     * event: audio
     * data: {"audio": "base64..."}
     *
     * event: done
     * data: {"status": "completed"}
     * </pre>
     *
     * @param request containing "message" field
     * @return SSE stream of text and audio
     */
    @PostMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<Map<String, Object>>> chat(@RequestBody Map<String, String> request) {
        String message = request.get("message");
        if (message == null || message.isEmpty()) {
            return Flux.just(ServerSentEvent.<Map<String, Object>>builder()
                    .event("error")
                    .data(Map.of("error", "Message is required"))
                    .build());
        }

        Sinks.Many<ServerSentEvent<Map<String, Object>>> sink = Sinks.many().multicast().onBackpressureBuffer();

        // Create a new TTS model instance for this request
        // Each request needs its own WebSocket session to avoid audio mixing
        DashScopeRealtimeTTSModel requestTtsModel = DashScopeRealtimeTTSModel.builder()
                .apiKey(apiKey)
                .modelName("qwen-tts-realtime-latest") // WebSocket realtime model
                .voice("Serena")
                .sampleRate(24000)
                .format("pcm")
                .build();

        // Create TTSHook that sends audio to frontend via SSE
        TTSHook ttsHook = TTSHook.builder()
                .ttsModel(requestTtsModel)
                .audioCallback(audio -> {
                    if (audio.getSource() instanceof Base64Source src) {
                        sink.tryEmitNext(ServerSentEvent.<Map<String, Object>>builder()
                                .event("audio")
                                .data(Map.of("audio", src.getData()))
                                .build());
                    }
                })
                .build();

        // Create agent with TTS hook
//        ReActAgent agent = ReActAgent.builder()
//                .name("Assistant")
//                .sysPrompt("你是一个友好的中文助手。请用简洁的中文回答问题。")
//                .model(chatModel)
//                .hook(ttsHook)
//                .maxIters(3)
//                .build();

        // Create agent card resolver by well-known uri.
        AgentCardResolver agentCardResolver = WellKnownAgentCardResolver.builder()
                .baseUrl("http://192.168.2.44:8010")
                .relativeCardPath("/.well-known/agent-card.json")
                .build();

        // Create A2aAgent
        A2aAgent agent = A2aAgent.builder()
                .name("Example-Agent")
                .hook(ttsHook)
                .agentCardResolver(agentCardResolver)
                .build();

        // Create user message
        Msg userMsg = Msg.builder()
                .name("user")
                .role(MsgRole.USER)
                .content(TextBlock.builder().text(message).build())
                .build();
        // Stream agent response
        agent.stream(userMsg, StreamOptions.builder()
                        .eventTypes(EventType.REASONING)
                        .incremental(true)
                        .build())
                .subscribeOn(Schedulers.boundedElastic())
                .doOnNext(event -> {
                    String text = event.getMessage().getTextContent();
                    if (text != null && !text.isEmpty()) {
                        sink.tryEmitNext(ServerSentEvent.<Map<String, Object>>builder()
                                .event("text")
                                .data(Map.of("text", text, "isLast", event.isLast()))
                                .build());
                    }
                })
                .doOnComplete(() -> {
                    sink.tryEmitNext(ServerSentEvent.<Map<String, Object>>builder()
                            .event("done")
                            .data(Map.of("status", "completed"))
                            .build());
                    sink.tryEmitComplete();
                    // Stop TTS hook and close WebSocket session for this request
                    ttsHook.stop();
                    requestTtsModel.close();
                })
                .doOnError(e -> {
                    // Stop TTS hook and close WebSocket session on error
                    ttsHook.stop();
                    requestTtsModel.close();

                    sink.tryEmitNext(ServerSentEvent.<Map<String, Object>>builder()
                            .event("error")
                            .data(Map.of("error", e.getMessage()))
                            .build());
                    sink.tryEmitComplete();
                })
                .subscribe();

        // Return flux with cleanup on cancel (when client disconnects)
        return sink.asFlux().doOnCancel(() -> {
            // Clean up when client disconnects or request is cancelled
            ttsHook.stop();
            requestTtsModel.close();
            sink.tryEmitComplete();
        });
    }
}

`

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

Projects

Status

In progress

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions