Spring Batch4 教程

Spring Batch4 跳过策略 SkipPolicy 接口

在 Spring Batch4 中,SkipPolicy 接口及其子类是用于实现跳过策略,以处理批处理过程中遇到的异常情况。下文将对 SkipPolicy 接口以及子类进行简单介绍,它们的类关系如下图:

image.png

SkipPolicy 接口

SkipPolicy 接口位于 org.springframework.batch.core.step.skip 包下,它定义了批处理作业在遇到异常时,决定是否跳过当前处理项的策略。该接口只有一个方法:

package org.springframework.batch.core.step.skip;

/**
 * 决定批处理过程中是否应该跳过某个异常的策略接口。
 * 
 * 使用场景:
 * 1. 需要根据异常类型动态决定是否跳过
 * 2. 需要根据已跳过次数做决策
 * 3. 需要实现复杂的跳过逻辑
 * 
 * 实现要求:
 * - 应保持无状态(线程安全)
 * - 不应抛出未声明的异常
 * - 应正确处理 skipCount < 0 的探测情况
 * 
 * @author Lucas Ward
 * @author Dave Syer
 * @author Mahmoud Ben Hassine
 */
public interface SkipPolicy {

    /**
     * 判断是否应该跳过当前异常并继续处理。
     * 
     * @param t 处理过程中遇到的异常,不允许为 null
     * @param skipCount 当前步骤已跳过的记录数(从 0 开始)
     * @return true 表示应该跳过并继续处理,false 表示不应跳过
     * @throws SkipLimitExceededException 当跳过次数超过限制时抛出
     * @throws IllegalArgumentException 当参数 t 为 null 时抛出
     * 
     * 方法契约:
     * 1. 实现必须能够处理 skipCount<0 的情况(用于探测可跳过异常类型)
     * 2. 实现不应抛出未声明的检查异常
     * 3. 实现应快速失败(对 null 参数立即抛出 IllegalArgumentException)
     * 
     * 典型实现示例:
     * // 基于异常类型的简单跳过策略
     * public boolean shouldSkip(Throwable t, int skipCount) {
     *     if (t instanceof DataFormatException) {
     *         return true;
     *     }
     *     return false;
     * }
     * 
     * // 带跳过次数限制的策略
     * public boolean shouldSkip(Throwable t, int skipCount) {
     *     if (skipCount >= maxSkips) {
     *         throw new SkipLimitExceededException(maxSkips, t);
     *     }
     *     return skippableExceptions.contains(t.getClass());
     * }
     */
    boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException;
}

shouldSkip() 方法说明,其中:

参数:

  • t:表示在处理过程中抛出的异常对象。

  • skipCount:表示到目前为止已经跳过的项数。

返回值:

  • true 表示应该跳过当前项,继续处理后续项;

  • false 表示不应该跳过,通常会导致作业失败或采取其他处理方式(如重试)。

异常:

  • SkipLimitExceededException  当跳过的项数超过了预先设定的跳过限制时抛出。

SkipPolicy 子类

从上面的类关系图可知,SkipPolicy 接口提供了5个内置的实现,分别如下:

NeverSkipItemSkipPolicy

该策略实现表示永远不会跳过任何项。无论遇到什么异常,shouldSkip() 方法都会返回 false,即遇到异常就会导致作业失败,除非有其他错误处理机制(如重试)介入。

源码如下:

package org.springframework.batch.core.step.skip;


/**
 * {@link SkipPolicy} implementation that always returns false,
 * indicating that an item should not be skipped.
 *
 * @author Lucas Ward
 */
public class NeverSkipItemSkipPolicy implements SkipPolicy{

    @Override
    public boolean shouldSkip(Throwable t, int skipCount) {
       return false;
    }

}

AlwaysSkipItemSkipPolicy

此策略实现总是会跳过遇到异常的项。无论抛出何种异常,shouldSkip() 方法始终返回 true,这意味着批处理作业会跳过有问题的项,继续处理后续的项。

源码如下:

package org.springframework.batch.core.step.skip;


/**
 * Implementation of the {@link SkipPolicy} interface that
 * will always return that an item should be skipped.
 *
 * @author Ben Hale
 * @author Lucas Ward
 */
public class AlwaysSkipItemSkipPolicy implements SkipPolicy {

    @Override
    public boolean shouldSkip(Throwable t, int skipCount) {
       return true;
    }

}

ExceptionClassifierSkipPolicy

该策略通过 ExceptionClassifier 来决定是否跳过。可以根据不同的异常类型配置不同的跳过策略。例如,你可以配置当遇到某种解析异常时跳过,遇到数据完整性异常时不跳过。它允许根据异常的具体类型进行更细粒度的控制,比简单的 “总是跳过” 或 “从不跳过” 更加灵活。

源码如下:

package org.springframework.batch.core.step.skip;

import java.util.Map;
import org.springframework.classify.Classifier;
import org.springframework.classify.SubclassClassifier;

/**
 * 基于异常分类器的动态跳过策略实现。
 * 
 * 核心机制:
 * 1. 使用SubclassClassifier根据异常类型动态选择具体的SkipPolicy
 * 2. 支持两种配置方式:直接注入分类器或通过Map配置
 * 3. 默认使用NeverSkipItemSkipPolicy作为未匹配异常的默认策略
 * 
 * @author Dave Syer
 * @see SubclassClassifier 用于异常类型分类的核心组件
 */
public class ExceptionClassifierSkipPolicy implements SkipPolicy {

    /**
     * 异常分类器,维护Throwable类型到SkipPolicy的映射关系
     */
    private SubclassClassifier<Throwable, SkipPolicy> classifier;

    /**
     * 设置异常分类器(直接注入方式)
     * 
     * @param classifier 预配置好的异常分类器实例
     */
    public void setExceptionClassifier(SubclassClassifier<Throwable, SkipPolicy> classifier) {
       this.classifier = classifier;
    }

    /**
     * 通过策略Map配置分类器(声明式配置方式)
     * 
     * @param policyMap 异常类型与策略的映射Map
     */
    public void setPolicyMap(Map<Class<? extends Throwable>, SkipPolicy> policyMap) {
       this.classifier = new SubclassClassifier<>(
             policyMap, new NeverSkipItemSkipPolicy());
    }

    /**
     * 核心跳过决策方法
     * 
     * 执行流程:
     * 1. 通过分类器根据异常类型查找对应的SkipPolicy
     * 2. 委托给具体策略执行实际判断
     * 
     * @param t 发生的异常对象(非空)
     * @param skipCount 当前步骤已跳过的记录数(从0开始)
     * @return true允许跳过,false拒绝跳过
     * @throws SkipLimitExceededException 当具体策略判断跳过次数超限时抛出
     */
    @Override
    public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {
       return classifier.classify(t).shouldSkip(t, skipCount);
    }

    //...
}

下面是 SubclassClassifier 类的源码,了解 classify() 方法,如下:

import java.util.*;
import java.util.concurrent.*;

/**
 * 基于类层次结构的分类器实现,支持按照类/父类/接口进行分类匹配。
 * 
 * @param <T> 输入对象的类型(通常为Throwable)
 * @param <C> 分类结果的类型(通常为Boolean或SkipPolicy)
 */
public class SubclassClassifier<T, C> implements Classifier<T, C> {
    
    // 存储类类型与分类结果的映射关系(线程安全)
    private ConcurrentMap<Class<? extends T>, C> classified;
    
    // 默认分类结果(当无匹配时返回)
    private C defaultValue;

    //....

    /**
     * 使用预定义映射和默认值创建分类器
     * @param typeMap 初始类型映射表
     * @param defaultValue 默认分类结果
     */
    public SubclassClassifier(Map<Class<? extends T>, C> typeMap, C defaultValue) {
        this.classified = new ConcurrentHashMap<>();
        this.defaultValue = defaultValue;
        this.classified.putAll(typeMap); // 线程安全地初始化
    }

    
    /**
     * 执行分类操作,查找顺序:
     * 1. 精确类型匹配
     * 2. 父类层次结构匹配(直到Object类)
     * 3. 接口继承结构匹配
     * 4. 返回默认值
     * 
     * 注意:匹配结果会被缓存以提升后续性能
     * 
     * @param classifiable 需要分类的对象(允许为null)
     * @return 分类结果或默认值
     */
    public C classify(T classifiable) {
        // 空对象处理
        if (classifiable == null) {
            return this.defaultValue;
        }

        Class<? extends T> exceptionClass = classifiable.getClass();
        
        // 第一步:精确类型匹配
        if (this.classified.containsKey(exceptionClass)) {
            return this.classified.get(exceptionClass);
        }

        C value = null;

        // 第二步:父类层次结构查找
        Class<?> cls = exceptionClass;
        while (!cls.equals(Object.class) && value == null) {
            value = this.classified.get(cls);
            cls = cls.getSuperclass();
        }

        // 第三步:接口继承结构查找
        if (value == null) {
            cls = exceptionClass;
            while (!cls.equals(Object.class) && value == null) {
                for (Class<?> ifc : cls.getInterfaces()) {
                    value = this.classified.get(ifc);
                    if (value != null) break;
                }
                cls = cls.getSuperclass();
            }
        }

        // 缓存匹配结果(如果找到)
        if (value != null) {
            this.classified.put(exceptionClass, value);
        }

        // 返回结果或默认值
        return value != null ? value : this.defaultValue;
    }
}

LimitCheckingItemSkipPolicy

该策略在其他 SkipPolicy 的基础上,增加了跳过次数的限制检查。它会先委托给另一个 SkipPolicy 来决定是否跳过(通过构造函数传入),然后检查当前的跳过次数是否超过了设定的限制。如果超过限制,会抛出 SkipLimitExceededException 异常,从而阻止进一步跳过。

源码如下:

package org.springframework.batch.core.step.skip;

import java.io.FileNotFoundException;
import java.util.Collections;
import java.util.Map;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.file.FlatFileParseException;
import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.classify.Classifier;

public class LimitCheckingItemSkipPolicy implements SkipPolicy {

    private int skipLimit; // 跳过次数限制

    private Classifier<Throwable, Boolean> skippableExceptionClassifier; // 异常分类器

    //...省略其他构造方法

    /**
     * @param skipLimit the number of skippable exceptions that are allowed to
     * be skipped
     * @param skippableExceptionClassifier exception classifier for those that
     * can be skipped (non-critical)
     */
    public LimitCheckingItemSkipPolicy(int skipLimit, Classifier<Throwable, Boolean> skippableExceptionClassifier) {
       this.skipLimit = skipLimit;
       this.skippableExceptionClassifier = skippableExceptionClassifier;
    }

    // ...省略成员变量设置方法...

    /**
     * 根据提供的异常和当前跳过计数,决定是否应该跳过当前异常继续处理。
     * 
     * 处理逻辑:
     * 1. 首先通过异常分类器判断该异常是否属于可跳过类型
     * 2. 如果是可跳过异常:
     *    - 检查当前跳过次数是否小于允许的最大跳过限制
     *    - 如果未超过限制,则允许跳过(返回true)
     *    - 如果已超过限制,则抛出SkipLimitExceededException终止处理
     * 3. 如果是不可跳过异常,直接返回false(不跳过)
     *
     * @param t 当前处理过程中抛出的异常(非空)
     * @param skipCount 当前步骤已经跳过的记录数(从0开始计数)
     * @return true表示允许跳过并继续处理,false表示不允许跳过
     * @throws SkipLimitExceededException 当可跳过异常但跳过次数超过限制时抛出
     */
    @Override
    public boolean shouldSkip(Throwable t, int skipCount) {
        // 第一步:通过预配置的异常分类器判断当前异常是否可跳过
        if (skippableExceptionClassifier.classify(t)) {
            // 第二步:检查跳过次数是否在允许范围内
            if (skipCount < skipLimit) {
                // 情况1:可跳过且未达上限 - 允许跳过
                return true;
            }
            else {
                // 情况2:可跳过但已达上限 - 抛出终止异常
                throw new SkipLimitExceededException(skipLimit, t);
            }
        }
        else {
            // 情况3:不可跳过类型 - 拒绝跳过
            return false;
        }
    }

}

CompositeSkipPolicy

这个策略允许组合多个SkipPolicy。可以按照顺序依次调用多个SkipPolicy的shouldSkip方法,只要其中一个返回true,就会跳过当前项。通过这种方式,可以将多个不同的跳过逻辑组合起来,以满足复杂的业务需求。

源码如下:

package org.springframework.batch.core.step.skip;

/**
 * @author Dave Syer
 */
public class CompositeSkipPolicy implements SkipPolicy {
    // 定义跳过策略数组
    private SkipPolicy[] skipPolicies;

    public CompositeSkipPolicy() {
       this(new SkipPolicy[0]);
    }

    public CompositeSkipPolicy(SkipPolicy[] skipPolicies) {
       this.skipPolicies = skipPolicies;
    }

    public void setSkipPolicies(SkipPolicy[] skipPolicies) {
       this.skipPolicies = skipPolicies;
    }

    @Override
    public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {
       // 遍历策略数组,逐一调用策略的 showSkip() 方法,任何一个跳过,则跳过该异常
       for (SkipPolicy policy : skipPolicies) {
          if (policy.shouldSkip(t, skipCount)) {
             return true;
          }
       }
       return false;
    }

}

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号