Spring Boot 中实现流式输出的方案总结

在现代 Web 应用程序中,实时数据流和长时间运行的操作变得越来越普遍。

Spring Boot 提供了多种方法来实现流式输出,使得服务器可以逐步向客户端发送数据,而不是等待整个响应准备就绪。

本文将介绍 5 种在 Spring Boot 中实现流式输出的方法,从传统的 Server-Sent Events (SSE) 到更现代的响应式编程方法。

流式输出方案对比

五种流式输出方法的主要优缺点:

方法 优点 缺点
SseEmitter ・支持 SSE,适合实时数据流・客户端易处理事件流・长连接,减少网络开销 ・仅支持文本数据・部分旧浏览器可能不支持 SSE
ResponseBodyEmitter ・支持二进制数据流・灵活性高,可发送不同类型数据 ・客户端需自行处理数据流・可能需额外客户端逻辑解析数据
DeferredResult ・支持异步处理・可一次性返回所有数据 ・不支持实时流式传输・可能占用较多服务器内存
Flux ・支持响应式编程・非阻塞式,效率高・支持背压・易于处理错误和异常 ・需要响应式编程知识・可能增加系统复杂性
AsyncContext ・直接操作底层 Servlet API,控制力强・可处理大量并发请求 ・代码复杂度较高・错误处理相对困难・不如高级抽象易用

方案实现方式

1. 使用 SseEmitter 实现 Server-Sent Events

Server-Sent Events (SSE) 是一种允许服务器向客户端推送数据的技术。Spring 框架提供了 SseEmitter 类来简化 SSE 的实现。

@PostMapping(value = "/chatStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter chatStream(@RequestBody @Validated GetAiAuditRequest dto, HttpServletResponse response) {
    SseEmitter emitter = new SseEmitter(-1L); // 设置无超时
    response.setContentType("text/event-stream");
    response.setCharacterEncoding("UTF-8");

    CompletableFuture.runAsync(() -> {
        try {
            TestClass.invoke(dto.getPrompt(), chunk -> {
                try {
                    emitter.send(chunk);
                } catch (Exception e) {
                    log.error("Error sending SSE event", e);
                }
            });
        } catch (Exception e) {
            emitter.completeWithError(e);
        } finally {
            emitter.complete();
        }
    }, asyncExecutor);

    return emitter;
}

这种方法使用 SseEmitter 来发送事件流,适合需要实时更新的场景。

2. 使用 ResponseBodyEmitter 模拟 SSE

有时候,我们可能需要使用不同的内容类型,但仍然希望保持类似 SSE 的行为。这时可以使用 ResponseBodyEmitter

@PostMapping(value = "/chatStream2", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public ResponseBodyEmitter chatStream2(@RequestBody @Validated GetAiAuditRequest dto, HttpServletResponse response) {
    ResponseBodyEmitter emitter = new ResponseBodyEmitter(-1L);
    response.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE);
    response.setCharacterEncoding("UTF-8");

    CompletableFuture.runAsync(() -> {
        try {
            TestClass.invoke(dto.getPrompt(), chunk -> {
                try {
                    String sseFormattedData = "data:" + chunk + "\n\n";
                    emitter.send(sseFormattedData.getBytes(StandardCharsets.UTF_8), MediaType.APPLICATION_OCTET_STREAM);
                } catch (Exception e) {
                    log.error("Error sending chunk", e);
                }
            });
        } catch (Exception e) {
            emitter.completeWithError(e);
        } finally {
            emitter.complete();
        }
    }, asyncExecutor);

    return emitter;
}

这种方法允许我们使用自定义的内容类型,同时保持类似 SSE 的数据格式。

3. 使用 Reactive Streams 和 ServerSentEvent

Spring WebFlux 提供了响应式编程的支持,我们可以使用 FluxServerSentEvent 来实现流式输出。

@PostMapping(value = "/chatStream3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatStream3(@RequestBody @Validated GetAiAuditRequest dto) {
    return Flux.<ServerSentEvent<String>>create(sink -> {
                TestClass.invoke(dto.getPrompt(), chunk -> {
                    sink.next(ServerSentEvent.<String>builder()
                            .data(chunk)
                            .build());
                });
                sink.complete();
            }).subscribeOn(Schedulers.fromExecutor(asyncExecutor))
            .onErrorResume(e -> {
                log.error("Error in chatStream3", e);
                return Flux.just(ServerSentEvent.<String>builder()
                        .data("Error: " + e.getMessage())
                        .build());
            });
}

这种方法利用了响应式编程的特性,提供了更好的背压处理和资源管理。

4. 使用 Flux 直接输出数据

有时我们可能不需要完整的 SSE 格式,只需要直接输出数据流。这时可以使用 Flux 直接输出数据。

@PostMapping(value = "/chatStream4", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Object> chatStream4(@RequestBody @Validated GetAiAuditRequest dto) {
    return Flux.create(sink -> {
                TestClass.invoke(dto.getPrompt(), chunk -> {
                    sink.next(chunk);
                    sink.next("\n");  // 添加换行符以确保立即刷新
                });
                sink.complete();
            }).subscribeOn(Schedulers.fromExecutor(asyncExecutor))
            .onErrorResume(e -> {
                log.error("Error in chatStream4", e);
                return Flux.just("Error: " + e.getMessage(), "\n");
            });
}

这种方法更加灵活,允许我们自定义输出格式。

5. 使用 AsyncContext 和 ServletOutputStream

最后,我们可以使用传统的 Servlet API 来实现流式输出,这种方法提供了最大的灵活性。

@PostMapping(value = "/chatStream5", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public void chatStream5(@RequestBody @Validated GetAiAuditRequest dto, HttpServletRequest request, HttpServletResponse response) throws IOException {
    AsyncContext asyncContext = request.startAsync();
    asyncContext.setTimeout(0); // 无超时

    response.setContentType("text/event-stream");
    response.setCharacterEncoding("UTF-8");

    asyncExecutor.execute(() -> {
        try {
            PrintWriter writer = response.getWriter();
            TestClass.invoke(dto.getPrompt(), chunk -> {
                try {
                    writer.write("data:" + chunk + "\n\n");
                    writer.flush();
                } catch (Exception e) {
                    log.error("Error writing chunk", e);
                }
            });
        } catch (Exception e) {
            log.error("Error in chat stream", e);
        } finally {
            asyncContext.complete();
        }
    });
}

这种方法使用 AsyncContextPrintWriter 直接写入响应,提供了最大的控制权。

效果测试

curl 测试

curl -X POST \
     -H "Accept: text/event-stream" \
     -H "Cache-Control: no-cache" \
     -H "Connection: keep-alive" \
     -H "Content-Type: application/json" \
     -d '{"prompt":"What is public in Java?"}' \
     --no-buffer \
     http://127.0.0.1/view/data/chatStream | \
sed 's/^data://; s/#¥#/\n/g' | tr -d '\n'



curl -X POST \
     -H "Accept: application/octet-stream" \
     -H "Content-Type: application/json" \
     -d '{"prompt":"What is public in Java?"}' \
     --no-buffer \
     http://127.0.0.1/view/data/chatStream2 | \
sed 's/^data://; s/#¥#/\n/g' | tr -d '\n'

Java 测试:

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;

public class SseClientControllerTest {

    private static final String URL_STRING = "http://127.0.0.1:18083/view/data/chatStream2";
    
    public static void main(String[] args) {
        try {
            long startTime = System.currentTimeMillis();
            System.out.println((System.currentTimeMillis()-startTime)+"----"+"步骤1: 准备连接SSE端点");

            String jsonBody = "{\"prompt\":\"What is public in Java?\"}";
            byte[] postData = jsonBody.getBytes(StandardCharsets.UTF_8);

            URL url = new URL(URL_STRING);
            HttpURLConnection conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("POST");  // 如果服务器支持 POST 方式的 SSE
            if(URL_STRING.endsWith("chatStream2")){
                conn.setRequestProperty("Accept", "application/octet-stream");
            } else {
                conn.setRequestProperty("Accept", "text/event-stream");
                conn.setRequestProperty("Cache-Control", "no-cache");
                conn.setRequestProperty("Connection", "keep-alive");
            }
            conn.setReadTimeout(0);  // 无限读取超时
            conn.setRequestMethod("POST");
            conn.setRequestProperty("Content-Type", "application/json");
            conn.setRequestProperty("Content-Length", String.valueOf(postData.length));
            conn.setDoOutput(true);

            System.out.println((System.currentTimeMillis()-startTime)+"----"+"步骤2: 发送POST请求到SSE端点");

            try (OutputStream os = conn.getOutputStream()) {
                os.write(postData);
            }

            System.out.println((System.currentTimeMillis()-startTime)+"----"+"步骤3: 收到响应,状态码: " + conn.getResponseCode());

            if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
                throw new RuntimeException("HTTP error! status: " + conn.getResponseCode());
            }

            System.out.println((System.currentTimeMillis()-startTime)+"----"+"步骤4: 开始读取SSE流");
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    if (!line.isEmpty()) {
                        System.out.print(line.replace("data:","").replace("#¥#","\n"));
                    }
                }
            }

            System.out.println((System.currentTimeMillis()-startTime)+"----"+"步骤6: SSE流结束");

        } catch (Exception e) {
            System.err.println("发生错误: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

总结

这五种方法展示了 Spring Boot 中实现流式输出的不同方式,从传统的 SSE 到现代的响应式编程。选择哪种方法取决于你的具体需求,如性能要求、兼容性考虑、以及与现有代码的集成等因素。无论选择哪种方法,流式输出都能显著提升实时数据处理和长时间运行任务的用户体验。

在实际应用中,记得要考虑错误处理、资源管理和性能优化。同时,客户端的实现也需要相应地调整以正确处理这些流式数据。

希望这篇文章能帮助你在 Spring Boot 项目中实现高效的流式输出!