Skip to content

How can I get HarnessAgent's subAgent stream event content? #1453

@six-teen

Description

@six-teen

`
package com.polymas.ai.platform.service.impl;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.agentscope.core.agent.Event;
import io.agentscope.core.agent.EventSource;
import io.agentscope.core.agent.EventType;
import io.agentscope.core.agent.StreamOptions;
import io.agentscope.core.message.*;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.tool.Toolkit;
import io.agentscope.core.tool.subagent.SubAgentConfig;
import io.agentscope.core.tool.subagent.SubAgentTool;
import io.agentscope.harness.agent.HarnessAgent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class HarnessAgentSubAgentStreamDemo2 {

private static final Logger log = LoggerFactory.getLogger(HarnessAgentSubAgentStreamDemo2.class);
private static final ObjectMapper MAPPER = new ObjectMapper()
        .setSerializationInclusion(JsonInclude.Include.NON_NULL);

public static void main(String[] args) {
    // ⚠️ 请填入你真实的通义千问 API Key
    String apiKey = "sk-xxx";

    // 1. 初始化 Toolkit,注册一系列子 Agent
    Toolkit toolkit = new Toolkit();
    
    // 创建第一个子 Agent(例如:翻译专家)
    var translatorAgent = createSubAgent(apiKey, "Translator-SubAgent", 
            "你是一个资深的中英翻译专家。请直接输出翻译结果,不要包含任何额外的解释。");
    
    // 创建第二个子 Agent(例如:代码助手)
    var codeAgent = createSubAgent(apiKey, "Code-SubAgent", 
            "你是一个Java编程专家。请用简洁的代码片段回答用户的问题。");

    // 【关键配置】将子 Agent 注册为工具,并开启 forwardEvents 透传流式事件
    SubAgentConfig translatorConfig = SubAgentConfig.builder()
            .toolName("translate_expert")
            .description("专业的中英互译助手")
            .forwardEvents(true) // 核心:让子 Agent 的流式 Chunk 穿透到主 HarnessAgent
            .streamOptions(StreamOptions.defaults())
            .build();
            
    SubAgentConfig codeConfig = SubAgentConfig.builder()
            .toolName("java_coding_expert")
            .description("Java代码编写与审查专家")
            .forwardEvents(true)
            .streamOptions(StreamOptions.defaults())
            .build();

    // 批量注册到主 Toolkit 中
    toolkit.registration().subAgent(() -> translatorAgent, translatorConfig).apply();
    toolkit.registration().subAgent(() -> codeAgent, codeConfig).apply();

    // 2. 构建生产级的 HarnessAgent (主 Agent)
    System.out.println("正在初始化 HarnessAgent (v1.1.0) 生产级运行时...\n");
    
    HarnessAgent mainAgent = HarnessAgent.builder()
            .name("Main-Harness-Supervisor")
            .model(DashScopeChatModel.builder()
                    .modelName("qwen-plus")
                    .apiKey(apiKey)
                    .stream(true)          // 【关键修复1】主 Agent 必须开启流式,否则不会产生 chunk 事件
                    .enableThinking(false)
                    .build())
            .sysPrompt("""
                    你是一个纯粹的任务路由助手,职责只有一个:根据用户的请求选择合适的子Agent并调用它。

                    严格规则(必须遵守):
                    1. 用户需要翻译 → 调用 translate_expert
                    2. 用户需要写代码 → 调用 java_coding_expert
                    3. 子Agent回答完毕后,你必须立即结束本轮对话,不得输出任何额外内容。
                    4. 禁止对子Agent的回答进行总结、评价、补充或任何形式的二次加工。
                    5. 禁止直接回答用户的问题,所有实质性内容均由子Agent负责。
                    """)
            .toolkit(toolkit)
            .build();

    // 3. 发起真实的流式请求
    Msg userMsg = Msg.builder().textContent("请把下面的英文作文翻译成中文\n").build();
    Msg content = Msg.builder().textContent("This weekend, I am going to have a busy but happy time. On Saturday morning, I will finish my homework first. In the afternoon, I plan to go to the park with my friends. We are going to play football and fly kites there. On Sunday, I will help my mom do some housework, such as cleaning my room and washing the dishes. In the evening, I am going to read an interesting storybook before going to bed. I think I will have a wonderful weekend!").build();

    List<Msg> msgList = Lists.newArrayList(userMsg, content);

    // StreamOptions:必须包含所有事件类型,否则 Flux 可能在中间阶段提前发出 onComplete,
    // 导致 blockLast() 提前返回、子agent事件丢失。
    StreamOptions streamOptions = StreamOptions.builder()
            .includeReasoningChunk(true)   // 父Agent思考/路由决策的 chunk
            .includeReasoningResult(true)  // 父Agent思考结果(含 ToolUseBlock,用于识别调用的子Agent)
            .includeActingChunk(true)      // 子Agent执行过程中透传的流式内容
            .includeSummaryChunk(true)     // 最终总结的 chunk
            .includeSummaryResult(true)    // 最终总结结果
            .build();

    // HarnessAgent stream 接口返回全量事件流
    Flux<Event> eventStream = mainAgent.stream(msgList, streamOptions);

    // ── State trackers for sub-agent lifecycle events ──────────────────────────────
    //
    // 实测结论 (1.1.0-RC2):「
    //   event.getSource() 在 HarnessAgent 中始终为 null,即使是子agent产生的 TOOL_RESULT
    //   事件也是如此。子agent的回答作为父agent的 TOOL_RESULT 事件返回,source 字段
    //   不会被填充。因此不能依赖 source 来区分父子agent事件。
    //
    // 子agent TOOL_RESULT 的识别方式:
    //   解析 Msg 中的 ToolResultBlock,检查其 name 对应的 tool 是否为 SubAgentTool。
    //   这等价于旧版 extractSubAgentToolResultName 的逻辑,但复用 extractSubAgentToolUseName
    //   的 toolkit 判断方式,无需手动维护名称集合。
    //
    // 事件时序(单轮子agent调用):
    //   REASONING non-last (source=null)         → selectSubAgentStart
    //   REASONING isLast   (source=null, 子agent) → selectSubAgentEnd + subAgentAnswerStart
    //   TOOL_RESULT non-last (source=null, 子agent) → subAgentAnswer(流式chunk)
    //   TOOL_RESULT isLast   (source=null, 子agent) → subAgentAnswerEnd
    //   AGENT_RESULT       (source=null)         → AGENT_RESULT
    //
    AtomicBoolean selectStartEmitted = new AtomicBoolean(false);
    AtomicReference<String> activeSubAgentName = new AtomicReference<>(null);
    AtomicReference<StringBuilder> subAgentFullReply = new AtomicReference<>(new StringBuilder());
    // Safety net: suppresses a spurious selectSubAgentStart if the model ignores the sysPrompt
    // constraint and produces a follow-up summary round after the sub-agent finishes.
    AtomicBoolean subAgentJustFinished = new AtomicBoolean(false);

    eventStream.doOnNext(event -> {
        EventSource src = event.getSource();
        if (src == null) {
            // 父 agent 自身事件
            System.out.printf("[parent][%s] %s%n",
                    event.getType(), event.getMessage().getTextContent());
        } else {
            // 子(或孙)agent 事件
            System.out.printf("[%s|depth=%d|path=%s][%s] %s%n",
                    src.getAgentId(), src.getDepth(), src.getPath(),
                    event.getType(), event.getMessage().getTextContent());
        }
    }).blockLast();

    log.info("HarnessAgentSubAgentStreamDemo#main conversation finished");

    System.out.println("\n\n对话结束。");
}

/**
 * Scan ToolUseBlocks in the message and return the toolName only if
 * the corresponding tool registered in the toolkit is a SubAgentTool.
 * Distinguishes sub-agent calls from regular tool calls without a manual name set.
 * Called on REASONING isLast events.
 */
private static String extractSubAgentToolUseName(Msg message, Toolkit toolkit) {
    if (message == null) return null;
    for (ContentBlock block : message.getContent()) {
        if (block instanceof ToolUseBlock) {
            String name = ((ToolUseBlock) block).getName();
            if (name != null && toolkit.getTool(name) instanceof SubAgentTool) {
                return name;
            }
        }
    }
    return null;
}

/**
 * Emit a structured lifecycle event to the log.
 * Every event envelope automatically includes a "ts" field (epoch millis)
 * so that time gaps between events are clearly visible in the log output.
 */
private static void emitEvent(String eventName, Map<String, Object> payload) {
    Map<String, Object> envelope = new LinkedHashMap<>();
    envelope.put("event", eventName);
    envelope.put("ts", System.currentTimeMillis());
    if (payload != null && !payload.isEmpty()) {
        envelope.put("payload", payload);
    }
    try {
        log.info("HarnessAgentSubAgentStreamDemo#emitEvent event={}", MAPPER.writeValueAsString(envelope));
    } catch (JsonProcessingException e) {
        log.error("HarnessAgentSubAgentStreamDemo#emitEvent serialize failed, event={}", eventName, e);
    }
}

/**
 * Build a simple ordered map from interleaved key-value pairs.
 */
private static Map<String, Object> mapOf(Object... kvPairs) {
    Map<String, Object> map = new LinkedHashMap<>();
    for (int i = 0; i + 1 < kvPairs.length; i += 2) {
        map.put(String.valueOf(kvPairs[i]), kvPairs[i + 1]);
    }
    return map;
}

/** 
 * extract text content from Msg, compatible with multiple ContentBlock types
 */
private static String extractText(Msg message) {
    if (message == null) return null;
    // 优先使用 getTextContent()
    String textContent = message.getTextContent();
    if (textContent != null && !textContent.isEmpty()) {
        return textContent;
    }
    // 尝试从 ContentBlock 列表中递归提取文本
    StringBuilder sb = new StringBuilder();
    for (ContentBlock block : message.getContent()) {
        sb.append(extractTextFromBlock(block));
    }
    return sb.toString();
}

/**
 * 从单个 ContentBlock 中提取文本
 */
private static String extractTextFromBlock(ContentBlock block) {
    if (block instanceof TextBlock) {
        return ((TextBlock) block).getText();
    } else if (block instanceof ToolResultBlock) {
        // ToolResultBlock 的文本在 output 列表中,需要递归提取
        StringBuilder sb = new StringBuilder();
        for (ContentBlock inner : ((ToolResultBlock) block).getOutput()) {
            sb.append(extractTextFromBlock(inner));
        }
        return sb.toString();
    }
    return "";
}

/** 
 * 封装创建真实子 Agent 的方法
 */
private static HarnessAgent createSubAgent(String apiKey, String name, String sysPrompt) {
    return HarnessAgent.builder()
            .name(name)
            .sysPrompt(sysPrompt)
            .model(DashScopeChatModel.builder()
                    .apiKey(apiKey)
                    .modelName("qwen-plus")
                    .stream(true)  // 子 Agent 必须开启流式,否则不会产生流式 chunk
                    .build())
            .build();
}

}
`

`

io.agentscope
agentscope
1.1.0-RC2

    <dependency>
        <groupId>io.agentscope</groupId>
        <artifactId>agentscope-harness</artifactId>
        <version>1.1.0-RC2</version>
    </dependency>

`

This is my java code , I debug see the process, but never has EventSource src = event.getSource(); src != null case

Is my code question? Or version is error? Anybody can help me? Thanks

Metadata

Metadata

Assignees

No one assigned

    Labels

    area/harnessagentscope-harness (test/runtime support)bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    Status
    Backlog

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions