在 Spring Batch 4 中,CompositeItemProcessor 是一个特殊的 ItemProcessor 实现,用于将多个处理器按顺序组合成一个处理链,实现复杂的数据处理流程。它允许开发者将不同的处理逻辑拆分到独立的处理器中,再通过组合方式协同工作,符合 "单一职责" 设计原则。
工作原理如下图:
上图中,处理器1直接接收 ItemReader 读取的原始数据,处理后输出的数据作为处理器2的输入数据,以此类推,直到所有的处理器都执行完成,最后将数据返回给 ItemWriter 写出到目的地。查看 CompositeItemProcessor 的核心代码:
public O process(I item) throws Exception { Object result = item; // 原始结果 for (ItemProcessor<?, ?> delegate : delegates) { if (result == null) { return null; // 如果其中某个处理器返回 null,则直接退出,过滤掉这条数据 } // 调用处理器,将上个处理器的结果作为参数传递给它(第一个处理器是原始数据) result = processItem(delegate, result); } return (O) result; // 返回最后处理结果 }
处理器链式调用:按顺序执行多个 ItemProcessor,前一个处理器的输出作为下一个的输入。
责任分离:将数据验证、转换、过滤等逻辑拆分到不同处理器,提高代码复用性和可维护性。
灵活组合:动态调整处理器顺序或增减处理器,适应不同业务场景。
下面是 CompositeItemProcessor 的源码,源码较少,容易读懂。如下:
package org.springframework.batch.item.support; import org.springframework.batch.item.ItemProcessor; import org.springframework.beans.factory.InitializingBean; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import java.util.List; /** * 组合式 ItemProcessor 实现,将项目依次通过注入的处理器链进行转换 * (前一个处理器的输出作为下一个处理器的输入)。 * * 注意:使用者需要确保注入的处理器链的输入/输出类型匹配声明类型。 * * 典型使用场景: * 1. 需要多个处理步骤串联执行的复杂业务逻辑 * 2. 分阶段的数据转换和验证流程 * 3. 可配置的处理管道 * * 实现特性: * 1. 自动传播 null 值(某个处理器返回 null 时立即终止处理链) * 2. 支持泛型类型转换 * 3. 提供基础的参数校验 * * @author Robert Kasanicky */ public class CompositeItemProcessor<I, O> implements ItemProcessor<I, O>, InitializingBean { /** * 处理器委托链,按顺序执行 * * 类型说明: * ? extends ItemProcessor<?, ?> 表示允许任何ItemProcessor的子类型, * 且其输入输出类型可以是任意匹配的类型 */ private List<? extends ItemProcessor<?, ?>> delegates; /** * 处理项目,依次通过所有委托处理器 * * @param item 要处理的输入项(类型 I) * @return 最终处理结果(类型 O),如果任何处理器返回null则整体返回null * @throws Exception 如果任何处理器抛出异常 * * 实现说明: * 使用中间变量 Object result 保存处理过程中的类型转换 */ @Nullable @Override @SuppressWarnings("unchecked") public O process(I item) throws Exception { Object result = item; // 初始值为输入项 // 依次执行每个处理器 for (ItemProcessor<?, ?> delegate : delegates) { if (result == null) { // 如果中间结果为null,立即终止处理链 return null; } // 调用当前处理器处理中间结果 result = processItem(delegate, result); } // 将最终结果转换为声明的输出类型 return (O) result; } /** * 处理单个处理器阶段的辅助方法 * * 解决泛型通配符捕获问题(参见Java泛型教程中的通配符捕获部分) * * @param <T> 处理器输入类型的推断类型 * @param processor 当前处理器 * @param input 要处理的输入 * @return 处理结果 * @throws Exception 如果处理器抛出异常 * * 技术说明: * 此方法通过独立类型参数<T>解决编译器无法推断通配符具体类型的问题 */ @SuppressWarnings("unchecked") private <T> Object processItem(ItemProcessor<T, ?> processor, Object input) throws Exception { return processor.process((T) input); } /** * Spring属性设置后的校验方法 * * @throws Exception 如果委托链为空或null * * 校验逻辑: * 1. delegates不能为null * 2. delegates不能是空列表 */ @Override public void afterPropertiesSet() throws Exception { Assert.notNull(delegates, "The 'delegates' may not be null"); Assert.notEmpty(delegates, "The 'delegates' may not be empty"); } /** * 设置处理器委托链 * * @param delegates 要使用的处理器列表,顺序决定执行顺序 * * 使用示例: * List<ItemProcessor<?,?>> processors = new ArrayList<>(); * processors.add(processor1); * processors.add(processor2); * compositeProcessor.setDelegates(processors); */ public void setDelegates(List<? extends ItemProcessor<?, ?>> delegates) { this.delegates = delegates; } }
我们依然通过在 resources 目录下面创建的 users.csv 文件提供数据,内容如下:
"id","username","password" 1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A" 2,李四,"5E5994FBCFA922D804DF45295AE98604" 3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E" 4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"
创建一个 Java POJO 类,用来映射从 CSV 读取的每行数据,如下:
package com.hxstrive.spring_batch.compositeItemProcessorDemo.dto; import lombok.Data; import lombok.ToString; /** * 用户DTO * @author hxstrive.com */ @Data @ToString public class User { private int id; private String username; private String password; }
使用 @Configuration 注解创建配置类,创建组合式(CompositeItemProcessor )ItemProcessor Bean,将多个处理器按顺序串联执行,代码如下:
package com.hxstrive.spring_batch.compositeItemProcessorDemo.config; import com.hxstrive.spring_batch.compositeItemProcessorDemo.dto.User; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.batch.item.support.CompositeItemProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.validation.BindException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; // 创建Job对象 @Bean public Job compositeItemProcessorDemoJob() { return jobBuilderFactory.get("compositeItemProcessorDemoJob-" + System.currentTimeMillis()) .start(compositeItemProcessorDemoStep()) .build(); } // 创建Step对象 @Bean public Step compositeItemProcessorDemoStep() { return stepBuilderFactory.get("compositeItemProcessorDemoStep") .<User, User>chunk(2) .reader(flatFileItemReader()) .processor(compositeItemProcessor()) .writer(new ItemWriter<User>() { @Override public void write(List<? extends User> items) throws Exception { System.out.println(Arrays.toString(items.toArray())); } }) .build(); } @Bean public ItemProcessor<User,User> compositeItemProcessor() { CompositeItemProcessor<User, User> compositeItemProcessor = new CompositeItemProcessor<>(); compositeItemProcessor.setDelegates(new ArrayList<ItemProcessor<User,User>>(){{ add(itemProcessor()); add(itemProcessor2()); }}); return compositeItemProcessor; } @Bean public ItemProcessor<User, User> itemProcessor() { return new ItemProcessor<User, User>() { @Override public User process(User item) throws Exception { System.out.println("itemProcessor() " + item); item.setPassword(item.getPassword().toLowerCase()); return item; } }; } @Bean public ItemProcessor<User, User> itemProcessor2() { return new ItemProcessor<User, User>() { @Override public User process(User item) throws Exception { System.out.println("itemProcessor2() " + item); if(item.getId() % 2 == 0) { return item; } else { return null; } } }; } // 创建ItemReader对象 @Bean @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定 public FlatFileItemReader<? extends User> flatFileItemReader() { FlatFileItemReader<User> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("users.csv")); reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名 // 解析数据 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames("id", "username", "password"); // 把解析出的数据映射到 User 对象中 DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){ @Override public User mapFieldSet(FieldSet fieldSet) throws BindException { User user = new User(); user.setId(fieldSet.readInt("id")); user.setUsername(fieldSet.readString("username")); user.setPassword(fieldSet.readString("password")); System.out.println("reading user: " + user); return user; } }); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; } }
上述代码,重点关注 compositeItemProcessor() 方法,该方法创建组合式 ItemProcessor Bean,将多个处理器按顺序串联执行。简单过程如下:
(1)创建一个CompositeItemProcessor实例。
(2)配置处理器链(按添加顺序执行)
(3)返回可注入Spring容器的组合处理器。
执行流程如下图:
代码如下:
@Bean public ItemProcessor<User,User> compositeItemProcessor() { // 1. 创建组合处理器实例 CompositeItemProcessor<User, User> compositeItemProcessor = new CompositeItemProcessor<>(); // 2. 配置处理器委托链(使用双括号初始化语法) compositeItemProcessor.setDelegates(new ArrayList<ItemProcessor<User,User>>(){{ // 第一阶段处理 add(itemProcessor()); // 假设进行数据校验 // 第二阶段处理 add(itemProcessor2()); // 假设进行数据增强 }} ); // 3. 返回组合后的处理器 return compositeItemProcessor; }
启动类很简单,仅添加了 @EnableBatchProcessing 注解,用来开启批处理功能。代码如下:
package com.hxstrive.spring_batch.compositeItemProcessorDemo; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableBatchProcessing // 开启批处理 public class SpringBatchDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchDemoApplication.class, args); } }
到这里,代码写完了,运行应用程序,输出日志如下:
关于 CompositeItemProcessor 类更多用法,请参考官方文档。