Spring WebFlux 从入门到精通:响应式编程实战全指南
前言
在传统的 Spring MVC 架构中,每个请求都会绑定一个线程。当请求量激增时,线程池耗尽,应用开始排队等待。这就是为什么在高并发、长耗时场景(如流式 API、SSE、WebSocket、数据库连接池打满)下,传统 Servlet 模型的扩展性会遇到瓶颈。
Spring WebFlux 是 Spring Framework 5.0 引入的全新响应式 Web 框架。它的核心思想是:用少量线程处理大量并发请求。不是通过多线程并行处理,而是通过非阻塞异步回调让线程在等待 I/O 时不被占用,从而可以处理下一个请求。
本文从响应式编程基础开始,逐步深入到 WebFlux 的两种编程模型(注解 vs 函数式)、WebClient、SSE、WebSocket、数据库响应式访问、测试和调优,帮助你从零到精通。
第一部分:响应式编程基础
1.1 什么是响应式编程
响应式编程(Reactive Programming)是一种基于数据流和变化传播的编程范式。简单说就是:当数据源发生变化时,所有依赖它的地方会自动收到通知并作出反应。
对比命令式编程:
// 命令式:同步阻塞,等待结果
String result = service.getData();
System.out.println(result); // 阻塞直到数据返回
// 响应式:订阅后异步处理
Mono<String> result = service.getData();
result.subscribe(
data -> System.out.println(data), // 数据到达时执行
error -> System.err.println(error) // 错误时执行
);
// 这里的代码立即执行,不等数据返回
1.2 Reactive Streams 规范
响应式编程在 Java 生态中有统一标准 —— Reactive Streams 规范(已被纳入 JDK 9 java.util.concurrent.Flow)。它定义了四个核心接口:
Spring WebFlux 基于 Project Reactor 实现,它完全遵守 Reactive Streams 规范。
1.3 Project Reactor:Mono 和 Flux
Project Reactor 是 Spring WebFlux 的底层响应式库,提供了两个核心类型:
// Mono:单个值或空
Mono<String> hello = Mono.just("Hello");
Mono<String> empty = Mono.empty();
Mono<String> error = Mono.error(new RuntimeException("Oops"));
// Flux:多个值
Flux<String> colors = Flux.just("red", "green", "blue");
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Long> ticks = Flux.interval(Duration.ofSeconds(1)); // 每秒发一个
1.4 背压(Backpressure)
背压是响应式编程最重要的概念之一。当数据生产速度大于消费速度时,订阅者可以告诉发布者"慢一点",防止内存溢出。
Flux.range(1, 1000)
.log()
.subscribe(
data -> {
// 处理数据
},
error -> {},
() -> {},
subscription -> subscription.request(10) // 每次只请求10个
);
实际开发中:大多数场景你不需要手动管理背压。WebFlux 的底层(Netty、R2DBC 等)会自动处理。但当你自定义 Publisher 时,需要考虑背压。
1.5 冷流 vs 热流
// 冷流:每次订阅都重新执行
Flux<Integer> cold = Flux.range(1, 3).doOnNext(i -> System.out.println("Generated: " + i));
cold.subscribe(); // 打印 Generated: 1, 2, 3
cold.subscribe(); // 再次打印 Generated: 1, 2, 3
// 热流:数据源独立运行
Flux<Long> hot = Flux.interval(Duration.ofSeconds(1))
.publish()
.autoConnect(0); // 自动开始,不依赖订阅者
Thread.sleep(3000); // 等待3秒
hot.subscribe(i -> System.out.println("Subscriber A: " + i)); // 错过前3个
第二部分:WebFlux 入门
2.1 依赖安装
以 Spring Boot 3.x + Java 17 为例:
<dependencies>
<!-- Spring Boot WebFlux Starter(内嵌 Netty) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- 测试用 WebClient -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependencies>
关键区别:
spring-boot-starter-web(Spring MVC,基于 Servlet)vsspring-boot-starter-webflux(响应式,基于 Netty/Undertow/Tomcat 异步)。二者不能同时使用。
2.2 运行容器
WebFlux 默认使用 Netty 作为运行容器,但它也支持其他容器:
<!-- 使用 Tomcat 运行 WebFlux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
2.3 第一个响应式 Controller
WebFlux 支持两种编程模型:
注解驱动(Annotation-based) —— 和 Spring MVC 一样的
@RestController函数式(Functional) —— 使用
RouterFunction和HandlerFunction
先从注解方式开始:
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
public UserController(UserService userService) {
this.userService = userService;
}
@GetMapping
public Flux<User> getAllUsers() {
return userService.findAll(); // 返回 Flux<User>
}
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable String id) {
return userService.findById(id); // 返回 Mono<User>
}
@PostMapping
public Mono<User> createUser(@RequestBody Mono<User> user) {
return user.flatMap(userService::save); // 接收 Mono<User>,返回 Mono<User>
}
@PutMapping("/{id}")
public Mono<User> updateUser(@PathVariable String id, @RequestBody Mono<User> user) {
return user.flatMap(u -> userService.update(id, u));
}
@DeleteMapping("/{id}")
public Mono<Void> deleteUser(@PathVariable String id) {
return userService.delete(id); // 返回 Mono<Void>,响应 200 或 204
}
}
与普通 Spring MVC 的关键区别:
返回值不是
User/List<User>,而是Mono<User>/Flux<User>Spring WebFlux 不会阻塞等待响应体完成,而是等
Mono/Flux发出数据时才写入响应
@RequestBody可以接收Mono<T>(单个对象异步反序列化)
Mono<Void>用于不需要响应体的场景(如 DELETE)
2.4 响应式 Service 层
@Service
public class UserService {
// 实际项目中这里会调用响应式数据库访问(R2DBC)或响应式缓存(Redis Lettuce)
private final Map<String, User> store = new ConcurrentHashMap<>();
public Flux<User> findAll() {
return Flux.fromIterable(store.values());
}
public Mono<User> findById(String id) {
return Mono.justOrEmpty(store.get(id));
}
public Mono<User> save(User user) {
String id = UUID.randomUUID().toString();
User saved = user.withId(id);
store.put(id, saved);
return Mono.just(saved);
}
public Mono<User> update(String id, User user) {
if (!store.containsKey(id)) {
return Mono.error(new ResponseStatusException(HttpStatus.NOT_FOUND));
}
User updated = user.withId(id);
store.put(id, updated);
return Mono.just(updated);
}
public Mono<Void> delete(String id) {
store.remove(id);
return Mono.empty();
}
}
第三部分:函数式端点(RouterFunction + HandlerFunction)
3.1 为什么需要函数式端点
注解模型简单直观,但在某些场景下函数式端点更有优势:
更细粒度的控制:请求路由、过滤、错误处理都是函数,可以灵活组合
更轻量:不需要反射扫描注解
更好的响应式组合:路由本身就是响应式的
适合 API Gateway、代理等中间层场景
3.2 基本概念
函数式端点由两个核心接口组成:
// HandlerFunction:处理请求
HandlerFunction<ServerResponse> handler = request -> {
String name = request.pathVariable("name");
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(new User(name));
};
// RouterFunction:路由到 Handler
RouterFunction<ServerResponse> route = RouterFunctions.route()
.GET("/hello/{name}", handler)
.build();
3.3 完整的函数式路由
使用 RouterFunction.Builder 构建完整的路由表:
@Configuration
public class UserRouter {
private final UserHandler userHandler;
public UserRouter(UserHandler userHandler) {
this.userHandler = userHandler;
}
@Bean
public RouterFunction<ServerResponse> userRoutes() {
return RouterFunctions.route()
.path("/api/users", builder -> builder
.GET("", userHandler::getAllUsers)
.GET("/{id}", userHandler::getUserById)
.POST("", userHandler::createUser)
.PUT("/{id}", userHandler::updateUser)
.DELETE("/{id}", userHandler::deleteUser)
.GET("/stream", userHandler::streamUsers))
.build();
}
}
3.4 Handler 实现
@Component
public class UserHandler {
private final UserService userService;
public UserHandler(UserService userService) {
this.userService = userService;
}
public Mono<ServerResponse> getAllUsers(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userService.findAll(), User.class);
}
public Mono<ServerResponse> getUserById(ServerRequest request) {
String id = request.pathVariable("id");
return userService.findById(id)
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> createUser(ServerRequest request) {
return request.bodyToMono(User.class)
.flatMap(userService::save)
.flatMap(saved -> ServerResponse.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(saved));
}
public Mono<ServerResponse> updateUser(ServerRequest request) {
String id = request.pathVariable("id");
return request.bodyToMono(User.class)
.flatMap(user -> userService.update(id, user))
.flatMap(updated -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(updated))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> deleteUser(ServerRequest request) {
String id = request.pathVariable("id");
return userService.delete(id)
.then(ServerResponse.ok().build());
}
// SSE 流式推送
public Mono<ServerResponse> streamUsers(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(userService.findAll(), User.class);
}
}
3.5 路由过滤器
@Bean
public RouterFunction<ServerResponse> filteredRoutes() {
return RouterFunctions.route()
.path("/api", builder -> builder
.before(request -> {
System.out.println("Before: " + request.path());
return request;
})
.after((request, response) -> {
System.out.println("After: " + response.statusCode());
return response;
})
.filter((request, next) -> {
String auth = request.headers().header("Authorization").orElse("");
if (!auth.startsWith("Bearer ")) {
return ServerResponse.status(HttpStatus.UNAUTHORIZED).build();
}
return next.handle(request);
})
.GET("/users", userHandler::getAllUsers)
.POST("/users", userHandler::createUser))
.build();
}
3.6 多路由组合
RouterFunction 可以组合,适合多模块项目:
@Bean
public RouterFunction<ServerResponse> compositeRouter(
RouterFunction<ServerResponse> userRouter,
RouterFunction<ServerResponse> orderRouter,
RouterFunction<ServerResponse> productRouter) {
return userRouter
.and(orderRouter)
.and(productRouter);
}
第四部分:WebClient —— 响应式 HTTP 客户端
4.1 为什么需要 WebClient
传统 RestTemplate 是同步阻塞的,不适合响应式应用。WebClient 是 WebFlux 提供的非阻塞响应式 HTTP 客户端,完全基于 Mono / Flux。
4.2 创建 WebClient
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.defaultHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(10 * 1024 * 1024)) // 10MB
.build();
}
}
4.3 GET 请求
@Service
public class ApiService {
private final WebClient webClient;
public ApiService(WebClient webClient) {
this.webClient = webClient;
}
// 单个对象
public Mono<User> getUser(String id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class);
}
// 集合
public Flux<User> getAllUsers() {
return webClient.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class);
}
// 带查询参数
public Flux<User> searchUsers(String name, int page) {
return webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/users")
.queryParam("name", name)
.queryParam("page", page)
.build())
.retrieve()
.bodyToFlux(User.class);
}
}
4.4 POST / PUT / DELETE 请求
// POST
public Mono<User> createUser(User user) {
return webClient.post()
.uri("/users")
.bodyValue(user)
.retrieve()
.bodyToMono(User.class);
}
// PUT
public Mono<User> updateUser(String id, User user) {
return webClient.put()
.uri("/users/{id}", id)
.bodyValue(user)
.retrieve()
.bodyToMono(User.class);
}
// DELETE(不需要响应体)
public Mono<Void> deleteUser(String id) {
return webClient.delete()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(Void.class);
}
4.5 错误处理
WebClient 默认对 4xx / 5xx 状态码抛出异常,需要自定义处理:
public Mono<User> getUserWithErrorHandling(String id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError, response ->
response.bodyToMono(String.class)
.flatMap(body -> Mono.error(new ClientException(
response.statusCode().value(), body))))
.onStatus(HttpStatusCode::is5xxServerError, response ->
response.bodyToMono(String.class)
.flatMap(body -> Mono.error(new ServerException(
response.statusCode().value(), body))))
.bodyToMono(User.class)
.onErrorResume(ClientException.class, e -> {
// 4xx 处理
return Mono.empty();
})
.onErrorResume(ServerException.class, e -> {
// 5xx 处理
return Mono.empty();
});
}
4.6 请求拦截器(ExchangeFilterFunction)
统一处理请求头(如 Token 注入):
@Bean
public WebClient webClientWithAuth() {
return WebClient.builder()
.baseUrl("https://api.example.com")
.filter(ExchangeFilterFunction.ofRequestProcessor(
clientRequest -> {
String token = getCurrentToken();
ClientRequest filtered = ClientRequest.from(clientRequest)
.header(HttpHeaders.AUTHORIZATION, "Bearer " + token)
.build();
return Mono.just(filtered);
}))
.build();
}
// 日志过滤器
ExchangeFilterFunction logRequest = ExchangeFilterFunction.ofRequestProcessor(
clientRequest -> {
System.out.println("Request: " + clientRequest.method() + " " + clientRequest.url());
return Mono.just(clientRequest);
});
ExchangeFilterFunction logResponse = ExchangeFilterFunction.ofResponseProcessor(
clientResponse -> {
System.out.println("Response: " + clientResponse.statusCode());
return Mono.just(clientResponse);
});
4.7 流式响应(SSE / JSON Stream)
// 接收 SSE 流
public Flux<Event> getEvents() {
return webClient.get()
.uri("/events/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(Event.class);
}
// 接收大 JSON 数组(流式反序列化,不全部加载到内存)
public Flux<User> getAllUsersStreaming() {
return webClient.get()
.uri("/users")
.retrieve()
.bodyToFlux(User.class) // 边接收边反序列化
.doOnNext(user -> System.out.println("Received: " + user));
}
4.8 与注解 Controller 配合使用
WebClient 也可以在 Spring MVC 项目中使用(只需引入 spring-boot-starter-webflux):
@RestController
public class AggregationController {
private final WebClient webClient;
public AggregationController(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.baseUrl("https://api.example.com").build();
}
@GetMapping("/dashboard")
public Mono<DashboardData> getDashboard() {
// 并行发起多个请求,全部完成后聚合
Mono<User> user = webClient.get().uri("/users/1").retrieve().bodyToMono(User.class);
Mono<List<Order>> orders = webClient.get().uri("/orders?userId=1").retrieve().bodyToFlux(Order.class).collectList();
Mono<List<Notification>> notifications = webClient.get().uri("/notifications?userId=1").retrieve().bodyToFlux(Notification.class).collectList();
return Mono.zip(user, orders, notifications)
.map(tuple -> new DashboardData(tuple.getT1(), tuple.getT2(), tuple.getT3()));
}
}
第五部分:Server-Sent Events(SSE)
5.1 什么是 SSE
SSE(Server-Sent Events)是一种服务端主动向客户端推送数据的技术。与 WebSocket 不同,SSE 是单向的(只有服务端推送),基于普通 HTTP 连接,使用 text/event-stream 媒体类型。
适用场景:
实时通知推送
股票/加密货币行情
日志实时查看
AI 流式输出(ChatGPT 风格)
5.2 WebFlux 中的 SSE
@RestController
@RequestMapping("/api/events")
public class EventController {
// 方式一:Flux 直接返回(最简单)
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(tick -> new Event(System.currentTimeMillis(), "tick-" + tick));
}
// 方式二:使用 Sinks 手动推送(适合外部触发)
private final Sinks.Many<Event> sink = Sinks.many().multicast().onBackpressureBuffer();
@PostConstruct
public void init() {
// 模拟外部事件源
Flux.interval(Duration.ofSeconds(2))
.map(tick -> new Event(System.currentTimeMillis(), "event-" + tick))
.subscribe(event -> sink.tryEmitNext(event));
}
@GetMapping(value = "/push", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Event> pushEvents() {
return sink.asFlux();
}
// 方式三:AI 风格流式输出
@PostMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ChatResponse> chat(@RequestBody ChatRequest request) {
return aiService.generateStream(request.getPrompt())
.map(chunk -> new ChatResponse(chunk, false))
.concatWith(Mono.just(new ChatResponse("[DONE]", true)));
}
}
5.3 SSE 数据结构
SSE 协议定义的消息格式:
data: {"id":1,"message":"hello"}
id: 1
event: user-message
retry: 3000
WebFlux 的 ServerSentEvent 类可以构建完整的 SSE 消息:
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Event>> sseWithMetadata() {
return Flux.interval(Duration.ofSeconds(1))
.map(tick -> ServerSentEvent.<Event>builder()
.id(String.valueOf(tick))
.event("tick-event")
.data(new Event(tick, "data-" + tick))
.retry(Duration.ofSeconds(3))
.build());
}
第六部分:WebSocket
6.1 WebSocket 服务端
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new ChatWebSocketHandler(), "/ws/chat");
}
}
public class ChatWebSocketHandler implements WebSocketHandler {
private final Sinks.Many<WebSocketMessage> sink =
Sinks.many().multicast().onBackpressureBuffer();
@Override
public Mono<Void> handle(WebSocketSession session) {
// 接收客户端消息
Mono<Void> input = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(msg -> System.out.println("Received: " + msg))
.then();
// 发送消息给客户端
Flux<WebSocketMessage> output = sink.asFlux()
.map(msg -> session.textMessage(msg.getPayloadAsText()));
return session.send(output).and(input);
}
}
6.2 WebSocket 客户端(WebClient)
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(URI.create("ws://localhost:8080/ws/chat"), session -> {
// 发送
Mono<Void> send = session.send(
Flux.just(session.textMessage("Hello from client!")));
// 接收
Mono<Void> receive = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(msg -> System.out.println("Server says: " + msg))
.then();
return Mono.zip(send, receive).then();
}).block();
第七部分:响应式数据库访问(R2DBC)
7.1 为什么不能用 JPA / JDBC
传统的 JPA(Hibernate)和 JDBC 都是同步阻塞的。在响应式应用中,使用阻塞调用会让 WebFlux 的非阻塞优势完全失效。
R2DBC(Reactive Relational Database Connectivity)是响应式的数据库驱动,支持 MySQL、PostgreSQL、H2、MariaDB 等。
7.2 Spring Data R2DBC
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
</dependency>
<!-- 或 MySQL -->
<!-- <dependency>
<groupId>com.github.jasync-sql</groupId>
<artifactId>jasync-mysql</artifactId>
</dependency> -->
</dependencies>
# application.yml
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/mydb
username: postgres
password: secret
7.3 实体与 Repository
@Table("users")
public class User {
@Id
private String id;
private String name;
private String email;
// getters, setters
}
// Spring Data R2DBC 的 Repository 返回类型是响应式的
public interface UserRepository extends ReactiveCrudRepository<User, String> {
Flux<User> findByNameContaining(String name);
Mono<User> findByEmail(String email);
Mono<Boolean> existsByEmail(String email);
}
7.4 Service 层使用
@Service
public class UserService {
private final UserRepository userRepository;
public UserService(UserRepository userRepository) {
this.userRepository = userRepository;
}
public Flux<User> findAll() {
return userRepository.findAll();
}
public Mono<User> findById(String id) {
return userRepository.findById(id);
}
public Mono<User> save(User user) {
return userRepository.save(user);
}
public Mono<User> update(String id, User user) {
return userRepository.findById(id)
.flatMap(existing -> {
existing.setName(user.getName());
existing.setEmail(user.getEmail());
return userRepository.save(existing);
});
}
public Mono<Void> delete(String id) {
return userRepository.deleteById(id);
}
// 复杂查询:统计在线用户数
public Mono<Long> countOnlineUsers() {
// 需要使用 R2dbcEntityTemplate 执行原生 SQL
return template.getDatabaseClient()
.sql("SELECT COUNT(*) FROM users WHERE status = 'online'")
.map((row, metadata) -> row.get(0, Long.class))
.first();
}
}
7.5 R2DBC vs JPA 的对比
迁移建议:如果你的项目从 Spring MVC 迁移到 WebFlux,不要直接搬 JPA。对于简单的 CRUD,R2DBC 足够;对于复杂的联表查询和事务,考虑使用
DatabaseClient执行原生 SQL 或迁移到专门的读写分离架构。
第八部分:响应式 Redis 与缓存
8.1 Lettuce(响应式 Redis 客户端)
Spring Data Redis 的 Lettuce 驱动是响应式的:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
@Service
public class CacheService {
private final ReactiveStringRedisTemplate redisTemplate;
public CacheService(ReactiveStringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public Mono<String> get(String key) {
return redisTemplate.opsForValue().get(key);
}
public Mono<Void> set(String key, String value, Duration ttl) {
return redisTemplate.opsForValue().set(key, value, ttl).then();
}
public Mono<Boolean> delete(String key) {
return redisTemplate.delete(key);
}
}
8.2 响应式缓存装饰器
@Service
public class CachedUserService {
private final UserService userService;
private final CacheService cacheService;
public CachedUserService(UserService userService, CacheService cacheService) {
this.userService = userService;
this.cacheService = cacheService;
}
public Mono<User> getUser(String id) {
String key = "user:" + id;
return cacheService.get(key)
.flatMap(json -> Mono.just(parseUser(json)))
.switchIfEmpty(
userService.findById(id)
.flatMap(user ->
cacheService.set(key, toJson(user), Duration.ofMinutes(30))
.thenReturn(user))
);
}
}
第九部分:异常处理
9.1 注解模型的全局异常处理
@RestControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(ResourceNotFoundException.class)
@ResponseStatus(HttpStatus.NOT_FOUND)
public Mono<ErrorResponse> handleNotFound(ResourceNotFoundException ex) {
return Mono.just(new ErrorResponse(404, ex.getMessage()));
}
@ExceptionHandler(IllegalArgumentException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
public Mono<ErrorResponse> handleBadRequest(IllegalArgumentException ex) {
return Mono.just(new ErrorResponse(400, ex.getMessage()));
}
@ExceptionHandler(Exception.class)
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Mono<ErrorResponse> handleGeneric(Exception ex) {
return Mono.just(new ErrorResponse(500, "Internal Server Error"));
}
}
9.2 函数式端点的异常处理
@Bean
public RouterFunction<ServerResponse> errorHandlingRoutes() {
return RouterFunctions.route()
.GET("/api/users", userHandler::getAllUsers)
.filter((request, next) -> {
try {
return next.handle(request);
} catch (Exception e) {
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(new ErrorResponse(500, e.getMessage()));
}
})
.build();
}
// 或者在 Handler 内部使用 onErrorResume
public Mono<ServerResponse> getUserById(ServerRequest request) {
String id = request.pathVariable("id");
return userService.findById(id)
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build())
.onErrorResume(e -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(new ErrorResponse(500, e.getMessage())));
}
9.3 WebExceptionHandler(最底层)
@Component
@Order(-2) // 优先级要高
public class CustomWebExceptionHandler implements WebExceptionHandler {
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
ErrorResponse error = new ErrorResponse(500, ex.getMessage());
String body = toJson(error);
DataBuffer buffer = response.bufferFactory().wrap(body.getBytes());
return response.writeWith(Mono.just(buffer));
}
}
第十部分:测试
10.1 注解 Controller 测试
使用 WebTestClient(WebFlux 内置的测试客户端):
@WebFluxTest(UserController.class)
class UserControllerTest {
@Autowired
private WebTestClient webTestClient;
@MockBean
private UserService userService;
@Test
void getAllUsers() {
when(userService.findAll()).thenReturn(Flux.just(
new User("1", "Alice"),
new User("2", "Bob")
));
webTestClient.get()
.uri("/api/users")
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class)
.hasSize(2)
.contains(new User("1", "Alice"), new User("2", "Bob"));
}
@Test
void getUserById_notFound() {
when(userService.findById("99")).thenReturn(Mono.empty());
webTestClient.get()
.uri("/api/users/99")
.exchange()
.expectStatus().isNotFound();
}
@Test
void createUser() {
User input = new User(null, "Charlie");
User saved = new User("3", "Charlie");
when(userService.save(any())).thenReturn(Mono.just(saved));
webTestClient.post()
.uri("/api/users")
.bodyValue(input)
.exchange()
.expectStatus().isCreated()
.expectBody(User.class)
.isEqualTo(saved);
}
}
10.2 函数式端点测试
class UserRouterTest {
private WebTestClient webTestClient;
@BeforeEach
void setUp() {
UserService userService = new UserService();
UserHandler handler = new UserHandler(userService);
UserRouter router = new UserRouter(handler);
RouterFunction<ServerResponse> route = router.userRoutes();
webTestClient = WebTestClient.bindToRouterFunction(route).build();
}
@Test
void getAllUsers() {
webTestClient.get()
.uri("/api/users")
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class);
}
}
10.3 全应用集成测试
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class FullIntegrationTest {
@Autowired
private WebTestClient webTestClient;
@Test
void fullCrudFlow() {
// Create
webTestClient.post()
.uri("/api/users")
.bodyValue(new User(null, "Test", "test@test.com"))
.exchange()
.expectStatus().isCreated()
.expectBody(User.class)
.consumeWith(result -> {
User created = result.getResponseBody();
assertNotNull(created.getId());
// Read
webTestClient.get()
.uri("/api/users/" + created.getId())
.exchange()
.expectStatus().isOk()
.expectBody(User.class)
.isEqualTo(created);
// Update
webTestClient.put()
.uri("/api/users/" + created.getId())
.bodyValue(new User(created.getId(), "Updated", "updated@test.com"))
.exchange()
.expectStatus().isOk();
// Delete
webTestClient.delete()
.uri("/api/users/" + created.getId())
.exchange()
.expectStatus().isOk();
});
}
}
第十一部分:性能调优
11.1 线程模型
WebFlux 默认使用 Event Loop 线程模型(Netty 的 EventLoopGroup):
NioEventLoopGroup (默认 CPU 核心数 * 2 个线程)
├── Thread-1: 处理请求 A → 异步 I/O → 返回 → 处理请求 B
├── Thread-2: 处理请求 C → 异步 I/O → 返回 → 处理请求 D
└── ...
关键原则:永远不要在 WebFlux 的 Event Loop 线程上执行阻塞操作。一旦阻塞,该线程上所有其他请求都会被卡住。
11.2 阻塞操作的正确处理
如果必须调用阻塞代码(如旧版 JDBC、同步文件 I/O),需要将其调度到专门的线程池:
@Service
public class MixedService {
private final Scheduler blockingScheduler =
Schedulers.newBoundedElastic(20, 1000, "blocking-pool");
public Mono<String> callBlockingApi() {
return Mono.fromCallable(() -> {
// 这里的代码会在 blockingScheduler 的线程上执行
return legacyJdbcDao.query(); // 阻塞调用
}).subscribeOn(blockingScheduler);
}
}
11.3 连接池配置
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/mydb
username: postgres
password: secret
data:
r2dbc:
pool:
enabled: true
initial-size: 10
max-size: 50
max-idle-time: 30m
max-life-time: 60m
validation-query: SELECT 1
11.4 大请求体限制
默认情况下,WebFlux 限制请求体大小为 256KB。超出后抛出 DataBufferLimitException:
spring:
codec:
max-in-memory-size: 10MB # 调整最大内存缓冲
或在代码中:
WebClient client = WebClient.builder()
.codecs(configurer -> configurer
.defaultCodecs()
.maxInMemorySize(10 * 1024 * 1024))
.build();
11.5 监控与诊断
// 开启 Reactor 的调试模式(在日志中打印完整的响应式链调用栈)
Hooks.onOperatorDebug();
// 或者针对特定的链开启
Flux.range(1, 10)
.log() // 打印每个操作符的事件
.subscribe();
日志输出示例:
| onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
| request(unbounded)
| onNext(1)
| onNext(2)
| ...
| onComplete()
第十二部分:常见问题 FAQ
Q1: WebFlux 一定比 Spring MVC 快吗?
不一定。WebFlux 的优势在于高并发 + 长 I/O 等待场景。如果 QPS 不高、每个请求处理很快,Spring MVC 可能更快(因为少了响应式调度的开销)。WebFlux 真正的价值在于资源利用率:用更少的内存和线程支撑更多的并发连接。
Q2: WebFlux 能兼容所有 Spring 生态吗?
大部分可以,但要注意:
不支持:JPA / Hibernate(阻塞)、Spring Security 的部分阻塞 API、传统 Filter
支持:Spring Data R2DBC、Spring Security WebFlux、WebClient、SSE、WebSocket
部分支持:部分第三方库没有响应式版本
Q3: 什么时候该用 WebFlux?
Q4: 注解模型和函数式端点该怎么选?
注解模型:简单、熟悉、适合大多数场景
函数式端点:需要细粒度控制、API Gateway、动态路由、无反射的场景
官方建议:注解模型是默认选择。函数式端点是补充,不是替代。
Q5: WebFlux 的异常为什么很难处理?
在响应式链中,异常通过 onError 信号传播。如果你在某个中间操作符中"吞"了异常(如 .onErrorResume(e -> Mono.empty())),下游就不会感知到。处理原则:
在最接近异常的地方处理(如
onErrorResume)使用全局
@RestControllerAdvice(注解模型)在链末端保留
onError的兜底处理
Q6: 如何调试响应式代码?
使用
.log()操作符打印事件流开启
Hooks.onOperatorDebug()获取完整调用栈使用 BlockHound 检测是否有阻塞调用:
// 测试依赖
<dependency>
<groupId>io.projectreactor.tools</groupId>
<artifactId>blockhound</artifactId>
<scope>test</scope>
</dependency>
// 测试类中
BlockHound.install();
// 如果有阻塞调用,会抛出 BlockHoundException
Q7: Mono.zip 和 Flux.merge 的区别?
附录:完整项目模板
// === 1. Application ===
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
// === 2. Router ===
@Configuration
public class UserRouter {
private final UserHandler userHandler;
public UserRouter(UserHandler userHandler) {
this.userHandler = userHandler;
}
@Bean
public RouterFunction<ServerResponse> userRoutes() {
return RouterFunctions.route()
.path("/api/users", builder -> builder
.GET("", userHandler::getAllUsers)
.GET("/{id}", userHandler::getUserById)
.POST("", userHandler::createUser)
.PUT("/{id}", userHandler::updateUser)
.DELETE("/{id}", userHandler::deleteUser)
.GET("/stream", userHandler::streamUsers))
.filter((request, next) -> {
String auth = request.headers().header("Authorization").orElse("");
if (!auth.startsWith("Bearer ")) {
return ServerResponse.status(HttpStatus.UNAUTHORIZED).build();
}
return next.handle(request);
})
.build();
}
}
// === 3. Handler ===
@Component
public class UserHandler {
private final UserService userService;
public UserHandler(UserService userService) {
this.userService = userService;
}
public Mono<ServerResponse> getAllUsers(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userService.findAll(), User.class);
}
public Mono<ServerResponse> getUserById(ServerRequest request) {
String id = request.pathVariable("id");
return userService.findById(id)
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> createUser(ServerRequest request) {
return request.bodyToMono(User.class)
.flatMap(userService::save)
.flatMap(user -> ServerResponse.status(HttpStatus.CREATED).bodyValue(user));
}
public Mono<ServerResponse> streamUsers(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(userService.findAll(), User.class);
}
// 省略 updateUser, deleteUser
}
// === 4. Repository ===
public interface UserRepository extends ReactiveCrudRepository<User, String> {
Flux<User> findByNameContaining(String name);
}
// === 5. Service ===
@Service
public class UserService {
private final UserRepository userRepository;
public UserService(UserRepository userRepository) {
this.userRepository = userRepository;
}
public Flux<User> findAll() { return userRepository.findAll(); }
public Mono<User> findById(String id) { return userRepository.findById(id); }
public Mono<User> save(User user) { return userRepository.save(user); }
public Mono<Void> delete(String id) { return userRepository.deleteById(id); }
}
// === 6. WebClient Config ===
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient(WebClient.Builder builder) {
return builder
.baseUrl("https://api.example.com")
.filter(ExchangeFilterFunction.ofRequestProcessor(
req -> Mono.just(ClientRequest.from(req)
.header("Authorization", "Bearer " + getToken())
.build())))
.build();
}
}
// === 7. Global Exception ===
@RestControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(ResourceNotFoundException.class)
@ResponseStatus(HttpStatus.NOT_FOUND)
public Mono<ErrorResponse> handleNotFound(ResourceNotFoundException ex) {
return Mono.just(new ErrorResponse(404, ex.getMessage()));
}
}
// === 8. Test ===
@WebFluxTest(UserHandler.class)
class UserHandlerTest {
@Autowired WebTestClient webTestClient;
@MockBean UserService userService;
@Test
void getAllUsers() {
when(userService.findAll()).thenReturn(Flux.just(
new User("1", "Alice"), new User("2", "Bob")));
webTestClient.get().uri("/api/users")
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class).hasSize(2);
}
}
总结
Spring WebFlux 的核心可以用三句话概括:
非阻塞 I/O —— 用少量线程处理大量并发,线程在等待 I/O 时不被占用
响应式流(Mono / Flux) —— 数据以流的形式异步传播,支持背压
两种编程模型 —— 注解模型(简单熟悉)和函数式端点(灵活可控)
从入门的响应式基础到高级的 R2DBC、SSE 流式推送、性能调优和 BlockHound 检测,WebFlux 用一套完整的响应式生态覆盖了现代高并发 Web 应用的所有场景。希望这篇指南能帮助你从"会用"走向"精通"。