在 Spring Batch4 中,JdbcBatchItemWriter 是一个常用的 ItemWriter 实现,用于将数据批量写入关系型数据库,它基于 JDBC 实现并利用批量操作来提高性能。
JdbcBatchItemWriter 通过 JDBC 的批量操作(addBatch() 和 executeBatch())减少数据库交互次数,提升写入效率。且提供多种方式将 Item 对象的属性映射到 SQL 语句的参数(支持 ? 占位符和 :paramName 参数名占位符),还与 Spring 的事务管理无缝集成,确保数据一致性。
查看 JdbcBatchItemWriter 的源码:
public class JdbcBatchItemWriter<T> implements ItemWriter<T>, InitializingBean {
// 日志记录器,用于记录操作过程中的日志信息
protected static final Log logger = LogFactory.getLog(JdbcBatchItemWriter.class);
// 命名参数JDBC操作模板,用于执行带命名参数的SQL语句
// 封装了JDBC的批量操作逻辑
protected NamedParameterJdbcOperations namedParameterJdbcTemplate;
// ItemPreparedStatementSetter接口实现,用于将Item对象设置到PreparedStatement中
// 适用于使用问号(?)占位符的SQL语句
protected ItemPreparedStatementSetter<T> itemPreparedStatementSetter;
// ItemSqlParameterSourceProvider接口实现,用于将Item对象转换为SqlParameterSource
// 适用于使用命名参数(:parameter)的SQL语句
protected ItemSqlParameterSourceProvider<T> itemSqlParameterSourceProvider;
// 要执行的SQL语句(通常是INSERT、UPDATE等)
// 可以包含命名参数或问号占位符
protected String sql;
// 是否验证批量操作后有记录被更新/插入
// 默认为true,如果为true但实际没有记录被修改,会抛出异常
protected boolean assertUpdates = true;
// SQL语句中的参数数量,用于内部验证
protected int parameterCount;
// 标记是否使用命名参数(true)还是问号占位符(false)
protected boolean usingNamedParameters;
//...省略...
}主要提供的可配置参数如下:
dataSource:数据库连接源(必需),上面成员变量中没有直接引用 DataSource,而是在 NamedParameterJdbcOperations 对象创建时提供,代码如下:
/**
* Public setter for the data source for injection purposes.
*
* @param dataSource {@link javax.sql.DataSource} to use for querying against
*/
public void setDataSource(DataSource dataSource) {
if (namedParameterJdbcTemplate == null) {
this.namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
}
}sql:执行的 SQL 语句(如 INSERT 语句,必需),用来执行写入数据到数据库。例如:
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)"); // 设置 insert SQL 语句
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());itemPreparedStatementSetter:用于将 Item 对象设置到 PreparedStatement 中,适用于使用问号 (?) 占位符的 SQL 语句。接口定义如下:
public interface ItemPreparedStatementSetter<T> {
/**
* Set parameter values on the given PreparedStatement as determined from
* the provided item.
* @param item the item to obtain the values from
* @param ps the PreparedStatement to invoke setter methods on
* @throws SQLException if a SQLException is encountered (i.e. there is no
* need to catch SQLException)
*/
void setValues(T item, PreparedStatement ps) throws SQLException;
}itemSqlParameterSourceProvider:用于将 Item 对象转换为 SqlParameterSource,适用于使用命名参数 (:parameter) 的 SQL 语句,接口定义如下:
public interface ItemSqlParameterSourceProvider<T> {
/**
* Provide parameter values in an {@link SqlParameterSource} based on values from
* the provided item.
* @param item the item to use for parameter values
* @return parameters extracted from the item
*/
SqlParameterSource createSqlParameterSource(T item);
}使用的 SQL 语句如下:
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)"); // 设置 insert SQL 语句assertUpdates:是否验证批量操作后有记录被更新/插入,默认为 true。如果为 true,但实际没有记录被修改,会抛出异常。部分源码如下:
@Override
public void write(final List<? extends T> items) throws Exception {
if (!items.isEmpty()) {
//...
// 看这里
if (assertUpdates) {
for (int i = 0; i < updateCounts.length; i++) {
int value = updateCounts[i];
if (value == 0) {
throw new EmptyResultDataAccessException("Item " + i + " of " + updateCounts.length
+ " did not update any rows: [" + items.get(i) + "]", 1);
}
}
}
}
}
该实例将演示从 csv 文件读取数据,然后通过 JdbcBatchItemWriter 将数据写入到 users_db 数据库表中。详细步骤如下:
在项目的 resources 目录下面创建名为 users.csv 的文件,内容如下:
"id","username","password" 1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A" 2,李四,"5E5994FBCFA922D804DF45295AE98604" 3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E" 4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"
连接到数据库,执行如下 SQL 语句,创建一张名为 users_db 的数据表。SQL 如下:
CREATE TABLE `users_db` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(100) DEFAULT NULL COMMENT '用户名', `password` varchar(100) DEFAULT NULL COMMENT '密码', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
数据表创建完成,结构如下图:

创建一个简单的 POJO 用于映射 users_db 表,如下:
package com.hxstrive.spring_batch.dbItemWriterDemo.dto;
import lombok.Data;
import lombok.ToString;
/**
* 用户DTO
* @author hxstrive.com
*/
@Data
@ToString
public class User {
private int id;
private String username;
private String password;
}
使用 @Configuration 注解创建配置类,配置 Spring Batch 的 Job、Step 等对象,代码如下:
package com.hxstrive.spring_batch.dbItemWriterDemo.config;
import com.hxstrive.spring_batch.dbItemWriterDemo.dto.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.validation.BindException;
import javax.sql.DataSource;
/**
* Spring Batch 配置类
* @author hxstrive.com
*/
@Configuration
public class BatchConfig {
// 用于创建和配置 Job 对象的工厂类
@Autowired
private JobBuilderFactory jobBuilderFactory;
// 用于创建和配置 Step 对象的工厂类
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
// 创建Job对象
@Bean
public Job dbItemWriterDemoJob() {
return jobBuilderFactory.get("dbItemWriterDemoJob-" + System.currentTimeMillis())
.start(dbItemWriterDemoStep())
.build();
}
// 创建Step对象
@Bean
public Step dbItemWriterDemoStep() {
return stepBuilderFactory.get("dbItemWriterDemoStep")
.<User,User>chunk(2)
.reader(flatFileItemReader())
.writer(dbItemWriteDemo())
.build();
}
// 创建ItemReader对象
@Bean
@StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定
public FlatFileItemReader<? extends User> flatFileItemReader() {
FlatFileItemReader<User> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("users.csv"));
reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名
// 解析数据
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("id", "username", "password");
// 把解析出的数据映射到 User 对象中
DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){
@Override
public User mapFieldSet(FieldSet fieldSet) throws BindException {
User user = new User();
user.setId(fieldSet.readInt("id"));
user.setUsername(fieldSet.readString("username"));
user.setPassword(fieldSet.readString("password"));
System.out.println("reading user: " + user);
return user;
}
});
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
return reader;
}
// 创建ItemWriter对象
@Bean
public JdbcBatchItemWriter<User> dbItemWriteDemo() {
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return writer;
}
}上述代码,主要查看 dbItemWriteDemo() 方法,其中,手动创建了一个 JdbcBatchItemWriter 对象,通过 setDataSource() 方法设置数据源,通过 setSql() 方法设置插入数据的 SQL INSERT 语句,最后手动创建 BeanPropertyItemSqlParameterSourceProvider 对象,实现根据参数名占位符,而非使用 ? 占位符。
关于 flatFileItemReader() 方法,可以参考Spring Batch4 从普通文件读取数据。
创建一个 Spring Boot 启动类,记得添加 @EnableBatchProcessing 注解,开启批处理功能。代码如下:
package com.hxstrive.spring_batch.dbItemWriterDemo;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchDemoApplication.class, args);
}
}运行应用,输出日志如下:

通过观察日志,成功读取了 csv 中的数据,继续查看数据库表,如下图:

成功将数据写入到数据库表,更多关于 JdbcBatchItemWriter 的用法请参考官方文档。