在 Spring Batch4 中,JdbcBatchItemWriter 是一个常用的 ItemWriter 实现,用于将数据批量写入关系型数据库,它基于 JDBC 实现并利用批量操作来提高性能。
JdbcBatchItemWriter 通过 JDBC 的批量操作(addBatch() 和 executeBatch())减少数据库交互次数,提升写入效率。且提供多种方式将 Item 对象的属性映射到 SQL 语句的参数(支持 ? 占位符和 :paramName 参数名占位符),还与 Spring 的事务管理无缝集成,确保数据一致性。
查看 JdbcBatchItemWriter 的源码:
public class JdbcBatchItemWriter<T> implements ItemWriter<T>, InitializingBean { // 日志记录器,用于记录操作过程中的日志信息 protected static final Log logger = LogFactory.getLog(JdbcBatchItemWriter.class); // 命名参数JDBC操作模板,用于执行带命名参数的SQL语句 // 封装了JDBC的批量操作逻辑 protected NamedParameterJdbcOperations namedParameterJdbcTemplate; // ItemPreparedStatementSetter接口实现,用于将Item对象设置到PreparedStatement中 // 适用于使用问号(?)占位符的SQL语句 protected ItemPreparedStatementSetter<T> itemPreparedStatementSetter; // ItemSqlParameterSourceProvider接口实现,用于将Item对象转换为SqlParameterSource // 适用于使用命名参数(:parameter)的SQL语句 protected ItemSqlParameterSourceProvider<T> itemSqlParameterSourceProvider; // 要执行的SQL语句(通常是INSERT、UPDATE等) // 可以包含命名参数或问号占位符 protected String sql; // 是否验证批量操作后有记录被更新/插入 // 默认为true,如果为true但实际没有记录被修改,会抛出异常 protected boolean assertUpdates = true; // SQL语句中的参数数量,用于内部验证 protected int parameterCount; // 标记是否使用命名参数(true)还是问号占位符(false) protected boolean usingNamedParameters; //...省略... }
主要提供的可配置参数如下:
dataSource:数据库连接源(必需),上面成员变量中没有直接引用 DataSource,而是在 NamedParameterJdbcOperations 对象创建时提供,代码如下:
/** * Public setter for the data source for injection purposes. * * @param dataSource {@link javax.sql.DataSource} to use for querying against */ public void setDataSource(DataSource dataSource) { if (namedParameterJdbcTemplate == null) { this.namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource); } }
sql:执行的 SQL 语句(如 INSERT 语句,必需),用来执行写入数据到数据库。例如:
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>(); writer.setDataSource(dataSource); writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)"); // 设置 insert SQL 语句 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
itemPreparedStatementSetter:用于将 Item 对象设置到 PreparedStatement 中,适用于使用问号 (?) 占位符的 SQL 语句。接口定义如下:
public interface ItemPreparedStatementSetter<T> { /** * Set parameter values on the given PreparedStatement as determined from * the provided item. * @param item the item to obtain the values from * @param ps the PreparedStatement to invoke setter methods on * @throws SQLException if a SQLException is encountered (i.e. there is no * need to catch SQLException) */ void setValues(T item, PreparedStatement ps) throws SQLException; }
itemSqlParameterSourceProvider:用于将 Item 对象转换为 SqlParameterSource,适用于使用命名参数 (:parameter) 的 SQL 语句,接口定义如下:
public interface ItemSqlParameterSourceProvider<T> { /** * Provide parameter values in an {@link SqlParameterSource} based on values from * the provided item. * @param item the item to use for parameter values * @return parameters extracted from the item */ SqlParameterSource createSqlParameterSource(T item); }
使用的 SQL 语句如下:
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>(); writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)"); // 设置 insert SQL 语句
assertUpdates:是否验证批量操作后有记录被更新/插入,默认为 true。如果为 true,但实际没有记录被修改,会抛出异常。部分源码如下:
@Override public void write(final List<? extends T> items) throws Exception { if (!items.isEmpty()) { //... // 看这里 if (assertUpdates) { for (int i = 0; i < updateCounts.length; i++) { int value = updateCounts[i]; if (value == 0) { throw new EmptyResultDataAccessException("Item " + i + " of " + updateCounts.length + " did not update any rows: [" + items.get(i) + "]", 1); } } } } }
该实例将演示从 csv 文件读取数据,然后通过 JdbcBatchItemWriter 将数据写入到 users_db 数据库表中。详细步骤如下:
在项目的 resources 目录下面创建名为 users.csv 的文件,内容如下:
"id","username","password" 1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A" 2,李四,"5E5994FBCFA922D804DF45295AE98604" 3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E" 4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"
连接到数据库,执行如下 SQL 语句,创建一张名为 users_db 的数据表。SQL 如下:
CREATE TABLE `users_db` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(100) DEFAULT NULL COMMENT '用户名', `password` varchar(100) DEFAULT NULL COMMENT '密码', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
数据表创建完成,结构如下图:
创建一个简单的 POJO 用于映射 users_db 表,如下:
package com.hxstrive.spring_batch.dbItemWriterDemo.dto; import lombok.Data; import lombok.ToString; /** * 用户DTO * @author hxstrive.com */ @Data @ToString public class User { private int id; private String username; private String password; }
使用 @Configuration 注解创建配置类,配置 Spring Batch 的 Job、Step 等对象,代码如下:
package com.hxstrive.spring_batch.dbItemWriterDemo.config; import com.hxstrive.spring_batch.dbItemWriterDemo.dto.User; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.validation.BindException; import javax.sql.DataSource; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; // 创建Job对象 @Bean public Job dbItemWriterDemoJob() { return jobBuilderFactory.get("dbItemWriterDemoJob-" + System.currentTimeMillis()) .start(dbItemWriterDemoStep()) .build(); } // 创建Step对象 @Bean public Step dbItemWriterDemoStep() { return stepBuilderFactory.get("dbItemWriterDemoStep") .<User,User>chunk(2) .reader(flatFileItemReader()) .writer(dbItemWriteDemo()) .build(); } // 创建ItemReader对象 @Bean @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定 public FlatFileItemReader<? extends User> flatFileItemReader() { FlatFileItemReader<User> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("users.csv")); reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名 // 解析数据 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames("id", "username", "password"); // 把解析出的数据映射到 User 对象中 DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){ @Override public User mapFieldSet(FieldSet fieldSet) throws BindException { User user = new User(); user.setId(fieldSet.readInt("id")); user.setUsername(fieldSet.readString("username")); user.setPassword(fieldSet.readString("password")); System.out.println("reading user: " + user); return user; } }); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; } // 创建ItemWriter对象 @Bean public JdbcBatchItemWriter<User> dbItemWriteDemo() { JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>(); writer.setDataSource(dataSource); writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)"); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); return writer; } }
上述代码,主要查看 dbItemWriteDemo() 方法,其中,手动创建了一个 JdbcBatchItemWriter 对象,通过 setDataSource() 方法设置数据源,通过 setSql() 方法设置插入数据的 SQL INSERT 语句,最后手动创建 BeanPropertyItemSqlParameterSourceProvider 对象,实现根据参数名占位符,而非使用 ? 占位符。
关于 flatFileItemReader() 方法,可以参考Spring Batch4 从普通文件读取数据。
创建一个 Spring Boot 启动类,记得添加 @EnableBatchProcessing 注解,开启批处理功能。代码如下:
package com.hxstrive.spring_batch.dbItemWriterDemo; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableBatchProcessing // 开启批处理 public class SpringBatchDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchDemoApplication.class, args); } }
运行应用,输出日志如下:
通过观察日志,成功读取了 csv 中的数据,继续查看数据库表,如下图:
成功将数据写入到数据库表,更多关于 JdbcBatchItemWriter 的用法请参考官方文档。