Spring Boot 中实现流式输出的方案总结
在现代 Web 应用程序中,实时数据流和长时间运行的操作变得越来越普遍。
Spring Boot 提供了多种方法来实现流式输出,使得服务器可以逐步向客户端发送数据,而不是等待整个响应准备就绪。
本文将介绍 5 种在 Spring Boot 中实现流式输出的方法,从传统的 Server-Sent Events (SSE) 到更现代的响应式编程方法。
流式输出方案对比
五种流式输出方法的主要优缺点:
| 方法 | 优点 | 缺点 |
|---|---|---|
| SseEmitter | ・支持 SSE,适合实时数据流・客户端易处理事件流・长连接,减少网络开销 | ・仅支持文本数据・部分旧浏览器可能不支持 SSE |
| ResponseBodyEmitter | ・支持二进制数据流・灵活性高,可发送不同类型数据 | ・客户端需自行处理数据流・可能需额外客户端逻辑解析数据 |
| DeferredResult | ・支持异步处理・可一次性返回所有数据 | ・不支持实时流式传输・可能占用较多服务器内存 |
| Flux | ・支持响应式编程・非阻塞式,效率高・支持背压・易于处理错误和异常 | ・需要响应式编程知识・可能增加系统复杂性 |
| AsyncContext | ・直接操作底层 Servlet API,控制力强・可处理大量并发请求 | ・代码复杂度较高・错误处理相对困难・不如高级抽象易用 |
方案实现方式
1. 使用 SseEmitter 实现 Server-Sent Events
Server-Sent Events (SSE) 是一种允许服务器向客户端推送数据的技术。Spring 框架提供了 SseEmitter 类来简化 SSE 的实现。
@PostMapping(value = "/chatStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter chatStream(@RequestBody @Validated GetAiAuditRequest dto, HttpServletResponse response) {
SseEmitter emitter = new SseEmitter(-1L); // 设置无超时
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
CompletableFuture.runAsync(() -> {
try {
TestClass.invoke(dto.getPrompt(), chunk -> {
try {
emitter.send(chunk);
} catch (Exception e) {
log.error("Error sending SSE event", e);
}
});
} catch (Exception e) {
emitter.completeWithError(e);
} finally {
emitter.complete();
}
}, asyncExecutor);
return emitter;
}
这种方法使用 SseEmitter 来发送事件流,适合需要实时更新的场景。
2. 使用 ResponseBodyEmitter 模拟 SSE
有时候,我们可能需要使用不同的内容类型,但仍然希望保持类似 SSE 的行为。这时可以使用 ResponseBodyEmitter。
@PostMapping(value = "/chatStream2", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public ResponseBodyEmitter chatStream2(@RequestBody @Validated GetAiAuditRequest dto, HttpServletResponse response) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter(-1L);
response.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE);
response.setCharacterEncoding("UTF-8");
CompletableFuture.runAsync(() -> {
try {
TestClass.invoke(dto.getPrompt(), chunk -> {
try {
String sseFormattedData = "data:" + chunk + "\n\n";
emitter.send(sseFormattedData.getBytes(StandardCharsets.UTF_8), MediaType.APPLICATION_OCTET_STREAM);
} catch (Exception e) {
log.error("Error sending chunk", e);
}
});
} catch (Exception e) {
emitter.completeWithError(e);
} finally {
emitter.complete();
}
}, asyncExecutor);
return emitter;
}
这种方法允许我们使用自定义的内容类型,同时保持类似 SSE 的数据格式。
3. 使用 Reactive Streams 和 ServerSentEvent
Spring WebFlux 提供了响应式编程的支持,我们可以使用 Flux 和 ServerSentEvent 来实现流式输出。
@PostMapping(value = "/chatStream3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatStream3(@RequestBody @Validated GetAiAuditRequest dto) {
return Flux.<ServerSentEvent<String>>create(sink -> {
TestClass.invoke(dto.getPrompt(), chunk -> {
sink.next(ServerSentEvent.<String>builder()
.data(chunk)
.build());
});
sink.complete();
}).subscribeOn(Schedulers.fromExecutor(asyncExecutor))
.onErrorResume(e -> {
log.error("Error in chatStream3", e);
return Flux.just(ServerSentEvent.<String>builder()
.data("Error: " + e.getMessage())
.build());
});
}
这种方法利用了响应式编程的特性,提供了更好的背压处理和资源管理。
4. 使用 Flux 直接输出数据
有时我们可能不需要完整的 SSE 格式,只需要直接输出数据流。这时可以使用 Flux 直接输出数据。
@PostMapping(value = "/chatStream4", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Object> chatStream4(@RequestBody @Validated GetAiAuditRequest dto) {
return Flux.create(sink -> {
TestClass.invoke(dto.getPrompt(), chunk -> {
sink.next(chunk);
sink.next("\n"); // 添加换行符以确保立即刷新
});
sink.complete();
}).subscribeOn(Schedulers.fromExecutor(asyncExecutor))
.onErrorResume(e -> {
log.error("Error in chatStream4", e);
return Flux.just("Error: " + e.getMessage(), "\n");
});
}
这种方法更加灵活,允许我们自定义输出格式。
5. 使用 AsyncContext 和 ServletOutputStream
最后,我们可以使用传统的 Servlet API 来实现流式输出,这种方法提供了最大的灵活性。
@PostMapping(value = "/chatStream5", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public void chatStream5(@RequestBody @Validated GetAiAuditRequest dto, HttpServletRequest request, HttpServletResponse response) throws IOException {
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0); // 无超时
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
asyncExecutor.execute(() -> {
try {
PrintWriter writer = response.getWriter();
TestClass.invoke(dto.getPrompt(), chunk -> {
try {
writer.write("data:" + chunk + "\n\n");
writer.flush();
} catch (Exception e) {
log.error("Error writing chunk", e);
}
});
} catch (Exception e) {
log.error("Error in chat stream", e);
} finally {
asyncContext.complete();
}
});
}
这种方法使用 AsyncContext 和 PrintWriter 直接写入响应,提供了最大的控制权。
效果测试
curl 测试
curl -X POST \
-H "Accept: text/event-stream" \
-H "Cache-Control: no-cache" \
-H "Connection: keep-alive" \
-H "Content-Type: application/json" \
-d '{"prompt":"What is public in Java?"}' \
--no-buffer \
http://127.0.0.1/view/data/chatStream | \
sed 's/^data://; s/#¥#/\n/g' | tr -d '\n'
curl -X POST \
-H "Accept: application/octet-stream" \
-H "Content-Type: application/json" \
-d '{"prompt":"What is public in Java?"}' \
--no-buffer \
http://127.0.0.1/view/data/chatStream2 | \
sed 's/^data://; s/#¥#/\n/g' | tr -d '\n'
Java 测试:
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
public class SseClientControllerTest {
private static final String URL_STRING = "http://127.0.0.1:18083/view/data/chatStream2";
public static void main(String[] args) {
try {
long startTime = System.currentTimeMillis();
System.out.println((System.currentTimeMillis()-startTime)+"----"+"步骤1: 准备连接SSE端点");
String jsonBody = "{\"prompt\":\"What is public in Java?\"}";
byte[] postData = jsonBody.getBytes(StandardCharsets.UTF_8);
URL url = new URL(URL_STRING);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST"); // 如果服务器支持 POST 方式的 SSE
if(URL_STRING.endsWith("chatStream2")){
conn.setRequestProperty("Accept", "application/octet-stream");
} else {
conn.setRequestProperty("Accept", "text/event-stream");
conn.setRequestProperty("Cache-Control", "no-cache");
conn.setRequestProperty("Connection", "keep-alive");
}
conn.setReadTimeout(0); // 无限读取超时
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
conn.setRequestProperty("Content-Length", String.valueOf(postData.length));
conn.setDoOutput(true);
System.out.println((System.currentTimeMillis()-startTime)+"----"+"步骤2: 发送POST请求到SSE端点");
try (OutputStream os = conn.getOutputStream()) {
os.write(postData);
}
System.out.println((System.currentTimeMillis()-startTime)+"----"+"步骤3: 收到响应,状态码: " + conn.getResponseCode());
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new RuntimeException("HTTP error! status: " + conn.getResponseCode());
}
System.out.println((System.currentTimeMillis()-startTime)+"----"+"步骤4: 开始读取SSE流");
try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
if (!line.isEmpty()) {
System.out.print(line.replace("data:","").replace("#¥#","\n"));
}
}
}
System.out.println((System.currentTimeMillis()-startTime)+"----"+"步骤6: SSE流结束");
} catch (Exception e) {
System.err.println("发生错误: " + e.getMessage());
e.printStackTrace();
}
}
}
总结
这五种方法展示了 Spring Boot 中实现流式输出的不同方式,从传统的 SSE 到现代的响应式编程。选择哪种方法取决于你的具体需求,如性能要求、兼容性考虑、以及与现有代码的集成等因素。无论选择哪种方法,流式输出都能显著提升实时数据处理和长时间运行任务的用户体验。
在实际应用中,记得要考虑错误处理、资源管理和性能优化。同时,客户端的实现也需要相应地调整以正确处理这些流式数据。
希望这篇文章能帮助你在 Spring Boot 项目中实现高效的流式输出!