AI Services:流式返回

流式返回指大模型不是等完整回答生成完毕后一次性返回所有结果,而是将生成的内容拆分成一个个小片段(通常是单词、句子或段落),生成一段就返回一段,持续地向调用方推送数据,直到回答全部生成完成。

你可以把它类比成:

  • 「非流式返回」:点一份外卖,商家做好所有菜品后打包,一次性送到你手上。

  • 「流式返回」:吃自助小火锅,食材一边煮,你一边夹着吃,不用等所有食材都煮熟再动筷。

最典型的场景就是 ChatGPT 网页端的回答 —— 你会看到文字逐行滚动出现,这就是流式返回的直观体现,LangChain4j 对这种交互模式提供了完善的支持。

TokenStream

在 LangChain4j 中,如果我们创建的 AI 服务使用 TokenStream 作为返回类型时,AI 服务则可以逐个 token 地流式返回响应。

TokenStream 表示来自模型的 Token 流,您可以订阅该 Token 流,并在有新的部分响应(通常是单个 Token)可用、模型完成流传输或流传输过程中发生错误时接收更新。

TokenStream 的基础方法如下:

  • start():完成当前的 TokenStream 构建,开始处理。

  • ignoreErrors():流传输过程中出现的所有错误都将被忽略(但会以WARN日志级别记录)。

  • onError(Consumer<Throwable> errorHandler):当流传输过程中发生错误时,errorHandler 将被调用。

  • onCompleteResponse(Consumer<ChatResponse> completeResponseHandler):当大语言模型完成最终聊天响应时,completeResponseHandler 将被调用。

中间完整响应监听方法(是整段完整响应的粗粒度监听,仅在复杂流程的中间节点触发):

  • onIntermediateResponse(Consumer<ChatResponse> intermediateResponseHandler):当大语言模型完成流式传输中间聊天响应(而非最终响应)时,所提供的消费者将被调用。

什么是中间响应?

当 AI 执行复杂流程时(比如工具调用、多轮推理),会在「最终响应生成前」产生「中间结果」,这个中间结果就是「中间响应」。

例如:用户提问 “今天北京天气怎么样?”,AI 流程是「分析问题 → 调用天气查询工具 → 根据工具返回结果生成最终回答」,其中「调用天气查询工具的请求」就是一个中间响应,onIntermediateResponse() 就会监听这个节点的完整结果。

流式文本片段监听方法(聚焦于单个/片段 token 的纯文本流式输出,是逐字/逐词的细粒度监听):

  • onPartialResponse(Consumer<String> partialResponseHandler):每当语言模型生成新的部分文本响应(通常是单个token)时,所提供的消费者就会被调用。

  • onPartialResponseWithContext(BiConsumer<PartialResponse, PartialResponseContext> handler):每当大语言模型有新的部分文本响应可用时,所提供的 handler 就会被调用。注意,该方法提供上下文信息。

思考/推理相关方法:

  • onPartialThinking(Consumer<PartialThinking> partialThinkingHandler):每当大语言模型有新的部分思考/推理文本可用时,所提供的 partialThinkingHandler 就会被调用。

  • onPartialThinkingWithContext(BiConsumer<PartialThinking, PartialThinkingContext> handler):每当语言模型有新的部分思考/推理文本可用时,所提供的 handler 就会被调用。注意,该方法提供上下文信息。

检索增强 RAG 相关方法:

  • onRetrieved(Consumer<List<Content>> contentHandler):如果使用 RetrievalAugmentor 检索到任何内容(Contents),则会调用所提供的 contentHandler。

工具调用相关方法:

  • beforeToolExecution(Consumer<BeforeToolExecution> beforeToolExecutionHandler):beforeToolExecutionHandler 将在工具执行前被调用。

  • onPartialToolCall(Consumer<PartialToolCall> partialToolCallHandler):每当大语言模型有新的部分工具调用可用时,所提供的 partialToolCallHandler 会被调用。

  • onPartialToolCallWithContext(BiConsumer<PartialToolCall, PartialToolCallContext> handler):每当大语言模型有新的部分工具调用可用时,所提供的 handler 会被调用。注意,该方法提供上下文信息。

  • onToolExecuted(Consumer<ToolExecution> toolExecuteHandler):在工具执行后立即调用 toolExecuteHandler。

简单示例:

package com.hxstrive.langchain4j.aiServices;

import dev.langchain4j.model.chat.StreamingChatModel;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.model.chat.response.PartialThinking;
import dev.langchain4j.model.openai.OpenAiStreamingChatModel;
import dev.langchain4j.rag.content.Content;
import dev.langchain4j.service.AiServices;
import dev.langchain4j.service.TokenStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class TokenStreamDemo {
    // 推荐:将OPEN_API_KEY设置成环境变量, 避免硬编码或随着代码泄露
    // 注意,设置完环境变量记得重启IDEA,不然可能环境变量不会生效
    private static final String API_KEY = System.getenv("OPEN_API_KEY");

    // 定义业务接口
    interface Assistant {
        TokenStream chat(String userMessage);
    }

    public static void main(String[] args) {
        // 创建 ChatModel 实现类(OpenAI 为例)
        StreamingChatModel chatModel = OpenAiStreamingChatModel.builder()
                .baseUrl("https://api.xty.app/v1")
                .apiKey(API_KEY)
                .modelName("gpt-4o-mini")
                .temperature(0.7)
                .returnThinking(true) // 开启模型思考
                .logRequests(true)
                .logResponses(false) // 禁止输出响应日志
                .build();

        // 使用 AiServices 创建服务
        Assistant assistant = AiServices.builder(Assistant.class)
                .streamingChatModel(chatModel)
                .build();

        // 发起对话
        CompletableFuture<ChatResponse> futureResponse = new CompletableFuture<>();
        TokenStream tokenStream = assistant.chat("你是谁?");

        // AI 模型并非一次性返回完整回答,而是将回答拆分为多个Token逐段推送,
        // 每推送一段部分文本内容时,被触发一次
        tokenStream.onPartialResponse((String partialResponse) -> {
                    System.out.println("onPartialResponse() " + partialResponse.replaceAll("\n", "\\\\n"));
                })
                // 模型产生了非最终响应的 “思考性中间数据”(例如部分推理逻辑),会触发该回调
                .onPartialThinking((PartialThinking partialThinking) -> {
                    System.out.println("onPartialThinking() " + partialThinking.text());
                })
                // 当AI模型从外部数据源(如知识库)成功获取到与当前对话相关的参考内容时,触发该方法
                .onRetrieved((List<Content> contents) -> {
                    System.out.println("onRetrieved() " + contents);
                })
                // 每一轮流式返回 “阶段性聊天数据” 时触发
                .onIntermediateResponse((ChatResponse intermediateResponse) -> {
                    System.out.println("onIntermediateResponse() " + intermediateResponse.aiMessage().text());
                })
                // 当模型完成所有内容生成,流式传输彻底结束时,该方法才会被调用
                .onCompleteResponse((ChatResponse response) -> {
                    System.out.println("onCompleteResponse() 响应完成");
                    futureResponse.complete(response);
                })
                // 当出现异常时调用
                .onError((Throwable error) -> {
                    System.out.println("onError()");
                    futureResponse.completeExceptionally(error);
                })
                .start();

        // 等待模型返回结果
        ChatResponse chatResponse = futureResponse.join();
        System.out.println("\n\n" + chatResponse.aiMessage().text());
    }
}

运行示例,输出日志如下:

onRetrieved() []
15:27:57.012 [main] INFO dev.langchain4j.http.client.log.LoggingHttpClient -- HTTP request:
- method: POST
- url: https://api.xty.app/v1/chat/completions
- headers: [Authorization: Beare...00], [User-Agent: langchain4j-openai], [Content-Type: application/json]
- body: {
  "model" : "gpt-4o-mini",
  "messages" : [ {
    "role" : "user",
    "content" : "你是谁?"
  } ],
  "temperature" : 0.7,
  "stream" : true,
  "stream_options" : {
    "include_usage" : true
  }
}

onPartialResponse() 我是 **Chat
onPartialResponse() GPT,基于 GPT
onPartialResponse() -5 mini 架
onPartialResponse() 构** 的智能聊天助手。🤖  \n\n我可以做很多事情,比如:  \n\n
onPartialResponse() - 回答问题和解释复杂概念  \n
onPartialResponse() - 帮你学习、写作、编程或做研究  \n- 提
onPartialResponse() 供创意
onPartialResponse() 点子或规划建议  \n- 分析数据、解决数学题、
onPartialResponse() 检查逻辑  \n- 甚至能和你聊日常、
onPartialResponse() 趣味或者哲学  \n\n简单来说,我就是一个能理解你问题、给
onPartialResponse() 出详细回答、偶尔还带点
onPartialResponse() 幽默的 AI 助手。  \n\n我好
onPartialResponse() 奇,你问这个是出于兴趣,还是想确认我能做什么?
onCompleteResponse() 响应完成


我是 **ChatGPT,基于 GPT-5 mini 架构** 的智能聊天助手。🤖  

我可以做很多事情,比如:  

- 回答问题和解释复杂概念  
- 帮你学习、写作、编程或做研究  
- 提供创意点子或规划建议  
- 分析数据、解决数学题、检查逻辑  
- 甚至能和你聊日常、趣味或者哲学  

简单来说,我就是一个能理解你问题、给出详细回答、偶尔还带点幽默的 AI 助手。  

我好奇,你问这个是出于兴趣,还是想确认我能做什么?

Process finished with exit code 0

流式传输取消

如果您希望取消流式传输,可以通过以下回调之一进行操作:

  • onPartialResponseWithContext(BiConsumer<PartialResponse, PartialResponseContext>)

  • onPartialThinkingWithContext(BiConsumer<PartialThinking, PartialThinkingContext>)

通过  PartialThinkingContext 的 streamingHandle() 方法获取 StreamingHandle 对象,然后调用 cancel() 方法取消流。

简单示例:

package com.hxstrive.langchain4j.aiServices;

import dev.langchain4j.model.chat.StreamingChatModel;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.model.chat.response.PartialResponse;
import dev.langchain4j.model.chat.response.PartialResponseContext;
import dev.langchain4j.model.openai.OpenAiStreamingChatModel;
import dev.langchain4j.service.AiServices;
import dev.langchain4j.service.TokenStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

public class TokenStreamCancelDemo {
    // 推荐:将OPEN_API_KEY设置成环境变量, 避免硬编码或随着代码泄露
    // 注意,设置完环境变量记得重启IDEA,不然可能环境变量不会生效
    private static final String API_KEY = System.getenv("OPEN_API_KEY");

    // 定义业务接口
    interface Assistant {
        TokenStream chat(String userMessage);
    }

    public static void main(String[] args) {
        // 创建 ChatModel 实现类(OpenAI 为例)
        StreamingChatModel chatModel = OpenAiStreamingChatModel.builder()
                .baseUrl("https://api.xty.app/v1")
                .apiKey(API_KEY)
                .modelName("gpt-4o-mini")
                .temperature(0.7)
                .returnThinking(true) // 开启模型思考
                .logRequests(true)
                .logResponses(false) // 禁止输出响应日志
                .build();

        // 使用 AiServices 创建服务
        Assistant assistant = AiServices.builder(Assistant.class)
                .streamingChatModel(chatModel)
                .build();

        // 发起对话
        CompletableFuture<ChatResponse> futureResponse = new CompletableFuture<>();
        TokenStream tokenStream = assistant.chat("你是谁?");

        final AtomicLong count = new AtomicLong(0);
        // AI 模型并非一次性返回完整回答,而是将回答拆分为多个Token逐段推送,
        // 每推送一段部分文本内容时,被触发一次
        tokenStream.onPartialResponseWithContext((PartialResponse partialResponse,
                                                  PartialResponseContext partialResponseContext) -> {
                    String partialText = partialResponse.text();
                    System.out.println("onPartialResponseWithContext() " + partialText.replaceAll("\n", "\\\\n"));
                    // 如果已经接收到超过50个字符,则取消流
                    if (count.addAndGet(partialText.length()) > 50) {
                        System.out.println("当前已接收 " + count.get() + " 个字符,取消流...");
                        partialResponseContext.streamingHandle().cancel();

                        // 终止阻塞
                        futureResponse.completeExceptionally(new RuntimeException("流已经被取消"));
                    }
                })
                // 当模型完成所有内容生成,流式传输彻底结束时,该方法才会被调用
                .onCompleteResponse((ChatResponse response) -> {
                    System.out.println("onCompleteResponse() 响应完成");
                    futureResponse.complete(response);
                })
                // 当出现异常时调用
                .onError((Throwable error) -> {
                    System.out.println("onError()");
                    futureResponse.completeExceptionally(error);
                })
                .start();

        // 等待模型返回结果
        ChatResponse chatResponse = futureResponse.join();
        System.out.println("\n\n" + chatResponse.aiMessage().text());
    }
}

运行示例,输出日志:

15:38:05.006 [main] INFO dev.langchain4j.http.client.log.LoggingHttpClient -- HTTP request:
- method: POST
- url: https://api.xty.app/v1/chat/completions
- headers: [Authorization: Beare...00], [User-Agent: langchain4j-openai], [Content-Type: application/json]
- body: {
  "model" : "gpt-4o-mini",
  "messages" : [ {
    "role" : "user",
    "content" : "你是谁?"
  } ],
  "temperature" : 0.7,
  "stream" : true,
  "stream_options" : {
    "include_usage" : true
  }
}

onPartialResponseWithContext() 我是 **Chat
onPartialResponseWithContext() GPT,基于 GPT
onPartialResponseWithContext() -5 mini 模型** 的聊天助手。😄
onPartialResponseWithContext()   \n我可以回答问题、帮你学习、分析复杂信息、写作、编程,甚至跟
当前已接收 72 个字符,取消流...
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: 流已经被取消
	at java.base/java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:413)
	at java.base/java.util.concurrent.CompletableFuture.join(CompletableFuture.java:2118)
	at com.hxstrive.langchain4j.aiServices.TokenStreamCancelDemo.main(TokenStreamCancelDemo.java:73)
Caused by: java.lang.RuntimeException: 流已经被取消
	at com.hxstrive.langchain4j.aiServices.TokenStreamCancelDemo.lambda$main$0(TokenStreamCancelDemo.java:57)
...

注意:当调用 StreamingHandle.cancel() 时,LangChain4j 会关闭连接并停止流传输。一旦调用了  StreamingHandle.cancel(),TokenStream 将不会再收到任何回调。

Flux

你也可以使用 Flux<String> 来替代 TokenStream。为此,请导入 langchain4j-reactor 模块:

<dependency>
    <groupId>dev.langchain4j</groupId>
    <artifactId>langchain4j-reactor</artifactId>
    <version>1.10.0-beta18</version>
</dependency>

简单示例:

package com.hxstrive.langchain4j.aiServices;

import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.model.chat.StreamingChatModel;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.model.openai.OpenAiStreamingChatModel;
import dev.langchain4j.service.AiServices;
import reactor.core.publisher.Flux;
import java.util.concurrent.CompletableFuture;

public class TokenStreamFluxDemo {
    // 推荐:将OPEN_API_KEY设置成环境变量, 避免硬编码或随着代码泄露
    // 注意,设置完环境变量记得重启IDEA,不然可能环境变量不会生效
    private static final String API_KEY = System.getenv("OPEN_API_KEY");

    // 定义业务接口
    interface Assistant {
        Flux<String> chat(String userMessage);
    }

    public static void main(String[] args) {
        // 创建 ChatModel 实现类(OpenAI 为例)
        StreamingChatModel chatModel = OpenAiStreamingChatModel.builder()
                .baseUrl("https://api.xty.app/v1")
                .apiKey(API_KEY)
                .modelName("gpt-4o-mini")
                .temperature(0.7)
                .returnThinking(true) // 开启模型思考
                .logRequests(true)
                .logResponses(false) // 禁止输出响应日志
                .build();

        // 使用 AiServices 创建服务
        Assistant assistant = AiServices.builder(Assistant.class)
                .streamingChatModel(chatModel)
                .build();

        // 发起对话
        final StringBuilder builder = new StringBuilder();
        final CompletableFuture<ChatResponse> futureResponse = new CompletableFuture<>();
        Flux<String> flux = assistant.chat("你是谁?");
        flux.subscribe(
                // 成功接收单个文本片段的回调(处理每一个流式返回的 token/片段)
                textFragment -> {
                    builder.append(textFragment);
                    System.out.println("textFragment: " + textFragment);
                },
                // 异常回调
                error -> {
                    System.err.println("\n对话发生异常:" + error.getMessage());
                    futureResponse.completeExceptionally(error); // 异常传递给 CompletableFuture
                },
                // 流结束回调
                () -> {
                    System.out.println("\n\n对话流结束");
                    futureResponse.complete(ChatResponse.builder()
                            .aiMessage(AiMessage.aiMessage(builder.toString())).build());
                }
        );

        // 等待模型返回结果
        ChatResponse chatResponse = futureResponse.join();
        System.out.println("\n\n" + chatResponse.aiMessage().text());
    }
}

运行示例,输出日志:

15:51:43.953 [main] INFO dev.langchain4j.http.client.log.LoggingHttpClient -- HTTP request:
- method: POST
- url: https://api.xty.app/v1/chat/completions
- headers: [Authorization: Beare...00], [User-Agent: langchain4j-openai], [Content-Type: application/json]
- body: {
  "model" : "gpt-4o-mini",
  "messages" : [ {
    "role" : "user",
    "content" : "你是谁?"
  } ],
  "temperature" : 0.7,
  "stream" : true,
  "stream_options" : {
    "include_usage" : true
  }
}

textFragment: 我是
textFragment: 一个
textFragment: 人工
textFragment: 智能
textFragment: 助手
textFragment: ,
textFragment: 旨
textFragment: 在
textFragment: 提供
textFragment: 信息
textFragment: 和
textFragment: 帮助
textFragment: 解
textFragment: 答
textFragment: 问题
textFragment: 。
textFragment: 有什么
textFragment: 我
textFragment: 可以
textFragment: 帮助
textFragment: 你
textFragment: 的吗
textFragment: ?


对话流结束


我是一个人工智能助手,旨在提供信息和帮助解答问题。有什么我可以帮助你的吗?

更多 LangChain4j 知识请阅读后续教程……

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号