Spring Batch4 教程

Spring Batch4 实现数据写入多个文件 ClassifierCompositeItemWriter

ClassifierCompositeItemWriter 是 Spring Batch 4 中的一个高级组件,它允许您根据一定的分类逻辑将写操作分发到多个不同的 ItemWriter 实现。这对于需要将数据写入不同目的地或基于数据属性进行分区的场景特别有用。

ClassifierCompositeItemWriter  使用场景:

  • 多目标写入: 当您需要将数据写入多个不同的目标(如数据库表、文件系统、消息队列等)时。

  • 基于属性的分区: 当您需要根据数据的某些属性(如类型、状态等)将数据分区到不同的目标时。

工作原理

image.png

上图中,ClassifierCompositeItemWriter 通过一个分类器(Classifier)来决定每个项目(Item)应该被哪个 ItemWriter 处理。分类器是一个函数式接口,它接受一个项目作为输入,并返回一个 ItemWriter 的标识符。定义如下:

package org.springframework.classify;

import java.io.Serializable;

public interface Classifier<C, T> extends Serializable {
    T classify(C var1);
}

然后,ClassifierCompositeItemWriter 使用这个标识符来查找并调用相应的 ItemWriter。

源码分析

直接上 ClassifierCompositeItemWriter 的源码,较为简单:

/**
 * 该类实现了ItemWriter接口
 * 它根据分类器(Classifier)的逻辑将不同类型的数据项分发到对应的ItemWriter进行处理
 *
 * @param <T> 处理的数据项类型
 */
public class ClassifierCompositeItemWriter<T> implements ItemWriter<T> {

    /**
     * 分类器,用于决定每个项目应该由哪个ItemWriter处理
     * 默认为一个返回null的ClassifierSupport实例
     */
    private Classifier<T, ItemWriter<? super T>> classifier = new ClassifierSupport<>(null);

    /**
     * 设置分类器
     *
     * @param classifier 用于项目分类的Classifier实例,不能为null
     */
    public void setClassifier(Classifier<T, ItemWriter<? super T>> classifier) {
        // 验证分类器不为null,否则抛出异常
        Assert.notNull(classifier, "A classifier is required.");
        this.classifier = classifier;
    }

    /**
     * 核心写入方法,根据分类器的分类结果将项目分发给对应的ItemWriter
     *
     * @param items 待处理的项目列表
     * @throws Exception 如果处理过程中发生错误
     */
    @Override
    public void write(List<? extends T> items) throws Exception {
        // 使用LinkedHashMap保持ItemWriter的处理顺序
        // 键: ItemWriter实例,值: 该写入器需要处理的项目列表
        Map<ItemWriter<? super T>, List<T>> map = new LinkedHashMap<>();

        // 遍历所有项目,进行分类并分组
        for (T item : items) {
            // 通过分类器获取当前项目对应的ItemWriter
            ItemWriter<? super T> key = classifier.classify(item);
            // 如果是新的ItemWriter,先在map中初始化一个空列表
            if (!map.containsKey(key)) {
                map.put(key, new ArrayList<>());
            }
            // 将当前项目添加到对应ItemWriter的处理列表中
            map.get(key).add(item);
        }

        // 遍历map,让每个ItemWriter处理其对应的项目列表
        for (ItemWriter<? super T> writer : map.keySet()) {
            writer.write(map.get(writer));
        }
    }

}

  

简单示例

该示例将从 csv 读取数据,然后使用 ClassifierCompositeItemWriter 根据匿名分类器(Classifier)将数据 ID 为偶数的写入文件,奇数写入数据库。下面是详细步骤:

准备数据

在项目的 resources 目录下面创建名为 users.csv 的文件,内容如下:

"id","username","password"
1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A"
2,李四,"5E5994FBCFA922D804DF45295AE98604"
3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E"
4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"

创建实体

创建一个简单的 POJO 用于映射 csv 读取的数据,如下:

package com.hxstrive.spring_batch.classifierCompositeItemWriterDemo.dto;

import lombok.Data;
import lombok.ToString;

/**
 * 用户DTO
 * @author hxstrive.com
 */
@Data
@ToString
public class User {
    private int id;
    private String username;
    private String password;
}

创建配置类

使用 @Configuration 注解创建配置类,然后分别创建 FlatFileItemWriter 和 JdbcBatchItemWriter 对象,最后使用前面两个 ItemWriter 创建一个 CompositeItemWriter 对象。代码如下:

package com.hxstrive.spring_batch.classifierCompositeItemWriterDemo.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hxstrive.spring_batch.classifierCompositeItemWriterDemo.dto.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.batch.item.file.transform.LineAggregator;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.classify.Classifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.validation.BindException;

import javax.sql.DataSource;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    // 创建Job对象
    @Bean
    public Job classifierItemWriterDemoJob() {
        return jobBuilderFactory.get("classifierItemWriterDemoJob-" + System.currentTimeMillis())
                .start(classifierItemWriterDemoStep())
                .build();
    }

    // 创建Step对象
    @Bean
    public Step classifierItemWriterDemoStep() {
        return stepBuilderFactory.get("classifierItemWriterDemoStep")
                .<User, User>chunk(2)
                .reader(flatFileItemReader())
                .writer(classifierItemWriter())
                .stream(flatFileItemWriter()) // 显式注册ItemStream组件
//                .stream(dbItemWrite())
                .build();
    }


    // 创建ItemReader对象
    @Bean
    @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定
    public FlatFileItemReader<? extends User> flatFileItemReader() {
        FlatFileItemReader<User> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("users.csv"));
        reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名

        // 解析数据
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("id", "username", "password");
        // 把解析出的数据映射到 User 对象中
        DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){
            @Override
            public User mapFieldSet(FieldSet fieldSet) throws BindException {
                User user = new User();
                user.setId(fieldSet.readInt("id"));
                user.setUsername(fieldSet.readString("username"));
                user.setPassword(fieldSet.readString("password"));
                System.out.println("reading user: " + user);
                return user;
            }
        });

        lineMapper.afterPropertiesSet();
        reader.setLineMapper(lineMapper);
        return reader;
    }

    // 创建ItemWriter对象
    @Bean
    public FlatFileItemWriter<? super User> flatFileItemWriter() {
        System.out.println("flatFileItemWriter()");
        FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
        String path = "F:\\customer.txt";
        writer.setResource(new FileSystemResource(path));

        //把User对象转换成字符串输出到文件
        writer.setLineAggregator(new LineAggregator<User>() {
            ObjectMapper mapper = new ObjectMapper();
            @Override
            public String aggregate(User item) {
                String str = null;
                try {
                    str = mapper.writeValueAsString(item);
                } catch (JsonProcessingException e) {
                    throw new RuntimeException(e);
                }
                return str;
            }
        });

        return writer;
    }

    // 创建ItemWriter对象
    @Bean
    public JdbcBatchItemWriter<User> dbItemWrite() {
        JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)");
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        writer.afterPropertiesSet();
        return writer;
    }

    @Bean
    public ClassifierCompositeItemWriter<? super User> classifierItemWriter() {
        ClassifierCompositeItemWriter<User> writer = new ClassifierCompositeItemWriter<>();
        writer.setClassifier(new Classifier<User, ItemWriter<? super User>>() {
            @Override
            public ItemWriter<? super User> classify(User item) {
                if (item.getId() % 2 == 0){
                    return flatFileItemWriter();
                }else{
                    return dbItemWrite();
                }
            }
        });

        return writer;
    }

}

上述配置中,主要关注 classifierItemWriter() 方法,该方法返回一个 ClassifierCompositeItemWriter 实例,该实例用于根据 User 对象的某些属性将数据写入不同的目标。

(1)创建 ClassifierCompositeItemWriter 实例

ClassifierCompositeItemWriter<User> writer = new ClassifierCompositeItemWriter<>();

(2)设置分类器

// 通过调用 setClassifier 方法,为 writer 实例设置了一个自定义的分类器
writer.setClassifier(new Classifier<User, ItemWriter<? super User>>() {
    @Override
    public ItemWriter<? super User> classify(User item) {
        if (item.getId() % 2 == 0){
            return flatFileItemWriter();
        }else{
            return dbItemWrite();
        }
    }
});

上述代码中,分类器是一个匿名内部类,实现了 Classifier<User, ItemWriter<? super User>> 接口。这意味着它接受一个 User 对象作为输入,并返回一个 ItemWriter<? super User> 对象。在 classify 方法中,根据 User 对象的 id 属性来决定返回哪个 ItemWriter。如果 id 是偶数,则返回 flatFileItemWriter() 方法返回的 ItemWriter;如果 id 是奇数,则返回 dbItemWrite() 方法返回的 ItemWriter。

创建启动类

这里没有什么新东西,仅仅在 Spring Boot 启动类上多添加 @EnableBatchProcessing 注解,开启批处理功能。代码如下:

package com.hxstrive.spring_batch.flatFileItemWriterDemo;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {

    public static void main(String[] args) {
       SpringApplication.run(SpringBatchDemoApplication.class, args);
    }

}

先将数据库表内容清空,删除存在的 customer.txt 文件。运行程序,输出日志如下图:

image.png

执行成功了,我们去看看数据库表和 customer.txt 文件,如下图:

image.png

image.png

更多关于 ClassifierCompositeItemWriter 的用法,请参考官方文档。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号