ClassifierCompositeItemWriter 是 Spring Batch 4 中的一个高级组件,它允许您根据一定的分类逻辑将写操作分发到多个不同的 ItemWriter 实现。这对于需要将数据写入不同目的地或基于数据属性进行分区的场景特别有用。
ClassifierCompositeItemWriter 使用场景:
多目标写入: 当您需要将数据写入多个不同的目标(如数据库表、文件系统、消息队列等)时。
基于属性的分区: 当您需要根据数据的某些属性(如类型、状态等)将数据分区到不同的目标时。
上图中,ClassifierCompositeItemWriter 通过一个分类器(Classifier)来决定每个项目(Item)应该被哪个 ItemWriter 处理。分类器是一个函数式接口,它接受一个项目作为输入,并返回一个 ItemWriter 的标识符。定义如下:
package org.springframework.classify; import java.io.Serializable; public interface Classifier<C, T> extends Serializable { T classify(C var1); }
然后,ClassifierCompositeItemWriter 使用这个标识符来查找并调用相应的 ItemWriter。
直接上 ClassifierCompositeItemWriter 的源码,较为简单:
/** * 该类实现了ItemWriter接口 * 它根据分类器(Classifier)的逻辑将不同类型的数据项分发到对应的ItemWriter进行处理 * * @param <T> 处理的数据项类型 */ public class ClassifierCompositeItemWriter<T> implements ItemWriter<T> { /** * 分类器,用于决定每个项目应该由哪个ItemWriter处理 * 默认为一个返回null的ClassifierSupport实例 */ private Classifier<T, ItemWriter<? super T>> classifier = new ClassifierSupport<>(null); /** * 设置分类器 * * @param classifier 用于项目分类的Classifier实例,不能为null */ public void setClassifier(Classifier<T, ItemWriter<? super T>> classifier) { // 验证分类器不为null,否则抛出异常 Assert.notNull(classifier, "A classifier is required."); this.classifier = classifier; } /** * 核心写入方法,根据分类器的分类结果将项目分发给对应的ItemWriter * * @param items 待处理的项目列表 * @throws Exception 如果处理过程中发生错误 */ @Override public void write(List<? extends T> items) throws Exception { // 使用LinkedHashMap保持ItemWriter的处理顺序 // 键: ItemWriter实例,值: 该写入器需要处理的项目列表 Map<ItemWriter<? super T>, List<T>> map = new LinkedHashMap<>(); // 遍历所有项目,进行分类并分组 for (T item : items) { // 通过分类器获取当前项目对应的ItemWriter ItemWriter<? super T> key = classifier.classify(item); // 如果是新的ItemWriter,先在map中初始化一个空列表 if (!map.containsKey(key)) { map.put(key, new ArrayList<>()); } // 将当前项目添加到对应ItemWriter的处理列表中 map.get(key).add(item); } // 遍历map,让每个ItemWriter处理其对应的项目列表 for (ItemWriter<? super T> writer : map.keySet()) { writer.write(map.get(writer)); } } }
该示例将从 csv 读取数据,然后使用 ClassifierCompositeItemWriter 根据匿名分类器(Classifier)将数据 ID 为偶数的写入文件,奇数写入数据库。下面是详细步骤:
在项目的 resources 目录下面创建名为 users.csv 的文件,内容如下:
"id","username","password" 1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A" 2,李四,"5E5994FBCFA922D804DF45295AE98604" 3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E" 4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"
创建一个简单的 POJO 用于映射 csv 读取的数据,如下:
package com.hxstrive.spring_batch.classifierCompositeItemWriterDemo.dto; import lombok.Data; import lombok.ToString; /** * 用户DTO * @author hxstrive.com */ @Data @ToString public class User { private int id; private String username; private String password; }
使用 @Configuration 注解创建配置类,然后分别创建 FlatFileItemWriter 和 JdbcBatchItemWriter 对象,最后使用前面两个 ItemWriter 创建一个 CompositeItemWriter 对象。代码如下:
package com.hxstrive.spring_batch.classifierCompositeItemWriterDemo.config; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.hxstrive.spring_batch.classifierCompositeItemWriterDemo.dto.User; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.FlatFileItemWriter; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.batch.item.file.transform.LineAggregator; import org.springframework.batch.item.support.ClassifierCompositeItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.classify.Classifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.FileSystemResource; import org.springframework.validation.BindException; import javax.sql.DataSource; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; // 创建Job对象 @Bean public Job classifierItemWriterDemoJob() { return jobBuilderFactory.get("classifierItemWriterDemoJob-" + System.currentTimeMillis()) .start(classifierItemWriterDemoStep()) .build(); } // 创建Step对象 @Bean public Step classifierItemWriterDemoStep() { return stepBuilderFactory.get("classifierItemWriterDemoStep") .<User, User>chunk(2) .reader(flatFileItemReader()) .writer(classifierItemWriter()) .stream(flatFileItemWriter()) // 显式注册ItemStream组件 // .stream(dbItemWrite()) .build(); } // 创建ItemReader对象 @Bean @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定 public FlatFileItemReader<? extends User> flatFileItemReader() { FlatFileItemReader<User> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("users.csv")); reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名 // 解析数据 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames("id", "username", "password"); // 把解析出的数据映射到 User 对象中 DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){ @Override public User mapFieldSet(FieldSet fieldSet) throws BindException { User user = new User(); user.setId(fieldSet.readInt("id")); user.setUsername(fieldSet.readString("username")); user.setPassword(fieldSet.readString("password")); System.out.println("reading user: " + user); return user; } }); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; } // 创建ItemWriter对象 @Bean public FlatFileItemWriter<? super User> flatFileItemWriter() { System.out.println("flatFileItemWriter()"); FlatFileItemWriter<User> writer = new FlatFileItemWriter<>(); String path = "F:\\customer.txt"; writer.setResource(new FileSystemResource(path)); //把User对象转换成字符串输出到文件 writer.setLineAggregator(new LineAggregator<User>() { ObjectMapper mapper = new ObjectMapper(); @Override public String aggregate(User item) { String str = null; try { str = mapper.writeValueAsString(item); } catch (JsonProcessingException e) { throw new RuntimeException(e); } return str; } }); return writer; } // 创建ItemWriter对象 @Bean public JdbcBatchItemWriter<User> dbItemWrite() { JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>(); writer.setDataSource(dataSource); writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)"); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); writer.afterPropertiesSet(); return writer; } @Bean public ClassifierCompositeItemWriter<? super User> classifierItemWriter() { ClassifierCompositeItemWriter<User> writer = new ClassifierCompositeItemWriter<>(); writer.setClassifier(new Classifier<User, ItemWriter<? super User>>() { @Override public ItemWriter<? super User> classify(User item) { if (item.getId() % 2 == 0){ return flatFileItemWriter(); }else{ return dbItemWrite(); } } }); return writer; } }
上述配置中,主要关注 classifierItemWriter() 方法,该方法返回一个 ClassifierCompositeItemWriter 实例,该实例用于根据 User 对象的某些属性将数据写入不同的目标。
(1)创建 ClassifierCompositeItemWriter 实例
ClassifierCompositeItemWriter<User> writer = new ClassifierCompositeItemWriter<>();
(2)设置分类器
// 通过调用 setClassifier 方法,为 writer 实例设置了一个自定义的分类器 writer.setClassifier(new Classifier<User, ItemWriter<? super User>>() { @Override public ItemWriter<? super User> classify(User item) { if (item.getId() % 2 == 0){ return flatFileItemWriter(); }else{ return dbItemWrite(); } } });
上述代码中,分类器是一个匿名内部类,实现了 Classifier<User, ItemWriter<? super User>> 接口。这意味着它接受一个 User 对象作为输入,并返回一个 ItemWriter<? super User> 对象。在 classify 方法中,根据 User 对象的 id 属性来决定返回哪个 ItemWriter。如果 id 是偶数,则返回 flatFileItemWriter() 方法返回的 ItemWriter;如果 id 是奇数,则返回 dbItemWrite() 方法返回的 ItemWriter。
这里没有什么新东西,仅仅在 Spring Boot 启动类上多添加 @EnableBatchProcessing 注解,开启批处理功能。代码如下:
package com.hxstrive.spring_batch.flatFileItemWriterDemo; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableBatchProcessing // 开启批处理 public class SpringBatchDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchDemoApplication.class, args); } }
先将数据库表内容清空,删除存在的 customer.txt 文件。运行程序,输出日志如下图:
执行成功了,我们去看看数据库表和 customer.txt 文件,如下图:
更多关于 ClassifierCompositeItemWriter 的用法,请参考官方文档。