LangChain4j 响应流式传输

注意:本页面介绍了使用低级 LLM API 进行响应流传输的相关内容。有关高级 LLM API,请阅读 AI 服务

大型语言模型(LLMs)一次生成一个标记(Token 是文本被模型处理的最小语义单元,大模型按照 Token 收费)的文本,因此许多大型语言模型提供商都提供了一种逐标记流式传输响应的方式,而不是等待整个文本生成完成。这显著改善了用户体验,因为用户无需等待不确定的时间,几乎可以立即开始阅读响应内容。效果如下图:

screenshots.gif

上面效果也称为打字机效果,它在交互和内容呈现中,有提升用户注意力、增强场景氛围感、优化信息传递节奏等好处。

  

StreamingChatResponseHandler 接口

StreamingChatResponseHandler 接口是 AI 流式聊天交互场景下的响应处理器接口,主要用于接收和处理 AI 聊天会话过程中产生的 “流式输出”(即非一次性返回完整结果,而是分批次返回片段数据),是实现 “打字机效果”、“实时思考过程展示”、“动态工具调用” 等交互体验的核心接口。

StreamingChatResponseHandler 接口定义:

/**
 * 流式聊天响应处理器接口
 * 用于处理AI聊天交互过程中的各类流式响应事件(部分响应、思考过程、工具调用、完整响应、异常)
 * 所有带default的方法为可选实现,核心必实现方法为 onCompleteResponse 和 onError
 */
public interface StreamingChatResponseHandler {

    /**
     * 接收文本类型的部分响应
     * @param partialResponse 单次返回的文本片段
     */
    default void onPartialResponse(String partialResponse) {}

    /**
     * 接收结构化的部分响应(带上下文)
     * @param partialResponse 结构化的部分响应对象(包含文本、类型等信息)
     * @param context 部分响应的上下文信息(如会话ID、请求ID等)
     */
    default void onPartialResponse(PartialResponse partialResponse, PartialResponseContext context) {}

    /**
     * 接收AI思考过程的部分输出
     * @param partialThinking 单次返回的思考过程片段
     */
    default void onPartialThinking(PartialThinking partialThinking) {}

    /**
     * 接收结构化的AI思考过程(带上下文)
     * @param partialThinking 结构化的思考过程对象
     * @param context 思考过程的上下文信息
     */
    default void onPartialThinking(PartialThinking partialThinking, PartialThinkingContext context) {}

    /**
     * 接收工具调用的部分输出
     * @param partialToolCall 单次返回的工具调用片段
     */
    default void onPartialToolCall(PartialToolCall partialToolCall) {}

    /**
     * 接收结构化的工具调用输出(带上下文)
     * @param partialToolCall 结构化的工具调用对象(包含工具名称、参数等)
     * @param context 工具调用的上下文信息
     */
    default void onPartialToolCall(PartialToolCall partialToolCall, PartialToolCallContext context) {}

    /**
     * 接收工具调用完成的完整回调
     * @param completeToolCall 完整的工具调用结果对象
     */
    default void onCompleteToolCall(CompleteToolCall completeToolCall) {}

    /**
     * 接收聊天响应完成的最终回调(必实现)
     * 当整个流式响应结束,返回完整的聊天结果时触发
     * @param completeResponse 完整的聊天响应对象(包含最终回复、会话信息等)
     */
    void onCompleteResponse(ChatResponse completeResponse);

    /**
     * 接收响应过程中的异常回调(必实现)
     * 当流式响应过程中出现网络错误、AI服务异常等情况时触发
     * @param error 异常对象(包含错误类型、信息、堆栈等)
     */
    void onError(Throwable error);
}

通过实现 StreamingChatResponseHandler 接口,你可以为以下事件自定义操作:

  • 当下一个部分文本响应生成时:会调用 onPartialResponse(String) 或者 onPartialResponse(PartialResponse, PartialResponseContext)(你可以实现这两个方法中的任意一个)。根据大模型提供商的不同,部分响应文本可能包含一个或多个标记(Token)。例如,你可以在标记一可用时就直接将其发送到用户界面。

  • 当下一个部分思考 / 推理文本生成时:会调用 onPartialThinking(PartialThinking) 或 onPartialThinking(PartialThinking, PartialThinkingContext)(你可以实现这两个方法中的任意一个)。根据大语言模型提供商的不同,部分思考文本可能包含一个或多个标记。

  • 当下一个部分工具调用生成时:会调用 onPartialToolCall(PartialToolCall) 或者 onPartialToolCall(PartialToolCall, PartialToolCallContext)(你可以实现这两个方法中的任意一个)。

  • 当大语言模型完成单个工具调用的流式传输时:会调用 onCompleteToolCall(CompleteToolCall)。

  • 当大语言模型完成生成时:会调用 onCompleteResponse(ChatResponse)。ChatResponse 对象包含完整的响应(AiMessage)以及ChatResponseMetadata。

  • 发生错误时:onError(Throwable error) 会被调用。

  

StreamingResponseHandler 接口

StreamingResponseHandler 接口是 LangChain4j 框架中用于处理流式模型响应的回调接口,由模型在流式生成响应的过程中调用,核心作用是接收流式返回的 “标记(token)级片段”、完整响应结果及异常。

接口定义如下:

package dev.langchain4j.model;

import dev.langchain4j.model.output.Response;

/**
 * 用于处理模型流式响应的回调接口
 * @param <T> 完整响应内容的类型
 */
public interface StreamingResponseHandler<T> {

    /**
     * 当模型生成新的标记(token)时调用
     * @param token 模型生成的单个标记(最小粒度文本片段)
     */
    void onNext(String token);

    /**
     * 当流式响应完成时调用
     * @param response 包含完整结果的响应对象
     */
    default void onComplete(Response<T> response) {
    }

    /**
     * 当流式响应过程中发生错误时调用
     * @param error 发生的异常对象
     */
    void onError(Throwable error);
}

注意,当流式模型返回部分标记(Token)时将触发 onNext() 方法,当回复完成时调用 onComplete() 接口,Response 包含了本次调用响应的所有内容。如果模型出现了错误,则触发 onError() 方法。

  

StreamingChatModel 和 StreamingLanguageModel

对于 ChatModel 和 LanguageModel 接口,存在对应的 StreamingChatModel 和 StreamingLanguageModel 接口。这些接口具有相似的 API,但能够流式传输响应。它们接受 StreamingChatResponseHandler 接口 或 StreamingResponseHandler 接口的一个实现作为参数,在适当的时候进行回调,我们在回调方法中进行业务处理。

StreamingChatModel 接口

StreamingChatModel 是AI 流式聊天模型的核心抽象接口,定义了 “调用 AI 聊天模型并以流式方式获取响应” 的核心能力,是上层业务代码与底层 AI 模型(如 OpenAI、通义千问、文心一言等)之间的标准化交互层。

接口定义如下:

package dev.langchain4j.model.chat;

import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.model.ModelProvider;
import dev.langchain4j.model.chat.listener.ChatModelListener;
import dev.langchain4j.model.chat.request.ChatRequest;
import dev.langchain4j.model.chat.request.ChatRequestParameters;
import dev.langchain4j.model.chat.request.DefaultChatRequestParameters;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.model.chat.response.CompleteToolCall;
import dev.langchain4j.model.chat.response.PartialResponse;
import dev.langchain4j.model.chat.response.PartialResponseContext;
import dev.langchain4j.model.chat.response.PartialThinking;
import dev.langchain4j.model.chat.response.PartialThinkingContext;
import dev.langchain4j.model.chat.response.PartialToolCall;
import dev.langchain4j.model.chat.response.PartialToolCallContext;
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public interface StreamingChatModel {
    default void chat(ChatRequest chatRequest, final StreamingChatResponseHandler handler) {
        //...
        this.doChat(finalChatRequest, observingHandler);
    }

    default void doChat(ChatRequest chatRequest, StreamingChatResponseHandler handler) {
        throw new RuntimeException("Not implemented");
    }

    default ChatRequestParameters defaultRequestParameters() {
        return DefaultChatRequestParameters.EMPTY;
    }

    default List<ChatModelListener> listeners() {
        return List.of();
    }

    default ModelProvider provider() {
        return ModelProvider.OTHER;
    }

    default void chat(String userMessage, StreamingChatResponseHandler handler) {
        ChatRequest chatRequest = ChatRequest.builder().messages(new ChatMessage[]{UserMessage.from(userMessage)}).build();
        this.chat(chatRequest, handler);
    }

    default void chat(List<ChatMessage> messages, StreamingChatResponseHandler handler) {
        ChatRequest chatRequest = ChatRequest.builder().messages(messages).build();
        this.chat(chatRequest, handler);
    }

    default Set<Capability> supportedCapabilities() {
        return Set.of();
    }
}

接口定义与 ChatModel 相比,方法签名多了一个 StreamingChatResponseHandler 参数,其他都是一致的。

简单示例:

package com.hxstrive.langchain4j.streamingChatModel;

import dev.langchain4j.model.chat.StreamingChatModel;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
import dev.langchain4j.model.openai.OpenAiStreamingChatModel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class StreamingChatModelDemo {
    // 推荐:将OPEN_API_KEY设置成环境变量, 避免硬编码或随着代码泄露
    // 注意,设置完环境变量记得重启IDEA,不然可能环境变量不会生效
    private static final String API_KEY = System.getenv("OPEN_API_KEY");

    public static void main(String[] args) throws InterruptedException {
        // 创建 StreamingChatModel 实现类(OpenAI 为例)
        StreamingChatModel chatModel = OpenAiStreamingChatModel.builder()
                .baseUrl("https://api.xty.app/v1")
                .apiKey(API_KEY)
                .modelName("gpt-3.5-turbo")
                .temperature(0.7)
                .logRequests(true)
                .logResponses(false)
                .build();

        // 调用chat方法
        final CountDownLatch countDownLatch = new CountDownLatch(1); // 创建一个计数器
        chatModel.chat("你是谁?", new StreamingChatResponseHandler() {
            @Override
            public void onPartialResponse(String partialResponse) {
                System.out.println("部分回复内容:" + partialResponse);
            }

            @Override
            public void onCompleteResponse(ChatResponse chatResponse) {
                System.out.println("完整回复内容:" + chatResponse.aiMessage().text());
                countDownLatch.countDown();
            }

            @Override
            public void onError(Throwable throwable) {
                System.err.println("错误信息:" + throwable.getMessage());
                countDownLatch.countDown();
            }
        });

        // 等待回复完成
        if(!countDownLatch.await(2, TimeUnit.MINUTES)) {
            System.err.println("等待超时...");
        }
    }

}

运行示例,输出如下:

[main] INFO dev.langchain4j.http.client.log.LoggingHttpClient -- HTTP request:
- method: POST
- url: https://api.xty.app/v1/chat/completions
- headers: [Authorization: Beare...00], [User-Agent: langchain4j-openai], [Content-Type: application/json]
- body: {
  "model" : "gpt-3.5-turbo",
  "messages" : [ {
    "role" : "user",
    "content" : "你是谁?"
  } ],
  "temperature" : 0.7,
  "stream" : true, // 注意这里,开启了流式传输
  "stream_options" : {
    "include_usage" : true
  }
}

部分回复内容:我是
部分回复内容: **ChatGPT**,
部分回复内容:一个由 **Open
部分回复内容:AI** 训练的人工智能助手。  
我可以用中文或英文
部分回复内容:和你交流,帮助你解答问题、
部分回复内容:学习新知识、写作、翻译、编
部分回复内容:程、头脑风暴,或者只是聊天 😊
部分回复内容:

如果你愿意,也可以告诉我你
部分回复内容:现在最想做什么或遇到了什么问题
部分回复内容:。
完整回复内容:我是 **ChatGPT**,一个由 **OpenAI** 训练的人工智能助手。  
我可以用中文或英文和你交流,帮助你解答问题、学习新知识、写作、翻译、编程、头脑风暴,或者只是聊天 😊

如果你愿意,也可以告诉我你现在最想做什么或遇到了什么问题。

Process finished with exit code 0

一种更简洁的响应流处理方式是使用 LambdaStreamingResponseHandler 类。这个工具类提供了静态方法,可通过 lambda 表达式创建StreamingChatResponseHandler。使用 lambda 来处理响应流的方法非常简单,只需调用 onPartialResponse() 静态方法,并传入一个 lambda 表达式,该表达式定义了如何处理部分响应:

import static dev.langchain4j.model.LambdaStreamingResponseHandler.onPartialResponse;

model.chat("Tell me a joke", onPartialResponse(System.out::print));

onPartialResponseAndError() 方法允许你为 onPartialResponse() 和 onError() 这两个事件定义操作:

import static dev.langchain4j.model.LambdaStreamingResponseHandler.onPartialResponseAndError;

model.chat("Tell me a joke", onPartialResponseAndError(System.out::print, Throwable::printStackTrace));

注意,如果你使用 main() 方式运行,上面两个方法执行后程序将立即退出,需要自己处理同步问题,等待所有回复完成。

庆幸的是,LambdaStreamingResponseHandler 为我们提供了一个  onPartialResponseBlocking() 方法,内部使用 CountDownLatch 实现等待操作,如下:

// 创建 StreamingChatModel 实现类(OpenAI 为例)
StreamingChatModel chatModel = OpenAiStreamingChatModel.builder()
        .baseUrl("https://api.xty.app/v1")
        .apiKey(API_KEY)
        .modelName("gpt-3.5-turbo")
        .temperature(0.7)
        .logRequests(true)
        .logResponses(false)
        .build();

LambdaStreamingResponseHandler.onPartialResponseBlocking(chatModel, "你是谁?", System.out::println);

或者

// 创建 StreamingChatModel 实现类(OpenAI 为例)
StreamingChatModel chatModel = OpenAiStreamingChatModel.builder()
        .baseUrl("https://api.xty.app/v1")
        .apiKey(API_KEY)
        .modelName("gpt-3.5-turbo")
        .temperature(0.7)
        .logRequests(true)
        .logResponses(false)
        .build();

// 采用同步模式处理
LambdaStreamingResponseHandler.onPartialResponseAndErrorBlocking(chatModel, "你是谁?",
        System.out::println, (throwable) -> {
            System.err.println("错误消息:" + throwable.getMessage());
        });

注意,在 onPartialResponseBlocking() 和 onPartialResponseAndErrorBlocking() 方法内部会调用 chat() 方法。

  

StreamingLanguageModel 接口

StreamingLanguageModel 是通用流式语言模型的顶层抽象接口,定义了 “调用任意类型的流式语言模型(文本生成、补全、总结、翻译等)并以流式方式获取输出” 的核心能力。

它是比 StreamingChatModel 更通用的接口 ——StreamingChatModel 专用于 “对话式交互”(有问有答、带上下文),而 StreamingLanguageModel 覆盖所有 “文本流式生成” 场景(如纯文本补全、翻译、摘要、单轮文本生成);

接口定义如下:

public interface StreamingLanguageModel {
    void generate(String var1, StreamingResponseHandler<String> var2);

    default void generate(Prompt prompt, StreamingResponseHandler<String> handler) {
        this.generate(prompt.text(), handler);
    }
}

两个 generate() 方法均接收一个 StreamingResponseHandler 类型参数。

简单示例:

package com.hxstrive.langchain4j.streamingLanguageModel;

import dev.langchain4j.model.StreamingResponseHandler;
import dev.langchain4j.model.language.StreamingLanguageModel;
import dev.langchain4j.model.openai.OpenAiStreamingLanguageModel;
import dev.langchain4j.model.output.Response;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class StreamingLanguageModelDemo {
    // 推荐:将OPEN_API_KEY设置成环境变量, 避免硬编码或随着代码泄露
    // 注意,设置完环境变量记得重启IDEA,不然可能环境变量不会生效
    private static final String API_KEY = System.getenv("OPEN_API_KEY");

    public static void main(String[] args) throws InterruptedException {
        // 创建 StreamingLanguageModel 实现类(OpenAI 为例)
        StreamingLanguageModel languageModel = OpenAiStreamingLanguageModel.builder()
                .baseUrl("https://api.xty.app/v1")
                .apiKey(API_KEY)
                .modelName("text-davinci-003")   // 模型名称,注意不要选择会话模型,选择文本补全模型
                .temperature(0.7)
                .logRequests(true)
                .logResponses(false)
                .build();

        // 调用generate方法
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        languageModel.generate("你是谁?", new StreamingResponseHandler<String>() {
            @Override
            public void onNext(String s) {
                System.out.println("部分回复内容:" + s);
            }

            @Override
            public void onComplete(Response<String> response) {
                System.out.println("完整回复内容:" + response.content());
                countDownLatch.countDown();
            }

            @Override
            public void onError(Throwable throwable) {
                System.err.println("错误信息:" + throwable.getMessage());
                countDownLatch.countDown();
            }
        });

        if(!countDownLatch.await(2, TimeUnit.MINUTES)) {
            System.err.println("等待超时...");
        }
    }

}

运行示例输出结果:

[main] INFO dev.langchain4j.http.client.log.LoggingHttpClient -- HTTP request:
- method: POST
- url: https://api.xty.app/v1/completions
- headers: [Authorization: Beare...00], [User-Agent: langchain4j-openai], [Content-Type: application/json]
- body: {
  "model" : "text-davinci-003",
  "prompt" : "你是谁?",
  "temperature" : 0.7,
  "stream" : true,
  "stream_options" : {
    "include_usage" : true
  }
}

部分回复内容:我是 **
部分回复内容:ChatGPT
部分回复内容:**,一个由
部分回复内容: **OpenAI** 训练的人工智能助手。  
我
部分回复内容:可以用中文或其他语言,帮你
部分回复内容:解答问题、写文章、学习知识、编程
部分回复内容:、翻译、做计划,或者 просто
部分回复内容: 聊聊天 😊  

你
部分回复内容:可以把我当成一个随时在线的助手。  

部分回复内容:你现在最想让我帮
部分回复内容:你做什么?
完整回复内容:我是 **ChatGPT**,一个由 **OpenAI** 训练的人工智能助手。  
我可以用中文或其他语言,帮你解答问题、写文章、学习知识、编程、翻译、做计划,或者 просто 聊聊天 😊  

你可以把我当成一个随时在线的助手。  
你现在最想让我帮你做什么?

  

取消流传输

如果您希望取消流传输,可以通过以下方法之一进行操作:

  • onPartialResponse(PartialResponse, PartialResponseContext)

  • onPartialThinking(PartialThinking, PartialThinkingContext)

  • onPartialToolCall(PartialToolCall, PartialToolCallContext)

上下文对象包含 StreamingHandle,可用于取消流传输:

model.chat(userMessage, new StreamingChatResponseHandler() {

    @Override
    public void onPartialResponse(PartialResponse partialResponse, PartialResponseContext context) {
        process(partialResponse);
        if (shouldCancel()) {
            context.streamingHandle().cancel(); // 取消流
        }
    }

    @Override
    public void onCompleteResponse(ChatResponse completeResponse) {
        System.out.println("onCompleteResponse: " + completeResponse);
    }

    @Override
    public void onError(Throwable error) {
        error.printStackTrace();
    }
});

当调用 StreamingHandle.cancel() 时,LangChain4j 会关闭连接并停止流传输。一旦调用了 StreamingHandle.cancel(),StreamingChatResponseHandler 将不会再收到任何回调。

简单示例:

package com.hxstrive.langchain4j.streamingChatModel;

import dev.langchain4j.model.chat.StreamingChatModel;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.model.chat.response.PartialResponse;
import dev.langchain4j.model.chat.response.PartialResponseContext;
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
import dev.langchain4j.model.openai.OpenAiStreamingChatModel;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class StreamingChatModelDemo3 {
    // 推荐:将OPEN_API_KEY设置成环境变量, 避免硬编码或随着代码泄露
    // 注意,设置完环境变量记得重启IDEA,不然可能环境变量不会生效
    private static final String API_KEY = System.getenv("OPEN_API_KEY");

    public static void main(String[] args) throws InterruptedException {
        // 创建 StreamingChatModel 实现类(OpenAI 为例)
        StreamingChatModel chatModel = OpenAiStreamingChatModel.builder()
                .baseUrl("https://api.xty.app/v1")
                .apiKey(API_KEY)
                .modelName("gpt-3.5-turbo")
                .temperature(0.7)
                .logRequests(true)
                .logResponses(false)
                .build();

        // 临时存放收到的所有内容
        final StringBuilder buffer = new StringBuilder();

        // 调用chat方法
        final CountDownLatch countDownLatch = new CountDownLatch(1); // 创建一个计数器
        chatModel.chat("你是谁?", new StreamingChatResponseHandler() {
            @Override
            public void onPartialResponse(PartialResponse partialResponse, PartialResponseContext context) {
                buffer.append(partialResponse.text());
                System.out.println("部分回复内容:" + partialResponse.text());

                // 如果收到大于20个字符,则取消流
                if(buffer.length() > 20) {
                    context.streamingHandle().cancel(); // 取消流
                    countDownLatch.countDown(); // 退出程序
                }
            }

            @Override
            public void onCompleteResponse(ChatResponse chatResponse) {
                System.out.println("完整回复内容:" + chatResponse.aiMessage().text());
                countDownLatch.countDown();
            }

            @Override
            public void onError(Throwable throwable) {
                System.err.println("错误信息:" + throwable.getMessage());
                countDownLatch.countDown();
            }
        });

        // 等待回复完成
        if(!countDownLatch.await(2, TimeUnit.MINUTES)) {
            System.err.println("等待超时...");
        }
    }

}

运行示例,输出如下:

[main] INFO dev.langchain4j.http.client.log.LoggingHttpClient -- HTTP request:
- method: POST
- url: https://api.xty.app/v1/chat/completions
- headers: [Authorization: Beare...00], [User-Agent: langchain4j-openai], [Content-Type: application/json]
- body: {
  "model" : "gpt-3.5-turbo",
  "messages" : [ {
    "role" : "user",
    "content" : "你是谁?"
  } ],
  "temperature" : 0.7,
  "stream" : true,
  "stream_options" : {
    "include_usage" : true
  }
}

部分回复内容:我是 **
部分回复内容:ChatGPT
部分回复内容:**,一个
部分回复内容:由 **OpenAI** 训练的人工智能

Process finished with exit code 0

👉更多 LangChain4j 知识请继续阅读后续章节……

 

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号