错误处理在编程中是无法避免的,那么如何处理错误呢?Spring Batch4 提供了完善的错误处理机制,使批处理作业能够优雅地处理各种异常情况。
以下是 Spring Batch4 中错误处理的全面解析:
跳过策略允许指定在遇到特定异常时(自定义数据校验异常)是否跳过当前项,而不是终止整个作业。例如:
public BusinessException extends {
public BusinessException() {
}
public BusinessException(String message) {
super(message);
}
public BusinessException(String message, Throwable cause) {
super(message, cause);
}
public BusinessException(Throwable cause) {
super(cause);
}
}创建一个 Step,设置跳过策略,以及添加跳过监听器,如下:
// 创建Step对象
@Bean
public Step skipDemoStep() {
return stepBuilderFactory.get("skipDemoStep")
.<User, User>chunk(1)
.reader(flatFileItemReader())
.processor(itemProcessor())
.writer(new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
System.out.println(Arrays.toString(items.toArray()));
}
})
.faultTolerant() // 开启容错机制
.skip(BusinessException.class) // 指定需要跳过的异常类型
.skipLimit(10) // 最多跳过10条记录
.listener(new SkipListener<User, User>() { // 指定跳过监听器
@Override
public void onSkipInRead(Throwable t) {
System.out.println(">> skip in read: " + t); // 跳过读
}
@Override
public void onSkipInWrite(User item, Throwable t) {
System.out.println(">> skip in write: " + t); // 跳过写
}
@Override
public void onSkipInProcess(User item, Throwable t) {
System.out.println(">> skip in process: " + t); // 跳过处理
}
})
.build();
}
在进行批处理时,如果处理的任务较多,那么处理周期会比较长。在处理周期内,很有可能遇上网络波动,导致数据库连接失败、与其他服务通信失败等错误。Spring Batch4 中,对于这种暂时性的错误,可以配置重试机制。通过重试来解决该类问题。例如:
// 创建Step对象
@Bean
public Step retryDemoStep() {
return stepBuilderFactory.get("retryDemoStep")
.<User, User>chunk(1)
.reader(flatFileItemReader())
.processor(itemProcessor())
.writer(new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
System.out.println(Arrays.toString(items.toArray()));
}
})
.faultTolerant() // 开启容错机制
.retry(BusinessException.class) // 指定需要重试的异常类
.retryLimit(3) // 重试次数
.listener(new RetryListener() { // 注册重试监听器,下面仅仅打印了日志
@Override
public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
System.out.println("RetryListener :: open");
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
System.out.println("RetryListener :: close");
}
@Override
public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
System.out.println("RetryListener :: error");
}
})
.build();
}Spring Batch4 还提供了一个 noSkip(Class<? extends Throwable> type) 方法,用于显式指定某些异常(及其子类)在发生时不应该被跳过,即当这些异常发生时,批处理步骤会终止执行,而不是继续处理后续数据。
ExceptionClassifierSkipPolicy 是一个灵活的跳过策略实现,它允许根据异常类型来决定是否跳过某个项(item)的处理。它通过分类器(classifier)将不同类型的异常映射到不同的 SkipPolicy,从而实现对不同异常的差异化处理。
ExceptionClassifierSkipPolicy 的主要作用是:
根据抛出的异常类型进行分类
为不同类型的异常指定不同的跳过策略
实现复杂的异常处理逻辑,满足多样化的业务需求
例如:
@Bean
public Step skipDemoStep() {
// 配置异常映射规则
Map<Class<? extends Throwable>, SkipPolicy> policyMap = new HashMap<>();
// 遇到 NullPointerException 时,使用 NeverSkipItemSkipPolicy 策略
// 表示永远不会跳过任何项
policyMap.put(NullPointerException.class, new NeverSkipItemSkipPolicy());
// 遇到 IllegalArgumentException 时,使用 NeverSkipItemSkipPolicy 策略
// 表示永远不会跳过任何项
policyMap.put(IllegalArgumentException.class, new NeverSkipItemSkipPolicy());
// 遇到 BusinessException 时,使用 AlwaysSkipItemSkipPolicy 策略
// 表示总是会跳过遇到异常的项
policyMap.put(BusinessException.class, new AlwaysSkipItemSkipPolicy());
ExceptionClassifierSkipPolicy skipPolicy = new ExceptionClassifierSkipPolicy();
skipPolicy.setPolicyMap(policyMap);
return stepBuilderFactory.get("skipDemoStep")
.<User, User>chunk(1)
.reader(flatFileItemReader())
.processor(itemProcessor())
.writer(new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
System.out.println(Arrays.toString(items.toArray()));
}
})
.faultTolerant() // 开启容错机制
.skipPolicy(skipPolicy) // 跳过策略【看这里】
.listener(new SkipListener<User, User>() { // 指定跳过监听器
@Override
public void onSkipInRead(Throwable t) {
System.out.println(">> skip in read: " + t);
}
@Override
public void onSkipInWrite(User item, Throwable t) {
System.out.println(">> skip in write: " + t);
}
@Override
public void onSkipInProcess(User item, Throwable t) {
System.out.println(">> skip in process: " + t);
}
})
.build();
}这些机制可以根据实际需求灵活组合使用,帮助构建健壮的批处理应用程序,能够优雅地处理各种错误情况,同时保证数据的一致性和处理的效率。