最近在研究
AgentScope的代码,从中发现 Reactor 编程中,链路追踪的用法,如果对这块只是看不懂,那么多半看不懂 AgentScope 的用法。这块的只是在这里补充下
在 Spring MVC 时代,ThreadLocal 是我们的“随身口袋”,随手存个 UserId 或 LogId 简直不要太爽。
但当你踏入 WebFlux 或 Project Reactor 的地盘,你会发现这里的线程就像渣男,说换就换。一个请求可能在 A 线程接客,B 线程查库,C 线程给响应。这时候,基于线程绑定的 ThreadLocal 瞬间就变成了“薛定谔的变量”——你永远不知道这一行代码跑完,下一个操作符里数据还在不在。
为了救火,Reactor 给我们准备了 Context。
一、 核心痛点:为什么 ThreadLocal 必死?
我们先看一眼惨案现场:
// 这是一个必挂的写法
ThreadLocal<String> traceId = new ThreadLocal<>();
traceId.set("UUID-999");
Mono.just("Hello")
.subscribeOn(Schedulers.boundedElastic()) // 切换线程了
.map(data -> {
// 这里的 traceId.get() 必拿 null
log.info("Trace: {}", traceId.get());
return data + " World";
})
.subscribe();
真相: 响应式编程的本质是异步流水线。操作符(Operator)不关心线程,它们只关心数据。线程只是被招来干活的“临时工”,干完这步就撤了,指望 ThreadLocal 跨步传递数据,无异于刻舟求剑。
二、 Reactor Context 的“逆流”设计
Reactor Context 不是存放在线程里的,它是存在 Subscriber(订阅者) 里的。
这里的逻辑非常硬核:
- 装配期: 你写
.contextWrite()时,其实什么都没发生,只是在链条上挂了个钩子。 - 订阅期: 只有当你执行
.subscribe()时,一个“订阅信号”会从下往上传递。 - 注入: 当信号路过
.contextWrite()时,Context 数据就像被吸附在信号上一样,一路带到了流的最顶端。
重点: 这种“自底向上”的设计决定了,上游的操作符只能看到它下游定义的 Context。
三、 代码实力:读写姿势与实战
1. 基础读写:如何优雅地“带货”
public Mono<String> processData() {
// 使用 deferContextual 延迟获取上下文
return Mono.deferContextual(ctx -> {
String correlationId = ctx.get("X-Correlation-ID");
return Mono.just("Processed with ID: " + correlationId);
});
}
@Test
public void testContextPropagation() {
processData()
.map(String::toUpperCase)
// 写入 Context,注意它在下方,但能作用于上方
.contextWrite(Context.of("X-Correlation-ID", "REQ-12345"))
// ⚠️⚠️ 注意 下面是无法获取 “X-Correlation-ID” 的,因为Context,只能作用于上方。
.subscribe(System.out::println);
}
// 如果想要下面也能拿到就必须这样写
@Test
public void testContext2() {
getSecureData()
.contextWrite(Context.of("user", "Admin")) // 写入 Context
.flatMap(data -> Mono.deferContextual(ctx -> {
// 在这里又可以掏口袋了,注意拿到的是下面定义的 Admin X,而不是上面的Admin
String user = ctx.getOrDefault("user", "unknown");
// Secure data for: Admin processed by Admin X
return Mono.just(data + " processed by " + user);
}))
.contextWrite(Context.of("user", "Admin X"))
.subscribe(System.out::println);
}
2. 结合 Spring Security(最强实战)
在 WebFlux 中,获取当前用户不需要传参,直接从 Context 里掏:
public Mono<Void> updateBusiness() {
return ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication)
.flatMap(auth -> {
log.info("当前操作人: {}", auth.getName());
return doBusiness(auth.getPrincipal());
});
}
原理: Spring Security 的拦截器会在流的最下游悄悄执行 .contextWrite(),把认证信息塞进去,供你全局调用。
四、 高级进阶:如何跟 Log4j2 (MDC) 这种“老顽固”对接?
很多老旧库依然死磕 ThreadLocal(比如日志 MDC)。在 Reactor 里,我们可以利用 handle 或自定义 Hook 来做一个“时空转换”。
推荐方案:使用 Micrometer Context Propagation 库 这是目前工业级的标准做法:
// 1. 初始化(全局执行一次)
Hooks.enableAutomaticContextPropagation();
ContextRegistry.getInstance()
.registerThreadLocalAccessor("traceId",
MDC::get, MDC::put, MDC::remove);
// 2. 在 Reactor 链条中开启自动同步
Mono.just("Task")
.handle((val, sink) -> {
// 这里可以直接打日志,MDC 里的 traceId 已经被自动还原了!
log.info("日志里现在有 traceId 了");
sink.next(val);
})
.contextWrite(Context.of("traceId", "T-800"))
.contextCapture(); // 关键:捕捉当前环境到 Context
五、生产环境下的应用
5.1 注册“搬运规则” (Registry)
@Configuration
public class ContextPropagationConfig {
@PostConstruct
public void init() {
// 注册 MDC 的搬运规则
ContextRegistry.getInstance()
.registerThreadLocalAccessor(
"TRACE_ID", // Context 中的 Key
MDC::get, // 如何从 ThreadLocal 读
val -> MDC.put("traceId", val), // 如何写回 ThreadLocal
() -> MDC.remove("traceId") // 如何清理
);
}
}
5.2 开启“全自动外挂” (Global Hook)
在启动类里一行代码搞定,让 Reactor 在切换线程时自动同步。
public static void main(String[] args) {
Hooks.enableAutomaticContextPropagation();
SpringApplication.run(DemoApplication.class, args);
}
5.3 自动搬运
@Component
@Order(Ordered.HIGHEST_PRECEDENCE) // 优先级最高,越早抓取越好
public class ContextSnapshotFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
// 1. 此时是在 HTTP 线程,ThreadLocal 还在
// 2. 这里的 contextCapture() 会把 ThreadLocal 存入 Reactor Context
return chain.filter(exchange)
.contextCapture();
}
}
5.4 注意点
很多人觉得开了 enableAutomaticContextPropagation() 就万事大吉了,其实不然: Automatic Propagation 负责的是:数据已经在 Context 里了,如何在 flatMap 切换线程时帮你自动搬运。 contextCapture() 负责的是:数据还在 ThreadLocal 里,如何把它“第一手”抓进响应式世界。
六、 使用指南
- 不可变性警告:
Context是Immutable的。ctx.put(k, v)不会改变原对象,它会返回一个全新的Context。记得像用String一样用它。 - 位置决定生死: 记住口诀——下层写,上层读。如果你把
contextWrite放在了链条的最顶端,那么它下面的flatMap永远拿不到值。 - 别塞大对象: Context 会随着整个订阅生命周期常驻内存。别把整个数据库查询结果存进去,那不是上下文,那是 OOM 的温床。
final class Context0 implements CoreContext {
static final Context0 INSTANCE = new Context0();
@Override
public Context put(Object key, Object value) {
Objects.requireNonNull(key, "key");
Objects.requireNonNull(value, "value");
return new Context1(key, value);
}
}
七、简单的Demo案例
7.1 线程切换导致无法获取 ThreadLocal 中的数据
- RequestContextWebFilter 将上下文通过contextWrite写入到 Reactor Context 中
- ContextPropagationService中只能获取 Reactor Context 信息,无法从 ThreadLocal 中获取,因为线程变了
@Component
public class RequestContextWebFilter implements WebFilter {
private static final String TRACE_ID_HEADER = "X-Trace-Id";
private static final String USER_ID_HEADER = "X-User-Id";
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String traceId = headerOrDefault(request, TRACE_ID_HEADER, "trace-" + UUID.randomUUID());
String userId = headerOrDefault(request, USER_ID_HEADER, "anonymous");
return chain.filter(exchange)
.contextWrite(context -> context
.put(ReactorContextKeys.TRACE_ID, traceId)
.put(ReactorContextKeys.USER_ID, userId));
}
private String headerOrDefault(ServerHttpRequest request, String headerName, String defaultValue) {
String value = request.getHeaders().getFirst(headerName);
return value == null || value.isBlank() ? defaultValue : value;
}
}
@Service
public class ContextPropagationService {
private static final ThreadLocal<String> TRACE_ID_HOLDER = new ThreadLocal<>();
private static final ThreadLocal<String> USER_ID_HOLDER = new ThreadLocal<>();
public Mono<ContextDemoResponse> inspectContext() {
return Mono.deferContextual(contextView -> {
ContextHopSnapshot beforeSwitch = snapshot("before-publishOn", contextView);
return Mono.just(beforeSwitch)
.publishOn(Schedulers.boundedElastic())
.flatMap(ignored -> readAfterThreadSwitch(beforeSwitch));
});
}
private Mono<ContextDemoResponse> readAfterThreadSwitch(ContextHopSnapshot beforeSwitch) {
return Mono.deferContextual(contextView -> Mono.just(new ContextDemoResponse(
"Automatic context propagation is active. Reactor Context values are restored into ThreadLocal even after thread switching.",
beforeSwitch,
snapshot("after-publishOn", contextView)
)));
}
private ContextHopSnapshot snapshot(String stage, ContextView contextView) {
// 是无法拿到,TRACE_ID_HOLDER和USER_ID_HOLDER的。因为切换了线程
return new ContextHopSnapshot(
stage,
Thread.currentThread().getName(),
contextView.get(ReactorContextKeys.TRACE_ID),
contextView.get(ReactorContextKeys.USER_ID),
TRACE_ID_HOLDER.get(),
USER_ID_HOLDER.get()
);
}
}
7.2 添加上自动映射
- 添加上自动转换了逻辑,自动将 Reactor Context 上下文转换到 ThreadLocal 上下文中。
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
<version>1.1.0</version>
</dependency>
@Service
public class ContextPropagationService {
private static final ThreadLocal<String> TRACE_ID_HOLDER = new ThreadLocal<>();
private static final ThreadLocal<String> USER_ID_HOLDER = new ThreadLocal<>();
@PostConstruct
public void init() {
Hooks.enableAutomaticContextPropagation();
ContextRegistry contextRegistry = ContextRegistry.getInstance();
contextRegistry.removeThreadLocalAccessor(ReactorContextKeys.TRACE_ID);
contextRegistry.removeThreadLocalAccessor(ReactorContextKeys.USER_ID);
contextRegistry.registerThreadLocalAccessor(
ReactorContextKeys.USER_ID,
USER_ID_HOLDER::get,
USER_ID_HOLDER::set,
USER_ID_HOLDER::remove
);
contextRegistry.registerThreadLocalAccessor(
ReactorContextKeys.TRACE_ID,
TRACE_ID_HOLDER::get,
TRACE_ID_HOLDER::set,
TRACE_ID_HOLDER::remove
);
}
}
八、核心原理
其实归根到底,我们是在研究,Reactor 中异步编程 Context 上下文,如何跟 ThreadLocal 进行数据交换的逻辑。 要么是 ThreadLocal 转移到 Context , 要么是 Context 转换到 ThreadLocal。
如果要我们自己来设计如何才能真正的做到无缝迁移,其实如果我们能在Reactor 的操作符中每次进行转移,其实就能无缝实现自动数据交换了。其实 Reactor 也提供了这样的能力,下面我们就研究这种方式。
8.1 思路分析
- web 过滤器中,将用户信息,保存到 Context 中。
- 在每个 Reactor 每个操作符中,从 Context中恢复到 ThreadLocal 中。
- 当每个操作符执行完成后,在从 ThreadLocal 中删除。
8.2 过滤器中维护 Reactor Context
请看代码注释,写的很清楚了。RequestContextRegistry 看不懂没关系,马上就说。
@Component
public class RequestContextWebFilter implements WebFilter {
private static final String TRACE_ID_HEADER = "X-Trace-Id";
private static final String USER_ID_HEADER = "X-User-Id";
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 从请求头中拿到用户信息
String traceId = headerOrDefault(request, TRACE_ID_HEADER, "trace-" + UUID.randomUUID());
String userId = headerOrDefault(request, USER_ID_HEADER, "anonymous");
// 生成模拟的上下文需要的用户对象
RequestContextSnapshot requestContext = new RequestContextSnapshot(traceId, userId);
// 通过contextWrite,添加到 Reactor Context 中。RequestContextRegistry 封装了统一的操作。属于无状态类,所以是线程安全的。
return chain.filter(exchange)
.contextWrite(context -> RequestContextRegistry.get().storeRequestContext(context, requestContext));
}
private String headerOrDefault(ServerHttpRequest request, String headerName, String defaultValue) {
String value = request.getHeaders().getFirst(headerName);
return value == null || value.isBlank() ? defaultValue : value;
}
}
8.3 注册统一的Reactor操作符处理
代码比较长,主要看 RequestContextRegistry#enablePropagationHook 方法,信号处理的方法: onNext/onError/onComplete中封装转换的逻辑,其他方法都透传即可,这是 api,没必要死记硬背。会用即可。
public final class RequestContextRegistry {
private static final String HOOK_KEY = "react-ctx-request-context";
private static volatile boolean hookEnabled = false;
private static final RequestContextTracer TRACER = new RequestContextTracer();
/**
* 启用 Reactor 全局 hook。
*
* <p>实现机制:
* <p>对于每一个 Reactor operator,Reactor 都会把原始 subscriber 提升为我们自己的包装 subscriber。
* 这个包装层不会改变业务逻辑,只负责在 {@code onNext}、{@code onError}、{@code onComplete}
* 被调用之前,先把当前请求上下文恢复到 {@link ThreadLocal} 中。
*
* <p>为什么这样可以工作:
* <p>下游 subscriber 会一直携带 Reactor 的 {@link Context},可以通过
* {@link CoreSubscriber#currentContext()} 取到。这个上下文能够跨异步边界传播。
* 因此我们只需要在 signal 回调前,从当前 Reactor Context 中取出请求数据,恢复到
* {@code ThreadLocal},执行原始回调,然后再恢复旧的 ThreadLocal 状态即可。
*/
public static synchronized void enablePropagationHook() {
if (!hookEnabled) {
Hooks.onEachOperator(
HOOK_KEY,
Operators.lift(
(scannable, subscriber) ->
new CoreSubscriber<Object>() {
/**
* 订阅阶段。
*
* <p>当前 demo 在这个阶段不需要恢复上下文,
* 直接把订阅事件透传给原始 subscriber 即可。
*/
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
}
/**
* 数据信号。
*
* <p>在把数据传递给下游逻辑之前,先把 Reactor {@link Context}
* 里的请求上下文恢复到 {@link ThreadLocal}。
* 这是整个实现的关键步骤,它保证旧的命令式代码依然可以通过
* {@link RequestContextThreadLocalHolder} 读取上下文。
*/
@Override
public void onNext(Object value) {
TRACER.runWithContext(
subscriber.currentContext(),
() -> {
subscriber.onNext(value);
return null;
});
}
/**
* 异常信号。
*
* <p>异常处理逻辑通常也会读取 traceId、userId 这类请求上下文,
* 所以这里和正常数据信号一样,也需要先恢复 ThreadLocal。
*/
@Override
public void onError(Throwable throwable) {
TRACER.runWithContext(
subscriber.currentContext(),
() -> {
subscriber.onError(throwable);
return null;
});
}
/**
* 完成信号。
*
* <p>完成回调里也可能访问请求级 ThreadLocal,
* 因此这里同样保持一致,先恢复上下文再调用下游。
*/
@Override
public void onComplete() {
TRACER.runWithContext(
subscriber.currentContext(),
() -> {
subscriber.onComplete();
return null;
});
}
/**
* 原样暴露下游 subscriber 的 Reactor Context。
*
* <p>这一点很重要:包装 subscriber 自己不维护独立的 Context,
* 而是始终委托给原始 subscriber。
* 这样上游放进去的请求上下文才能在整条链路里持续可见。
*/
@Override
public Context currentContext() {
return subscriber.currentContext();
}
}));
hookEnabled = true;
}
}
}
8.4 真正的转换逻辑-RequestContextTracer
主要负责两件事:
- 从 Reactor {@link ContextView} 中读取当前请求上下文;
- 在执行指定逻辑前,把请求上下文恢复到 {@link ThreadLocal},执行完成后再恢复旧值。
这里逻辑很简单,其实就是 ThreadLocal 的多线程上下文切换的原理。
public class RequestContextTracer {
private static final String CONTEXT_KEY = "request-context";
/**
* 从 Reactor Context 中读取请求上下文。
*
* <p>如果当前链路里没有放入请求上下文,则返回一个空对象,
* 而不是返回 {@code null}。这样调用方在读取时可以少做一层判空处理。
*/
public RequestContextSnapshot getRequestContextFromContextView(ContextView reactorCtx) {
return reactorCtx.getOrDefault(CONTEXT_KEY, RequestContextSnapshot.empty());
}
// 从Reactor Context读取上下文,
public <TResp> TResp runWithContext(ContextView reactorCtx, Supplier<TResp> inner) {
RequestContextSnapshot requestContext = getRequestContextFromContextView(reactorCtx);
// 先记录现在的上下文。
RequestContextSnapshot previous = RequestContextThreadLocalHolder.get();
// 将 Reactor Context 放到当前的ThreadLocal 中
RequestContextThreadLocalHolder.set(requestContext);
try {
// 然后执行操作
return inner.get();
} finally {
// 执行完后将原来的上下文,重新恢复进去
RequestContextThreadLocalHolder.restore(previous);
}
}
}
package cn.duoduo.reactctx.context;
public final class RequestContextThreadLocalHolder {
private static final ThreadLocal<RequestContextSnapshot> REQUEST_CONTEXT_HOLDER = new ThreadLocal<>();
private RequestContextThreadLocalHolder() {
}
public static RequestContextSnapshot get() {
return REQUEST_CONTEXT_HOLDER.get();
}
public static void set(RequestContextSnapshot requestContext) {
if (requestContext == null) {
REQUEST_CONTEXT_HOLDER.remove();
return;
}
REQUEST_CONTEXT_HOLDER.set(requestContext);
}
public static void restore(RequestContextSnapshot requestContext) {
set(requestContext);
}
public static String getTraceId() {
RequestContextSnapshot requestContext = get();
return requestContext == null ? null : requestContext.traceId();
}
public static String getUserId() {
RequestContextSnapshot requestContext = get();
return requestContext == null ? null : requestContext.userId();
}
public static void clearAll() {
REQUEST_CONTEXT_HOLDER.remove();
}
}
总结
Reactor Context 是响应式架构里的“隐形传送门”。它虽然看起来有点反直觉(自底向上),但却是解决异步链路数据丢失的唯一正解。
作为开发者,我们要做的就是:放下对 ThreadLocal 的执念,拥抱响应式流的生命周期。 觉得有用?点个赞,咱们评论区讨论一下你被响应式坑过的瞬间。