Spring Batch4 教程

Spring Batch4 多处理器联合 CompositeItemProcessor

在 Spring Batch 4 中,CompositeItemProcessor 是一个特殊的 ItemProcessor 实现,用于将多个处理器按顺序组合成一个处理链,实现复杂的数据处理流程。它允许开发者将不同的处理逻辑拆分到独立的处理器中,再通过组合方式协同工作,符合 "单一职责" 设计原则。

工作原理如下图:

image.png

上图中,处理器1直接接收 ItemReader 读取的原始数据,处理后输出的数据作为处理器2的输入数据,以此类推,直到所有的处理器都执行完成,最后将数据返回给 ItemWriter 写出到目的地。查看 CompositeItemProcessor  的核心代码:

public O process(I item) throws Exception {
    Object result = item; // 原始结果

    for (ItemProcessor<?, ?> delegate : delegates) {
       if (result == null) {
          return null; // 如果其中某个处理器返回 null,则直接退出,过滤掉这条数据
       }

       // 调用处理器,将上个处理器的结果作为参数传递给它(第一个处理器是原始数据)
       result = processItem(delegate, result);
    }
    return (O) result; // 返回最后处理结果
}

核心作用

  • 处理器链式调用:按顺序执行多个 ItemProcessor,前一个处理器的输出作为下一个的输入。

  • 责任分离:将数据验证、转换、过滤等逻辑拆分到不同处理器,提高代码复用性和可维护性。

  • 灵活组合:动态调整处理器顺序或增减处理器,适应不同业务场景。

源码预览&分析

下面是 CompositeItemProcessor 的源码,源码较少,容易读懂。如下:

package org.springframework.batch.item.support;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import java.util.List;

/**
 * 组合式 ItemProcessor 实现,将项目依次通过注入的处理器链进行转换
 * (前一个处理器的输出作为下一个处理器的输入)。
 * 
 * 注意:使用者需要确保注入的处理器链的输入/输出类型匹配声明类型。
 * 
 * 典型使用场景:
 * 1. 需要多个处理步骤串联执行的复杂业务逻辑
 * 2. 分阶段的数据转换和验证流程
 * 3. 可配置的处理管道
 * 
 * 实现特性:
 * 1. 自动传播 null 值(某个处理器返回 null 时立即终止处理链)
 * 2. 支持泛型类型转换
 * 3. 提供基础的参数校验
 * 
 * @author Robert Kasanicky
 */
public class CompositeItemProcessor<I, O> implements ItemProcessor<I, O>, InitializingBean {

    /**
     * 处理器委托链,按顺序执行
     * 
     * 类型说明:
     * ? extends ItemProcessor<?, ?> 表示允许任何ItemProcessor的子类型,
     * 且其输入输出类型可以是任意匹配的类型
     */
    private List<? extends ItemProcessor<?, ?>> delegates;

    /**
     * 处理项目,依次通过所有委托处理器
     * 
     * @param item 要处理的输入项(类型 I)
     * @return 最终处理结果(类型 O),如果任何处理器返回null则整体返回null
     * @throws Exception 如果任何处理器抛出异常
     * 
     * 实现说明:
     * 使用中间变量 Object result 保存处理过程中的类型转换
     */
    @Nullable
    @Override
    @SuppressWarnings("unchecked")
    public O process(I item) throws Exception {
        Object result = item;  // 初始值为输入项

        // 依次执行每个处理器
        for (ItemProcessor<?, ?> delegate : delegates) {
            if (result == null) {
                // 如果中间结果为null,立即终止处理链
                return null;
            }

            // 调用当前处理器处理中间结果
            result = processItem(delegate, result);
        }
        
        // 将最终结果转换为声明的输出类型
        return (O) result;
    }
    
    /**
     * 处理单个处理器阶段的辅助方法
     * 
     * 解决泛型通配符捕获问题(参见Java泛型教程中的通配符捕获部分)
     * 
     * @param <T> 处理器输入类型的推断类型
     * @param processor 当前处理器
     * @param input 要处理的输入
     * @return 处理结果
     * @throws Exception 如果处理器抛出异常
     * 
     * 技术说明:
     * 此方法通过独立类型参数<T>解决编译器无法推断通配符具体类型的问题
     */
    @SuppressWarnings("unchecked")
    private <T> Object processItem(ItemProcessor<T, ?> processor, Object input) throws Exception {
        return processor.process((T) input);
    }

    /**
     * Spring属性设置后的校验方法
     * 
     * @throws Exception 如果委托链为空或null
     * 
     * 校验逻辑:
     * 1. delegates不能为null
     * 2. delegates不能是空列表
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.notNull(delegates, "The 'delegates' may not be null");
        Assert.notEmpty(delegates, "The 'delegates' may not be empty");
    }

    /**
     * 设置处理器委托链
     * 
     * @param delegates 要使用的处理器列表,顺序决定执行顺序
     * 
     * 使用示例:
     * List<ItemProcessor<?,?>> processors = new ArrayList<>();
     * processors.add(processor1);
     * processors.add(processor2);
     * compositeProcessor.setDelegates(processors);
     */
    public void setDelegates(List<? extends ItemProcessor<?, ?>> delegates) {
        this.delegates = delegates;
    }
}

  

简单示例

准备数据

我们依然通过在 resources 目录下面创建的 users.csv 文件提供数据,内容如下:

"id","username","password"
1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A"
2,李四,"5E5994FBCFA922D804DF45295AE98604"
3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E"
4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"

创建实体

创建一个 Java POJO 类,用来映射从 CSV 读取的每行数据,如下:

package com.hxstrive.spring_batch.compositeItemProcessorDemo.dto;

import lombok.Data;
import lombok.ToString;

/**
 * 用户DTO
 * @author hxstrive.com
 */
@Data
@ToString
public class User {
    private int id;
    private String username;
    private String password;
}

创建配置类

使用 @Configuration 注解创建配置类,创建组合式(CompositeItemProcessor )ItemProcessor Bean,将多个处理器按顺序串联执行,代码如下:

package com.hxstrive.spring_batch.compositeItemProcessorDemo.config;

import com.hxstrive.spring_batch.compositeItemProcessorDemo.dto.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.validation.BindException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 创建Job对象
    @Bean
    public Job compositeItemProcessorDemoJob() {
        return jobBuilderFactory.get("compositeItemProcessorDemoJob-" + System.currentTimeMillis())
                .start(compositeItemProcessorDemoStep())
                .build();
    }

    // 创建Step对象
    @Bean
    public Step compositeItemProcessorDemoStep() {
        return stepBuilderFactory.get("compositeItemProcessorDemoStep")
                .<User, User>chunk(2)
                .reader(flatFileItemReader())
                .processor(compositeItemProcessor())
                .writer(new ItemWriter<User>() {
                    @Override
                    public void write(List<? extends User> items) throws Exception {
                        System.out.println(Arrays.toString(items.toArray()));
                    }
                })
                .build();
    }

    @Bean
    public ItemProcessor<User,User> compositeItemProcessor() {
        CompositeItemProcessor<User, User> compositeItemProcessor = new CompositeItemProcessor<>();
        compositeItemProcessor.setDelegates(new ArrayList<ItemProcessor<User,User>>(){{
            add(itemProcessor());
            add(itemProcessor2());
        }});
        return compositeItemProcessor;
    }

    @Bean
    public ItemProcessor<User, User> itemProcessor() {
        return new ItemProcessor<User, User>() {
            @Override
            public User process(User item) throws Exception {
                System.out.println("itemProcessor() " + item);
                item.setPassword(item.getPassword().toLowerCase());
                return item;
            }
        };
    }

    @Bean
    public ItemProcessor<User, User> itemProcessor2() {
        return new ItemProcessor<User, User>() {
            @Override
            public User process(User item) throws Exception {
                System.out.println("itemProcessor2() " + item);
                if(item.getId() % 2 == 0) {
                    return item;
                } else {
                    return null;
                }
            }
        };
    }

    // 创建ItemReader对象
    @Bean
    @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定
    public FlatFileItemReader<? extends User> flatFileItemReader() {
        FlatFileItemReader<User> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("users.csv"));
        reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名

        // 解析数据
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("id", "username", "password");
        // 把解析出的数据映射到 User 对象中
        DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){
            @Override
            public User mapFieldSet(FieldSet fieldSet) throws BindException {
                User user = new User();
                user.setId(fieldSet.readInt("id"));
                user.setUsername(fieldSet.readString("username"));
                user.setPassword(fieldSet.readString("password"));
                System.out.println("reading user: " + user);
                return user;
            }
        });

        lineMapper.afterPropertiesSet();
        reader.setLineMapper(lineMapper);
        return reader;
    }

}

上述代码,重点关注 compositeItemProcessor() 方法,该方法创建组合式 ItemProcessor Bean,将多个处理器按顺序串联执行。简单过程如下:

(1)创建一个CompositeItemProcessor实例。

(2)配置处理器链(按添加顺序执行)

(3)返回可注入Spring容器的组合处理器。

执行流程如下图:

image.png

代码如下:

@Bean
public ItemProcessor<User,User> compositeItemProcessor() {
    // 1. 创建组合处理器实例
    CompositeItemProcessor<User, User> compositeItemProcessor = new CompositeItemProcessor<>();

    // 2. 配置处理器委托链(使用双括号初始化语法)
    compositeItemProcessor.setDelegates(new ArrayList<ItemProcessor<User,User>>(){{
            // 第一阶段处理
            add(itemProcessor());  // 假设进行数据校验
            // 第二阶段处理 
            add(itemProcessor2()); // 假设进行数据增强
        }}
    );
    
    // 3. 返回组合后的处理器
    return compositeItemProcessor;
}

创建启动类

启动类很简单,仅添加了 @EnableBatchProcessing  注解,用来开启批处理功能。代码如下:

package com.hxstrive.spring_batch.compositeItemProcessorDemo;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {

    public static void main(String[] args) {
       SpringApplication.run(SpringBatchDemoApplication.class, args);
    }

}

到这里,代码写完了,运行应用程序,输出日志如下:

image.png

关于 CompositeItemProcessor 类更多用法,请参考官方文档。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号