Spring Batch4 从数据库中读取数据 JdbcPagingItemReader

本章节将介绍如何使用 Spring Batch4 中的 ItemReader 实现类 JdbcPagingItemReader 从数据库中读取数据,JdbcPagingItemReader 是一个强大的数据库读取器,专门用于从关系型数据库中分页读取大量数据。它结合了 JDBC 的高效性和分页查询的优势,避免了一次性加载整个数据集到内存的问题。

JdbcPagingItemReader 常用方法

数据源与连接配置

  • setDataSource(DataSource dataSource)  设置数据库连接的 DataSource,必须配置。

  • setFetchSize(int fetchSize)  设置每次数据库查询获取的记录数,通常与 chunkSize 保持一致以优化性能。例如:

reader.setFetchSize(1000); // 每次查询 1000 条记录
  • setQueryProvider(PagingQueryProvider queryProvider)   设置分页查询提供者,负责生成分页 SQL。常见实现类:

    (1)MySqlPagingQueryProvider  用于 MySQL 数据库

    (2)OraclePagingQueryProvider  用于 Oracle 数据库

    (3)SqlServerPagingQueryProvider  用于 SqlServer 数据库

简单示例:

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("SELECT id, name");
queryProvider.setFromClause("FROM users");
reader.setQueryProvider(queryProvider);
  • setRowMapper(RowMapper<T> rowMapper)  将数据库记录映射为 Java 对象。可使用:

    (1)自定义 RowMapper 实现

    (2)BeanPropertyRowMapper  自动映射字段到属性

简单示例:

reader.setRowMapper(new BeanPropertyRowMapper<>(User.class));
  • setSortKeys(Map<String, Order> sortKeys)  指定分页排序字段(必须配置),确保结果集有序。例如:

Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING); // 按 ID 升序排序
queryProvider.setSortKeys(sortKeys);
  • setParameterValues(Map<String, Object> parameterValues)  设置 SQL 查询的静态参数(如固定条件)。简单示例:

reader.setParameterValues(Map.of("status", "ACTIVE"));

  

高级配置方法

  • setSaveState(boolean saveState)  是否保存读取状态(默认 true),用于作业重启时恢复进度。例如:

reader.setSaveState(true);
  • setName(String name)  设置读取器名称(用于状态保存)。例如:

reader.setName("userPagingReader");
  • setPageSize(int pageSize)  设置每页记录数(默认与 fetchSize 相同)。例如:

reader.setPageSize(500);

  

简单示例

准备工作

在 spring_batch4 数据库中创建数据表 users,并且插入几条数据,SQL 语句如下:

CREATE TABLE `users` (
     `id` int NOT NULL AUTO_INCREMENT COMMENT '用户ID',
     `username` varchar(100) NOT NULL COMMENT '用户名',
     `password` varchar(128) NOT NULL COMMENT '密码',
     PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='用户表';

INSERT INTO users(username, `password`)VALUES('张三', '13BC03AC29FAC7B29736EC3BE5C2F55A');
INSERT INTO users(username, `password`)VALUES('李四', '5E5994FBCFA922D804DF45295AE98604');
INSERT INTO users(username, `password`)VALUES('王五', '6C14DA109E294D1E8155BE8AA4B1CE8E');
INSERT INTO users(username, `password`)VALUES('赵六', '03774AD7979A5909E78F9C9DB3A2F0B2');

执行上述 SQL 后,效果如下图:

image.png

  

创建实体

创建 users 表的实体对象 User,代码如下:

package com.hxstrive.spring_batch.dbItemReaderDemo.dto;

import lombok.Data;
import lombok.ToString;

/**
 * 用户DTO
 * @author hxstrive.com
 */
@Data
@ToString
public class User {
    private int id;
    private String username;
    private String password;
}

  

创建 BatchConfig 配置

使用 @Configuration 注解创建 BatchConfig 配置类,对 Spring Batch 进行配置,代码如下:

package com.hxstrive.spring_batch.dbItemReaderDemo.config;

import com.hxstrive.spring_batch.dbItemReaderDemo.dto.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.RowMapper;

import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;


    // 创建Job对象
    @Bean
    public Job dbItemReaderDemoJob() {
        return jobBuilderFactory.get("dbItemReaderDemoJob")
                .start(dbItemReaderDemoStep())
                .build();
    }

    // 创建Step对象
    @Bean
    public Step dbItemReaderDemoStep() {
        return stepBuilderFactory.get("dbItemReaderDemoStep-" + System.currentTimeMillis())
                .<User,User>chunk(2)
                .reader(jdbcItemReader())
                .writer(new ItemWriter<User>() {
                    @Override
                    public void write(List<? extends User> list) throws Exception {
                        System.out.println("=========== 分隔符 ===========");
                        for (User user : list) {
                            System.out.println(user);
                        }
                    }
                })
                .build();
    }

    // 创建ItemReader对象,使用 JdbcPagingItemReader 实现从数据库读取数据
    @Bean
    @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定
    public ItemReader<? extends User> jdbcItemReader() {
        JdbcPagingItemReader<User> jdbcPagingItemReader = new JdbcPagingItemReader<>();
        jdbcPagingItemReader.setDataSource(dataSource); // 设置数据源
        jdbcPagingItemReader.setFetchSize(2); // 设置一次从数据库中读取的记录数

        // 设置行映射器,将数据库中的记录映射为User对象
        jdbcPagingItemReader.setRowMapper(new RowMapper<User>(){
            @Override
            public User mapRow(ResultSet rs, int rowNum) throws SQLException {
                User user = new User();
                user.setId(rs.getInt("id"));
                user.setUsername(rs.getString("username"));
                user.setPassword(rs.getString("password"));
                return user;
            }
        });

        // 设置查询语句,从数据库中读取数据
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("select id, username, password");
        queryProvider.setFromClause("from users");

        // 指定根据哪个字段进行排序
        Map<String, Order> sort = new HashMap<>();
        sort.put("id", Order.ASCENDING);
        queryProvider.setSortKeys(sort);

        jdbcPagingItemReader.setQueryProvider(queryProvider);
        return jdbcPagingItemReader;
    }

}

上述代码,配置了一个 JdbcPagingItemReader 实例,用于从 MySQL 数据库分页读取用户数据。下面是对上述代码的详细说明:

  • @StepScope:用于将 Bean 的作用域限定为 Step 执行期间,支持在运行时注入 Step 级别的参数(如 JobParameters)。

  • RowMapper:用于将数据库查询结果集(ResultSet)中的单行记录映射为 Java 对象。它通常与 JDBC 相关的 ItemReader 实现(如 JdbcCursorItemReader 和 JdbcPagingItemReader)配合使用。接口定义如下:

package org.springframework.jdbc.core;

import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.lang.Nullable;

@FunctionalInterface
public interface RowMapper<T> {

    @Nullable
    T mapRow(ResultSet rs, int rowNum) throws SQLException;

}
  • MySqlPagingQueryProvider:用于生成符合 MySQL 语法的分页 SQL 语句。它是 PagingQueryProvider 接口的实现类,主要配合 JdbcPagingItemReader 使用,解决大数据量下的高效分页读取问题。部分源码如下:

public class MySqlPagingQueryProvider extends AbstractSqlPagingQueryProvider {

    //...

    private String buildLimitClause(int pageSize) {
       return new StringBuilder().append("LIMIT ").append(pageSize).toString();
    }

    @Override
    public String generateJumpToItemQuery(int itemIndex, int pageSize) {
       // 计算分页信息
       int page = itemIndex / pageSize;
       int offset = (page * pageSize) - 1;
       offset = offset<0 ? 0 : offset;

       // 构建分页语句,如 limit 1,10
       String limitClause = new StringBuilder().append("LIMIT ").append(offset).append(", 1").toString();
       return SqlPagingQueryUtils.generateLimitJumpToQuery(this, limitClause);
    }

}

  

运行&验证

启动服务,删除日志如下图:

image.png

注意,上面 BatchConfig 配置中创建 Job的 stepBuilderFactory.get("dbItemReaderDemoStep-" + System.currentTimeMillis()) 代码,后面添加 System.currentTimeMillis() 是为了任务执行后,下次还能继续执行,方便演示。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号