错误处理在编程中是无法避免的,那么如何处理错误呢?Spring Batch4 提供了完善的错误处理机制,使批处理作业能够优雅地处理各种异常情况。
以下是 Spring Batch4 中错误处理的全面解析:
跳过策略允许指定在遇到特定异常时(自定义数据校验异常)是否跳过当前项,而不是终止整个作业。例如:
public BusinessException extends { public BusinessException() { } public BusinessException(String message) { super(message); } public BusinessException(String message, Throwable cause) { super(message, cause); } public BusinessException(Throwable cause) { super(cause); } }
创建一个 Step,设置跳过策略,以及添加跳过监听器,如下:
// 创建Step对象 @Bean public Step skipDemoStep() { return stepBuilderFactory.get("skipDemoStep") .<User, User>chunk(1) .reader(flatFileItemReader()) .processor(itemProcessor()) .writer(new ItemWriter<User>() { @Override public void write(List<? extends User> items) throws Exception { System.out.println(Arrays.toString(items.toArray())); } }) .faultTolerant() // 开启容错机制 .skip(BusinessException.class) // 指定需要跳过的异常类型 .skipLimit(10) // 最多跳过10条记录 .listener(new SkipListener<User, User>() { // 指定跳过监听器 @Override public void onSkipInRead(Throwable t) { System.out.println(">> skip in read: " + t); // 跳过读 } @Override public void onSkipInWrite(User item, Throwable t) { System.out.println(">> skip in write: " + t); // 跳过写 } @Override public void onSkipInProcess(User item, Throwable t) { System.out.println(">> skip in process: " + t); // 跳过处理 } }) .build(); }
在进行批处理时,如果处理的任务较多,那么处理周期会比较长。在处理周期内,很有可能遇上网络波动,导致数据库连接失败、与其他服务通信失败等错误。Spring Batch4 中,对于这种暂时性的错误,可以配置重试机制。通过重试来解决该类问题。例如:
// 创建Step对象 @Bean public Step retryDemoStep() { return stepBuilderFactory.get("retryDemoStep") .<User, User>chunk(1) .reader(flatFileItemReader()) .processor(itemProcessor()) .writer(new ItemWriter<User>() { @Override public void write(List<? extends User> items) throws Exception { System.out.println(Arrays.toString(items.toArray())); } }) .faultTolerant() // 开启容错机制 .retry(BusinessException.class) // 指定需要重试的异常类 .retryLimit(3) // 重试次数 .listener(new RetryListener() { // 注册重试监听器,下面仅仅打印了日志 @Override public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) { System.out.println("RetryListener :: open"); return true; } @Override public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) { System.out.println("RetryListener :: close"); } @Override public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) { System.out.println("RetryListener :: error"); } }) .build(); }
Spring Batch4 还提供了一个 noSkip(Class<? extends Throwable> type) 方法,用于显式指定某些异常(及其子类)在发生时不应该被跳过,即当这些异常发生时,批处理步骤会终止执行,而不是继续处理后续数据。
ExceptionClassifierSkipPolicy 是一个灵活的跳过策略实现,它允许根据异常类型来决定是否跳过某个项(item)的处理。它通过分类器(classifier)将不同类型的异常映射到不同的 SkipPolicy,从而实现对不同异常的差异化处理。
ExceptionClassifierSkipPolicy 的主要作用是:
根据抛出的异常类型进行分类
为不同类型的异常指定不同的跳过策略
实现复杂的异常处理逻辑,满足多样化的业务需求
例如:
@Bean public Step skipDemoStep() { // 配置异常映射规则 Map<Class<? extends Throwable>, SkipPolicy> policyMap = new HashMap<>(); // 遇到 NullPointerException 时,使用 NeverSkipItemSkipPolicy 策略 // 表示永远不会跳过任何项 policyMap.put(NullPointerException.class, new NeverSkipItemSkipPolicy()); // 遇到 IllegalArgumentException 时,使用 NeverSkipItemSkipPolicy 策略 // 表示永远不会跳过任何项 policyMap.put(IllegalArgumentException.class, new NeverSkipItemSkipPolicy()); // 遇到 BusinessException 时,使用 AlwaysSkipItemSkipPolicy 策略 // 表示总是会跳过遇到异常的项 policyMap.put(BusinessException.class, new AlwaysSkipItemSkipPolicy()); ExceptionClassifierSkipPolicy skipPolicy = new ExceptionClassifierSkipPolicy(); skipPolicy.setPolicyMap(policyMap); return stepBuilderFactory.get("skipDemoStep") .<User, User>chunk(1) .reader(flatFileItemReader()) .processor(itemProcessor()) .writer(new ItemWriter<User>() { @Override public void write(List<? extends User> items) throws Exception { System.out.println(Arrays.toString(items.toArray())); } }) .faultTolerant() // 开启容错机制 .skipPolicy(skipPolicy) // 跳过策略【看这里】 .listener(new SkipListener<User, User>() { // 指定跳过监听器 @Override public void onSkipInRead(Throwable t) { System.out.println(">> skip in read: " + t); } @Override public void onSkipInWrite(User item, Throwable t) { System.out.println(">> skip in write: " + t); } @Override public void onSkipInProcess(User item, Throwable t) { System.out.println(">> skip in process: " + t); } }) .build(); }
这些机制可以根据实际需求灵活组合使用,帮助构建健壮的批处理应用程序,能够优雅地处理各种错误情况,同时保证数据的一致性和处理的效率。