在 Spring Batch4 中,ItemProcessor 是批处理流程中的核心组件之一,主要负责数据的转换、过滤和验证工作。它位于 ItemReader(数据读取)和 ItemWriter(数据写入)之间,形成了 "读取 → 处理 → 写入" 经典流程。如下图:

注意:ItemProcessor 数据处理环节可以省略,即可以不进行数据处理,直接读取数据,输出数据。
数据转换:将读取到的原始数据转换为目标格式,例如:类型转换、字段映射、格式调整。
数据过滤:根据业务规则筛选数据(返回 null 表示该数据将被过滤,不进入写入阶段)。
数据验证:检查数据合法性,不合法的数据可被过滤或抛出异常。
业务逻辑处理:实现复杂的业务规则和验证。
数据增强:为原始数据添加额外信息。
ItemProcessor 是一个泛型接口,定义如下:
package org.springframework.batch.item;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
/**
* 项目(Item)转换接口。给定一个输入项,该接口提供了一个扩展点,
* 允许在面向项目的处理场景中应用业务逻辑。
*
* 需要注意:
* 1. 可以返回与提供类型不同的类型,但不是必须的。
* 2. 返回 null 表示该项不应继续处理。
*
* @author Robert Kasanicky
* @author Dave Syer
* @author Mahmoud Ben Hassine
*
* @param <I> 输入项的类型
* @param <O> 输出项的类型
*/
public interface ItemProcessor<I, O> {
/**
* 处理提供的项目,返回可能被修改或新的项目以继续处理。
* 如果返回结果为 null,则表示该项的处理不应继续。
*
* 该方法永远不会收到 null 的 item 参数,因为 null 可能的来源只有:
* (1)ItemReader 返回 null,表示没有更多项目可读。
* (2)组合处理器中前一个 ItemProcessor,表示该项已被过滤。
*
* @param item 要处理的项目,保证不为 {@code null}
* @return 可能被修改或新的项目以继续处理,如果返回 null 则表示不应继续处理提供的项目。
* @throws Exception 处理过程中发生异常时抛出
*/
@Nullable
O process(@NonNull I item) throws Exception;
}该接口只提供了一个 process() 方法,用来处理数据。该方法的参数说明:
I:输入数据类型,通常是 ItemReader 读取的数据类型。
O:输出数据类型,通常是 ItemWriter 接收的数据类型。不一定要和 ItemReader 保持一致,可以是其他类型。处理后的对象,返回 null 表示该数据被过滤,即不会传递给 ItemWriter。
实现注意事项:
(1)该方法应保持无状态,以确保线程安全。
(2)抛出的任何异常都将导致当前事务回滚。
(3)复杂的业务逻辑应在此方法中实现。
(4)对于性能敏感的操作,应考虑批量处理。
下面提供多个 ItemProcessor 简单示例。
将输入数据转换为另一种格式或类型,例如:
/**
* 将字符串转换为大写
*/
public class UpperCaseProcessor implements ItemProcessor<String, String> {
@Override
public String process(@NonNull String item) {
return item.toUpperCase();
}
}或者
/**
* 将 CSV 字符串转换为对象
*/
public class CsvToPersonProcessor implements ItemProcessor<String, Person> {
@Override
public Person process(@NonNull String csvLine) {
String[] fields = csvLine.split(",");
Person person = new Person();
person.setName(fields[0]);
person.setAge(Integer.parseInt(fields[1]));
return person;
}
}通过返回 null 跳过不符合条件的记录,例如:
/**
* 只处理偶数ID的记录
*/
public class EvenIdFilterProcessor implements ItemProcessor<Order, Order> {
@Override
public Order process(@NonNull Order order) {
return order.getId() % 2 == 0 ? order : null;
}
}
/**
* 过滤掉无效邮箱
*/
public class EmailValidationProcessor implements ItemProcessor<User, User> {
@Override
public User process(@NonNull User user) {
return user.getEmail().contains("@") ? user : null;
}
}从外部系统获取附加信息,下面示例通过 ItemReader 读取数据的 ID 从数据库查询详细的用户信息,然后将用户的地址和电话设置到 User 返回。如下:
/**
* 根据用户ID查询数据库补充用户详情
*/
public class UserEnrichmentProcessor implements ItemProcessor<User, User> {
@Autowired
private UserRepository userRepository;
@Override
public User process(@NonNull User basicUser) {
UserDetail detail = userRepository.findDetailById(basicUser.getId());
basicUser.setAddress(detail.getAddress());
basicUser.setPhone(detail.getPhone());
return basicUser;
}
}转换对象类型,process() 方法可以返回与 ItemReader 不同的数据类型,下面将 Person 类型转换成 Employee 类型返回。如下:
/**
* 将 Person 转换为 Employee
*/
public class PersonToEmployeeProcessor implements ItemProcessor<Person, Employee> {
@Override
public Employee process(@NonNull Person person) {
Employee employee = new Employee();
employee.setEmployeeId("EMP-" + person.getId());
employee.setFullName(person.getFirstName() + " " + person.getLastName());
return employee;
}
}一个处理器内实现多个操作,如验证数据、转换格式、计算哈希值等等,如下:
/**
* 1. 验证数据 -> 2. 转换格式 -> 3. 计算哈希值
*/
public class MultiStepProcessor implements ItemProcessor<RawData, ProcessedData> {
@Override
public ProcessedData process(@NonNull RawData raw) {
// 1. 验证
if (raw.getTimestamp() == null) {
return null;
}
// 2. 转换
ProcessedData processed = new ProcessedData();
processed.setId(raw.getId().toUpperCase());
processed.setValue(raw.getValue() * 100);
// 3. 计算哈希
processed.setHash(DigestUtils.md5Hex(processed.toString()));
return processed;
}
}process() 方法的异常处理是批处理流程稳定性的关键环节。异常处理策略需要根据业务需求(如是否跳过错误数据、是否重试)来设计,主要有以下几种方式:
(1)直接抛出异常(默认行为)
process() 方法声明了 throws Exception,允许直接抛出任何异常。默认情况下,抛出的异常会导致:
当前当前批次(Chunk)处理中断
事务回滚
整个 Step 失败(除非配置了容错机制)
@Override
public User process(UserDTO item) throws Exception {
if (item.getAge() == null) {
// 直接抛出异常,终止处理
throw new InvalidDataException("年龄不能为空");
}
// 正常处理逻辑...
return user;
}(2)配置跳过策略(Skip Policy)
通过 SkipPolicy 可忽略特定异常,让流程继续处理后续数据,适用于非致命错误(如数据格式错误)。
步骤 1:定义异常类型
// 自定义异常
public class InvalidDataException extends Exception {
public InvalidDataException(String message) {
super(message);
}
}步骤 2:在 Step 中配置跳过策略
@Bean
public Step myStep(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("myStep")
.<UserDTO, User>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer())
// 配置可跳过的异常
.faultTolerant() // 要先调用该方法
.skip(InvalidDataException.class) // 跳过特定异常
.skipLimit(100) // 最多跳过100条记录
.build();
}其中,skip() 方法定义如下:
public FaultTolerantStepBuilder<I, O> skip(Class<? extends Throwable> type) {
skippableExceptionClasses.put(type, true);
return this;
}注意,除了通过配置跳过策略,我们也可以自行进行处理,捕获特定类型异常,然后返回 null 进行过滤。如下,捕获 NumberFormatException 类型异常,然后返回 null 过滤数据。如下:
/**
* 处理可能格式错误的数据
*/
public class SafeNumberProcessor implements ItemProcessor<String, Integer> {
@Override
public Integer process(@NonNull String item) {
try {
// 转换为数字
return Integer.parseInt(item.trim());
} catch (NumberFormatException e) {
// 如果不是数字,则跳过处理
log.warn("Invalid number format: {}", item);
return null; // 跳过无效数据
}
}
}更多 ItemProcess 处理请参考官方文档。