本章将借助 Spring Batch4 的 ItemStreamReader 接口实现当 ItemReader 读取数据失败时,记录状态信息(当前行号)。重启时,将接着出现异常的位置继续读取数据。
ItemStreamReader 接口继承自 ItemReader 接口,主要用于处理那些需要状态管理的读取操作。并且把 ItemReader 的基础读取能力和 ItemStream 的状态管理功能结合起来,这一设计使得它能够在批处理作业重启时记住之前的状态。
读取大文件:像 CSV、XML 这类大文件,在处理过程中可能需要记录已读取的位置。
数据库查询:对于分批次查询数据库的操作,要记录查询的偏移量。
远程 API 调用:调用远程 API 获取数据时,需要记录分页信息或者最后处理的记录。
事务性资源:处理事务性资源时,要保证重启后数据不会重复处理。
ItemStreamReader 接口新增了以下方法:
open(ExecutionContext executionContext):在读取操作开始前初始化资源,例如打开文件或者建立数据库连接。
update(ExecutionContext executionContext):更新执行上下文,将当前的读取状态保存起来。
close():读取操作结束后释放资源,例如关闭文件或者断开数据库连接。
如果我们使用 ItemReader 从 users.csv 文件读取数据,当遇见用户 id 为 2 的数据,抛出异常。Spring Batch4 将会使用 ExecutionContext 的 putLong() 方法保存当期处理的行号。当再次启动任务时将从 ExecutionContext 中读取上次的行号信息,并在 read() 方法中使用 setLinesToSkip() 方法跳过已经读取的行。
创建 users.csv 文件,内容如下:
"id","username","password" 1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A" 2,李四,"5E5994FBCFA922D804DF45295AE98604" 3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E" 4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"
创建名为 User 的实体类,如下:
package com.hxstrive.spring_batch.restartItemReaderDemo.dto; import lombok.Data; import lombok.ToString; /** * 用户DTO * @author hxstrive.com */ @Data @ToString public class User { private int id; private String username; private String password; }
创建名为 RestartReader 的 ItemReader,注意该 ItemReader 实现了 ItemStreamReader 接口。支持
package com.hxstrive.spring_batch.restartItemReaderDemo; import com.hxstrive.spring_batch.restartItemReaderDemo.dto.User; import org.springframework.batch.item.*; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.core.io.ClassPathResource; import org.springframework.stereotype.Component; import org.springframework.validation.BindException; import java.util.Objects; /** * 支持重启的 Reader * @author hxstrive.com */ @Component("restartReader") public class RestartReader implements ItemStreamReader<User> { // 用于读取 csv 文件 private FlatFileItemReader<User> userFileItemReader = new FlatFileItemReader<>(); // 当前行,行号 private Long curLine = 0L; // 是否重启(true-重启;false-为重启) private boolean restart = false; // 用于在批处理作业的不同阶段(Step 之间、任务重启时)保存和恢复状态信息 private ExecutionContext executionContext; /** * 创建 RestartReader,并且进行是数据解析 */ public RestartReader() { // 设置资源文件 userFileItemReader.setResource(new ClassPathResource("users.csv")); userFileItemReader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名 // 解析数据 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames("id", "username", "password"); // 把解析出的数据映射到 User 对象中 DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){ @Override public User mapFieldSet(FieldSet fieldSet) throws BindException { User user = new User(); user.setId(fieldSet.readInt("id")); user.setUsername(fieldSet.readString("username")); user.setPassword(fieldSet.readString("password")); return user; } }); lineMapper.afterPropertiesSet(); userFileItemReader.setLineMapper(lineMapper); } // 读取数据 @Override public User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { User user = null; this.curLine++; // 行号加1 // 如果是重启,则跳过已经读取过的行号 if(this.restart) { this.userFileItemReader.setLinesToSkip(this.curLine.intValue() - 1); this.restart = false; // 恢复状态 System.out.println("Restart reading from line: " + this.curLine); } this.userFileItemReader.open(this.executionContext); user = this.userFileItemReader.read(); // 故意抛出异常信息,重启的时候再修改一下代码,将 2 换成 12 if(Objects.nonNull(user) && user.getId() == 2) { throw new RuntimeException("Read data error"); } return user; } @Override public void open(ExecutionContext executionContext) throws ItemStreamException { this.executionContext = executionContext; // 如果 ExecutionContext 中包含 curLine 键,说明任务重启 if(executionContext.containsKey("curLine")) { this.curLine = executionContext.getLong("curLine"); this.restart = true; } else { // 保存行号信息 this.curLine = 0L; executionContext.putLong("curLine", this.curLine); System.out.println("Start reading from line: " + (this.curLine + 1)); } } @Override public void update(ExecutionContext executionContext) throws ItemStreamException { // 更新行号信息 executionContext.putLong("curLine", this.curLine); System.out.println("update()"); } @Override public void close() throws ItemStreamException { System.out.println("close()"); } }
创建名为 BatchConfig 的配置类,然后通过 @Autowired 注解自动注入自定义的 RestartReader,代码如下:
package com.hxstrive.spring_batch.restartItemReaderDemo.config; import com.hxstrive.spring_batch.restartItemReaderDemo.dto.User; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; import java.util.List; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private ItemReader<User> restartReader; // 创建Job对象 @Bean public Job restartReaderDemoJob() { return jobBuilderFactory.get("restartReaderDemoJob2") .start(restartReaderDemoStep()) .build(); } // 创建Step对象 @Bean public Step restartReaderDemoStep() { return stepBuilderFactory.get("restartReaderDemoStep") .<User, User>chunk(1) .reader(restartReader) .writer(new ItemWriter<User>() { @Override public void write(List<? extends User> list) throws Exception { System.out.println(Arrays.toString(list.toArray())); } }) .build(); } }
运行应用程序,第一次将会抛出异常,输出日志如下:
修改代码,将如下代码:
if(Objects.nonNull(user) && user.getId() == 3) { throw new RuntimeException("Read data error"); }
修改为
if(Objects.nonNull(user) && user.getId() == 13) { throw new RuntimeException("Read data error"); }
修改完成后。重新运行应用程序,输出日志如下:
从上图可知,上次读取到第3行出现了错误,当前将从第2行开始读取。