本章节将介绍如何使用 Spring Batch4 中的 ItemReader 实现类 JdbcPagingItemReader 从数据库中读取数据,JdbcPagingItemReader 是一个强大的数据库读取器,专门用于从关系型数据库中分页读取大量数据。它结合了 JDBC 的高效性和分页查询的优势,避免了一次性加载整个数据集到内存的问题。
setDataSource(DataSource dataSource) 设置数据库连接的 DataSource,必须配置。
setFetchSize(int fetchSize) 设置每次数据库查询获取的记录数,通常与 chunkSize 保持一致以优化性能。例如:
reader.setFetchSize(1000); // 每次查询 1000 条记录
setQueryProvider(PagingQueryProvider queryProvider) 设置分页查询提供者,负责生成分页 SQL。常见实现类:
(1)MySqlPagingQueryProvider 用于 MySQL 数据库
(2)OraclePagingQueryProvider 用于 Oracle 数据库
(3)SqlServerPagingQueryProvider 用于 SqlServer 数据库
简单示例:
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("SELECT id, name"); queryProvider.setFromClause("FROM users"); reader.setQueryProvider(queryProvider);
setRowMapper(RowMapper<T> rowMapper) 将数据库记录映射为 Java 对象。可使用:
(1)自定义 RowMapper 实现
(2)BeanPropertyRowMapper 自动映射字段到属性
简单示例:
reader.setRowMapper(new BeanPropertyRowMapper<>(User.class));
setSortKeys(Map<String, Order> sortKeys) 指定分页排序字段(必须配置),确保结果集有序。例如:
Map<String, Order> sortKeys = new HashMap<>(); sortKeys.put("id", Order.ASCENDING); // 按 ID 升序排序 queryProvider.setSortKeys(sortKeys);
setParameterValues(Map<String, Object> parameterValues) 设置 SQL 查询的静态参数(如固定条件)。简单示例:
reader.setParameterValues(Map.of("status", "ACTIVE"));
setSaveState(boolean saveState) 是否保存读取状态(默认 true),用于作业重启时恢复进度。例如:
reader.setSaveState(true);
setName(String name) 设置读取器名称(用于状态保存)。例如:
reader.setName("userPagingReader");
setPageSize(int pageSize) 设置每页记录数(默认与 fetchSize 相同)。例如:
reader.setPageSize(500);
在 spring_batch4 数据库中创建数据表 users,并且插入几条数据,SQL 语句如下:
CREATE TABLE `users` ( `id` int NOT NULL AUTO_INCREMENT COMMENT '用户ID', `username` varchar(100) NOT NULL COMMENT '用户名', `password` varchar(128) NOT NULL COMMENT '密码', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='用户表'; INSERT INTO users(username, `password`)VALUES('张三', '13BC03AC29FAC7B29736EC3BE5C2F55A'); INSERT INTO users(username, `password`)VALUES('李四', '5E5994FBCFA922D804DF45295AE98604'); INSERT INTO users(username, `password`)VALUES('王五', '6C14DA109E294D1E8155BE8AA4B1CE8E'); INSERT INTO users(username, `password`)VALUES('赵六', '03774AD7979A5909E78F9C9DB3A2F0B2');
执行上述 SQL 后,效果如下图:
创建 users 表的实体对象 User,代码如下:
package com.hxstrive.spring_batch.dbItemReaderDemo.dto; import lombok.Data; import lombok.ToString; /** * 用户DTO * @author hxstrive.com */ @Data @ToString public class User { private int id; private String username; private String password; }
使用 @Configuration 注解创建 BatchConfig 配置类,对 Spring Batch 进行配置,代码如下:
package com.hxstrive.spring_batch.dbItemReaderDemo.config; import com.hxstrive.spring_batch.dbItemReaderDemo.dto.User; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.JdbcPagingItemReader; import org.springframework.batch.item.database.Order; import org.springframework.batch.item.database.support.MySqlPagingQueryProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.RowMapper; import javax.sql.DataSource; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; // 创建Job对象 @Bean public Job dbItemReaderDemoJob() { return jobBuilderFactory.get("dbItemReaderDemoJob") .start(dbItemReaderDemoStep()) .build(); } // 创建Step对象 @Bean public Step dbItemReaderDemoStep() { return stepBuilderFactory.get("dbItemReaderDemoStep-" + System.currentTimeMillis()) .<User,User>chunk(2) .reader(jdbcItemReader()) .writer(new ItemWriter<User>() { @Override public void write(List<? extends User> list) throws Exception { System.out.println("=========== 分隔符 ==========="); for (User user : list) { System.out.println(user); } } }) .build(); } // 创建ItemReader对象,使用 JdbcPagingItemReader 实现从数据库读取数据 @Bean @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定 public ItemReader<? extends User> jdbcItemReader() { JdbcPagingItemReader<User> jdbcPagingItemReader = new JdbcPagingItemReader<>(); jdbcPagingItemReader.setDataSource(dataSource); // 设置数据源 jdbcPagingItemReader.setFetchSize(2); // 设置一次从数据库中读取的记录数 // 设置行映射器,将数据库中的记录映射为User对象 jdbcPagingItemReader.setRowMapper(new RowMapper<User>(){ @Override public User mapRow(ResultSet rs, int rowNum) throws SQLException { User user = new User(); user.setId(rs.getInt("id")); user.setUsername(rs.getString("username")); user.setPassword(rs.getString("password")); return user; } }); // 设置查询语句,从数据库中读取数据 MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("select id, username, password"); queryProvider.setFromClause("from users"); // 指定根据哪个字段进行排序 Map<String, Order> sort = new HashMap<>(); sort.put("id", Order.ASCENDING); queryProvider.setSortKeys(sort); jdbcPagingItemReader.setQueryProvider(queryProvider); return jdbcPagingItemReader; } }
上述代码,配置了一个 JdbcPagingItemReader 实例,用于从 MySQL 数据库分页读取用户数据。下面是对上述代码的详细说明:
@StepScope:用于将 Bean 的作用域限定为 Step 执行期间,支持在运行时注入 Step 级别的参数(如 JobParameters)。
RowMapper:用于将数据库查询结果集(ResultSet)中的单行记录映射为 Java 对象。它通常与 JDBC 相关的 ItemReader 实现(如 JdbcCursorItemReader 和 JdbcPagingItemReader)配合使用。接口定义如下:
package org.springframework.jdbc.core; import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.lang.Nullable; @FunctionalInterface public interface RowMapper<T> { @Nullable T mapRow(ResultSet rs, int rowNum) throws SQLException; }
MySqlPagingQueryProvider:用于生成符合 MySQL 语法的分页 SQL 语句。它是 PagingQueryProvider 接口的实现类,主要配合 JdbcPagingItemReader 使用,解决大数据量下的高效分页读取问题。部分源码如下:
public class MySqlPagingQueryProvider extends AbstractSqlPagingQueryProvider { //... private String buildLimitClause(int pageSize) { return new StringBuilder().append("LIMIT ").append(pageSize).toString(); } @Override public String generateJumpToItemQuery(int itemIndex, int pageSize) { // 计算分页信息 int page = itemIndex / pageSize; int offset = (page * pageSize) - 1; offset = offset<0 ? 0 : offset; // 构建分页语句,如 limit 1,10 String limitClause = new StringBuilder().append("LIMIT ").append(offset).append(", 1").toString(); return SqlPagingQueryUtils.generateLimitJumpToQuery(this, limitClause); } }
启动服务,删除日志如下图:
注意,上面 BatchConfig 配置中创建 Job的 stepBuilderFactory.get("dbItemReaderDemoStep-" + System.currentTimeMillis()) 代码,后面添加 System.currentTimeMillis() 是为了任务执行后,下次还能继续执行,方便演示。