Spring Batch4 ItemReader 异常处理

本章将借助 Spring Batch4 的 ItemStreamReader 接口实现当 ItemReader 读取数据失败时,记录状态信息(当前行号)。重启时,将接着出现异常的位置继续读取数据。

ItemStreamReader 接口

ItemStreamReader 接口继承自 ItemReader 接口,主要用于处理那些需要状态管理的读取操作。并且把 ItemReader 的基础读取能力和 ItemStream 的状态管理功能结合起来,这一设计使得它能够在批处理作业重启时记住之前的状态。

主要使用场景

  • 读取大文件:像 CSV、XML 这类大文件,在处理过程中可能需要记录已读取的位置。

  • 数据库查询:对于分批次查询数据库的操作,要记录查询的偏移量。

  • 远程 API 调用:调用远程 API 获取数据时,需要记录分页信息或者最后处理的记录。

  • 事务性资源:处理事务性资源时,要保证重启后数据不会重复处理。

关键方法

ItemStreamReader 接口新增了以下方法:

  • open(ExecutionContext executionContext):在读取操作开始前初始化资源,例如打开文件或者建立数据库连接。

  • update(ExecutionContext executionContext):更新执行上下文,将当前的读取状态保存起来。

  • close():读取操作结束后释放资源,例如关闭文件或者断开数据库连接。

  

简单示例

如果我们使用 ItemReader 从 users.csv 文件读取数据,当遇见用户 id 为 2 的数据,抛出异常。Spring Batch4 将会使用 ExecutionContext 的 putLong() 方法保存当期处理的行号。当再次启动任务时将从 ExecutionContext 中读取上次的行号信息,并在 read() 方法中使用 setLinesToSkip() 方法跳过已经读取的行。

准备工作

创建 users.csv 文件,内容如下:

"id","username","password"
1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A"
2,李四,"5E5994FBCFA922D804DF45295AE98604"
3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E"
4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"

  

创建实体

创建名为 User 的实体类,如下:

package com.hxstrive.spring_batch.restartItemReaderDemo.dto;

import lombok.Data;
import lombok.ToString;

/**
 * 用户DTO
 * @author hxstrive.com
 */
@Data
@ToString
public class User {
    private int id;
    private String username;
    private String password;
}

  

创建 ItemReader

创建名为 RestartReader 的 ItemReader,注意该 ItemReader 实现了 ItemStreamReader 接口。支持

package com.hxstrive.spring_batch.restartItemReaderDemo;

import com.hxstrive.spring_batch.restartItemReaderDemo.dto.User;
import org.springframework.batch.item.*;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
import org.springframework.validation.BindException;
import java.util.Objects;

/**
 * 支持重启的 Reader
 * @author hxstrive.com
 */
@Component("restartReader")
public class RestartReader implements ItemStreamReader<User> {
    // 用于读取 csv 文件
    private FlatFileItemReader<User> userFileItemReader = new FlatFileItemReader<>();
    // 当前行,行号
    private Long curLine = 0L;
    // 是否重启(true-重启;false-为重启)
    private boolean restart = false;
    // 用于在批处理作业的不同阶段(Step 之间、任务重启时)保存和恢复状态信息
    private ExecutionContext executionContext;

    /**
     * 创建 RestartReader,并且进行是数据解析
     */
    public RestartReader() {
        // 设置资源文件
        userFileItemReader.setResource(new ClassPathResource("users.csv"));
        userFileItemReader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名

        // 解析数据
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("id", "username", "password");
        
        // 把解析出的数据映射到 User 对象中
        DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){
            @Override
            public User mapFieldSet(FieldSet fieldSet) throws BindException {
                User user = new User();
                user.setId(fieldSet.readInt("id"));
                user.setUsername(fieldSet.readString("username"));
                user.setPassword(fieldSet.readString("password"));
                return user;
            }
        });

        lineMapper.afterPropertiesSet();
        userFileItemReader.setLineMapper(lineMapper);
    }

    // 读取数据
    @Override
    public User read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        User user = null;
        this.curLine++; // 行号加1

        // 如果是重启,则跳过已经读取过的行号
        if(this.restart) {
            this.userFileItemReader.setLinesToSkip(this.curLine.intValue() - 1);
            this.restart = false; // 恢复状态
            System.out.println("Restart reading from line: " + this.curLine);
        }

        this.userFileItemReader.open(this.executionContext);
        user = this.userFileItemReader.read();

        // 故意抛出异常信息,重启的时候再修改一下代码,将 2 换成 12
        if(Objects.nonNull(user) && user.getId() == 2) {
            throw new RuntimeException("Read data error");
        }

        return user;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        this.executionContext = executionContext;
        // 如果 ExecutionContext 中包含 curLine 键,说明任务重启
        if(executionContext.containsKey("curLine")) {
            this.curLine = executionContext.getLong("curLine");
            this.restart = true;
        } else {
            // 保存行号信息
            this.curLine = 0L;
            executionContext.putLong("curLine", this.curLine);
            System.out.println("Start reading from line: " + (this.curLine + 1));
        }
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        // 更新行号信息
        executionContext.putLong("curLine", this.curLine);
        System.out.println("update()");
    }

    @Override
    public void close() throws ItemStreamException {
        System.out.println("close()");
    }

}

  

创建 BatchConfig 配置

创建名为 BatchConfig 的配置类,然后通过 @Autowired 注解自动注入自定义的 RestartReader,代码如下:

package com.hxstrive.spring_batch.restartItemReaderDemo.config;

import com.hxstrive.spring_batch.restartItemReaderDemo.dto.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.List;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private ItemReader<User> restartReader;


    // 创建Job对象
    @Bean
    public Job restartReaderDemoJob() {
        return jobBuilderFactory.get("restartReaderDemoJob2")
                .start(restartReaderDemoStep())
                .build();
    }

    // 创建Step对象
    @Bean
    public Step restartReaderDemoStep() {
        return stepBuilderFactory.get("restartReaderDemoStep")
                .<User, User>chunk(1)
                .reader(restartReader)
                .writer(new ItemWriter<User>() {
                    @Override
                    public void write(List<? extends User> list) throws Exception {
                        System.out.println(Arrays.toString(list.toArray()));
                    }
                })
                .build();
    }

}

  

运行&验证

运行应用程序,第一次将会抛出异常,输出日志如下:

image.png

修改代码,将如下代码:

if(Objects.nonNull(user) && user.getId() == 3) {
    throw new RuntimeException("Read data error");
}

修改为

if(Objects.nonNull(user) && user.getId() == 13) {
    throw new RuntimeException("Read data error");
}

修改完成后。重新运行应用程序,输出日志如下:

image.png

从上图可知,上次读取到第3行出现了错误,当前将从第2行开始读取。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号