在 Spring Batch4 中,ItemProcessor 是批处理流程中的核心组件之一,主要负责数据的转换、过滤和验证工作。它位于 ItemReader(数据读取)和 ItemWriter(数据写入)之间,形成了 "读取 → 处理 → 写入" 经典流程。如下图:
注意:ItemProcessor 数据处理环节可以省略,即可以不进行数据处理,直接读取数据,输出数据。
数据转换:将读取到的原始数据转换为目标格式,例如:类型转换、字段映射、格式调整。
数据过滤:根据业务规则筛选数据(返回 null 表示该数据将被过滤,不进入写入阶段)。
数据验证:检查数据合法性,不合法的数据可被过滤或抛出异常。
业务逻辑处理:实现复杂的业务规则和验证。
数据增强:为原始数据添加额外信息。
ItemProcessor 是一个泛型接口,定义如下:
package org.springframework.batch.item; import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; /** * 项目(Item)转换接口。给定一个输入项,该接口提供了一个扩展点, * 允许在面向项目的处理场景中应用业务逻辑。 * * 需要注意: * 1. 可以返回与提供类型不同的类型,但不是必须的。 * 2. 返回 null 表示该项不应继续处理。 * * @author Robert Kasanicky * @author Dave Syer * @author Mahmoud Ben Hassine * * @param <I> 输入项的类型 * @param <O> 输出项的类型 */ public interface ItemProcessor<I, O> { /** * 处理提供的项目,返回可能被修改或新的项目以继续处理。 * 如果返回结果为 null,则表示该项的处理不应继续。 * * 该方法永远不会收到 null 的 item 参数,因为 null 可能的来源只有: * (1)ItemReader 返回 null,表示没有更多项目可读。 * (2)组合处理器中前一个 ItemProcessor,表示该项已被过滤。 * * @param item 要处理的项目,保证不为 {@code null} * @return 可能被修改或新的项目以继续处理,如果返回 null 则表示不应继续处理提供的项目。 * @throws Exception 处理过程中发生异常时抛出 */ @Nullable O process(@NonNull I item) throws Exception; }
该接口只提供了一个 process() 方法,用来处理数据。该方法的参数说明:
I:输入数据类型,通常是 ItemReader 读取的数据类型。
O:输出数据类型,通常是 ItemWriter 接收的数据类型。不一定要和 ItemReader 保持一致,可以是其他类型。处理后的对象,返回 null 表示该数据被过滤,即不会传递给 ItemWriter。
实现注意事项:
(1)该方法应保持无状态,以确保线程安全。
(2)抛出的任何异常都将导致当前事务回滚。
(3)复杂的业务逻辑应在此方法中实现。
(4)对于性能敏感的操作,应考虑批量处理。
下面提供多个 ItemProcessor 简单示例。
将输入数据转换为另一种格式或类型,例如:
/** * 将字符串转换为大写 */ public class UpperCaseProcessor implements ItemProcessor<String, String> { @Override public String process(@NonNull String item) { return item.toUpperCase(); } }
或者
/** * 将 CSV 字符串转换为对象 */ public class CsvToPersonProcessor implements ItemProcessor<String, Person> { @Override public Person process(@NonNull String csvLine) { String[] fields = csvLine.split(","); Person person = new Person(); person.setName(fields[0]); person.setAge(Integer.parseInt(fields[1])); return person; } }
通过返回 null 跳过不符合条件的记录,例如:
/** * 只处理偶数ID的记录 */ public class EvenIdFilterProcessor implements ItemProcessor<Order, Order> { @Override public Order process(@NonNull Order order) { return order.getId() % 2 == 0 ? order : null; } } /** * 过滤掉无效邮箱 */ public class EmailValidationProcessor implements ItemProcessor<User, User> { @Override public User process(@NonNull User user) { return user.getEmail().contains("@") ? user : null; } }
从外部系统获取附加信息,下面示例通过 ItemReader 读取数据的 ID 从数据库查询详细的用户信息,然后将用户的地址和电话设置到 User 返回。如下:
/** * 根据用户ID查询数据库补充用户详情 */ public class UserEnrichmentProcessor implements ItemProcessor<User, User> { @Autowired private UserRepository userRepository; @Override public User process(@NonNull User basicUser) { UserDetail detail = userRepository.findDetailById(basicUser.getId()); basicUser.setAddress(detail.getAddress()); basicUser.setPhone(detail.getPhone()); return basicUser; } }
转换对象类型,process() 方法可以返回与 ItemReader 不同的数据类型,下面将 Person 类型转换成 Employee 类型返回。如下:
/** * 将 Person 转换为 Employee */ public class PersonToEmployeeProcessor implements ItemProcessor<Person, Employee> { @Override public Employee process(@NonNull Person person) { Employee employee = new Employee(); employee.setEmployeeId("EMP-" + person.getId()); employee.setFullName(person.getFirstName() + " " + person.getLastName()); return employee; } }
一个处理器内实现多个操作,如验证数据、转换格式、计算哈希值等等,如下:
/** * 1. 验证数据 -> 2. 转换格式 -> 3. 计算哈希值 */ public class MultiStepProcessor implements ItemProcessor<RawData, ProcessedData> { @Override public ProcessedData process(@NonNull RawData raw) { // 1. 验证 if (raw.getTimestamp() == null) { return null; } // 2. 转换 ProcessedData processed = new ProcessedData(); processed.setId(raw.getId().toUpperCase()); processed.setValue(raw.getValue() * 100); // 3. 计算哈希 processed.setHash(DigestUtils.md5Hex(processed.toString())); return processed; } }
process() 方法的异常处理是批处理流程稳定性的关键环节。异常处理策略需要根据业务需求(如是否跳过错误数据、是否重试)来设计,主要有以下几种方式:
(1)直接抛出异常(默认行为)
process() 方法声明了 throws Exception,允许直接抛出任何异常。默认情况下,抛出的异常会导致:
当前当前批次(Chunk)处理中断
事务回滚
整个 Step 失败(除非配置了容错机制)
@Override public User process(UserDTO item) throws Exception { if (item.getAge() == null) { // 直接抛出异常,终止处理 throw new InvalidDataException("年龄不能为空"); } // 正常处理逻辑... return user; }
(2)配置跳过策略(Skip Policy)
通过 SkipPolicy 可忽略特定异常,让流程继续处理后续数据,适用于非致命错误(如数据格式错误)。
步骤 1:定义异常类型
// 自定义异常 public class InvalidDataException extends Exception { public InvalidDataException(String message) { super(message); } }
步骤 2:在 Step 中配置跳过策略
@Bean public Step myStep(StepBuilderFactory stepBuilderFactory) { return stepBuilderFactory.get("myStep") .<UserDTO, User>chunk(10) .reader(reader()) .processor(processor()) .writer(writer()) // 配置可跳过的异常 .faultTolerant() // 要先调用该方法 .skip(InvalidDataException.class) // 跳过特定异常 .skipLimit(100) // 最多跳过100条记录 .build(); }
其中,skip() 方法定义如下:
public FaultTolerantStepBuilder<I, O> skip(Class<? extends Throwable> type) { skippableExceptionClasses.put(type, true); return this; }
注意,除了通过配置跳过策略,我们也可以自行进行处理,捕获特定类型异常,然后返回 null 进行过滤。如下,捕获 NumberFormatException 类型异常,然后返回 null 过滤数据。如下:
/** * 处理可能格式错误的数据 */ public class SafeNumberProcessor implements ItemProcessor<String, Integer> { @Override public Integer process(@NonNull String item) { try { // 转换为数字 return Integer.parseInt(item.trim()); } catch (NumberFormatException e) { // 如果不是数字,则跳过处理 log.warn("Invalid number format: {}", item); return null; // 跳过无效数据 } } }
更多 ItemProcess 处理请参考官方文档。