在 Spring Batch 4 中,CompositeItemWriter 是一个核心的多路输出组件,它允许将处理后的数据同时写入多个不同的目标(如数据库、文件、消息队列等)。通过 CompositeItemWriter,Spring Batch 可以轻松实现 "一次处理,多处写入" 的复杂场景,大幅提升批处理作业的灵活性和功能性。
CompositeItemWriter 核心特性:
组合多个写入器:可以包含一个 ItemWriter 列表,当 write() 方法被调用时,会按顺序调用所有包含的写入器的 write() 方法。
事务一致性:所有包含的写入器将在同一个事务上下文中工作,确保数据写入的原子性(要么全部成功,要么全部失败)。
顺序执行:写入器按添加顺序依次执行,前一个写入器完成后才会执行下一个。
CompositeItemWriter 主要方法:
setDelegates(List<ItemWriter<? super T>> delegates) 设置要组合的 ItemWriter 列表,源码如下:
private List<ItemWriter<? super T>> delegates;
public void setIgnoreItemStream(boolean ignoreItemStream) {
this.ignoreItemStream = ignoreItemStream;
}使用示例代码如下:
CompositeItemWriter<User> compositeItemWriter = new CompositeItemWriter<>(); compositeItemWriter.setDelegates(Arrays.<ItemWriter<? super User>>asList(flatFileItemWriter(), dbItemWriteDemo()));
write(List<? extends T> items) 重写的写入方法,会依次调用所有 delegate 的 write() 方法,源码如下:
// 添加的 ItemWrite 列表
private List<ItemWriter<? super T>> delegates;
@Override
public void write(List<? extends T> item) throws Exception {
for (ItemWriter<? super T> writer : delegates) {
writer.write(item);
}
}
该示例将从 csv 读取数据,然后使用 CompositeItemWriter 将数据分别写入到数据库和 text 文件。下面是详细步骤:
在项目的 resources 目录下面创建名为 users.csv 的文件,内容如下:
"id","username","password" 1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A" 2,李四,"5E5994FBCFA922D804DF45295AE98604" 3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E" 4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"
创建一个简单的 POJO 用于映射 csv 读取的数据,如下:
package com.hxstrive.spring_batch.compositeItemWriterDemo.dto;
import lombok.Data;
import lombok.ToString;
/**
* 用户DTO
* @author hxstrive.com
*/
@Data
@ToString
public class User {
private int id;
private String username;
private String password;
}使用 @Configuration 注解创建配置类,然后分别创建 FlatFileItemWriter 和 JdbcBatchItemWriter 对象,最后使用前面两个 ItemWriter 创建一个 CompositeItemWriter 对象。代码如下:
package com.hxstrive.spring_batch.compositeItemWriterDemo.config;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hxstrive.spring_batch.compositeItemWriterDemo.dto.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.batch.item.file.transform.LineAggregator;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.validation.BindException;
import javax.sql.DataSource;
import java.util.Arrays;
/**
* Spring Batch 配置类
* @author hxstrive.com
*/
@Configuration
public class BatchConfig {
// 用于创建和配置 Job 对象的工厂类
@Autowired
private JobBuilderFactory jobBuilderFactory;
// 用于创建和配置 Step 对象的工厂类
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
// 创建Job对象
@Bean
public Job compositeItemWriterDemoJob() {
return jobBuilderFactory.get("compositeItemWriterDemoJob-" + System.currentTimeMillis())
.start(compositeItemWriterDemoStep())
.build();
}
// 创建Step对象
@Bean
public Step compositeItemWriterDemoStep() {
return stepBuilderFactory.get("compositeItemWriterDemoStep")
.<User, User>chunk(2)
.reader(flatFileItemReader())
.writer(compositeItemWriter())
.build();
}
// 创建ItemReader对象
@Bean
@StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定
public FlatFileItemReader<? extends User> flatFileItemReader() {
FlatFileItemReader<User> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("users.csv"));
reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名
// 解析数据
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("id", "username", "password");
// 把解析出的数据映射到 User 对象中
DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){
@Override
public User mapFieldSet(FieldSet fieldSet) throws BindException {
User user = new User();
user.setId(fieldSet.readInt("id"));
user.setUsername(fieldSet.readString("username"));
user.setPassword(fieldSet.readString("password"));
System.out.println("reading user: " + user);
return user;
}
});
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
return reader;
}
// 创建ItemWriter对象,写入数据到 text 文件
@Bean
public ItemWriter<? super User> flatFileItemWriter() {
System.out.println("flatFileItemWriter()");
FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
String path = "F:\\customer.txt";
writer.setResource(new FileSystemResource(path));
//把User对象转换成字符串输出到文件
writer.setLineAggregator(new LineAggregator<User>() {
ObjectMapper mapper = new ObjectMapper();
@Override
public String aggregate(User item) {
String str = null;
try {
str = mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return str;
}
});
return writer;
}
// 创建ItemWriter对象,写入数据到数据库
@Bean
public JdbcBatchItemWriter<User> dbItemWriteDemo() {
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return writer;
}
@Bean
public CompositeItemWriter<User> compositeItemWriter() {
CompositeItemWriter<User> compositeItemWriter = new CompositeItemWriter<>();
// 设置多个 ItemWriter
compositeItemWriter.setDelegates(Arrays.<ItemWriter<? super User>>asList(flatFileItemWriter(), dbItemWriteDemo()));
return compositeItemWriter;
}
}这里没有什么新东西,仅仅在 Spring Boot 启动类上多添加 @EnableBatchProcessing 注解,开启批处理功能。代码如下:
package com.hxstrive.spring_batch.flatFileItemWriterDemo;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchDemoApplication.class, args);
}
}运行程序,输出日志如下图:

关于更多 CompositeItemWriter 的用法,请参考官方文档。