Spring AI 教程

Apache Cassandra 数据库(CassandraChatMemoryRepository)

提示:如果不能访问 OpenAI,请点击 AiCode API 注册账号,通过代理访问。  

在 Spring AI 1.0.0 中,CassandraChatMemoryRepository 是用于将聊天记忆(对话历史)持久化到 Apache Cassandra 数据库的实现类,它实现了 ChatMemoryRepository 接口,属于 Spring AI 对话记忆模块的一部分,用于存储和管理与 AI 模型交互的对话上下文。

CassandraChatMemoryRepository 基于 Apache Cassandra 实现消息存储,适用于需要高可用、持久化、可扩展及利用 TTL 特性的聊天记忆持久化场景。

CassandraChatMemoryRepository 采用时间序列 Schema,完整记录历史聊天窗口,对合规审计极具价值。建议设置生存时间(如三年)。

  

Apache Cassandra

Apache Cassandra 是一款分布式、高可用、可线性扩展的 NoSQL 列存储数据库,专为海量结构化数据的分布式存储与高并发读写场景设计,核心特点是牺牲部分一致性以换取极致的可用性与扩展性。

Apache Cassandra 的设计围绕 “大规模分布式数据管理” 展开,核心特性可概括为以下几点:

  • 高可用(High Availability):无单点故障,数据自动在多个节点间复制(默认副本数为 3),单个或多个节点下线不影响服务,支持跨数据中心部署。

  • 线性扩展(Linear Scalability):通过添加节点即可线性提升存储容量与读写性能,集群规模可从几台扩展到数千台服务器,支持 PB 级数据存储。

  • 列存储结构(Column-Oriented):以 “行键 - 列族” 形式组织数据,列可动态添加(无需固定表结构),适合存储半结构化数据,且查询时仅读取所需列,减少无效数据加载。

  • 灵活的一致性模型(Tunable Consistency):支持 “最终一致性” 与 “强一致性” 按需配置,可通过设置 “读取 / 写入副本数” 平衡一致性与性能(例如金融场景用强一致性,日志场景用最终一致性)。

  • 去中心化架构(Decentralized):所有节点地位平等,无主从之分,客户端可连接任意节点发起请求,避免传统主从架构的性能瓶颈。

Apache Cassandra 数据模型与传统关系型数据库差异较大,核心概念需重点理解:

  • 键空间(Keyspace):类似关系型数据库的“数据库”,是表的集合。用于隔离不同业务的数据,配置副本策略、副本数等全局参数。

  • 表(Table):数据存储的基本单元,结构为 “行键 + 列族”。存储某一类业务数据,例如 “用户表”、“订单表”。

  • 行键(Row Key):表的唯一主键(类似关系型数据库的主键)。用于定位数据在集群中的存储节点(通过哈希算法分配),是数据查询的核心依据。

  • 列(Column):最小数据单元,由 “名称(Name)- 值(Value)- 时间戳(Timestamp)” 组成,支持动态添加,同一行可包含不同列,适合存储属性不固定的数据(如用户画像)。

  • 复合主键(Composite Primary Key):由 “行键(Partition Key)+ 聚类列(Clustering Column)” 组成,行键用于数据分区,聚类列用于对分区内的数据排序(如按时间戳排序对话记录)。

更多 Apache Cassandra 数据知识请访问官网 https://cassandra.apache.org/doc/latest/

 

简单示例

安装 Apache Cassandra

下面将演示在 Windows11 下面通过 WSL2 安装的 Ubuntu 22.04.5 LTS (GNU/Linux 6.6.87.2-microsoft-standard-WSL2 x86_64) 系统中,通过 Docker 安装 Apache Cassandra 数据库,详细步骤如下:

(1)查看 Docker 版本信息,如果没有 Docker 请先安装 Docker。如下:

hxstrive@localhost:~$ sudo docker --version
Docker version 28.3.2, build 578ccf6

安装 Docker 请参考 Docker 教程。

(2) 从 Docker Hub 拉取官方的 Cassandra 镜像(默认会拉取最新稳定版),如下:

hxstrive@localhost:~$ sudo docker pull cassandra
Using default tag: latest
latest: Pulling from library/cassandra
af6eca94c810: Pull complete
cb0efb96dabd: Pull complete
3e9d91201f40: Pull complete
66b76b382631: Pull complete
601f2c23751f: Pull complete
f5ecb65214de: Pull complete
9e8ae0f01b75: Pull complete
28e503795475: Pull complete
07ed2fbbbc3b: Pull complete
fd802b4ffa8c: Pull complete
Digest: sha256:8b55dd41d5d1220e11eb8cf80f26ab655c21f7cf271ca4a7577c1da7d9221624
Status: Downloaded newer image for cassandra:latest
docker.io/library/cassandra:latest

成功拉取了 Apache Cassandra 的镜像。

(3)使用以下命令启动一个单机版的 Cassandra 容器,如下:

docker run --name my-cassandra -p 9042:9042 -d /
    -e CASSANDRA_CLUSTER_NAME=my-cluster cassandra

命令说明:

  • --name my-cassandra:给容器命名,方便后续操作。

  • -p 9042:9042:将容器内的 9042 端口(Cassandra CQL 协议端口)映射到宿主机的 9042 端口,允许外部客户端连接。

  • -d:后台运行容器。

  • -e CASSANDRA_CLUSTER_NAME:可选,自定义集群名称(默认集群名为 my-cluster)。

  • cassandra:表示 Docker 镜像名称。

命令运行过程:

# 创建容器
hxstrive@localhost:~$ sudo docker run --name my-cassandra -p 9042:9042 -d -e CASSANDRA_CLUSTER_NAME=my-cluster cassandra
0e326c3e07469cd669c051416975f8c3689794d1ea243d6e72b0985a38152c03

# 查看容器
hxstrive@localhost:~$ sudo docker ps
CONTAINER ID   IMAGE       COMMAND                  CREATED          STATUS          PORTS                                                                            NAMES
0e326c3e0746   cassandra   "docker-entrypoint.s…"   27 seconds ago   Up 27 seconds   7000-7001/tcp, 7199/tcp, 9160/tcp, 0.0.0.0:9042->9042/tcp, [::]:9042->9042/tcp   my-cassandra
1569d69afd11   mysql:8.0   "docker-entrypoint.s…"   3 months ago     Up 2 hours      0.0.0.0:3306->3306/tcp, [::]:3306->3306/tcp, 33060/tcp                           mysql
fe09ace436a8   redis       "docker-entrypoint.s…"   3 months ago     Up 2 hours      0.0.0.0:6379->6379/tcp, [::]:6379->6379/tcp                                      redis
hx@CDLPF2YBXYD:~$

(4)验证 Cassandra 是否启动成功。进入 cassandra 容器,在容器内使用 cqlsh(Cassandra 自带的 CQL 客户端)连接本地数据库,执行 CQL 命令测试,如创建一个键空间,例如:

# 进入容器
hxstrive@localhost:~$ sudo docker exec -it my-cassandra bash
root@0e326c3e0746:/# cqlsh
Connected to my-cluster at 127.0.0.1:9042
[cqlsh 6.2.0 | Cassandra 5.0.5 | CQL spec 3.4.7 | Native protocol v5]
Use HELP for help.
cqlsh> CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
cqlsh>

上面在 cqlsh 中执行如下命令创建了一个键空间 test,如下:

CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

注意,后续会使用该 test 键空间来存储聊天数据。

添加依赖

使用 CassandraChatMemoryRepository 需先在项目中添加以下依赖:

<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-model-chat-memory-repository-cassandra</artifactId>
</dependency>

编写代码

到这里,准备工作就做完了,下面开始编写代码。

首先,创建一个名为 AiConfig.java 的配置类,用来配置 ChatMemory 和 CassandraChatMemoryRepository,代码如下:

package com.hxstrive.springai.springai_openai.chatMemoryRepository.chatMemoryRepository3;

import com.datastax.oss.driver.api.core.CqlSession;
import org.springframework.ai.chat.memory.ChatMemory;
import org.springframework.ai.chat.memory.MessageWindowChatMemory;
import org.springframework.ai.chat.memory.repository.cassandra.CassandraChatMemoryRepository;
import org.springframework.ai.chat.memory.repository.cassandra.CassandraChatMemoryRepositoryConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Spring AI 聊天记忆配置类
 * 用于配置基于 Cassandra 数据库的聊天记忆存储组件
 */
@Configuration
public class AiConfig {

    /**
     * 定义聊天记忆(ChatMemory)Bean
     * 采用窗口模式(MessageWindowChatMemory),限制最多保存的消息数量
     *
     * @param chatMemoryRepository 聊天记忆存储仓库(Cassandra实现)
     * @return 配置好的聊天记忆对象
     */
    @Bean
    public ChatMemory chatMemory(CassandraChatMemoryRepository chatMemoryRepository) {
        return MessageWindowChatMemory.builder()
                .chatMemoryRepository(chatMemoryRepository) // 注入Cassandra存储仓库,指定消息持久化方式
                .maxMessages(20) // 设置窗口大小,最多保存20条消息(超过则自动丢弃最早的消息)
                .build();
    }

    /**
     * 定义 Cassandra 聊天记忆存储仓库 Bean
     * 负责将聊天消息实际存储到 Cassandra 数据库中
     *
     * @param config  Cassandra 存储配置(包含表名、键空间等信息)
     * @return 初始化好的 Cassandra 聊天记忆仓库
     */
    @Bean
    public CassandraChatMemoryRepository cassandraChatMemoryRepository(
            CassandraChatMemoryRepositoryConfig config) {
        // 根据配置创建 Cassandra 存储仓库实例
        return CassandraChatMemoryRepository.create(config);
    }

    /**
     * 配置 Cassandra 聊天记忆的存储参数
     * 包括表名、键空间、数据库连接会话等核心配置
     *
     * @param cqlSession Cassandra 数据库连接会话(已由Spring容器管理)
     * @return 配置对象,用于初始化存储仓库
     */
    @Bean
    public CassandraChatMemoryRepositoryConfig cassandraChatMemoryRepositoryConfig(CqlSession cqlSession) {
        return CassandraChatMemoryRepositoryConfig.builder()
                .withTableName("chat_memory") // 指定存储聊天记忆的表名
                .withKeyspaceName("test") // 指定使用的键空间(需提前在Cassandra中创建)
                .withCqlSession(cqlSession) // 注入数据库连接会话,用于执行CQL操作
                .build();
    }

    /**
     * 配置 Cassandra 数据库连接会话(CqlSession)
     * 这是与 Cassandra 数据库交互的核心对象,负责建立和管理连接
     *
     * 注意:如果项目中已引入 spring-boot-starter-data-cassandra 依赖,
     * Spring Boot 会自动配置 CqlSession(基于 application.yml 中的配置),
     * 此时需删除当前方法,避免 Bean 重复定义
     *
     * @return 配置好的 Cassandra 连接会话
     */
    @Bean
    public CqlSession cqlSession() {
        return CqlSession.builder()
                .withLocalDatacenter("datacenter1") // 指定数据中心(Docker默认Cassandra为datacenter1)
                .withKeyspace("test") // 连接时默认使用的键空间
                // 若需要指定地址和端口,可添加:.withContactPoint(new InetSocketAddress("localhost", 9042))
                .build();
    }

}

然后,将 ChatMemory 注入到客户端代码中,如下:

package com.hxstrive.springai.springai_openai.chatMemoryRepository.chatMemoryRepository3;

import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.client.advisor.SimpleLoggerAdvisor;
import org.springframework.ai.chat.memory.ChatMemory;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RestController;

@RestController
@SpringBootApplication
public class SpringAiDemoApplication implements CommandLineRunner {

	@Autowired
	private ChatModel chatModel;

    // 自动注入
	@Autowired
	private ChatMemory chatMemory;

	public static void main(String[] args) {
		SpringApplication.run(SpringAiDemoApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		StringBuilder builder = new StringBuilder();

		// 唯一会话ID,同一个会话上下文中保持一致
		String sessionId = "USER-251017100313";

		String question = "四川大学是985吗?";
		String ai = chat(sessionId, question);
		builder.append(String.format("用户问题:%s\n", question));
		builder.append(String.format("机器回复:%s\n", ai));

		question = "是211吗?";
		ai = chat(sessionId, question);
		builder.append(String.format("用户问题:%s\n", question));
		builder.append(String.format("机器回复:%s\n", ai));

		System.out.println(builder.toString());
	}

	private String chat(String sessionId, String question) {
		ChatClient chatClient = ChatClient.builder(chatModel)
				.defaultSystem("你是AI助手小粒,所有回复均使用中文。")
				.defaultAdvisors(
						new SimpleLoggerAdvisor() // 输出聊天日志
				).build();

		// 1. 将用户输入添加到记忆
		UserMessage userMessage = new UserMessage(question);
		chatMemory.add(sessionId, userMessage);

		// 2. 获取记忆中的所有消息(上下文),传递给 AI 模型
		ChatClient.CallResponseSpec responseSpec = chatClient.prompt().messages(chatMemory.get(sessionId)).call();

		// 3. 将 AI 回复添加到记忆,更新上下文
		AssistantMessage assistantMessage = responseSpec.chatResponse().getResult().getOutput();
		chatMemory.add(sessionId, assistantMessage);

		// 4. 返回 AI 回复内容
		return assistantMessage.getText();
	}

}

运行代码,结果如下图:

Apache Cassandra 数据库(CassandraChatMemoryRepository)

成功运行后,再次执行如下代码进入 cassandra 容器内部:

sudo docker exec -it my-cassandra bash

然后使用 cqlsh 命令连接到 Apache cassandra,连接成功后使用 use test 命令切换到 test 键空间,运行 desc tables 查看当前键空间下所有的表,如下:

# 切换到 test 键空间
cqlsh> use test;

# 查看 test 键空间所有的表格
cqlsh:test> desc tables;

chat_memory

# 使用 select 语句查看 chat_memory 表中的数据
cqlsh:test> select * from chat_memory;

 session_id        | message_timestamp               | messages
-------------------+---------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 USER-251017100313 | 2025-10-17 05:41:22.189000+0000 | [{msg_timestamp: '2025-10-17 05:41:15.184000+0000', msg_type: 'USER', msg_content: '四川大学是985吗?'}, {msg_timestamp: '2025-10-17 05:41:20.297000+0000', msg_type: 'ASSISTANT', msg_content: '是的,四川大学是“985工程”重点建设的大学之一。'}, {msg_timestamp: '2025-10-17 05:41:20.310000+0000', msg_type: 'USER', msg_content: '是211吗?'}, {msg_timestamp: '2025-10-17 05:41:22.189000+0000', msg_type: 'ASSISTANT', msg_content: '是的,四川大学也是“211工程”重点建设的大学之一。'}]
 USER-251017100313 | 2025-10-17 05:41:20.310000+0000 | [{msg_timestamp: '2025-10-17 05:41:15.184000+0000', msg_type: 'USER', msg_content: '四川大学是985吗?'}, {msg_timestamp: '2025-10-17 05:41:20.297000+0000', msg_type: 'ASSISTANT', msg_content: '是的,四川大学是“985工程”重点建设的大学之一。'}, {msg_timestamp: '2025-10-17 05:41:20.310000+0000', msg_type: 'USER', msg_content: '是211吗?'}]
 USER-251017100313 | 2025-10-17 05:41:20.297000+0000 | [{msg_timestamp: '2025-10-17 05:41:15.184000+0000', msg_type: 'USER', msg_content: '四川大学是985吗?'}, {msg_timestamp: '2025-10-17 05:41:20.297000+0000', msg_type: 'ASSISTANT', msg_content: '是的,四川大学是“985工程”重点建设的大学之一。'}]
 USER-251017100313 | 2025-10-17 05:41:15.184000+0000 | [{msg_timestamp: '2025-10-17 05:41:15.184000+0000', msg_type: 'USER', msg_content: '四川大学是985吗?'}]

(4 rows)
cqlsh:test>

成功查询出四条数据,就是刚才我们存入的数据。

配置属性

下表是 Spring AI 中对 cassandra 的配置:

属性

说明

默认值

spring.cassandra.contactPoints

用于集群发现的主机地址(可多个)

127.0.0.1

spring.cassandra.port

Cassandra 生协议连接端口

9042

spring.cassandra.localDatacenter

要连接的 Cassandra 数据中心名称

datacenter1

spring.ai.chat.memory.cassandra.time-to-live

Cassandra中消息的生存时间(TTL)设置


spring.ai.chat.memory.cassandra.keyspace

Cassandra Key 空间名称(keyspace)

springframework

spring.ai.chat.memory.cassandra.messages-column

Cassandra 消息列名

springframework

spring.ai.chat.memory.cassandra.table

Cassandra 表

ai_chat_memory

spring.ai.chat.memory.cassandra.initialize-schema

是否在启动时初始化 Schema 结构

true

更多信息请访问 Spring AI 官方文档 https://docs.spring.io/spring-ai/reference/api/vectordbs/apache-cassandra.html

  

提示:如果不能访问 OpenAI,请点击 AiCode API 注册账号,通过代理访问。  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号