Spring Batch4 教程

Spring Batch4 核心组件 ItemProcessor 介绍

在 Spring Batch4 中,ItemProcessor 是批处理流程中的核心组件之一,主要负责数据的转换、过滤和验证工作。它位于 ItemReader(数据读取)和 ItemWriter(数据写入)之间,形成了 "读取 → 处理  写入" 经典流程。如下图:

image.png

注意:ItemProcessor 数据处理环节可以省略,即可以不进行数据处理,直接读取数据,输出数据。

ItemProcessor 的核心作用

  • 数据转换:将读取到的原始数据转换为目标格式,例如:类型转换、字段映射、格式调整。

  • 数据过滤:根据业务规则筛选数据(返回 null 表示该数据将被过滤,不进入写入阶段)。

  • 数据验证:检查数据合法性,不合法的数据可被过滤或抛出异常。

  • 业务逻辑处理:实现复杂的业务规则和验证。

  • 数据增强:为原始数据添加额外信息。

ItemProcessor 接口定义

ItemProcessor 是一个泛型接口,定义如下:

package org.springframework.batch.item;

import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;

/**
 * 项目(Item)转换接口。给定一个输入项,该接口提供了一个扩展点,
 * 允许在面向项目的处理场景中应用业务逻辑。
 * 
 * 需要注意:
 * 1. 可以返回与提供类型不同的类型,但不是必须的。
 * 2. 返回 null 表示该项不应继续处理。
 *  
 * @author Robert Kasanicky
 * @author Dave Syer
 * @author Mahmoud Ben Hassine
 * 
 * @param <I> 输入项的类型
 * @param <O> 输出项的类型
 */
public interface ItemProcessor<I, O> {

    /**
     * 处理提供的项目,返回可能被修改或新的项目以继续处理。
     * 如果返回结果为 null,则表示该项的处理不应继续。
     * 
     * 该方法永远不会收到 null 的 item 参数,因为 null 可能的来源只有:
     * (1)ItemReader 返回 null,表示没有更多项目可读。
     * (2)组合处理器中前一个 ItemProcessor,表示该项已被过滤。
     *
     * @param item 要处理的项目,保证不为 {@code null}
     * @return 可能被修改或新的项目以继续处理,如果返回 null 则表示不应继续处理提供的项目。
     * @throws Exception 处理过程中发生异常时抛出
     */
    @Nullable
    O process(@NonNull I item) throws Exception;
}

该接口只提供了一个 process() 方法,用来处理数据。该方法的参数说明:

  • I:输入数据类型,通常是 ItemReader 读取的数据类型。

  • O:输出数据类型,通常是 ItemWriter 接收的数据类型。不一定要和 ItemReader 保持一致,可以是其他类型。处理后的对象,返回 null 表示该数据被过滤,即不会传递给 ItemWriter。

实现注意事项:

(1)该方法应保持无状态,以确保线程安全。

(2)抛出的任何异常都将导致当前事务回滚。

(3)复杂的业务逻辑应在此方法中实现。

(4)对于性能敏感的操作,应考虑批量处理。

  

ItemProcessor  示例

下面提供多个 ItemProcessor 简单示例。

基本数据转换

将输入数据转换为另一种格式或类型,例如:

/**
 * 将字符串转换为大写
 */
public class UpperCaseProcessor implements ItemProcessor<String, String> {
    @Override
    public String process(@NonNull String item) {
        return item.toUpperCase();
    }
}

或者

/**
 * 将 CSV 字符串转换为对象
 */
public class CsvToPersonProcessor implements ItemProcessor<String, Person> {
    @Override
    public Person process(@NonNull String csvLine) {
        String[] fields = csvLine.split(",");
        Person person = new Person();
        person.setName(fields[0]);
        person.setAge(Integer.parseInt(fields[1]));
        return person;
    }
}

数据过滤

通过返回 null 跳过不符合条件的记录,例如:

/**
 * 只处理偶数ID的记录
 */
public class EvenIdFilterProcessor implements ItemProcessor<Order, Order> {
    @Override
    public Order process(@NonNull Order order) {
        return order.getId() % 2 == 0 ? order : null;
    }
}

/**
 * 过滤掉无效邮箱
 */
public class EmailValidationProcessor implements ItemProcessor<User, User> {
    @Override
    public User process(@NonNull User user) {
        return user.getEmail().contains("@") ? user : null;
    }
}

数据增强

从外部系统获取附加信息,下面示例通过 ItemReader 读取数据的 ID 从数据库查询详细的用户信息,然后将用户的地址和电话设置到 User 返回。如下:

/**
 * 根据用户ID查询数据库补充用户详情
 */
public class UserEnrichmentProcessor implements ItemProcessor<User, User> {
    @Autowired
    private UserRepository userRepository;

    @Override
    public User process(@NonNull User basicUser) {
        UserDetail detail = userRepository.findDetailById(basicUser.getId());
        basicUser.setAddress(detail.getAddress());
        basicUser.setPhone(detail.getPhone());
        return basicUser;
    }
}

类型转换

转换对象类型,process() 方法可以返回与 ItemReader 不同的数据类型,下面将 Person 类型转换成 Employee 类型返回。如下:

/**
 * 将 Person 转换为 Employee
 */
public class PersonToEmployeeProcessor implements ItemProcessor<Person, Employee> {
    @Override
    public Employee process(@NonNull Person person) {
        Employee employee = new Employee();
        employee.setEmployeeId("EMP-" + person.getId());
        employee.setFullName(person.getFirstName() + " " + person.getLastName());
        return employee;
    }
}

组合操作

一个处理器内实现多个操作,如验证数据、转换格式、计算哈希值等等,如下:

/**
 * 1. 验证数据 -> 2. 转换格式 -> 3. 计算哈希值
 */
public class MultiStepProcessor implements ItemProcessor<RawData, ProcessedData> {
    @Override
    public ProcessedData process(@NonNull RawData raw) {
        // 1. 验证
        if (raw.getTimestamp() == null) {
            return null;
        }

        // 2. 转换
        ProcessedData processed = new ProcessedData();
        processed.setId(raw.getId().toUpperCase());
        processed.setValue(raw.getValue() * 100);

        // 3. 计算哈希
        processed.setHash(DigestUtils.md5Hex(processed.toString()));
        
        return processed;
    }
}

异常处理

process() 方法的异常处理是批处理流程稳定性的关键环节。异常处理策略需要根据业务需求(如是否跳过错误数据、是否重试)来设计,主要有以下几种方式:

(1)直接抛出异常(默认行为)

process() 方法声明了 throws Exception,允许直接抛出任何异常。默认情况下,抛出的异常会导致:

  • 当前当前批次(Chunk)处理中断

  • 事务回滚

  • 整个 Step 失败(除非配置了容错机制)

@Override
public User process(UserDTO item) throws Exception {
    if (item.getAge() == null) {
        // 直接抛出异常,终止处理
        throw new InvalidDataException("年龄不能为空");
    }
    // 正常处理逻辑...
    return user;
}

(2)配置跳过策略(Skip Policy)

通过 SkipPolicy 可忽略特定异常,让流程继续处理后续数据,适用于非致命错误(如数据格式错误)。

步骤 1:定义异常类型

// 自定义异常
public class InvalidDataException extends Exception {
    public InvalidDataException(String message) {
        super(message);
    }
}

步骤 2:在 Step 中配置跳过策略

@Bean
public Step myStep(StepBuilderFactory stepBuilderFactory) {
    return stepBuilderFactory.get("myStep")
            .<UserDTO, User>chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            // 配置可跳过的异常
            .faultTolerant() // 要先调用该方法
            .skip(InvalidDataException.class) // 跳过特定异常
            .skipLimit(100) // 最多跳过100条记录
            .build();
}

其中,skip() 方法定义如下:

public FaultTolerantStepBuilder<I, O> skip(Class<? extends Throwable> type) {
    skippableExceptionClasses.put(type, true);
    return this;
}

注意,除了通过配置跳过策略,我们也可以自行进行处理,捕获特定类型异常,然后返回 null 进行过滤。如下,捕获 NumberFormatException 类型异常,然后返回 null 过滤数据。如下:

/**
 * 处理可能格式错误的数据
 */
public class SafeNumberProcessor implements ItemProcessor<String, Integer> {
    @Override
    public Integer process(@NonNull String item) {
        try {
            // 转换为数字
            return Integer.parseInt(item.trim());
        } catch (NumberFormatException e) {
            // 如果不是数字,则跳过处理
            log.warn("Invalid number format: {}", item);
            return null; // 跳过无效数据
        }
    }
}

更多 ItemProcess 处理请参考官方文档。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号