在 Spring Batch4 中,SkipPolicy 接口及其子类是用于实现跳过策略,以处理批处理过程中遇到的异常情况。下文将对 SkipPolicy 接口以及子类进行简单介绍,它们的类关系如下图:
SkipPolicy 接口位于 org.springframework.batch.core.step.skip 包下,它定义了批处理作业在遇到异常时,决定是否跳过当前处理项的策略。该接口只有一个方法:
package org.springframework.batch.core.step.skip; /** * 决定批处理过程中是否应该跳过某个异常的策略接口。 * * 使用场景: * 1. 需要根据异常类型动态决定是否跳过 * 2. 需要根据已跳过次数做决策 * 3. 需要实现复杂的跳过逻辑 * * 实现要求: * - 应保持无状态(线程安全) * - 不应抛出未声明的异常 * - 应正确处理 skipCount < 0 的探测情况 * * @author Lucas Ward * @author Dave Syer * @author Mahmoud Ben Hassine */ public interface SkipPolicy { /** * 判断是否应该跳过当前异常并继续处理。 * * @param t 处理过程中遇到的异常,不允许为 null * @param skipCount 当前步骤已跳过的记录数(从 0 开始) * @return true 表示应该跳过并继续处理,false 表示不应跳过 * @throws SkipLimitExceededException 当跳过次数超过限制时抛出 * @throws IllegalArgumentException 当参数 t 为 null 时抛出 * * 方法契约: * 1. 实现必须能够处理 skipCount<0 的情况(用于探测可跳过异常类型) * 2. 实现不应抛出未声明的检查异常 * 3. 实现应快速失败(对 null 参数立即抛出 IllegalArgumentException) * * 典型实现示例: * // 基于异常类型的简单跳过策略 * public boolean shouldSkip(Throwable t, int skipCount) { * if (t instanceof DataFormatException) { * return true; * } * return false; * } * * // 带跳过次数限制的策略 * public boolean shouldSkip(Throwable t, int skipCount) { * if (skipCount >= maxSkips) { * throw new SkipLimitExceededException(maxSkips, t); * } * return skippableExceptions.contains(t.getClass()); * } */ boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException; }
shouldSkip() 方法说明,其中:
参数:
t:表示在处理过程中抛出的异常对象。
skipCount:表示到目前为止已经跳过的项数。
返回值:
true 表示应该跳过当前项,继续处理后续项;
false 表示不应该跳过,通常会导致作业失败或采取其他处理方式(如重试)。
异常:
SkipLimitExceededException 当跳过的项数超过了预先设定的跳过限制时抛出。
从上面的类关系图可知,SkipPolicy 接口提供了5个内置的实现,分别如下:
该策略实现表示永远不会跳过任何项。无论遇到什么异常,shouldSkip() 方法都会返回 false,即遇到异常就会导致作业失败,除非有其他错误处理机制(如重试)介入。
源码如下:
package org.springframework.batch.core.step.skip; /** * {@link SkipPolicy} implementation that always returns false, * indicating that an item should not be skipped. * * @author Lucas Ward */ public class NeverSkipItemSkipPolicy implements SkipPolicy{ @Override public boolean shouldSkip(Throwable t, int skipCount) { return false; } }
此策略实现总是会跳过遇到异常的项。无论抛出何种异常,shouldSkip() 方法始终返回 true,这意味着批处理作业会跳过有问题的项,继续处理后续的项。
源码如下:
package org.springframework.batch.core.step.skip; /** * Implementation of the {@link SkipPolicy} interface that * will always return that an item should be skipped. * * @author Ben Hale * @author Lucas Ward */ public class AlwaysSkipItemSkipPolicy implements SkipPolicy { @Override public boolean shouldSkip(Throwable t, int skipCount) { return true; } }
该策略通过 ExceptionClassifier 来决定是否跳过。可以根据不同的异常类型配置不同的跳过策略。例如,你可以配置当遇到某种解析异常时跳过,遇到数据完整性异常时不跳过。它允许根据异常的具体类型进行更细粒度的控制,比简单的 “总是跳过” 或 “从不跳过” 更加灵活。
源码如下:
package org.springframework.batch.core.step.skip; import java.util.Map; import org.springframework.classify.Classifier; import org.springframework.classify.SubclassClassifier; /** * 基于异常分类器的动态跳过策略实现。 * * 核心机制: * 1. 使用SubclassClassifier根据异常类型动态选择具体的SkipPolicy * 2. 支持两种配置方式:直接注入分类器或通过Map配置 * 3. 默认使用NeverSkipItemSkipPolicy作为未匹配异常的默认策略 * * @author Dave Syer * @see SubclassClassifier 用于异常类型分类的核心组件 */ public class ExceptionClassifierSkipPolicy implements SkipPolicy { /** * 异常分类器,维护Throwable类型到SkipPolicy的映射关系 */ private SubclassClassifier<Throwable, SkipPolicy> classifier; /** * 设置异常分类器(直接注入方式) * * @param classifier 预配置好的异常分类器实例 */ public void setExceptionClassifier(SubclassClassifier<Throwable, SkipPolicy> classifier) { this.classifier = classifier; } /** * 通过策略Map配置分类器(声明式配置方式) * * @param policyMap 异常类型与策略的映射Map */ public void setPolicyMap(Map<Class<? extends Throwable>, SkipPolicy> policyMap) { this.classifier = new SubclassClassifier<>( policyMap, new NeverSkipItemSkipPolicy()); } /** * 核心跳过决策方法 * * 执行流程: * 1. 通过分类器根据异常类型查找对应的SkipPolicy * 2. 委托给具体策略执行实际判断 * * @param t 发生的异常对象(非空) * @param skipCount 当前步骤已跳过的记录数(从0开始) * @return true允许跳过,false拒绝跳过 * @throws SkipLimitExceededException 当具体策略判断跳过次数超限时抛出 */ @Override public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException { return classifier.classify(t).shouldSkip(t, skipCount); } //... }
下面是 SubclassClassifier 类的源码,了解 classify() 方法,如下:
import java.util.*; import java.util.concurrent.*; /** * 基于类层次结构的分类器实现,支持按照类/父类/接口进行分类匹配。 * * @param <T> 输入对象的类型(通常为Throwable) * @param <C> 分类结果的类型(通常为Boolean或SkipPolicy) */ public class SubclassClassifier<T, C> implements Classifier<T, C> { // 存储类类型与分类结果的映射关系(线程安全) private ConcurrentMap<Class<? extends T>, C> classified; // 默认分类结果(当无匹配时返回) private C defaultValue; //.... /** * 使用预定义映射和默认值创建分类器 * @param typeMap 初始类型映射表 * @param defaultValue 默认分类结果 */ public SubclassClassifier(Map<Class<? extends T>, C> typeMap, C defaultValue) { this.classified = new ConcurrentHashMap<>(); this.defaultValue = defaultValue; this.classified.putAll(typeMap); // 线程安全地初始化 } /** * 执行分类操作,查找顺序: * 1. 精确类型匹配 * 2. 父类层次结构匹配(直到Object类) * 3. 接口继承结构匹配 * 4. 返回默认值 * * 注意:匹配结果会被缓存以提升后续性能 * * @param classifiable 需要分类的对象(允许为null) * @return 分类结果或默认值 */ public C classify(T classifiable) { // 空对象处理 if (classifiable == null) { return this.defaultValue; } Class<? extends T> exceptionClass = classifiable.getClass(); // 第一步:精确类型匹配 if (this.classified.containsKey(exceptionClass)) { return this.classified.get(exceptionClass); } C value = null; // 第二步:父类层次结构查找 Class<?> cls = exceptionClass; while (!cls.equals(Object.class) && value == null) { value = this.classified.get(cls); cls = cls.getSuperclass(); } // 第三步:接口继承结构查找 if (value == null) { cls = exceptionClass; while (!cls.equals(Object.class) && value == null) { for (Class<?> ifc : cls.getInterfaces()) { value = this.classified.get(ifc); if (value != null) break; } cls = cls.getSuperclass(); } } // 缓存匹配结果(如果找到) if (value != null) { this.classified.put(exceptionClass, value); } // 返回结果或默认值 return value != null ? value : this.defaultValue; } }
该策略在其他 SkipPolicy 的基础上,增加了跳过次数的限制检查。它会先委托给另一个 SkipPolicy 来决定是否跳过(通过构造函数传入),然后检查当前的跳过次数是否超过了设定的限制。如果超过限制,会抛出 SkipLimitExceededException 异常,从而阻止进一步跳过。
源码如下:
package org.springframework.batch.core.step.skip; import java.io.FileNotFoundException; import java.util.Collections; import java.util.Map; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecution; import org.springframework.batch.item.file.FlatFileParseException; import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.classify.Classifier; public class LimitCheckingItemSkipPolicy implements SkipPolicy { private int skipLimit; // 跳过次数限制 private Classifier<Throwable, Boolean> skippableExceptionClassifier; // 异常分类器 //...省略其他构造方法 /** * @param skipLimit the number of skippable exceptions that are allowed to * be skipped * @param skippableExceptionClassifier exception classifier for those that * can be skipped (non-critical) */ public LimitCheckingItemSkipPolicy(int skipLimit, Classifier<Throwable, Boolean> skippableExceptionClassifier) { this.skipLimit = skipLimit; this.skippableExceptionClassifier = skippableExceptionClassifier; } // ...省略成员变量设置方法... /** * 根据提供的异常和当前跳过计数,决定是否应该跳过当前异常继续处理。 * * 处理逻辑: * 1. 首先通过异常分类器判断该异常是否属于可跳过类型 * 2. 如果是可跳过异常: * - 检查当前跳过次数是否小于允许的最大跳过限制 * - 如果未超过限制,则允许跳过(返回true) * - 如果已超过限制,则抛出SkipLimitExceededException终止处理 * 3. 如果是不可跳过异常,直接返回false(不跳过) * * @param t 当前处理过程中抛出的异常(非空) * @param skipCount 当前步骤已经跳过的记录数(从0开始计数) * @return true表示允许跳过并继续处理,false表示不允许跳过 * @throws SkipLimitExceededException 当可跳过异常但跳过次数超过限制时抛出 */ @Override public boolean shouldSkip(Throwable t, int skipCount) { // 第一步:通过预配置的异常分类器判断当前异常是否可跳过 if (skippableExceptionClassifier.classify(t)) { // 第二步:检查跳过次数是否在允许范围内 if (skipCount < skipLimit) { // 情况1:可跳过且未达上限 - 允许跳过 return true; } else { // 情况2:可跳过但已达上限 - 抛出终止异常 throw new SkipLimitExceededException(skipLimit, t); } } else { // 情况3:不可跳过类型 - 拒绝跳过 return false; } } }
这个策略允许组合多个SkipPolicy。可以按照顺序依次调用多个SkipPolicy的shouldSkip方法,只要其中一个返回true,就会跳过当前项。通过这种方式,可以将多个不同的跳过逻辑组合起来,以满足复杂的业务需求。
源码如下:
package org.springframework.batch.core.step.skip; /** * @author Dave Syer */ public class CompositeSkipPolicy implements SkipPolicy { // 定义跳过策略数组 private SkipPolicy[] skipPolicies; public CompositeSkipPolicy() { this(new SkipPolicy[0]); } public CompositeSkipPolicy(SkipPolicy[] skipPolicies) { this.skipPolicies = skipPolicies; } public void setSkipPolicies(SkipPolicy[] skipPolicies) { this.skipPolicies = skipPolicies; } @Override public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException { // 遍历策略数组,逐一调用策略的 showSkip() 方法,任何一个跳过,则跳过该异常 for (SkipPolicy policy : skipPolicies) { if (policy.shouldSkip(t, skipCount)) { return true; } } return false; } }