在 Spring Batch 4 中,CompositeItemWriter 是一个核心的多路输出组件,它允许将处理后的数据同时写入多个不同的目标(如数据库、文件、消息队列等)。通过 CompositeItemWriter,Spring Batch 可以轻松实现 "一次处理,多处写入" 的复杂场景,大幅提升批处理作业的灵活性和功能性。
CompositeItemWriter 核心特性:
组合多个写入器:可以包含一个 ItemWriter 列表,当 write() 方法被调用时,会按顺序调用所有包含的写入器的 write() 方法。
事务一致性:所有包含的写入器将在同一个事务上下文中工作,确保数据写入的原子性(要么全部成功,要么全部失败)。
顺序执行:写入器按添加顺序依次执行,前一个写入器完成后才会执行下一个。
CompositeItemWriter 主要方法:
setDelegates(List<ItemWriter<? super T>> delegates) 设置要组合的 ItemWriter 列表,源码如下:
private List<ItemWriter<? super T>> delegates; public void setIgnoreItemStream(boolean ignoreItemStream) { this.ignoreItemStream = ignoreItemStream; }
使用示例代码如下:
CompositeItemWriter<User> compositeItemWriter = new CompositeItemWriter<>(); compositeItemWriter.setDelegates(Arrays.<ItemWriter<? super User>>asList(flatFileItemWriter(), dbItemWriteDemo()));
write(List<? extends T> items) 重写的写入方法,会依次调用所有 delegate 的 write() 方法,源码如下:
// 添加的 ItemWrite 列表 private List<ItemWriter<? super T>> delegates; @Override public void write(List<? extends T> item) throws Exception { for (ItemWriter<? super T> writer : delegates) { writer.write(item); } }
该示例将从 csv 读取数据,然后使用 CompositeItemWriter 将数据分别写入到数据库和 text 文件。下面是详细步骤:
在项目的 resources 目录下面创建名为 users.csv 的文件,内容如下:
"id","username","password" 1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A" 2,李四,"5E5994FBCFA922D804DF45295AE98604" 3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E" 4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"
创建一个简单的 POJO 用于映射 csv 读取的数据,如下:
package com.hxstrive.spring_batch.compositeItemWriterDemo.dto; import lombok.Data; import lombok.ToString; /** * 用户DTO * @author hxstrive.com */ @Data @ToString public class User { private int id; private String username; private String password; }
使用 @Configuration 注解创建配置类,然后分别创建 FlatFileItemWriter 和 JdbcBatchItemWriter 对象,最后使用前面两个 ItemWriter 创建一个 CompositeItemWriter 对象。代码如下:
package com.hxstrive.spring_batch.compositeItemWriterDemo.config; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.hxstrive.spring_batch.compositeItemWriterDemo.dto.User; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.FlatFileItemWriter; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.batch.item.file.transform.LineAggregator; import org.springframework.batch.item.support.CompositeItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.FileSystemResource; import org.springframework.validation.BindException; import javax.sql.DataSource; import java.util.Arrays; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; // 创建Job对象 @Bean public Job compositeItemWriterDemoJob() { return jobBuilderFactory.get("compositeItemWriterDemoJob-" + System.currentTimeMillis()) .start(compositeItemWriterDemoStep()) .build(); } // 创建Step对象 @Bean public Step compositeItemWriterDemoStep() { return stepBuilderFactory.get("compositeItemWriterDemoStep") .<User, User>chunk(2) .reader(flatFileItemReader()) .writer(compositeItemWriter()) .build(); } // 创建ItemReader对象 @Bean @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定 public FlatFileItemReader<? extends User> flatFileItemReader() { FlatFileItemReader<User> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("users.csv")); reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名 // 解析数据 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames("id", "username", "password"); // 把解析出的数据映射到 User 对象中 DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){ @Override public User mapFieldSet(FieldSet fieldSet) throws BindException { User user = new User(); user.setId(fieldSet.readInt("id")); user.setUsername(fieldSet.readString("username")); user.setPassword(fieldSet.readString("password")); System.out.println("reading user: " + user); return user; } }); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; } // 创建ItemWriter对象,写入数据到 text 文件 @Bean public ItemWriter<? super User> flatFileItemWriter() { System.out.println("flatFileItemWriter()"); FlatFileItemWriter<User> writer = new FlatFileItemWriter<>(); String path = "F:\\customer.txt"; writer.setResource(new FileSystemResource(path)); //把User对象转换成字符串输出到文件 writer.setLineAggregator(new LineAggregator<User>() { ObjectMapper mapper = new ObjectMapper(); @Override public String aggregate(User item) { String str = null; try { str = mapper.writeValueAsString(item); } catch (JsonProcessingException e) { throw new RuntimeException(e); } return str; } }); return writer; } // 创建ItemWriter对象,写入数据到数据库 @Bean public JdbcBatchItemWriter<User> dbItemWriteDemo() { JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>(); writer.setDataSource(dataSource); writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)"); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); return writer; } @Bean public CompositeItemWriter<User> compositeItemWriter() { CompositeItemWriter<User> compositeItemWriter = new CompositeItemWriter<>(); // 设置多个 ItemWriter compositeItemWriter.setDelegates(Arrays.<ItemWriter<? super User>>asList(flatFileItemWriter(), dbItemWriteDemo())); return compositeItemWriter; } }
这里没有什么新东西,仅仅在 Spring Boot 启动类上多添加 @EnableBatchProcessing 注解,开启批处理功能。代码如下:
package com.hxstrive.spring_batch.flatFileItemWriterDemo; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableBatchProcessing // 开启批处理 public class SpringBatchDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchDemoApplication.class, args); } }
运行程序,输出日志如下图:
关于更多 CompositeItemWriter 的用法,请参考官方文档。