Spring Batch4 教程

Spring Batch4 写数据到数据库 JdbcBatchItemWriter

在 Spring Batch4 中,JdbcBatchItemWriter 是一个常用的 ItemWriter 实现,用于将数据批量写入关系型数据库,它基于 JDBC 实现并利用批量操作来提高性能。

JdbcBatchItemWriter 通过 JDBC 的批量操作(addBatch() 和 executeBatch())减少数据库交互次数,提升写入效率。且提供多种方式将 Item 对象的属性映射到 SQL 语句的参数(支持 ? 占位符和 :paramName 参数名占位符),还与 Spring 的事务管理无缝集成,确保数据一致性。

核心配置属性

查看 JdbcBatchItemWriter 的源码:

public class JdbcBatchItemWriter<T> implements ItemWriter<T>, InitializingBean {
    // 日志记录器,用于记录操作过程中的日志信息
    protected static final Log logger = LogFactory.getLog(JdbcBatchItemWriter.class);
    
    // 命名参数JDBC操作模板,用于执行带命名参数的SQL语句
    // 封装了JDBC的批量操作逻辑
    protected NamedParameterJdbcOperations namedParameterJdbcTemplate;
    
    // ItemPreparedStatementSetter接口实现,用于将Item对象设置到PreparedStatement中
    // 适用于使用问号(?)占位符的SQL语句
    protected ItemPreparedStatementSetter<T> itemPreparedStatementSetter;
    
    // ItemSqlParameterSourceProvider接口实现,用于将Item对象转换为SqlParameterSource
    // 适用于使用命名参数(:parameter)的SQL语句
    protected ItemSqlParameterSourceProvider<T> itemSqlParameterSourceProvider;
    
    // 要执行的SQL语句(通常是INSERT、UPDATE等)
    // 可以包含命名参数或问号占位符
    protected String sql;
    
    // 是否验证批量操作后有记录被更新/插入
    // 默认为true,如果为true但实际没有记录被修改,会抛出异常
    protected boolean assertUpdates = true;
    
    // SQL语句中的参数数量,用于内部验证
    protected int parameterCount;
    
    // 标记是否使用命名参数(true)还是问号占位符(false)
    protected boolean usingNamedParameters;
    
    //...省略...
}

主要提供的可配置参数如下:

  • dataSource:数据库连接源(必需),上面成员变量中没有直接引用 DataSource,而是在 NamedParameterJdbcOperations 对象创建时提供,代码如下:

/**
 * Public setter for the data source for injection purposes.
 *
 * @param dataSource {@link javax.sql.DataSource} to use for querying against
 */
public void setDataSource(DataSource dataSource) {
    if (namedParameterJdbcTemplate == null) {
       this.namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
    }
}
  • sql:执行的 SQL 语句(如 INSERT 语句,必需),用来执行写入数据到数据库。例如:

JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)"); // 设置 insert SQL 语句
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
  • itemPreparedStatementSetter:用于将 Item 对象设置到 PreparedStatement 中,适用于使用问号 (?) 占位符的 SQL 语句。接口定义如下:

public interface ItemPreparedStatementSetter<T> {
    /**
     * Set parameter values on the given PreparedStatement as determined from
     * the provided item.
     * @param item the item to obtain the values from
     * @param ps the PreparedStatement to invoke setter methods on
     * @throws SQLException if a SQLException is encountered (i.e. there is no
     * need to catch SQLException)
     */
    void setValues(T item, PreparedStatement ps) throws SQLException;

}
  • itemSqlParameterSourceProvider:用于将 Item 对象转换为 SqlParameterSource,适用于使用命名参数 (:parameter) 的 SQL 语句,接口定义如下:

public interface ItemSqlParameterSourceProvider<T> {

    /**
     * Provide parameter values in an {@link SqlParameterSource} based on values from  
     * the provided item.
     * @param item the item to use for parameter values
     * @return parameters extracted from the item
     */
    SqlParameterSource createSqlParameterSource(T item);

}

使用的 SQL 语句如下:

JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)"); // 设置 insert SQL 语句
  • assertUpdates:是否验证批量操作后有记录被更新/插入,默认为 true。如果为 true,但实际没有记录被修改,会抛出异常。部分源码如下:

@Override
public void write(final List<? extends T> items) throws Exception {

    if (!items.isEmpty()) {

       //...
       // 看这里
       if (assertUpdates) {
          for (int i = 0; i < updateCounts.length; i++) {
             int value = updateCounts[i];
             if (value == 0) {
                throw new EmptyResultDataAccessException("Item " + i + " of " + updateCounts.length
                      + " did not update any rows: [" + items.get(i) + "]", 1);
             }
          }
       }
    }
}

 

简单示例

该实例将演示从 csv 文件读取数据,然后通过 JdbcBatchItemWriter 将数据写入到 users_db 数据库表中。详细步骤如下:

准备数据

在项目的 resources 目录下面创建名为 users.csv 的文件,内容如下:

"id","username","password"
1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A"
2,李四,"5E5994FBCFA922D804DF45295AE98604"
3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E"
4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"

 

创建数据表

连接到数据库,执行如下 SQL 语句,创建一张名为 users_db 的数据表。SQL 如下:

CREATE TABLE `users_db` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `username` varchar(100) DEFAULT NULL COMMENT '用户名',
  `password` varchar(100) DEFAULT NULL COMMENT '密码',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

数据表创建完成,结构如下图:

image.png

 

创建实体

创建一个简单的 POJO 用于映射 users_db 表,如下:

package com.hxstrive.spring_batch.dbItemWriterDemo.dto;

import lombok.Data;
import lombok.ToString;

/**
 * 用户DTO
 * @author hxstrive.com
 */
@Data
@ToString
public class User {
    private int id;
    private String username;
    private String password;
}

 

创建配置类

使用 @Configuration 注解创建配置类,配置 Spring Batch 的 Job、Step 等对象,代码如下:

package com.hxstrive.spring_batch.dbItemWriterDemo.config;

import com.hxstrive.spring_batch.dbItemWriterDemo.dto.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.validation.BindException;
import javax.sql.DataSource;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    // 创建Job对象
    @Bean
    public Job dbItemWriterDemoJob() {
        return jobBuilderFactory.get("dbItemWriterDemoJob-" + System.currentTimeMillis())
                .start(dbItemWriterDemoStep())
                .build();
    }

    // 创建Step对象
    @Bean
    public Step dbItemWriterDemoStep() {
        return stepBuilderFactory.get("dbItemWriterDemoStep")
                .<User,User>chunk(2)
                .reader(flatFileItemReader())
                .writer(dbItemWriteDemo())
                .build();
    }

    // 创建ItemReader对象
    @Bean
    @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定
    public FlatFileItemReader<? extends User> flatFileItemReader() {
        FlatFileItemReader<User> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("users.csv"));
        reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名

        // 解析数据
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("id", "username", "password");
        // 把解析出的数据映射到 User 对象中
        DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){
            @Override
            public User mapFieldSet(FieldSet fieldSet) throws BindException {
                User user = new User();
                user.setId(fieldSet.readInt("id"));
                user.setUsername(fieldSet.readString("username"));
                user.setPassword(fieldSet.readString("password"));
                System.out.println("reading user: " + user);
                return user;
            }
        });

        lineMapper.afterPropertiesSet();
        reader.setLineMapper(lineMapper);
        return reader;
    }

    // 创建ItemWriter对象
    @Bean
    public JdbcBatchItemWriter<User> dbItemWriteDemo() {
        JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)");
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        return writer;
    }

}

上述代码,主要查看 dbItemWriteDemo() 方法,其中,手动创建了一个 JdbcBatchItemWriter 对象,通过 setDataSource() 方法设置数据源,通过 setSql() 方法设置插入数据的 SQL INSERT 语句,最后手动创建 BeanPropertyItemSqlParameterSourceProvider 对象,实现根据参数名占位符,而非使用 ? 占位符。

关于 flatFileItemReader() 方法,可以参考Spring Batch4 从普通文件读取数据

 

创建启动类

创建一个 Spring Boot 启动类,记得添加 @EnableBatchProcessing 注解,开启批处理功能。代码如下:

package com.hxstrive.spring_batch.dbItemWriterDemo;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {
    public static void main(String[] args) {
       SpringApplication.run(SpringBatchDemoApplication.class, args);
    }

}

运行应用,输出日志如下:

image.png

通过观察日志,成功读取了 csv 中的数据,继续查看数据库表,如下图:

image.png

成功将数据写入到数据库表,更多关于 JdbcBatchItemWriter 的用法请参考官方文档。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号