上一篇 下一篇 回到顶部 目录 返回首页
目录

Spring WebFlux 从入门到精通:响应式编程实战全指南

发表于
更新于
2 73.1~94.0 分钟 32906

前言

在传统的 Spring MVC 架构中,每个请求都会绑定一个线程。当请求量激增时,线程池耗尽,应用开始排队等待。这就是为什么在高并发、长耗时场景(如流式 API、SSE、WebSocket、数据库连接池打满)下,传统 Servlet 模型的扩展性会遇到瓶颈。

Spring WebFlux 是 Spring Framework 5.0 引入的全新响应式 Web 框架。它的核心思想是:用少量线程处理大量并发请求。不是通过多线程并行处理,而是通过非阻塞异步回调让线程在等待 I/O 时不被占用,从而可以处理下一个请求。

本文从响应式编程基础开始,逐步深入到 WebFlux 的两种编程模型(注解 vs 函数式)、WebClient、SSE、WebSocket、数据库响应式访问、测试和调优,帮助你从零到精通。


第一部分:响应式编程基础

1.1 什么是响应式编程

响应式编程(Reactive Programming)是一种基于数据流变化传播的编程范式。简单说就是:当数据源发生变化时,所有依赖它的地方会自动收到通知并作出反应。

对比命令式编程:

// 命令式:同步阻塞,等待结果
String result = service.getData();
System.out.println(result);  // 阻塞直到数据返回

// 响应式:订阅后异步处理
Mono<String> result = service.getData();
result.subscribe(
    data -> System.out.println(data),   // 数据到达时执行
    error -> System.err.println(error)  // 错误时执行
);
// 这里的代码立即执行,不等数据返回

1.2 Reactive Streams 规范

响应式编程在 Java 生态中有统一标准 —— Reactive Streams 规范(已被纳入 JDK 9 java.util.concurrent.Flow)。它定义了四个核心接口:

接口

说明

Publisher<T>

数据发布者

Subscriber<T>

数据订阅者

Subscription

订阅关系,控制请求速率(背压)

Processor<T, R>

既是发布者又是订阅者(转换用)

Spring WebFlux 基于 Project Reactor 实现,它完全遵守 Reactive Streams 规范。

1.3 Project Reactor:Mono 和 Flux

Project Reactor 是 Spring WebFlux 的底层响应式库,提供了两个核心类型:

类型

含义

类比

Mono<T>

0 或 1 个元素

Optional<T> / CompletableFuture<T>

Flux<T>

0 到 N 个元素

Stream<T> / List<T>

// Mono:单个值或空
Mono<String> hello = Mono.just("Hello");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("Oops"));

// Flux:多个值
Flux<String> colors = Flux.just("red", "green", "blue");
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Long> ticks = Flux.interval(Duration.ofSeconds(1));  // 每秒发一个

1.4 背压(Backpressure)

背压是响应式编程最重要的概念之一。当数据生产速度大于消费速度时,订阅者可以告诉发布者"慢一点",防止内存溢出。

Flux.range(1, 1000)
    .log()
    .subscribe(
        data -> {
            // 处理数据
        },
        error -> {},
        () -> {},
        subscription -> subscription.request(10)  // 每次只请求10个
    );

实际开发中:大多数场景你不需要手动管理背压。WebFlux 的底层(Netty、R2DBC 等)会自动处理。但当你自定义 Publisher 时,需要考虑背压。

1.5 冷流 vs 热流

类型

特点

示例

冷流(Cold)

每个订阅者独立触发数据源,各自收到完整数据

数据库查询、HTTP 请求

热流(Hot)

数据源独立运行,所有订阅者共享同一个数据流,晚加入的订阅者错过之前的数据

SSE 推送、WebSocket、事件总线

// 冷流:每次订阅都重新执行
Flux<Integer> cold = Flux.range(1, 3).doOnNext(i -> System.out.println("Generated: " + i));
cold.subscribe();  // 打印 Generated: 1, 2, 3
cold.subscribe();  // 再次打印 Generated: 1, 2, 3

// 热流:数据源独立运行
Flux<Long> hot = Flux.interval(Duration.ofSeconds(1))
    .publish()
    .autoConnect(0);  // 自动开始,不依赖订阅者
Thread.sleep(3000);   // 等待3秒
hot.subscribe(i -> System.out.println("Subscriber A: " + i));  // 错过前3个

第二部分:WebFlux 入门

2.1 依赖安装

以 Spring Boot 3.x + Java 17 为例:

<dependencies>
    <!-- Spring Boot WebFlux Starter(内嵌 Netty) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <!-- 测试用 WebClient -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
</dependencies>

关键区别spring-boot-starter-web(Spring MVC,基于 Servlet)vs spring-boot-starter-webflux(响应式,基于 Netty/Undertow/Tomcat 异步)。二者不能同时使用。

2.2 运行容器

WebFlux 默认使用 Netty 作为运行容器,但它也支持其他容器:

容器

配置方式

Netty(默认)

无需配置

Tomcat

排除 Netty,引入 spring-boot-starter-tomcat

Undertow

排除 Netty,引入 spring-boot-starter-undertow

Jetty

排除 Netty,引入 spring-boot-starter-jetty

<!-- 使用 Tomcat 运行 WebFlux -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-reactor-netty</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

2.3 第一个响应式 Controller

WebFlux 支持两种编程模型:

  1. 注解驱动(Annotation-based) —— 和 Spring MVC 一样的 @RestController

  2. 函数式(Functional) —— 使用 RouterFunctionHandlerFunction

先从注解方式开始:

@RestController
@RequestMapping("/api/users")
public class UserController {

    private final UserService userService;

    public UserController(UserService userService) {
        this.userService = userService;
    }

    @GetMapping
    public Flux<User> getAllUsers() {
        return userService.findAll();  // 返回 Flux<User>
    }

    @GetMapping("/{id}")
    public Mono<User> getUser(@PathVariable String id) {
        return userService.findById(id);  // 返回 Mono<User>
    }

    @PostMapping
    public Mono<User> createUser(@RequestBody Mono<User> user) {
        return user.flatMap(userService::save);  // 接收 Mono<User>,返回 Mono<User>
    }

    @PutMapping("/{id}")
    public Mono<User> updateUser(@PathVariable String id, @RequestBody Mono<User> user) {
        return user.flatMap(u -> userService.update(id, u));
    }

    @DeleteMapping("/{id}")
    public Mono<Void> deleteUser(@PathVariable String id) {
        return userService.delete(id);  // 返回 Mono<Void>,响应 200 或 204
    }
}

与普通 Spring MVC 的关键区别

  • 返回值不是 User / List<User>,而是 Mono<User> / Flux<User>

  • Spring WebFlux 不会阻塞等待响应体完成,而是等 Mono / Flux 发出数据时才写入响应

  • @RequestBody 可以接收 Mono<T>(单个对象异步反序列化)

  • Mono<Void> 用于不需要响应体的场景(如 DELETE)

2.4 响应式 Service 层

@Service
public class UserService {

    // 实际项目中这里会调用响应式数据库访问(R2DBC)或响应式缓存(Redis Lettuce)
    private final Map<String, User> store = new ConcurrentHashMap<>();

    public Flux<User> findAll() {
        return Flux.fromIterable(store.values());
    }

    public Mono<User> findById(String id) {
        return Mono.justOrEmpty(store.get(id));
    }

    public Mono<User> save(User user) {
        String id = UUID.randomUUID().toString();
        User saved = user.withId(id);
        store.put(id, saved);
        return Mono.just(saved);
    }

    public Mono<User> update(String id, User user) {
        if (!store.containsKey(id)) {
            return Mono.error(new ResponseStatusException(HttpStatus.NOT_FOUND));
        }
        User updated = user.withId(id);
        store.put(id, updated);
        return Mono.just(updated);
    }

    public Mono<Void> delete(String id) {
        store.remove(id);
        return Mono.empty();
    }
}

第三部分:函数式端点(RouterFunction + HandlerFunction)

3.1 为什么需要函数式端点

注解模型简单直观,但在某些场景下函数式端点更有优势:

  • 更细粒度的控制:请求路由、过滤、错误处理都是函数,可以灵活组合

  • 更轻量:不需要反射扫描注解

  • 更好的响应式组合:路由本身就是响应式的

  • 适合 API Gateway、代理等中间层场景

3.2 基本概念

函数式端点由两个核心接口组成:

接口

作用

类比

HandlerFunction<T>

处理请求,返回 Mono<ServerResponse>

@RestController 的方法

RouterFunction<T>

根据请求条件路由到 HandlerFunction

@RequestMapping

// HandlerFunction:处理请求
HandlerFunction<ServerResponse> handler = request -> {
    String name = request.pathVariable("name");
    return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(new User(name));
};

// RouterFunction:路由到 Handler
RouterFunction<ServerResponse> route = RouterFunctions.route()
        .GET("/hello/{name}", handler)
        .build();

3.3 完整的函数式路由

使用 RouterFunction.Builder 构建完整的路由表:

@Configuration
public class UserRouter {

    private final UserHandler userHandler;

    public UserRouter(UserHandler userHandler) {
        this.userHandler = userHandler;
    }

    @Bean
    public RouterFunction<ServerResponse> userRoutes() {
        return RouterFunctions.route()
                .path("/api/users", builder -> builder
                        .GET("", userHandler::getAllUsers)
                        .GET("/{id}", userHandler::getUserById)
                        .POST("", userHandler::createUser)
                        .PUT("/{id}", userHandler::updateUser)
                        .DELETE("/{id}", userHandler::deleteUser)
                        .GET("/stream", userHandler::streamUsers))
                .build();
    }
}

3.4 Handler 实现

@Component
public class UserHandler {

    private final UserService userService;

    public UserHandler(UserService userService) {
        this.userService = userService;
    }

    public Mono<ServerResponse> getAllUsers(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(userService.findAll(), User.class);
    }

    public Mono<ServerResponse> getUserById(ServerRequest request) {
        String id = request.pathVariable("id");
        return userService.findById(id)
                .flatMap(user -> ServerResponse.ok()
                        .contentType(MediaType.APPLICATION_JSON)
                        .bodyValue(user))
                .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> createUser(ServerRequest request) {
        return request.bodyToMono(User.class)
                .flatMap(userService::save)
                .flatMap(saved -> ServerResponse.status(HttpStatus.CREATED)
                        .contentType(MediaType.APPLICATION_JSON)
                        .bodyValue(saved));
    }

    public Mono<ServerResponse> updateUser(ServerRequest request) {
        String id = request.pathVariable("id");
        return request.bodyToMono(User.class)
                .flatMap(user -> userService.update(id, user))
                .flatMap(updated -> ServerResponse.ok()
                        .contentType(MediaType.APPLICATION_JSON)
                        .bodyValue(updated))
                .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> deleteUser(ServerRequest request) {
        String id = request.pathVariable("id");
        return userService.delete(id)
                .then(ServerResponse.ok().build());
    }

    // SSE 流式推送
    public Mono<ServerResponse> streamUsers(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.TEXT_EVENT_STREAM)
                .body(userService.findAll(), User.class);
    }
}

3.5 路由过滤器

@Bean
public RouterFunction<ServerResponse> filteredRoutes() {
    return RouterFunctions.route()
            .path("/api", builder -> builder
                    .before(request -> {
                        System.out.println("Before: " + request.path());
                        return request;
                    })
                    .after((request, response) -> {
                        System.out.println("After: " + response.statusCode());
                        return response;
                    })
                    .filter((request, next) -> {
                        String auth = request.headers().header("Authorization").orElse("");
                        if (!auth.startsWith("Bearer ")) {
                            return ServerResponse.status(HttpStatus.UNAUTHORIZED).build();
                        }
                        return next.handle(request);
                    })
                    .GET("/users", userHandler::getAllUsers)
                    .POST("/users", userHandler::createUser))
            .build();
}

3.6 多路由组合

RouterFunction 可以组合,适合多模块项目:

@Bean
public RouterFunction<ServerResponse> compositeRouter(
        RouterFunction<ServerResponse> userRouter,
        RouterFunction<ServerResponse> orderRouter,
        RouterFunction<ServerResponse> productRouter) {
    return userRouter
            .and(orderRouter)
            .and(productRouter);
}

第四部分:WebClient —— 响应式 HTTP 客户端

4.1 为什么需要 WebClient

传统 RestTemplate 是同步阻塞的,不适合响应式应用。WebClient 是 WebFlux 提供的非阻塞响应式 HTTP 客户端,完全基于 Mono / Flux

4.2 创建 WebClient

@Configuration
public class WebClientConfig {

    @Bean
    public WebClient webClient() {
        return WebClient.builder()
                .baseUrl("https://api.example.com")
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .defaultHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
                .codecs(configurer -> configurer
                        .defaultCodecs()
                        .maxInMemorySize(10 * 1024 * 1024))  // 10MB
                .build();
    }
}

4.3 GET 请求

@Service
public class ApiService {

    private final WebClient webClient;

    public ApiService(WebClient webClient) {
        this.webClient = webClient;
    }

    // 单个对象
    public Mono<User> getUser(String id) {
        return webClient.get()
                .uri("/users/{id}", id)
                .retrieve()
                .bodyToMono(User.class);
    }

    // 集合
    public Flux<User> getAllUsers() {
        return webClient.get()
                .uri("/users")
                .retrieve()
                .bodyToFlux(User.class);
    }

    // 带查询参数
    public Flux<User> searchUsers(String name, int page) {
        return webClient.get()
                .uri(uriBuilder -> uriBuilder
                        .path("/users")
                        .queryParam("name", name)
                        .queryParam("page", page)
                        .build())
                .retrieve()
                .bodyToFlux(User.class);
    }
}

4.4 POST / PUT / DELETE 请求

// POST
public Mono<User> createUser(User user) {
    return webClient.post()
            .uri("/users")
            .bodyValue(user)
            .retrieve()
            .bodyToMono(User.class);
}

// PUT
public Mono<User> updateUser(String id, User user) {
    return webClient.put()
            .uri("/users/{id}", id)
            .bodyValue(user)
            .retrieve()
            .bodyToMono(User.class);
}

// DELETE(不需要响应体)
public Mono<Void> deleteUser(String id) {
    return webClient.delete()
            .uri("/users/{id}", id)
            .retrieve()
            .bodyToMono(Void.class);
}

4.5 错误处理

WebClient 默认对 4xx / 5xx 状态码抛出异常,需要自定义处理:

public Mono<User> getUserWithErrorHandling(String id) {
    return webClient.get()
            .uri("/users/{id}", id)
            .retrieve()
            .onStatus(HttpStatusCode::is4xxClientError, response ->
                    response.bodyToMono(String.class)
                            .flatMap(body -> Mono.error(new ClientException(
                                    response.statusCode().value(), body))))
            .onStatus(HttpStatusCode::is5xxServerError, response ->
                    response.bodyToMono(String.class)
                            .flatMap(body -> Mono.error(new ServerException(
                                    response.statusCode().value(), body))))
            .bodyToMono(User.class)
            .onErrorResume(ClientException.class, e -> {
                // 4xx 处理
                return Mono.empty();
            })
            .onErrorResume(ServerException.class, e -> {
                // 5xx 处理
                return Mono.empty();
            });
}

4.6 请求拦截器(ExchangeFilterFunction)

统一处理请求头(如 Token 注入):

@Bean
public WebClient webClientWithAuth() {
    return WebClient.builder()
            .baseUrl("https://api.example.com")
            .filter(ExchangeFilterFunction.ofRequestProcessor(
                    clientRequest -> {
                        String token = getCurrentToken();
                        ClientRequest filtered = ClientRequest.from(clientRequest)
                                .header(HttpHeaders.AUTHORIZATION, "Bearer " + token)
                                .build();
                        return Mono.just(filtered);
                    }))
            .build();
}

// 日志过滤器
ExchangeFilterFunction logRequest = ExchangeFilterFunction.ofRequestProcessor(
        clientRequest -> {
            System.out.println("Request: " + clientRequest.method() + " " + clientRequest.url());
            return Mono.just(clientRequest);
        });

ExchangeFilterFunction logResponse = ExchangeFilterFunction.ofResponseProcessor(
        clientResponse -> {
            System.out.println("Response: " + clientResponse.statusCode());
            return Mono.just(clientResponse);
        });

4.7 流式响应(SSE / JSON Stream)

// 接收 SSE 流
public Flux<Event> getEvents() {
    return webClient.get()
            .uri("/events/stream")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(Event.class);
}

// 接收大 JSON 数组(流式反序列化,不全部加载到内存)
public Flux<User> getAllUsersStreaming() {
    return webClient.get()
            .uri("/users")
            .retrieve()
            .bodyToFlux(User.class)  // 边接收边反序列化
            .doOnNext(user -> System.out.println("Received: " + user));
}

4.8 与注解 Controller 配合使用

WebClient 也可以在 Spring MVC 项目中使用(只需引入 spring-boot-starter-webflux):

@RestController
public class AggregationController {

    private final WebClient webClient;

    public AggregationController(WebClient.Builder webClientBuilder) {
        this.webClient = webClientBuilder.baseUrl("https://api.example.com").build();
    }

    @GetMapping("/dashboard")
    public Mono<DashboardData> getDashboard() {
        // 并行发起多个请求,全部完成后聚合
        Mono<User> user = webClient.get().uri("/users/1").retrieve().bodyToMono(User.class);
        Mono<List<Order>> orders = webClient.get().uri("/orders?userId=1").retrieve().bodyToFlux(Order.class).collectList();
        Mono<List<Notification>> notifications = webClient.get().uri("/notifications?userId=1").retrieve().bodyToFlux(Notification.class).collectList();

        return Mono.zip(user, orders, notifications)
                .map(tuple -> new DashboardData(tuple.getT1(), tuple.getT2(), tuple.getT3()));
    }
}

第五部分:Server-Sent Events(SSE)

5.1 什么是 SSE

SSE(Server-Sent Events)是一种服务端主动向客户端推送数据的技术。与 WebSocket 不同,SSE 是单向的(只有服务端推送),基于普通 HTTP 连接,使用 text/event-stream 媒体类型。

适用场景:

  • 实时通知推送

  • 股票/加密货币行情

  • 日志实时查看

  • AI 流式输出(ChatGPT 风格)

5.2 WebFlux 中的 SSE

@RestController
@RequestMapping("/api/events")
public class EventController {

    // 方式一:Flux 直接返回(最简单)
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Event> streamEvents() {
        return Flux.interval(Duration.ofSeconds(1))
                .map(tick -> new Event(System.currentTimeMillis(), "tick-" + tick));
    }

    // 方式二:使用 Sinks 手动推送(适合外部触发)
    private final Sinks.Many<Event> sink = Sinks.many().multicast().onBackpressureBuffer();

    @PostConstruct
    public void init() {
        // 模拟外部事件源
        Flux.interval(Duration.ofSeconds(2))
                .map(tick -> new Event(System.currentTimeMillis(), "event-" + tick))
                .subscribe(event -> sink.tryEmitNext(event));
    }

    @GetMapping(value = "/push", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Event> pushEvents() {
        return sink.asFlux();
    }

    // 方式三:AI 风格流式输出
    @PostMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ChatResponse> chat(@RequestBody ChatRequest request) {
        return aiService.generateStream(request.getPrompt())
                .map(chunk -> new ChatResponse(chunk, false))
                .concatWith(Mono.just(new ChatResponse("[DONE]", true)));
    }
}

5.3 SSE 数据结构

SSE 协议定义的消息格式:

data: {"id":1,"message":"hello"}
id: 1
event: user-message
retry: 3000

WebFlux 的 ServerSentEvent 类可以构建完整的 SSE 消息:

@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Event>> sseWithMetadata() {
    return Flux.interval(Duration.ofSeconds(1))
            .map(tick -> ServerSentEvent.<Event>builder()
                    .id(String.valueOf(tick))
                    .event("tick-event")
                    .data(new Event(tick, "data-" + tick))
                    .retry(Duration.ofSeconds(3))
                    .build());
}

第六部分:WebSocket

6.1 WebSocket 服务端

@Configuration
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new ChatWebSocketHandler(), "/ws/chat");
    }
}

public class ChatWebSocketHandler implements WebSocketHandler {

    private final Sinks.Many<WebSocketMessage> sink =
            Sinks.many().multicast().onBackpressureBuffer();

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 接收客户端消息
        Mono<Void> input = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .doOnNext(msg -> System.out.println("Received: " + msg))
                .then();

        // 发送消息给客户端
        Flux<WebSocketMessage> output = sink.asFlux()
                .map(msg -> session.textMessage(msg.getPayloadAsText()));

        return session.send(output).and(input);
    }
}

6.2 WebSocket 客户端(WebClient)

WebSocketClient client = new ReactorNettyWebSocketClient();

client.execute(URI.create("ws://localhost:8080/ws/chat"), session -> {
    // 发送
    Mono<Void> send = session.send(
            Flux.just(session.textMessage("Hello from client!")));

    // 接收
    Mono<Void> receive = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .doOnNext(msg -> System.out.println("Server says: " + msg))
            .then();

    return Mono.zip(send, receive).then();
}).block();

第七部分:响应式数据库访问(R2DBC)

7.1 为什么不能用 JPA / JDBC

传统的 JPA(Hibernate)和 JDBC 都是同步阻塞的。在响应式应用中,使用阻塞调用会让 WebFlux 的非阻塞优势完全失效。

R2DBC(Reactive Relational Database Connectivity)是响应式的数据库驱动,支持 MySQL、PostgreSQL、H2、MariaDB 等。

7.2 Spring Data R2DBC

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-r2dbc</artifactId>
    </dependency>
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>r2dbc-postgresql</artifactId>
    </dependency>
    <!-- 或 MySQL -->
    <!-- <dependency>
        <groupId>com.github.jasync-sql</groupId>
        <artifactId>jasync-mysql</artifactId>
    </dependency> -->
</dependencies>
# application.yml
spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/mydb
    username: postgres
    password: secret

7.3 实体与 Repository

@Table("users")
public class User {
    @Id
    private String id;
    private String name;
    private String email;
    // getters, setters
}

// Spring Data R2DBC 的 Repository 返回类型是响应式的
public interface UserRepository extends ReactiveCrudRepository<User, String> {
    Flux<User> findByNameContaining(String name);
    Mono<User> findByEmail(String email);
    Mono<Boolean> existsByEmail(String email);
}

7.4 Service 层使用

@Service
public class UserService {

    private final UserRepository userRepository;

    public UserService(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public Flux<User> findAll() {
        return userRepository.findAll();
    }

    public Mono<User> findById(String id) {
        return userRepository.findById(id);
    }

    public Mono<User> save(User user) {
        return userRepository.save(user);
    }

    public Mono<User> update(String id, User user) {
        return userRepository.findById(id)
                .flatMap(existing -> {
                    existing.setName(user.getName());
                    existing.setEmail(user.getEmail());
                    return userRepository.save(existing);
                });
    }

    public Mono<Void> delete(String id) {
        return userRepository.deleteById(id);
    }

    // 复杂查询:统计在线用户数
    public Mono<Long> countOnlineUsers() {
        // 需要使用 R2dbcEntityTemplate 执行原生 SQL
        return template.getDatabaseClient()
                .sql("SELECT COUNT(*) FROM users WHERE status = 'online'")
                .map((row, metadata) -> row.get(0, Long.class))
                .first();
    }
}

7.5 R2DBC vs JPA 的对比

特性

JPA / Hibernate

Spring Data R2DBC

阻塞性

阻塞

非阻塞

懒加载

支持

不支持

级联操作

支持

不支持(需要手动)

复杂联表查询

JPQL / Criteria

原生 SQL

二级缓存

支持

不支持

适用场景

CRUD 为主的传统应用

高并发、响应式应用

迁移建议:如果你的项目从 Spring MVC 迁移到 WebFlux,不要直接搬 JPA。对于简单的 CRUD,R2DBC 足够;对于复杂的联表查询和事务,考虑使用 DatabaseClient 执行原生 SQL 或迁移到专门的读写分离架构。


第八部分:响应式 Redis 与缓存

8.1 Lettuce(响应式 Redis 客户端)

Spring Data Redis 的 Lettuce 驱动是响应式的:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
@Service
public class CacheService {

    private final ReactiveStringRedisTemplate redisTemplate;

    public CacheService(ReactiveStringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public Mono<String> get(String key) {
        return redisTemplate.opsForValue().get(key);
    }

    public Mono<Void> set(String key, String value, Duration ttl) {
        return redisTemplate.opsForValue().set(key, value, ttl).then();
    }

    public Mono<Boolean> delete(String key) {
        return redisTemplate.delete(key);
    }
}

8.2 响应式缓存装饰器

@Service
public class CachedUserService {

    private final UserService userService;
    private final CacheService cacheService;

    public CachedUserService(UserService userService, CacheService cacheService) {
        this.userService = userService;
        this.cacheService = cacheService;
    }

    public Mono<User> getUser(String id) {
        String key = "user:" + id;
        return cacheService.get(key)
                .flatMap(json -> Mono.just(parseUser(json)))
                .switchIfEmpty(
                        userService.findById(id)
                                .flatMap(user ->
                                        cacheService.set(key, toJson(user), Duration.ofMinutes(30))
                                                .thenReturn(user))
                );
    }
}

第九部分:异常处理

9.1 注解模型的全局异常处理

@RestControllerAdvice
public class GlobalExceptionHandler {

    @ExceptionHandler(ResourceNotFoundException.class)
    @ResponseStatus(HttpStatus.NOT_FOUND)
    public Mono<ErrorResponse> handleNotFound(ResourceNotFoundException ex) {
        return Mono.just(new ErrorResponse(404, ex.getMessage()));
    }

    @ExceptionHandler(IllegalArgumentException.class)
    @ResponseStatus(HttpStatus.BAD_REQUEST)
    public Mono<ErrorResponse> handleBadRequest(IllegalArgumentException ex) {
        return Mono.just(new ErrorResponse(400, ex.getMessage()));
    }

    @ExceptionHandler(Exception.class)
    @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
    public Mono<ErrorResponse> handleGeneric(Exception ex) {
        return Mono.just(new ErrorResponse(500, "Internal Server Error"));
    }
}

9.2 函数式端点的异常处理

@Bean
public RouterFunction<ServerResponse> errorHandlingRoutes() {
    return RouterFunctions.route()
            .GET("/api/users", userHandler::getAllUsers)
            .filter((request, next) -> {
                try {
                    return next.handle(request);
                } catch (Exception e) {
                    return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
                            .bodyValue(new ErrorResponse(500, e.getMessage()));
                }
            })
            .build();
}

// 或者在 Handler 内部使用 onErrorResume
public Mono<ServerResponse> getUserById(ServerRequest request) {
    String id = request.pathVariable("id");
    return userService.findById(id)
            .flatMap(user -> ServerResponse.ok().bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build())
            .onErrorResume(e -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
                    .bodyValue(new ErrorResponse(500, e.getMessage())));
}

9.3 WebExceptionHandler(最底层)

@Component
@Order(-2)  // 优先级要高
public class CustomWebExceptionHandler implements WebExceptionHandler {

    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        ServerHttpResponse response = exchange.getResponse();
        response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
        response.getHeaders().setContentType(MediaType.APPLICATION_JSON);

        ErrorResponse error = new ErrorResponse(500, ex.getMessage());
        String body = toJson(error);
        DataBuffer buffer = response.bufferFactory().wrap(body.getBytes());
        return response.writeWith(Mono.just(buffer));
    }
}

第十部分:测试

10.1 注解 Controller 测试

使用 WebTestClient(WebFlux 内置的测试客户端):

@WebFluxTest(UserController.class)
class UserControllerTest {

    @Autowired
    private WebTestClient webTestClient;

    @MockBean
    private UserService userService;

    @Test
    void getAllUsers() {
        when(userService.findAll()).thenReturn(Flux.just(
                new User("1", "Alice"),
                new User("2", "Bob")
        ));

        webTestClient.get()
                .uri("/api/users")
                .exchange()
                .expectStatus().isOk()
                .expectBodyList(User.class)
                .hasSize(2)
                .contains(new User("1", "Alice"), new User("2", "Bob"));
    }

    @Test
    void getUserById_notFound() {
        when(userService.findById("99")).thenReturn(Mono.empty());

        webTestClient.get()
                .uri("/api/users/99")
                .exchange()
                .expectStatus().isNotFound();
    }

    @Test
    void createUser() {
        User input = new User(null, "Charlie");
        User saved = new User("3", "Charlie");
        when(userService.save(any())).thenReturn(Mono.just(saved));

        webTestClient.post()
                .uri("/api/users")
                .bodyValue(input)
                .exchange()
                .expectStatus().isCreated()
                .expectBody(User.class)
                .isEqualTo(saved);
    }
}

10.2 函数式端点测试

class UserRouterTest {

    private WebTestClient webTestClient;

    @BeforeEach
    void setUp() {
        UserService userService = new UserService();
        UserHandler handler = new UserHandler(userService);
        UserRouter router = new UserRouter(handler);

        RouterFunction<ServerResponse> route = router.userRoutes();
        webTestClient = WebTestClient.bindToRouterFunction(route).build();
    }

    @Test
    void getAllUsers() {
        webTestClient.get()
                .uri("/api/users")
                .exchange()
                .expectStatus().isOk()
                .expectBodyList(User.class);
    }
}

10.3 全应用集成测试

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class FullIntegrationTest {

    @Autowired
    private WebTestClient webTestClient;

    @Test
    void fullCrudFlow() {
        // Create
        webTestClient.post()
                .uri("/api/users")
                .bodyValue(new User(null, "Test", "test@test.com"))
                .exchange()
                .expectStatus().isCreated()
                .expectBody(User.class)
                .consumeWith(result -> {
                    User created = result.getResponseBody();
                    assertNotNull(created.getId());

                    // Read
                    webTestClient.get()
                            .uri("/api/users/" + created.getId())
                            .exchange()
                            .expectStatus().isOk()
                            .expectBody(User.class)
                            .isEqualTo(created);

                    // Update
                    webTestClient.put()
                            .uri("/api/users/" + created.getId())
                            .bodyValue(new User(created.getId(), "Updated", "updated@test.com"))
                            .exchange()
                            .expectStatus().isOk();

                    // Delete
                    webTestClient.delete()
                            .uri("/api/users/" + created.getId())
                            .exchange()
                            .expectStatus().isOk();
                });
    }
}

第十一部分:性能调优

11.1 线程模型

WebFlux 默认使用 Event Loop 线程模型(Netty 的 EventLoopGroup):

NioEventLoopGroup (默认 CPU 核心数 * 2 个线程)
├── Thread-1: 处理请求 A → 异步 I/O → 返回 → 处理请求 B
├── Thread-2: 处理请求 C → 异步 I/O → 返回 → 处理请求 D
└── ...

关键原则:永远不要在 WebFlux 的 Event Loop 线程上执行阻塞操作。一旦阻塞,该线程上所有其他请求都会被卡住。

11.2 阻塞操作的正确处理

如果必须调用阻塞代码(如旧版 JDBC、同步文件 I/O),需要将其调度到专门的线程池:

@Service
public class MixedService {

    private final Scheduler blockingScheduler =
            Schedulers.newBoundedElastic(20, 1000, "blocking-pool");

    public Mono<String> callBlockingApi() {
        return Mono.fromCallable(() -> {
            // 这里的代码会在 blockingScheduler 的线程上执行
            return legacyJdbcDao.query();  // 阻塞调用
        }).subscribeOn(blockingScheduler);
    }
}

11.3 连接池配置

spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/mydb
    username: postgres
    password: secret
  data:
    r2dbc:
      pool:
        enabled: true
        initial-size: 10
        max-size: 50
        max-idle-time: 30m
        max-life-time: 60m
        validation-query: SELECT 1

11.4 大请求体限制

默认情况下,WebFlux 限制请求体大小为 256KB。超出后抛出 DataBufferLimitException

spring:
  codec:
    max-in-memory-size: 10MB  # 调整最大内存缓冲

或在代码中:

WebClient client = WebClient.builder()
        .codecs(configurer -> configurer
                .defaultCodecs()
                .maxInMemorySize(10 * 1024 * 1024))
        .build();

11.5 监控与诊断

// 开启 Reactor 的调试模式(在日志中打印完整的响应式链调用栈)
Hooks.onOperatorDebug();

// 或者针对特定的链开启
Flux.range(1, 10)
    .log()  // 打印每个操作符的事件
    .subscribe();

日志输出示例:

| onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
| request(unbounded)
| onNext(1)
| onNext(2)
| ...
| onComplete()

第十二部分:常见问题 FAQ

Q1: WebFlux 一定比 Spring MVC 快吗?

不一定。WebFlux 的优势在于高并发 + 长 I/O 等待场景。如果 QPS 不高、每个请求处理很快,Spring MVC 可能更快(因为少了响应式调度的开销)。WebFlux 真正的价值在于资源利用率:用更少的内存和线程支撑更多的并发连接。

Q2: WebFlux 能兼容所有 Spring 生态吗?

大部分可以,但要注意:

  • 不支持:JPA / Hibernate(阻塞)、Spring Security 的部分阻塞 API、传统 Filter

  • 支持:Spring Data R2DBC、Spring Security WebFlux、WebClient、SSE、WebSocket

  • 部分支持:部分第三方库没有响应式版本

Q3: 什么时候该用 WebFlux?

场景

推荐方案

CRUD 为主的传统应用

Spring MVC

高并发 API Gateway

WebFlux

流式推送(SSE / WebSocket)

WebFlux

微服务间大量并行调用

WebFlux + WebClient

需要 R2DBC 的响应式数据库

WebFlux

团队不熟悉响应式编程

Spring MVC

Q4: 注解模型和函数式端点该怎么选?

  • 注解模型:简单、熟悉、适合大多数场景

  • 函数式端点:需要细粒度控制、API Gateway、动态路由、无反射的场景

官方建议:注解模型是默认选择。函数式端点是补充,不是替代。

Q5: WebFlux 的异常为什么很难处理?

在响应式链中,异常通过 onError 信号传播。如果你在某个中间操作符中"吞"了异常(如 .onErrorResume(e -> Mono.empty())),下游就不会感知到。处理原则:

  • 在最接近异常的地方处理(如 onErrorResume

  • 使用全局 @RestControllerAdvice(注解模型)

  • 在链末端保留 onError 的兜底处理

Q6: 如何调试响应式代码?

  1. 使用 .log() 操作符打印事件流

  2. 开启 Hooks.onOperatorDebug() 获取完整调用栈

  3. 使用 BlockHound 检测是否有阻塞调用:

// 测试依赖
<dependency>
    <groupId>io.projectreactor.tools</groupId>
    <artifactId>blockhound</artifactId>
    <scope>test</scope>
</dependency>

// 测试类中
BlockHound.install();
// 如果有阻塞调用,会抛出 BlockHoundException

Q7: Mono.zip 和 Flux.merge 的区别?

方法

行为

适用场景

Mono.zip

所有 Mono 完成后合并结果

并行请求,全部完成才返回

Flux.merge

多个 Flux 合并,哪个先发就返回哪个

事件合并、实时聚合

Flux.concat

按顺序串联,第一个完成后再订阅第二个

有序执行


附录:完整项目模板

// === 1. Application ===
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

// === 2. Router ===
@Configuration
public class UserRouter {
    private final UserHandler userHandler;

    public UserRouter(UserHandler userHandler) {
        this.userHandler = userHandler;
    }

    @Bean
    public RouterFunction<ServerResponse> userRoutes() {
        return RouterFunctions.route()
                .path("/api/users", builder -> builder
                        .GET("", userHandler::getAllUsers)
                        .GET("/{id}", userHandler::getUserById)
                        .POST("", userHandler::createUser)
                        .PUT("/{id}", userHandler::updateUser)
                        .DELETE("/{id}", userHandler::deleteUser)
                        .GET("/stream", userHandler::streamUsers))
                .filter((request, next) -> {
                    String auth = request.headers().header("Authorization").orElse("");
                    if (!auth.startsWith("Bearer ")) {
                        return ServerResponse.status(HttpStatus.UNAUTHORIZED).build();
                    }
                    return next.handle(request);
                })
                .build();
    }
}

// === 3. Handler ===
@Component
public class UserHandler {
    private final UserService userService;

    public UserHandler(UserService userService) {
        this.userService = userService;
    }

    public Mono<ServerResponse> getAllUsers(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(userService.findAll(), User.class);
    }

    public Mono<ServerResponse> getUserById(ServerRequest request) {
        String id = request.pathVariable("id");
        return userService.findById(id)
                .flatMap(user -> ServerResponse.ok().bodyValue(user))
                .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> createUser(ServerRequest request) {
        return request.bodyToMono(User.class)
                .flatMap(userService::save)
                .flatMap(user -> ServerResponse.status(HttpStatus.CREATED).bodyValue(user));
    }

    public Mono<ServerResponse> streamUsers(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.TEXT_EVENT_STREAM)
                .body(userService.findAll(), User.class);
    }

    // 省略 updateUser, deleteUser
}

// === 4. Repository ===
public interface UserRepository extends ReactiveCrudRepository<User, String> {
    Flux<User> findByNameContaining(String name);
}

// === 5. Service ===
@Service
public class UserService {
    private final UserRepository userRepository;

    public UserService(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public Flux<User> findAll() { return userRepository.findAll(); }
    public Mono<User> findById(String id) { return userRepository.findById(id); }
    public Mono<User> save(User user) { return userRepository.save(user); }
    public Mono<Void> delete(String id) { return userRepository.deleteById(id); }
}

// === 6. WebClient Config ===
@Configuration
public class WebClientConfig {
    @Bean
    public WebClient webClient(WebClient.Builder builder) {
        return builder
                .baseUrl("https://api.example.com")
                .filter(ExchangeFilterFunction.ofRequestProcessor(
                        req -> Mono.just(ClientRequest.from(req)
                                .header("Authorization", "Bearer " + getToken())
                                .build())))
                .build();
    }
}

// === 7. Global Exception ===
@RestControllerAdvice
public class GlobalExceptionHandler {
    @ExceptionHandler(ResourceNotFoundException.class)
    @ResponseStatus(HttpStatus.NOT_FOUND)
    public Mono<ErrorResponse> handleNotFound(ResourceNotFoundException ex) {
        return Mono.just(new ErrorResponse(404, ex.getMessage()));
    }
}

// === 8. Test ===
@WebFluxTest(UserHandler.class)
class UserHandlerTest {
    @Autowired WebTestClient webTestClient;
    @MockBean UserService userService;

    @Test
    void getAllUsers() {
        when(userService.findAll()).thenReturn(Flux.just(
                new User("1", "Alice"), new User("2", "Bob")));

        webTestClient.get().uri("/api/users")
                .exchange()
                .expectStatus().isOk()
                .expectBodyList(User.class).hasSize(2);
    }
}

总结

Spring WebFlux 的核心可以用三句话概括:

  1. 非阻塞 I/O —— 用少量线程处理大量并发,线程在等待 I/O 时不被占用

  2. 响应式流(Mono / Flux) —— 数据以流的形式异步传播,支持背压

  3. 两种编程模型 —— 注解模型(简单熟悉)和函数式端点(灵活可控)

从入门的响应式基础到高级的 R2DBC、SSE 流式推送、性能调优和 BlockHound 检测,WebFlux 用一套完整的响应式生态覆盖了现代高并发 Web 应用的所有场景。希望这篇指南能帮助你从"会用"走向"精通"。