返回博客列表

解决异步挑战:Reactor Context 实现响应式上下文传递

2026-04-07
7 min read

> 最近在研究 的代码,从中发现 Reactor 编程中,链路追踪的用法,如果对这块只是看不懂,那么多半看不懂 AgentScope 的用法。这块的只是在这里补充下 在 Spring MVC 时代, 是我们的“随身口袋”,随手存个 或 简直不要太爽。 但当你踏入 WebFlux 或 Project Reactor 的地盘,你会发现这里的线程就像渣男,说换就换。一个请求可能在 A 线程接客,B...

最近在研究 AgentScope 的代码,从中发现 Reactor 编程中,链路追踪的用法,如果对这块只是看不懂,那么多半看不懂 AgentScope 的用法。这块的只是在这里补充下

在 Spring MVC 时代,ThreadLocal 是我们的“随身口袋”,随手存个 UserIdLogId 简直不要太爽。

但当你踏入 WebFlux 或 Project Reactor 的地盘,你会发现这里的线程就像渣男,说换就换。一个请求可能在 A 线程接客,B 线程查库,C 线程给响应。这时候,基于线程绑定的 ThreadLocal 瞬间就变成了“薛定谔的变量”——你永远不知道这一行代码跑完,下一个操作符里数据还在不在。

为了救火,Reactor 给我们准备了 Context


一、 核心痛点:为什么 ThreadLocal 必死?

我们先看一眼惨案现场:

java
// 这是一个必挂的写法
ThreadLocal<String> traceId = new ThreadLocal<>();
traceId.set("UUID-999");

Mono.just("Hello")
    .subscribeOn(Schedulers.boundedElastic()) // 切换线程了
    .map(data -> {
        // 这里的 traceId.get() 必拿 null
        log.info("Trace: {}", traceId.get()); 
        return data + " World";
    })
    .subscribe();

真相: 响应式编程的本质是异步流水线。操作符(Operator)不关心线程,它们只关心数据。线程只是被招来干活的“临时工”,干完这步就撤了,指望 ThreadLocal 跨步传递数据,无异于刻舟求剑。


二、 Reactor Context 的“逆流”设计

Reactor Context 不是存放在线程里的,它是存在 Subscriber(订阅者) 里的。

这里的逻辑非常硬核:

  1. 装配期: 你写 .contextWrite() 时,其实什么都没发生,只是在链条上挂了个钩子。
  2. 订阅期: 只有当你执行 .subscribe() 时,一个“订阅信号”会从下往上传递。
  3. 注入: 当信号路过 .contextWrite() 时,Context 数据就像被吸附在信号上一样,一路带到了流的最顶端。

重点: 这种“自底向上”的设计决定了,上游的操作符只能看到它下游定义的 Context。


三、 代码实力:读写姿势与实战

1. 基础读写:如何优雅地“带货”

java
public Mono<String> processData() {
    // 使用 deferContextual 延迟获取上下文
    return Mono.deferContextual(ctx -> {
        String correlationId = ctx.get("X-Correlation-ID");
        return Mono.just("Processed with ID: " + correlationId);
    });
}

@Test
public void testContextPropagation() {
    processData()
        .map(String::toUpperCase)
        // 写入 Context,注意它在下方,但能作用于上方
        .contextWrite(Context.of("X-Correlation-ID", "REQ-12345"))
        // ⚠️⚠️ 注意 下面是无法获取 “X-Correlation-ID” 的,因为Context,只能作用于上方。
        .subscribe(System.out::println);
}

   // 如果想要下面也能拿到就必须这样写
    @Test
    public void testContext2() {
        getSecureData()
                .contextWrite(Context.of("user", "Admin")) // 写入 Context
                .flatMap(data -> Mono.deferContextual(ctx -> {
                    // 在这里又可以掏口袋了,注意拿到的是下面定义的 Admin X,而不是上面的Admin
                    String user = ctx.getOrDefault("user", "unknown");
                    // Secure data for: Admin processed by Admin X
                    return Mono.just(data + " processed by " + user);
                }))
                
                .contextWrite(Context.of("user", "Admin X"))
                .subscribe(System.out::println);
    }

2. 结合 Spring Security(最强实战)

在 WebFlux 中,获取当前用户不需要传参,直接从 Context 里掏:

java
public Mono<Void> updateBusiness() {
    return ReactiveSecurityContextHolder.getContext()
        .map(SecurityContext::getAuthentication)
        .flatMap(auth -> {
            log.info("当前操作人: {}", auth.getName());
            return doBusiness(auth.getPrincipal());
        });
}

原理: Spring Security 的拦截器会在流的最下游悄悄执行 .contextWrite(),把认证信息塞进去,供你全局调用。


四、 高级进阶:如何跟 Log4j2 (MDC) 这种“老顽固”对接?

很多老旧库依然死磕 ThreadLocal(比如日志 MDC)。在 Reactor 里,我们可以利用 handle 或自定义 Hook 来做一个“时空转换”。

推荐方案:使用 Micrometer Context Propagation 库 这是目前工业级的标准做法:

java

// 1. 初始化(全局执行一次)
Hooks.enableAutomaticContextPropagation();
ContextRegistry.getInstance()
    .registerThreadLocalAccessor("traceId", 
        MDC::get, MDC::put, MDC::remove);

// 2. 在 Reactor 链条中开启自动同步
Mono.just("Task")
    .handle((val, sink) -> {
        // 这里可以直接打日志,MDC 里的 traceId 已经被自动还原了!
        log.info("日志里现在有 traceId 了");
        sink.next(val);
    })
    .contextWrite(Context.of("traceId", "T-800"))
    .contextCapture(); // 关键:捕捉当前环境到 Context

五、生产环境下的应用

5.1 注册“搬运规则” (Registry)

java
@Configuration
public class ContextPropagationConfig {
    @PostConstruct
    public void init() {
        // 注册 MDC 的搬运规则
        ContextRegistry.getInstance()
            .registerThreadLocalAccessor(
                "TRACE_ID",                // Context 中的 Key
                MDC::get,                  // 如何从 ThreadLocal 读
                val -> MDC.put("traceId", val), // 如何写回 ThreadLocal
                () -> MDC.remove("traceId")     // 如何清理
            );
    }
}

5.2 开启“全自动外挂” (Global Hook)

在启动类里一行代码搞定,让 Reactor 在切换线程时自动同步。

java
public static void main(String[] args) {
    Hooks.enableAutomaticContextPropagation(); 
    SpringApplication.run(DemoApplication.class, args);
}

5.3 自动搬运

java
@Component
@Order(Ordered.HIGHEST_PRECEDENCE) // 优先级最高,越早抓取越好
public class ContextSnapshotFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        // 1. 此时是在 HTTP 线程,ThreadLocal 还在
        // 2. 这里的 contextCapture() 会把 ThreadLocal 存入 Reactor Context
        return chain.filter(exchange)
                    .contextCapture(); 
    }
}

5.4 注意点

很多人觉得开了 enableAutomaticContextPropagation() 就万事大吉了,其实不然: Automatic Propagation 负责的是:数据已经在 Context 里了,如何在 flatMap 切换线程时帮你自动搬运。 contextCapture() 负责的是:数据还在 ThreadLocal 里,如何把它“第一手”抓进响应式世界。


六、 使用指南

  1. 不可变性警告: ContextImmutable 的。ctx.put(k, v) 不会改变原对象,它会返回一个全新的 Context。记得像用 String 一样用它。
  2. 位置决定生死: 记住口诀——下层写,上层读。如果你把 contextWrite 放在了链条的最顶端,那么它下面的 flatMap 永远拿不到值。
  3. 别塞大对象: Context 会随着整个订阅生命周期常驻内存。别把整个数据库查询结果存进去,那不是上下文,那是 OOM 的温床。
java
final class Context0 implements CoreContext {

	static final Context0 INSTANCE = new Context0();

	@Override
	public Context put(Object key, Object value) {
		Objects.requireNonNull(key, "key");
		Objects.requireNonNull(value, "value");
		return new Context1(key, value);
	}
}

七、简单的Demo案例

7.1 线程切换导致无法获取 ThreadLocal 中的数据

  • RequestContextWebFilter 将上下文通过contextWrite写入到 Reactor Context 中
  • ContextPropagationService中只能获取 Reactor Context 信息,无法从 ThreadLocal 中获取,因为线程变了
java

@Component
public class RequestContextWebFilter implements WebFilter {

    private static final String TRACE_ID_HEADER = "X-Trace-Id";
    private static final String USER_ID_HEADER = "X-User-Id";

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String traceId = headerOrDefault(request, TRACE_ID_HEADER, "trace-" + UUID.randomUUID());
        String userId = headerOrDefault(request, USER_ID_HEADER, "anonymous");

        return chain.filter(exchange)
                .contextWrite(context -> context
                        .put(ReactorContextKeys.TRACE_ID, traceId)
                        .put(ReactorContextKeys.USER_ID, userId));
    }

    private String headerOrDefault(ServerHttpRequest request, String headerName, String defaultValue) {
        String value = request.getHeaders().getFirst(headerName);
        return value == null || value.isBlank() ? defaultValue : value;
    }
}

@Service
public class ContextPropagationService {

    private static final ThreadLocal<String> TRACE_ID_HOLDER = new ThreadLocal<>();
    private static final ThreadLocal<String> USER_ID_HOLDER = new ThreadLocal<>();

    public Mono<ContextDemoResponse> inspectContext() {
        return Mono.deferContextual(contextView -> {
            ContextHopSnapshot beforeSwitch = snapshot("before-publishOn", contextView);

            return Mono.just(beforeSwitch)
                    .publishOn(Schedulers.boundedElastic())
                    .flatMap(ignored -> readAfterThreadSwitch(beforeSwitch));
        });
    }

    private Mono<ContextDemoResponse> readAfterThreadSwitch(ContextHopSnapshot beforeSwitch) {
        return Mono.deferContextual(contextView -> Mono.just(new ContextDemoResponse(
                "Automatic context propagation is active. Reactor Context values are restored into ThreadLocal even after thread switching.",
                beforeSwitch,
                snapshot("after-publishOn", contextView)
        )));
    }

    private ContextHopSnapshot snapshot(String stage, ContextView contextView) {
        // 是无法拿到,TRACE_ID_HOLDER和USER_ID_HOLDER的。因为切换了线程
        return new ContextHopSnapshot(
                stage,
                Thread.currentThread().getName(),
                contextView.get(ReactorContextKeys.TRACE_ID),
                contextView.get(ReactorContextKeys.USER_ID),
                TRACE_ID_HOLDER.get(),
                USER_ID_HOLDER.get()
        );
    }
}

7.2 添加上自动映射

  • 添加上自动转换了逻辑,自动将 Reactor Context 上下文转换到 ThreadLocal 上下文中。
xml
   <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>context-propagation</artifactId>
            <version>1.1.0</version>
        </dependency>
java
@Service
public class ContextPropagationService {

    private static final ThreadLocal<String> TRACE_ID_HOLDER = new ThreadLocal<>();
    private static final ThreadLocal<String> USER_ID_HOLDER = new ThreadLocal<>();

    @PostConstruct
    public void init() {
        Hooks.enableAutomaticContextPropagation();
        ContextRegistry contextRegistry = ContextRegistry.getInstance();
        contextRegistry.removeThreadLocalAccessor(ReactorContextKeys.TRACE_ID);
        contextRegistry.removeThreadLocalAccessor(ReactorContextKeys.USER_ID);
        contextRegistry.registerThreadLocalAccessor(
                ReactorContextKeys.USER_ID,
                USER_ID_HOLDER::get,
                USER_ID_HOLDER::set,
                USER_ID_HOLDER::remove
        );
        contextRegistry.registerThreadLocalAccessor(
                ReactorContextKeys.TRACE_ID,
                TRACE_ID_HOLDER::get,
                TRACE_ID_HOLDER::set,
                TRACE_ID_HOLDER::remove
        );

    }
}

八、核心原理

其实归根到底,我们是在研究,Reactor 中异步编程 Context 上下文,如何跟 ThreadLocal 进行数据交换的逻辑。 要么是 ThreadLocal 转移到 Context , 要么是 Context 转换到 ThreadLocal。

如果要我们自己来设计如何才能真正的做到无缝迁移,其实如果我们能在Reactor 的操作符中每次进行转移,其实就能无缝实现自动数据交换了。其实 Reactor 也提供了这样的能力,下面我们就研究这种方式。

8.1 思路分析

  • web 过滤器中,将用户信息,保存到 Context 中。
  • 在每个 Reactor 每个操作符中,从 Context中恢复到 ThreadLocal 中。
  • 当每个操作符执行完成后,在从 ThreadLocal 中删除。

8.2 过滤器中维护 Reactor Context

请看代码注释,写的很清楚了。RequestContextRegistry 看不懂没关系,马上就说。

java
@Component
public class RequestContextWebFilter implements WebFilter {

    private static final String TRACE_ID_HEADER = "X-Trace-Id";
    private static final String USER_ID_HEADER = "X-User-Id";

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        // 从请求头中拿到用户信息
        String traceId = headerOrDefault(request, TRACE_ID_HEADER, "trace-" + UUID.randomUUID());
        String userId = headerOrDefault(request, USER_ID_HEADER, "anonymous");
        // 生成模拟的上下文需要的用户对象
        RequestContextSnapshot requestContext = new RequestContextSnapshot(traceId, userId);
        // 通过contextWrite,添加到 Reactor Context 中。RequestContextRegistry 封装了统一的操作。属于无状态类,所以是线程安全的。
        return chain.filter(exchange)
                .contextWrite(context -> RequestContextRegistry.get().storeRequestContext(context, requestContext));
    }

    private String headerOrDefault(ServerHttpRequest request, String headerName, String defaultValue) {
        String value = request.getHeaders().getFirst(headerName);
        return value == null || value.isBlank() ? defaultValue : value;
    }
}

8.3 注册统一的Reactor操作符处理

代码比较长,主要看 RequestContextRegistry#enablePropagationHook 方法,信号处理的方法: onNext/onError/onComplete中封装转换的逻辑,其他方法都透传即可,这是 api,没必要死记硬背。会用即可。

java
public final class RequestContextRegistry {
 private static final String HOOK_KEY = "react-ctx-request-context";

 private static volatile boolean hookEnabled = false;

 private static final RequestContextTracer TRACER = new RequestContextTracer();

/**
     * 启用 Reactor 全局 hook。
     *
     * <p>实现机制:
     * <p>对于每一个 Reactor operator,Reactor 都会把原始 subscriber 提升为我们自己的包装 subscriber。
     * 这个包装层不会改变业务逻辑,只负责在 {@code onNext}、{@code onError}、{@code onComplete}
     * 被调用之前,先把当前请求上下文恢复到 {@link ThreadLocal} 中。
     *
     * <p>为什么这样可以工作:
     * <p>下游 subscriber 会一直携带 Reactor 的 {@link Context},可以通过
     * {@link CoreSubscriber#currentContext()} 取到。这个上下文能够跨异步边界传播。
     * 因此我们只需要在 signal 回调前,从当前 Reactor Context 中取出请求数据,恢复到
     * {@code ThreadLocal},执行原始回调,然后再恢复旧的 ThreadLocal 状态即可。
     */
    public static synchronized void enablePropagationHook() {
        if (!hookEnabled) {
            Hooks.onEachOperator(
                    HOOK_KEY,
                    Operators.lift(
                            (scannable, subscriber) ->
                                    new CoreSubscriber<Object>() {
                                        /**
                                         * 订阅阶段。
                                         *
                                         * <p>当前 demo 在这个阶段不需要恢复上下文,
                                         * 直接把订阅事件透传给原始 subscriber 即可。
                                         */
                                        @Override
                                        public void onSubscribe(Subscription subscription) {
                                            subscriber.onSubscribe(subscription);
                                        }

                                        /**
                                         * 数据信号。
                                         *
                                         * <p>在把数据传递给下游逻辑之前,先把 Reactor {@link Context}
                                         * 里的请求上下文恢复到 {@link ThreadLocal}。
                                         * 这是整个实现的关键步骤,它保证旧的命令式代码依然可以通过
                                         * {@link RequestContextThreadLocalHolder} 读取上下文。
                                         */
                                        @Override
                                        public void onNext(Object value) {
                                            TRACER.runWithContext(
                                                    subscriber.currentContext(),
                                                    () -> {
                                                        subscriber.onNext(value);
                                                        return null;
                                                    });
                                        }

                                        /**
                                         * 异常信号。
                                         *
                                         * <p>异常处理逻辑通常也会读取 traceId、userId 这类请求上下文,
                                         * 所以这里和正常数据信号一样,也需要先恢复 ThreadLocal。
                                         */
                                        @Override
                                        public void onError(Throwable throwable) {
                                            TRACER.runWithContext(
                                                    subscriber.currentContext(),
                                                    () -> {
                                                        subscriber.onError(throwable);
                                                        return null;
                                                    });
                                        }

                                        /**
                                         * 完成信号。
                                         *
                                         * <p>完成回调里也可能访问请求级 ThreadLocal,
                                         * 因此这里同样保持一致,先恢复上下文再调用下游。
                                         */
                                        @Override
                                        public void onComplete() {
                                            TRACER.runWithContext(
                                                    subscriber.currentContext(),
                                                    () -> {
                                                        subscriber.onComplete();
                                                        return null;
                                                    });
                                        }

                                        /**
                                         * 原样暴露下游 subscriber 的 Reactor Context。
                                         *
                                         * <p>这一点很重要:包装 subscriber 自己不维护独立的 Context,
                                         * 而是始终委托给原始 subscriber。
                                         * 这样上游放进去的请求上下文才能在整条链路里持续可见。
                                         */
                                        @Override
                                        public Context currentContext() {
                                            return subscriber.currentContext();
                                        }
                                    }));
            hookEnabled = true;
        }
    }
}

8.4 真正的转换逻辑-RequestContextTracer

主要负责两件事:

  1. 从 Reactor {@link ContextView} 中读取当前请求上下文;
  2. 在执行指定逻辑前,把请求上下文恢复到 {@link ThreadLocal},执行完成后再恢复旧值。

这里逻辑很简单,其实就是 ThreadLocal 的多线程上下文切换的原理。

java
public class RequestContextTracer {

    private static final String CONTEXT_KEY = "request-context";

    /**
     * 从 Reactor Context 中读取请求上下文。
     *
     * <p>如果当前链路里没有放入请求上下文,则返回一个空对象,
     * 而不是返回 {@code null}。这样调用方在读取时可以少做一层判空处理。
     */
   public RequestContextSnapshot getRequestContextFromContextView(ContextView reactorCtx) {
        return reactorCtx.getOrDefault(CONTEXT_KEY, RequestContextSnapshot.empty());
    }

    // 从Reactor Context读取上下文,
    public <TResp> TResp runWithContext(ContextView reactorCtx, Supplier<TResp> inner) {
        RequestContextSnapshot requestContext = getRequestContextFromContextView(reactorCtx);
        // 先记录现在的上下文。
        RequestContextSnapshot previous = RequestContextThreadLocalHolder.get();
        // 将 Reactor Context 放到当前的ThreadLocal 中
        RequestContextThreadLocalHolder.set(requestContext);
        try {
            // 然后执行操作
            return inner.get();
        } finally {
            // 执行完后将原来的上下文,重新恢复进去
            RequestContextThreadLocalHolder.restore(previous);
        }
    }
}

java
package cn.duoduo.reactctx.context;
public final class RequestContextThreadLocalHolder {
    private static final ThreadLocal<RequestContextSnapshot> REQUEST_CONTEXT_HOLDER = new ThreadLocal<>();
    private RequestContextThreadLocalHolder() {
    }
    public static RequestContextSnapshot get() {
        return REQUEST_CONTEXT_HOLDER.get();
    }
    public static void set(RequestContextSnapshot requestContext) {
        if (requestContext == null) {
            REQUEST_CONTEXT_HOLDER.remove();
            return;
        }
        REQUEST_CONTEXT_HOLDER.set(requestContext);
    }
    public static void restore(RequestContextSnapshot requestContext) {
        set(requestContext);
    }
    public static String getTraceId() {
        RequestContextSnapshot requestContext = get();
        return requestContext == null ? null : requestContext.traceId();
    }
    public static String getUserId() {
        RequestContextSnapshot requestContext = get();
        return requestContext == null ? null : requestContext.userId();
    }
    public static void clearAll() {
        REQUEST_CONTEXT_HOLDER.remove();
    }
}

总结

Reactor Context 是响应式架构里的“隐形传送门”。它虽然看起来有点反直觉(自底向上),但却是解决异步链路数据丢失的唯一正解。

作为开发者,我们要做的就是:放下对 ThreadLocal 的执念,拥抱响应式流的生命周期。 觉得有用?点个赞,咱们评论区讨论一下你被响应式坑过的瞬间。

返回博客列表
最后更新于 2026-04-07
想法或问题?在 GitHub Issue 下方参与讨论
去评论