返回博客列表

AgentScope 源码学习

2026-04-08
4 min read

模型层定义 Model 模型层统一集成 Model ChatModelBase 主要提供统一的入口、统一的埋点追踪、让子类只关心如何请求模型。 Msg Msg 是 AgentScope 里最核心的消息对象。是对 Agent、用户、系统、工具之间通用的“消息信封”。 Msg 主要有 6 个核心字段: id 每条消息的唯一标识。流式输出时,同一条消息的多个 chunk 往往共享同一个 i...

模型层定义

Model

模型层统一集成 Model

java
public interface Model {
    Flux<ChatResponse> stream(List<Msg> messages, List<ToolSchema> tools, GenerateOptions options);
    String getModelName();
}
mermaid
classDiagram
    direction TB

    class Model {
        <<interface>>
    }

    class ChatModelBase {
        <<abstract>>
        +send(prompt)
        +stream(prompt)
    }

    class AnthropicChatModel {
        +Claude 模型适配实现
    }

    class DashScopeChatModel {
        +阿里云百炼 / 通义千问适配
    }

    class GeminiChatModel {
        +Google Gemini 模型适配
    }

    class OpenAIChatModel {
        +OpenAI GPT 系列模型适配
    }

    Model <|.. ChatModelBase : implements
    ChatModelBase <|-- AnthropicChatModel : extends
    ChatModelBase <|-- DashScopeChatModel : extends
    ChatModelBase <|-- GeminiChatModel : extends
    ChatModelBase <|-- OpenAIChatModel : extends

ChatModelBase

主要提供统一的入口、统一的埋点追踪、让子类只关心如何请求模型。

java
public abstract class ChatModelBase implements Model {

    @Override
    public final Flux<ChatResponse> stream(
            List<Msg> messages, List<ToolSchema> tools, GenerateOptions options) {
        return TracerRegistry.get()
                .callModel(
                        this, messages, tools, options, () -> doStream(messages, tools, options));
    }

    protected abstract Flux<ChatResponse> doStream(
            List<Msg> messages, List<ToolSchema> tools, GenerateOptions options);
}

Msg

Msg 是 AgentScope 里最核心的消息对象。是对 Agent、用户、系统、工具之间通用的“消息信封”。

Msg 主要有 6 个核心字段:

  • id 每条消息的唯一标识。流式输出时,同一条消息的多个 chunk 往往共享同一个 id。
  • name 消息发送者名称,比如 "User"、"Assistant"、"system"。
  • role 发送者角色,通常是:
    • USER
    • ASSISTANT
    • SYSTEM
    • TOOL
  • content 真正的消息内容,是 List,不是单纯一个字符串。 这点很关键。它支持多种内容块,比如:
    • TextBlock
    • ThinkingBlock
    • ToolUseBlock
    • ToolResultBlock
  • metadata 附加元数据,用来放一些不适合直接展示但执行阶段很重要的信息。
  • timestamp 消息时间戳。

ChatResponse

ChatResponse 是 模型层返回的数据对象。是 AgentScope 定义的LLM 一次输出的原始响应包。

ChatResponse 主要有 5 个字段:

  1. id 响应唯一标识。 流式场景下,同一轮模型输出的多个 chunk 往往会共享这个 id。

  2. content List 类型,表示模型这次返回的内容块。 可能包含:

    • TextBlock
    • ThinkingBlock
    • ToolUseBlock

    也就是说,模型返回的不一定只是文本,也可能直接返回工具调用。

  3. usage ChatUsage 类型,记录这次模型调用的 token 使用情况、耗时等。

  4. metadata 模型提供方附带的额外信息。 这个字段更偏底层,通常用于保留供应商侧扩展数据。

  5. finishReason 模型为什么停止生成。 例如正常结束、长度限制、工具调用结束等。

实现方案

OpenAIChatModel

  1. 包装重试和失败逻辑
  2. 讲 AgentScope 定义的 Msg/ToolSchema/GenerateOptions 转换成 OpenAI 的请求。
  3. 把返回结果解析成 ChatResponse
java
 // 转换成 OpenAI 的请求和响应
 Formatter<OpenAIMessage, OpenAIResponse, OpenAIRequest> formatter;

 protected Flux<ChatResponse> doStream(
            List<Msg> messages, List<ToolSchema> tools, GenerateOptions options) {
        return ModelUtils.applyTimeoutAndRetry(
                doStream0(messages, tools, options),
                options,
                configuredOptions,
                configuredOptions.getModelName(),
                "openai");
    }

这里有一个小巧思。

  • 流的方式会直接发起请求,流式处理
  • 非流的方式会使用Flux.defer使用惰性请求

Formatter

负责将原始的 json -> OpenAIRequest -> ChatResponse

mermaid
flowchart TD
    %% 定义样式
    classDef raw fill:#f2f2f2,stroke:#666,stroke-dasharray: 5 5;
    classDef intermediate fill:#e1f5fe,stroke:#01579b,stroke-width:2px;
    classDef final fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px;

    %% 输入部分
    subgraph Input ["1. 原始输入 (Raw Data)"]
        RawJSON[/"原始 JSON 数据<br/>(来自 OpenAI/DeepSeek API)"/]:::raw
        UserMsg["业务层 Msg 对象"]:::raw
    end

    %% 转换层
    subgraph Transformation ["2. 适配层 (Formatter Logic)"]
        direction TB
        
        %% 响应流
        OAI_Resp["<b>OpenAIResponse</b><br/>(标准 OpenAI 协议实体)"]:::intermediate
        
        %% 请求流
        OAI_Msg["<b>OpenAIMessage</b><br/>(含 Role, Content 等)"]:::intermediate
    end

    %% 输出部分
    subgraph Output ["3. 统一接口 (Standard API)"]
        ChatResp["<b>ChatResponse</b><br/>(系统通用响应)"]:::final
    end

    %% 数据流向
    RawJSON -->|反序列化| OAI_Resp
    OAI_Resp -->|字段映射| ChatResp

    UserMsg -->|协议包装| OAI_Msg
    OAI_Msg -->|构造 Body| RawJSON

    %% 标注
    note1[所有继承自 OpenAIChatFormatter 的模型<br/>如 GLM, DeepSeek 都复用此路径]
    note1 -.-> Transformation
mermaid
classDiagram
    %% 核心基类
    class Formatter
    class AbstractBaseFormatter
    
    AbstractBaseFormatter --|> Formatter

    %% 第一层:厂商基础
    OpenAIBaseFormatter --|> AbstractBaseFormatter
    GeminiChatFormatter --|> AbstractBaseFormatter
    GeminiMultiAgentFormatter --|> AbstractBaseFormatter
    DashScopeChatFormatter --|> AbstractBaseFormatter
    AnthropicBaseFormatter --|> AbstractBaseFormatter
    DashScopeMultiAgentFormatter --|> AbstractBaseFormatter

    %% 第二层:混合继承
    OpenAIChatFormatter --|> OpenAIBaseFormatter
    OpenAIChatFormatter --|> GeminiChatFormatter
    
    AnthropicChatFormatter --|> AnthropicBaseFormatter
    AnthropicChatFormatter --|> GeminiChatFormatter

    AnthropicMultiAgentFormatter --|> AnthropicBaseFormatter
    AnthropicMultiAgentFormatter --|> GeminiMultiAgentFormatter

    %% 第三层:具体实现
    OpenAIMultiAgentFormatter --|> OpenAIChatFormatter
    GLMFormatter --|> OpenAIChatFormatter
    DeepSeekFormatter --|> AnthropicChatFormatter

    DeepSeekMultiAgentFormatter --|> DeepSeekFormatter
    GLMMultiAgentFormatter --|> GLMFormatter
    GLMMultiAgentFormatter --|> OpenAIMultiAgentFormatter

Agent & AgentBase

mermaid
classDiagram
    class StateModule
    class Agent
    class AgentBase
    class ReActAgent
    class A2aAgent
    class StudioUserAgent
    class UserAgent

    AgentBase ..|> StateModule
    AgentBase ..|> Agent

    ReActAgent --|> AgentBase
    A2aAgent --|> AgentBase
    StudioUserAgent --|> AgentBase
    UserAgent --|> AgentBase

从继承关系上来看,AgentBase 作为模版类,封装了统一的操作,具体的实现交给了子类。我们先看,模版类中都做了哪些事情。

java

agent.stream(userMsg);

public abstract class AgentBase{
     @Override
    public final Flux<Event> stream(List<Msg> msgs, StreamOptions options) {
        // 本质上是 把一次普通的 call() 执行包装成一个可订阅的流式 Flux<Event>
        return createEventStream(options, () -> call(msgs));
    }
  
}

 stream(...)
    -> createEventStream(...)
         -> 创建 Flux<Event>
         -> 临时挂 StreamingHook
         -> 执行正常 call()
              -> 各阶段触发 HookEvent
              -> StreamingHook 把部分 HookEvent 转成 Event
         -> 可选补一个 AGENT_RESULT
         -> complete/error
         -> 移除 StreamingHook
  1. 创建一个 Flux,后面 agent 执行过程中产生的事件都会往这个 sink 里写。

  2. 临时创建并挂载 StreamingHook,后临时加到当前 agent 的 hooks 列表里。

  3. 执行真正的 agent 调用 () -> call(msgs) Model 的调用封装。

  4. 结束调用后将 StreamingHook 移除,避免污染后续非流式调用。

java
    @Override
    public final Mono<Msg> call(List<Msg> msgs) {
        if (!running.compareAndSet(false, true) && checkRunning) {
            return Mono.error(
                    new IllegalStateException(
                            "Agent is still running, please wait for it to finish"));
        }
        resetInterruptFlag();

        return TracerRegistry.get()
                .callAgent(
                        this,
                        msgs,
                        () ->
                                notifyPreCall(msgs)
                                        .flatMap(this::doCall)
                                        .flatMap(this::notifyPostCall)
                                        .onErrorResume(
                                                createErrorHandler(msgs.toArray(new Msg[0]))))
                .doFinally(signalType -> running.set(false));
    }
java
public interface StateModule {
   // 历史状态存储,上下文信息加载
}
public abstract class AgentBase implements StateModule, Agent {
  ...
  void interrupt();
  String getAgentId();
  public final Mono<Msg> call(List<Msg> msgs){};
}

ReActAgent

ReAct 是一种代理设计模式,它将推理(思考和计划)与行动(工具执行)结合在迭代循环中。代理在这两个阶段之间交替,直到完成任务或达到最大迭代限制。

  • 响应式流:使用 Project Reactor 进行非阻塞执行
  • 钩子系统:用于监视和拦截代理执行的可扩展钩子
  • HITL 支持:通过 PostReasoningEvent/PostActingEvent 中的 stopAgent() 进行人机交互
  • 结构化输出: StructuredOutputCapableAgent 提供类型安全的输出生成

:reasoning() 是 ReActAgent 的“思考编排器”,不是一个普通模型调用方法.

java
if (!ignoreMaxIters && iter >= maxIters) {
      return summarizing();
  }

  ReasoningContext context = new ReasoningContext(getName());

  return checkInterruptedAsync()
      .then(notifyPreReasoningEvent(prepareMessages()))
      .flatMapMany(event -> model.stream(...))
      .doOnNext(chunk -> context.processChunk(chunk))
      .then(Mono.justOrEmpty(context.buildFinalMessage()))
      .flatMap(this::notifyPostReasoning)
      .flatMap(event -> {
          Msg msg = event.getReasoningMessage();
          if (event.isStopRequested()) return Mono.just(msg);
          if (event.isGotoReasoningRequested()) return reasoning(iter + 1, true);
          if (isFinished(msg)) return Mono.just(msg);
          return acting(iter);
      });

A2aAgent

面向 Agent-to-Agent 协作的角色/封装

StudioUserAgent

搭配 AgentScope Studio 使用,接到 AgentScope Studio 的调试会话里。

UserAgent

Trace 链路跟踪

AgentScope 是基于 Reactor 反应式编程,非阻塞的编程模型。多线程处理,原来的 ThreadLocal 的模型在这种情况下是不能用的,那么如何在反应式编程中实现链路追踪呢?是我们接下来要研究的点。

在 AgentScope 场景下建立一套可落地的链路追踪体系,主要解决以下问题:

  • 能还原一次 Agent 回答的完整执行过程
  • 能区分 reasoning、acting、tool execution、RAG injection、hook mutation
  • 能支持流式输出、挂起恢复、人工确认、多轮会话
  • 能兼顾排障、审计、性能分析与数据安全

Trace 链路跟踪技术方案

返回博客列表
最后更新于 2026-04-08
想法或问题?在 GitHub Issue 下方参与讨论
去评论