本章节将介绍如何使用 Spring Batch4 中的 ItemReader 实现类 JdbcPagingItemReader 从数据库中读取数据,JdbcPagingItemReader 是一个强大的数据库读取器,专门用于从关系型数据库中分页读取大量数据。它结合了 JDBC 的高效性和分页查询的优势,避免了一次性加载整个数据集到内存的问题。
setDataSource(DataSource dataSource) 设置数据库连接的 DataSource,必须配置。
setFetchSize(int fetchSize) 设置每次数据库查询获取的记录数,通常与 chunkSize 保持一致以优化性能。例如:
reader.setFetchSize(1000); // 每次查询 1000 条记录
setQueryProvider(PagingQueryProvider queryProvider) 设置分页查询提供者,负责生成分页 SQL。常见实现类:
(1)MySqlPagingQueryProvider 用于 MySQL 数据库
(2)OraclePagingQueryProvider 用于 Oracle 数据库
(3)SqlServerPagingQueryProvider 用于 SqlServer 数据库
简单示例:
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("SELECT id, name");
queryProvider.setFromClause("FROM users");
reader.setQueryProvider(queryProvider);setRowMapper(RowMapper<T> rowMapper) 将数据库记录映射为 Java 对象。可使用:
(1)自定义 RowMapper 实现
(2)BeanPropertyRowMapper 自动映射字段到属性
简单示例:
reader.setRowMapper(new BeanPropertyRowMapper<>(User.class));
setSortKeys(Map<String, Order> sortKeys) 指定分页排序字段(必须配置),确保结果集有序。例如:
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING); // 按 ID 升序排序
queryProvider.setSortKeys(sortKeys);setParameterValues(Map<String, Object> parameterValues) 设置 SQL 查询的静态参数(如固定条件)。简单示例:
reader.setParameterValues(Map.of("status", "ACTIVE"));
setSaveState(boolean saveState) 是否保存读取状态(默认 true),用于作业重启时恢复进度。例如:
reader.setSaveState(true);
setName(String name) 设置读取器名称(用于状态保存)。例如:
reader.setName("userPagingReader");setPageSize(int pageSize) 设置每页记录数(默认与 fetchSize 相同)。例如:
reader.setPageSize(500);
在 spring_batch4 数据库中创建数据表 users,并且插入几条数据,SQL 语句如下:
CREATE TABLE `users` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '用户ID',
`username` varchar(100) NOT NULL COMMENT '用户名',
`password` varchar(128) NOT NULL COMMENT '密码',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='用户表';
INSERT INTO users(username, `password`)VALUES('张三', '13BC03AC29FAC7B29736EC3BE5C2F55A');
INSERT INTO users(username, `password`)VALUES('李四', '5E5994FBCFA922D804DF45295AE98604');
INSERT INTO users(username, `password`)VALUES('王五', '6C14DA109E294D1E8155BE8AA4B1CE8E');
INSERT INTO users(username, `password`)VALUES('赵六', '03774AD7979A5909E78F9C9DB3A2F0B2');执行上述 SQL 后,效果如下图:

创建 users 表的实体对象 User,代码如下:
package com.hxstrive.spring_batch.dbItemReaderDemo.dto;
import lombok.Data;
import lombok.ToString;
/**
* 用户DTO
* @author hxstrive.com
*/
@Data
@ToString
public class User {
private int id;
private String username;
private String password;
}
使用 @Configuration 注解创建 BatchConfig 配置类,对 Spring Batch 进行配置,代码如下:
package com.hxstrive.spring_batch.dbItemReaderDemo.config;
import com.hxstrive.spring_batch.dbItemReaderDemo.dto.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.RowMapper;
import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Spring Batch 配置类
* @author hxstrive.com
*/
@Configuration
public class BatchConfig {
// 用于创建和配置 Job 对象的工厂类
@Autowired
private JobBuilderFactory jobBuilderFactory;
// 用于创建和配置 Step 对象的工厂类
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
// 创建Job对象
@Bean
public Job dbItemReaderDemoJob() {
return jobBuilderFactory.get("dbItemReaderDemoJob")
.start(dbItemReaderDemoStep())
.build();
}
// 创建Step对象
@Bean
public Step dbItemReaderDemoStep() {
return stepBuilderFactory.get("dbItemReaderDemoStep-" + System.currentTimeMillis())
.<User,User>chunk(2)
.reader(jdbcItemReader())
.writer(new ItemWriter<User>() {
@Override
public void write(List<? extends User> list) throws Exception {
System.out.println("=========== 分隔符 ===========");
for (User user : list) {
System.out.println(user);
}
}
})
.build();
}
// 创建ItemReader对象,使用 JdbcPagingItemReader 实现从数据库读取数据
@Bean
@StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定
public ItemReader<? extends User> jdbcItemReader() {
JdbcPagingItemReader<User> jdbcPagingItemReader = new JdbcPagingItemReader<>();
jdbcPagingItemReader.setDataSource(dataSource); // 设置数据源
jdbcPagingItemReader.setFetchSize(2); // 设置一次从数据库中读取的记录数
// 设置行映射器,将数据库中的记录映射为User对象
jdbcPagingItemReader.setRowMapper(new RowMapper<User>(){
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user = new User();
user.setId(rs.getInt("id"));
user.setUsername(rs.getString("username"));
user.setPassword(rs.getString("password"));
return user;
}
});
// 设置查询语句,从数据库中读取数据
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("select id, username, password");
queryProvider.setFromClause("from users");
// 指定根据哪个字段进行排序
Map<String, Order> sort = new HashMap<>();
sort.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sort);
jdbcPagingItemReader.setQueryProvider(queryProvider);
return jdbcPagingItemReader;
}
}上述代码,配置了一个 JdbcPagingItemReader 实例,用于从 MySQL 数据库分页读取用户数据。下面是对上述代码的详细说明:
@StepScope:用于将 Bean 的作用域限定为 Step 执行期间,支持在运行时注入 Step 级别的参数(如 JobParameters)。
RowMapper:用于将数据库查询结果集(ResultSet)中的单行记录映射为 Java 对象。它通常与 JDBC 相关的 ItemReader 实现(如 JdbcCursorItemReader 和 JdbcPagingItemReader)配合使用。接口定义如下:
package org.springframework.jdbc.core;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.lang.Nullable;
@FunctionalInterface
public interface RowMapper<T> {
@Nullable
T mapRow(ResultSet rs, int rowNum) throws SQLException;
}MySqlPagingQueryProvider:用于生成符合 MySQL 语法的分页 SQL 语句。它是 PagingQueryProvider 接口的实现类,主要配合 JdbcPagingItemReader 使用,解决大数据量下的高效分页读取问题。部分源码如下:
public class MySqlPagingQueryProvider extends AbstractSqlPagingQueryProvider {
//...
private String buildLimitClause(int pageSize) {
return new StringBuilder().append("LIMIT ").append(pageSize).toString();
}
@Override
public String generateJumpToItemQuery(int itemIndex, int pageSize) {
// 计算分页信息
int page = itemIndex / pageSize;
int offset = (page * pageSize) - 1;
offset = offset<0 ? 0 : offset;
// 构建分页语句,如 limit 1,10
String limitClause = new StringBuilder().append("LIMIT ").append(offset).append(", 1").toString();
return SqlPagingQueryUtils.generateLimitJumpToQuery(this, limitClause);
}
}
启动服务,删除日志如下图:

注意,上面 BatchConfig 配置中创建 Job的 stepBuilderFactory.get("dbItemReaderDemoStep-" + System.currentTimeMillis()) 代码,后面添加 System.currentTimeMillis() 是为了任务执行后,下次还能继续执行,方便演示。