模型层定义
Model
模型层统一集成 Model
public interface Model {
Flux<ChatResponse> stream(List<Msg> messages, List<ToolSchema> tools, GenerateOptions options);
String getModelName();
}
classDiagram
direction TB
class Model {
<<interface>>
}
class ChatModelBase {
<<abstract>>
+send(prompt)
+stream(prompt)
}
class AnthropicChatModel {
+Claude 模型适配实现
}
class DashScopeChatModel {
+阿里云百炼 / 通义千问适配
}
class GeminiChatModel {
+Google Gemini 模型适配
}
class OpenAIChatModel {
+OpenAI GPT 系列模型适配
}
Model <|.. ChatModelBase : implements
ChatModelBase <|-- AnthropicChatModel : extends
ChatModelBase <|-- DashScopeChatModel : extends
ChatModelBase <|-- GeminiChatModel : extends
ChatModelBase <|-- OpenAIChatModel : extends
ChatModelBase
主要提供统一的入口、统一的埋点追踪、让子类只关心如何请求模型。
public abstract class ChatModelBase implements Model {
@Override
public final Flux<ChatResponse> stream(
List<Msg> messages, List<ToolSchema> tools, GenerateOptions options) {
return TracerRegistry.get()
.callModel(
this, messages, tools, options, () -> doStream(messages, tools, options));
}
protected abstract Flux<ChatResponse> doStream(
List<Msg> messages, List<ToolSchema> tools, GenerateOptions options);
}
Msg
Msg 是 AgentScope 里最核心的消息对象。是对 Agent、用户、系统、工具之间通用的“消息信封”。
Msg 主要有 6 个核心字段:
- id 每条消息的唯一标识。流式输出时,同一条消息的多个 chunk 往往共享同一个 id。
- name 消息发送者名称,比如 "User"、"Assistant"、"system"。
- role
发送者角色,通常是:
- USER
- ASSISTANT
- SYSTEM
- TOOL
- content
真正的消息内容,是 List
,不是单纯一个字符串。 这点很关键。它支持多种内容块,比如: - TextBlock
- ThinkingBlock
- ToolUseBlock
- ToolResultBlock
- metadata 附加元数据,用来放一些不适合直接展示但执行阶段很重要的信息。
- timestamp 消息时间戳。
ChatResponse
ChatResponse 是 模型层返回的数据对象。是 AgentScope 定义的LLM 一次输出的原始响应包。
ChatResponse 主要有 5 个字段:
-
id 响应唯一标识。 流式场景下,同一轮模型输出的多个 chunk 往往会共享这个 id。
-
content List
类型,表示模型这次返回的内容块。 可能包含: - TextBlock
- ThinkingBlock
- ToolUseBlock
也就是说,模型返回的不一定只是文本,也可能直接返回工具调用。
-
usage ChatUsage 类型,记录这次模型调用的 token 使用情况、耗时等。
-
metadata 模型提供方附带的额外信息。 这个字段更偏底层,通常用于保留供应商侧扩展数据。
-
finishReason 模型为什么停止生成。 例如正常结束、长度限制、工具调用结束等。
实现方案
OpenAIChatModel
- 包装重试和失败逻辑
- 讲 AgentScope 定义的 Msg/ToolSchema/GenerateOptions 转换成 OpenAI 的请求。
- 把返回结果解析成 ChatResponse
// 转换成 OpenAI 的请求和响应
Formatter<OpenAIMessage, OpenAIResponse, OpenAIRequest> formatter;
protected Flux<ChatResponse> doStream(
List<Msg> messages, List<ToolSchema> tools, GenerateOptions options) {
return ModelUtils.applyTimeoutAndRetry(
doStream0(messages, tools, options),
options,
configuredOptions,
configuredOptions.getModelName(),
"openai");
}
这里有一个小巧思。
- 流的方式会直接发起请求,流式处理
- 非流的方式会使用
Flux.defer使用惰性请求
Formatter
负责将原始的 json -> OpenAIRequest -> ChatResponse
flowchart TD
%% 定义样式
classDef raw fill:#f2f2f2,stroke:#666,stroke-dasharray: 5 5;
classDef intermediate fill:#e1f5fe,stroke:#01579b,stroke-width:2px;
classDef final fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px;
%% 输入部分
subgraph Input ["1. 原始输入 (Raw Data)"]
RawJSON[/"原始 JSON 数据<br/>(来自 OpenAI/DeepSeek API)"/]:::raw
UserMsg["业务层 Msg 对象"]:::raw
end
%% 转换层
subgraph Transformation ["2. 适配层 (Formatter Logic)"]
direction TB
%% 响应流
OAI_Resp["<b>OpenAIResponse</b><br/>(标准 OpenAI 协议实体)"]:::intermediate
%% 请求流
OAI_Msg["<b>OpenAIMessage</b><br/>(含 Role, Content 等)"]:::intermediate
end
%% 输出部分
subgraph Output ["3. 统一接口 (Standard API)"]
ChatResp["<b>ChatResponse</b><br/>(系统通用响应)"]:::final
end
%% 数据流向
RawJSON -->|反序列化| OAI_Resp
OAI_Resp -->|字段映射| ChatResp
UserMsg -->|协议包装| OAI_Msg
OAI_Msg -->|构造 Body| RawJSON
%% 标注
note1[所有继承自 OpenAIChatFormatter 的模型<br/>如 GLM, DeepSeek 都复用此路径]
note1 -.-> Transformation
classDiagram
%% 核心基类
class Formatter
class AbstractBaseFormatter
AbstractBaseFormatter --|> Formatter
%% 第一层:厂商基础
OpenAIBaseFormatter --|> AbstractBaseFormatter
GeminiChatFormatter --|> AbstractBaseFormatter
GeminiMultiAgentFormatter --|> AbstractBaseFormatter
DashScopeChatFormatter --|> AbstractBaseFormatter
AnthropicBaseFormatter --|> AbstractBaseFormatter
DashScopeMultiAgentFormatter --|> AbstractBaseFormatter
%% 第二层:混合继承
OpenAIChatFormatter --|> OpenAIBaseFormatter
OpenAIChatFormatter --|> GeminiChatFormatter
AnthropicChatFormatter --|> AnthropicBaseFormatter
AnthropicChatFormatter --|> GeminiChatFormatter
AnthropicMultiAgentFormatter --|> AnthropicBaseFormatter
AnthropicMultiAgentFormatter --|> GeminiMultiAgentFormatter
%% 第三层:具体实现
OpenAIMultiAgentFormatter --|> OpenAIChatFormatter
GLMFormatter --|> OpenAIChatFormatter
DeepSeekFormatter --|> AnthropicChatFormatter
DeepSeekMultiAgentFormatter --|> DeepSeekFormatter
GLMMultiAgentFormatter --|> GLMFormatter
GLMMultiAgentFormatter --|> OpenAIMultiAgentFormatter
Agent & AgentBase
classDiagram
class StateModule
class Agent
class AgentBase
class ReActAgent
class A2aAgent
class StudioUserAgent
class UserAgent
AgentBase ..|> StateModule
AgentBase ..|> Agent
ReActAgent --|> AgentBase
A2aAgent --|> AgentBase
StudioUserAgent --|> AgentBase
UserAgent --|> AgentBase
从继承关系上来看,AgentBase 作为模版类,封装了统一的操作,具体的实现交给了子类。我们先看,模版类中都做了哪些事情。
agent.stream(userMsg);
public abstract class AgentBase{
@Override
public final Flux<Event> stream(List<Msg> msgs, StreamOptions options) {
// 本质上是 把一次普通的 call() 执行包装成一个可订阅的流式 Flux<Event>
return createEventStream(options, () -> call(msgs));
}
}
stream(...)
-> createEventStream(...)
-> 创建 Flux<Event>
-> 临时挂 StreamingHook
-> 执行正常 call()
-> 各阶段触发 HookEvent
-> StreamingHook 把部分 HookEvent 转成 Event
-> 可选补一个 AGENT_RESULT
-> complete/error
-> 移除 StreamingHook
-
创建一个 Flux
,后面 agent 执行过程中产生的事件都会往这个 sink 里写。 -
临时创建并挂载 StreamingHook,后临时加到当前 agent 的 hooks 列表里。
-
执行真正的 agent 调用
() -> call(msgs)Model 的调用封装。 -
结束调用后将
StreamingHook移除,避免污染后续非流式调用。
@Override
public final Mono<Msg> call(List<Msg> msgs) {
if (!running.compareAndSet(false, true) && checkRunning) {
return Mono.error(
new IllegalStateException(
"Agent is still running, please wait for it to finish"));
}
resetInterruptFlag();
return TracerRegistry.get()
.callAgent(
this,
msgs,
() ->
notifyPreCall(msgs)
.flatMap(this::doCall)
.flatMap(this::notifyPostCall)
.onErrorResume(
createErrorHandler(msgs.toArray(new Msg[0]))))
.doFinally(signalType -> running.set(false));
}
public interface StateModule {
// 历史状态存储,上下文信息加载
}
public abstract class AgentBase implements StateModule, Agent {
...
void interrupt();
String getAgentId();
public final Mono<Msg> call(List<Msg> msgs){};
}
ReActAgent
ReAct 是一种代理设计模式,它将推理(思考和计划)与行动(工具执行)结合在迭代循环中。代理在这两个阶段之间交替,直到完成任务或达到最大迭代限制。
- 响应式流:使用 Project Reactor 进行非阻塞执行
- 钩子系统:用于监视和拦截代理执行的可扩展钩子
- HITL 支持:通过 PostReasoningEvent/PostActingEvent 中的 stopAgent() 进行人机交互
- 结构化输出: StructuredOutputCapableAgent 提供类型安全的输出生成
:reasoning() 是 ReActAgent 的“思考编排器”,不是一个普通模型调用方法.
if (!ignoreMaxIters && iter >= maxIters) {
return summarizing();
}
ReasoningContext context = new ReasoningContext(getName());
return checkInterruptedAsync()
.then(notifyPreReasoningEvent(prepareMessages()))
.flatMapMany(event -> model.stream(...))
.doOnNext(chunk -> context.processChunk(chunk))
.then(Mono.justOrEmpty(context.buildFinalMessage()))
.flatMap(this::notifyPostReasoning)
.flatMap(event -> {
Msg msg = event.getReasoningMessage();
if (event.isStopRequested()) return Mono.just(msg);
if (event.isGotoReasoningRequested()) return reasoning(iter + 1, true);
if (isFinished(msg)) return Mono.just(msg);
return acting(iter);
});
A2aAgent
面向 Agent-to-Agent 协作的角色/封装
StudioUserAgent
搭配 AgentScope Studio 使用,接到 AgentScope Studio 的调试会话里。
UserAgent
Trace 链路跟踪
AgentScope 是基于 Reactor 反应式编程,非阻塞的编程模型。多线程处理,原来的 ThreadLocal 的模型在这种情况下是不能用的,那么如何在反应式编程中实现链路追踪呢?是我们接下来要研究的点。
在 AgentScope 场景下建立一套可落地的链路追踪体系,主要解决以下问题:
- 能还原一次 Agent 回答的完整执行过程
- 能区分 reasoning、acting、tool execution、RAG injection、hook mutation
- 能支持流式输出、挂起恢复、人工确认、多轮会话
- 能兼顾排障、审计、性能分析与数据安全