从版本 4 开始,MongoDB 支持事务。事务是建立在会话之上的,因此需要一个活动的 ClientSession。
注意,除非您在应用程序上下文中指定 MongoTransactionManager,否则事务支持将被禁用。您可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非本机MongoDB 事务。
为了获得对事务的完全程序化控制,你可能想在 MongoOperations 上使用会话回调。下面的示例演示 SessionCallback 中的编程事务控制:
// 获取新的 ClientSession
ClientSession session = client.startSession(options);
template.withSession(session)
.execute(action -> {
// 开始事务
session.startTransaction();
try {
Step step = // ...;
action.insert(step);
process(step);
action.update(Step.class).apply(Update.set("state", // ...
// 如果一切按预期进行,就提交修改,提交事务
session.commitTransaction();
} catch (RuntimeException e) {
// 如果抛出异常,事务回滚一切
session.abortTransaction();
}
}, ClientSession::close) // 完成后不要忘记关闭会话上面的示例让你完全控制了事务行为,同时在回调中使用会话范围的 MongoOperations 实例,以确保会话被传递给每个服务器调用。为了避免这种方法带来的一些开销,你可以使用 TransactionTemplate 来消除手动事务的一些噪音。
Spring Data MongoDB 事务支持 TransactionTemplate。下面的示例演示如何创建和使用 TransactionTemplate:
// 在模板API配置期间启用事务同步
template.setSessionSynchronization(ALWAYS);
// ...
// 使用提供的 PlatformTransactionManager 创建 TransactionTemplate
TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);
txTemplate.execute(new TransactionCallbackWithoutResult() {
// 在回调中,已经注册了 ClientSession 和事务
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
Step step = // ...;
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
});MongoTransactionManager 是通往众所周知的 Spring 事务支持的网关,它可以让应用程序使用 Spring 的托管事务功能。
MongoTransactionManager 将一个客户端会话绑定到线程上。
MongoTemplate 会检测会话,并相应地对这些与事务相关的资源进行操作。MongoTemplate 也可以参与其他正在进行的事务。
下面的例子展示了如何用 MongoTransactionManager 创建和使用事务:
@Configuration
static class Config extends AbstractMongoClientConfiguration {
// 在应用程序上下文中注册 MongoTransactionManager
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {
return new MongoTransactionManager(dbFactory);
}
// ...
}
@Component
public class StateService {
// 将方法标记为事务性的
@Transactional
void someBusinessFunction(Step step) {
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
});注意,@Transactional(readOnly = true) 建议 MongoTransactionManager 也启动一个事务,将 ClientSession 添加到发出的请求中。
与支持响应式 ClientSession 一样,响应式 MongoTemplate 提供了专门的方法,用于在事务中进行操作,而不必担心根据操作结果提交或停止操作。
使用普通的 MongoDB 响应式驱动程序 API,事务流中的删除可能看起来像这样。例如:
Mono<DeleteResult> result = Mono
// 首先,我们显然需要启动会话
.from(client.startSession())
.flatMap(session -> {
// 一旦我们手头有了 ClientSession,就开始进行事务
session.startTransaction();
// 通过向操作传递 ClientSession,在事务中进行操作
return Mono.from(collection.deleteMany(session, ...))
// 如果操作异常完成,我们需要停止交易,并保留错误信息
.onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))
// 当然,也可以在成功的情况下提交更改,仍然保留操作结果
.flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))
// 最后,我们需要确保关闭会话
.doFinally(signal -> session.close());
});上述操作的罪魁祸首是在保留主流 DeleteResult,而不是通过 commitTransaction() 或 abortTransaction() 发布的事务结果,这导致了相当复杂的设置。
Spring Data MongoDB 事务支持 TransactionalOperator。下面的例子展示了如何创建和使用 TransactionalOperator:
// 启用事务同步以进行事务
template.setSessionSynchronization(ALWAYS);
// ...
// 使用提供的 ReactiveTransactionManager创建 TransactionalOperator
TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
new DefaultTransactionDefinition());
Step step = // ...;
template.insert(step);
Mono<Void> process(step)
.then(template.update(Step.class).apply(Update.set("state", …))
// TransactionalOperator.transactional(...) 为所有上游操作提供事务管理
.as(rxtx::transactional) (3)
.then();ReactiveMongoTransactionManager 是通往众所周知的 Spring 事务支持的网关。它允许应用程序利用 Spring 的管理事务功能。ReactiveMongoTransactionManager 将客户会话绑定到用户上下文。ReactiveMongoTemplate 会检测会话,并对这些与事务相关的资源进行相应操作。ReactiveMongoTemplate 也可以参与到其他正在进行的事务中。
下面的例子展示了如何用 ReactiveMongoTransactionManager 创建和使用事务:
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {
// 在应用程序上下文中注册 ReactiveMongoTransactionManager
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {
return new ReactiveMongoTransactionManager(factory);
}
// ...
}
@Service
public class StateService {
// 将方法标记为事务性
@Transactional
Mono<UpdateResult> someBusinessFunction(Step step) {
return template.insert(step)
.then(process(step))
.then(template.update(Step.class).apply(Update.set("state", …));
};
});注意,@Transactional(readOnly = true) 建议 ReactiveMongoTransactionManager 也启动一个事务,将 ClientSession 添加到发出的请求中。
在事务内部,MongoDB 服务器有一个稍微不同的行为。
MongoDB驱动程序提供了一个专用的副本集名称配置选项,使驱动程序进入自动检测模式。此选项有助于在事务期间识别主复制集节点和命令路由。
MongoDB 不支持集合操作,例如在一个事务中创建集合。这也影响了第一次使用时发生的即时集合创建。因此,请确保所有需要的结构都已到位。
MongoDB 可以为事务操作期间引发的错误添加特殊标签。这些可能表示短暂的失败,这些失败可能仅仅通过重试操作就会消失。出于这些目的,我们强烈建议使用 Spring Retry。 尽管如此,您可以重写 MongoTransactionManager#doCommit(MongoTransactionObject) 来实现 MongoDB 参考手册中概述的重试提交操作行为。
MongoDB 的计数操作是基于集合统计,可能无法反映事务中的实际情况。当在一个多文档事务中发出计数命令时,服务器会响应错误 50851。一旦 MongoTemplate 检测到一个活动的事务,所有暴露的 count() 方法都会被转换,并使用 $match 和 $count 操作符委托给聚合框架,保留查询设置,如 collation。
在聚集计数助手中使用地理命令时,有一些限制。以下运算符不能使用,必须用不同的运算符代替:
$where → $expr
$near → $geoWithin with $center
$nearSphere → $geoWithin with $centerSphere
使用 Criteria.near(...) 和 Criteria.nearSphere(...) 的查询必须改写为 Criteria.within(...) 各自 Criteria.withinSphere(...)。同样适用于资源库查询方法中的 near 查询关键字,必须改为 within。也请参见 MongoDB JIRA ticket DRIVERS-518 以获得更多参考。
下面的片段显示了会话绑定闭包内的计数用法:
session.startTransaction();
template.withSession(session)
.execute(action -> {
action.count(query(where("state").is("active")), Step.class)
...上面的片段具体化为以下命令:
db.collection.aggregate(
[
{ $match: { state: "active" } },
{ $count: "totalEntityCount" }
]
)而不是:
db.collection.find( { state: "active" } ).count()