Spring Batch4 教程

Spring Batch4 错误处理

错误处理在编程中是无法避免的,那么如何处理错误呢?Spring Batch4 提供了完善的错误处理机制,使批处理作业能够优雅地处理各种异常情况。

以下是 Spring Batch4 中错误处理的全面解析:

跳过策略(Skip Policy)

跳过策略允许指定在遇到特定异常时(自定义数据校验异常)是否跳过当前项,而不是终止整个作业。例如:

public BusinessException extends {

    public BusinessException() {
    }
    
    public BusinessException(String message) {
        super(message);
    }
    
    public BusinessException(String message, Throwable cause) {
        super(message, cause);
    }
    
    public BusinessException(Throwable cause) {
        super(cause);
    }

}

创建一个 Step,设置跳过策略,以及添加跳过监听器,如下:

// 创建Step对象
@Bean
public Step skipDemoStep() {
    return stepBuilderFactory.get("skipDemoStep")
            .<User, User>chunk(1)
            .reader(flatFileItemReader())
            .processor(itemProcessor())
            .writer(new ItemWriter<User>() {
                @Override
                public void write(List<? extends User> items) throws Exception {
                    System.out.println(Arrays.toString(items.toArray()));
                }
            })
            .faultTolerant() // 开启容错机制
            .skip(BusinessException.class) // 指定需要跳过的异常类型
            .skipLimit(10) // 最多跳过10条记录
            .listener(new SkipListener<User, User>() { // 指定跳过监听器
                @Override
                public void onSkipInRead(Throwable t) {
                    System.out.println(">> skip in read: " + t); // 跳过读
                }

                @Override
                public void onSkipInWrite(User item, Throwable t) {
                    System.out.println(">> skip in write: " + t); // 跳过写
                }

                @Override
                public void onSkipInProcess(User item, Throwable t) {
                    System.out.println(">> skip in process: " + t); // 跳过处理
                }
            })
            .build();
}

  

重试机制(Retry Mechanism)

在进行批处理时,如果处理的任务较多,那么处理周期会比较长。在处理周期内,很有可能遇上网络波动,导致数据库连接失败、与其他服务通信失败等错误。Spring Batch4 中,对于这种暂时性的错误,可以配置重试机制。通过重试来解决该类问题。例如:

// 创建Step对象
@Bean
public Step retryDemoStep() {
    return stepBuilderFactory.get("retryDemoStep")
            .<User, User>chunk(1)
            .reader(flatFileItemReader())
            .processor(itemProcessor())
            .writer(new ItemWriter<User>() {
                @Override
                public void write(List<? extends User> items) throws Exception {
                    System.out.println(Arrays.toString(items.toArray()));
                }
            })
            .faultTolerant() // 开启容错机制
            .retry(BusinessException.class) // 指定需要重试的异常类
            .retryLimit(3) // 重试次数
            .listener(new RetryListener() { // 注册重试监听器,下面仅仅打印了日志
                @Override
                public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
                    System.out.println("RetryListener :: open");
                    return true;
                }

                @Override
                public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                    System.out.println("RetryListener :: close");
                }

                @Override
                public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                    System.out.println("RetryListener :: error");
                }
            })
            .build();
}

Spring Batch4 还提供了一个 noSkip(Class<? extends Throwable> type) 方法,用于显式指定某些异常(及其子类)在发生时不应该被跳过,即当这些异常发生时,批处理步骤会终止执行,而不是继续处理后续数据。

  

异常分类器(Exception Classifier)

ExceptionClassifierSkipPolicy 是一个灵活的跳过策略实现,它允许根据异常类型来决定是否跳过某个项(item)的处理。它通过分类器(classifier)将不同类型的异常映射到不同的 SkipPolicy,从而实现对不同异常的差异化处理。

ExceptionClassifierSkipPolicy 的主要作用是:

  • 根据抛出的异常类型进行分类

  • 为不同类型的异常指定不同的跳过策略

  • 实现复杂的异常处理逻辑,满足多样化的业务需求

例如:

@Bean
public Step skipDemoStep() {
    // 配置异常映射规则
    Map<Class<? extends Throwable>, SkipPolicy> policyMap = new HashMap<>();
    // 遇到 NullPointerException 时,使用 NeverSkipItemSkipPolicy 策略
    // 表示永远不会跳过任何项
    policyMap.put(NullPointerException.class, new NeverSkipItemSkipPolicy());

    // 遇到 IllegalArgumentException 时,使用 NeverSkipItemSkipPolicy 策略
    // 表示永远不会跳过任何项
    policyMap.put(IllegalArgumentException.class, new NeverSkipItemSkipPolicy());

    // 遇到 BusinessException 时,使用 AlwaysSkipItemSkipPolicy 策略
    // 表示总是会跳过遇到异常的项
    policyMap.put(BusinessException.class, new AlwaysSkipItemSkipPolicy());

    ExceptionClassifierSkipPolicy skipPolicy = new ExceptionClassifierSkipPolicy();
    skipPolicy.setPolicyMap(policyMap);

    return stepBuilderFactory.get("skipDemoStep")
            .<User, User>chunk(1)
            .reader(flatFileItemReader())
            .processor(itemProcessor())
            .writer(new ItemWriter<User>() {
                @Override
                public void write(List<? extends User> items) throws Exception {
                    System.out.println(Arrays.toString(items.toArray()));
                }
            })
            .faultTolerant() // 开启容错机制
            .skipPolicy(skipPolicy) // 跳过策略【看这里】
            .listener(new SkipListener<User, User>() { // 指定跳过监听器
                @Override
                public void onSkipInRead(Throwable t) {
                    System.out.println(">> skip in read: " + t);
                }

                @Override
                public void onSkipInWrite(User item, Throwable t) {
                    System.out.println(">> skip in write: " + t);
                }

                @Override
                public void onSkipInProcess(User item, Throwable t) {
                    System.out.println(">> skip in process: " + t);
                }
            })
            .build();
}

这些机制可以根据实际需求灵活组合使用,帮助构建健壮的批处理应用程序,能够优雅地处理各种错误情况,同时保证数据的一致性和处理的效率。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号