FlatFileItemWriter 是 Spring Batch 中用于将数据写入平面文件(如 CSV、TXT 等文本文件)的核心组件,实现了 ItemWriter 接口,专门处理批量数据到文本文件的输出操作。
在使用该类前,我们先简单看看它的源代码:
package org.springframework.batch.item.file;
import java.util.List;
import org.springframework.batch.item.file.transform.LineAggregator;
import org.springframework.batch.item.support.AbstractFileItemWriter;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* 平面文件项写入器,用于将数据项写入平面文件(如文本文件)
* 继承自AbstractFileItemWriter,泛型T表示要写入的数据的类型
*/
public class FlatFileItemWriter<T> extends AbstractFileItemWriter<T> {
/**
* 行聚合器,用于将数据项T转换为字符串行
* 负责将单个数据项转换为文件中对应的一行内容
*/
protected LineAggregator<T> lineAggregator;
/**
* 构造方法,初始化执行上下文名称
* 设置执行上下文名称为当前类的短名称(不包含包名)
*/
public FlatFileItemWriter() {
this.setExecutionContextName(ClassUtils.getShortName(FlatFileItemWriter.class));
}
/**
* 断言必要的属性(lineAggregator)已被设置
*
* 实现 InitializingBean 接口的方法,在 Bean 属性设置完成后调用
*
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
*/
@Override
public void afterPropertiesSet() throws Exception {
// 确保lineAggregator 不为 null,否则抛出异常
Assert.notNull(lineAggregator, "A LineAggregator must be provided.");
// 如果是追加模式,则不应在文件存在时删除它
if (append) {
shouldDeleteIfExists = false;
}
}
/**
* LineAggregator 的公共设置方法
* 用于注入行聚合器,它将被用来将数据项转换为输出行
*
* @param lineAggregator 要设置的LineAggregator
*/
public void setLineAggregator(LineAggregator<T> lineAggregator) {
this.lineAggregator = lineAggregator;
}
/**
* 执行实际的写入操作,将数据项列表转换为字符串
* 重写父类方法,将多个数据项转换为对应的字符串行
*
* @param items 要写入的项列表
* @return 转换后的字符串,包含所有项对应的行,每行用行分隔符分隔
*/
@Override
public String doWrite(List<? extends T> items) {
// 用于构建所有行的字符串构建器
StringBuilder lines = new StringBuilder();
// 遍历所有数据项
for (T item : items) {
// 使用行聚合器将项转换为字符串,并添加行分隔符
lines.append(this.lineAggregator.aggregate(item)).append(this.lineSeparator);
}
// 返回构建好的字符串
return lines.toString();
}
}
该示例将从 csv 文件读取数据,然后利用 FlatFileItemWriter 将数据写入到磁盘的 txt 文件。
在项目的 resources 目录下面创建名为 users.csv 的文件,内容如下:
"id","username","password" 1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A" 2,李四,"5E5994FBCFA922D804DF45295AE98604" 3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E" 4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"
创建一个简单的 POJO 用于映射 csv 读取的数据,如下:
package com.hxstrive.spring_batch.flatFileItemWriterDemo.dto;
import lombok.Data;
import lombok.ToString;
/**
* 用户DTO
* @author hxstrive.com
*/
@Data
@ToString
public class User {
private int id;
private String username;
private String password;
}使用 @Configuration 注解创建配置类,配置 Spring Batch 的 Job、Step、ItemReader 和 ItemWriter,代码如下:
package com.hxstrive.spring_batch.flatFileItemWriterDemo.config;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hxstrive.spring_batch.flatFileItemWriterDemo.dto.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.batch.item.file.transform.LineAggregator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.validation.BindException;
import javax.sql.DataSource;
/**
* Spring Batch 配置类
* @author hxstrive.com
*/
@Configuration
public class BatchConfig {
// 用于创建和配置 Job 对象的工厂类
@Autowired
private JobBuilderFactory jobBuilderFactory;
// 用于创建和配置 Step 对象的工厂类
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
// 创建Job对象
@Bean
public Job flatFileItemWriterDemoJob() {
return jobBuilderFactory.get("flatFileItemWriterDemoJob-" + System.currentTimeMillis())
.start(flatFileItemWriterDemoStep())
.build();
}
// 创建Step对象
@Bean
public Step flatFileItemWriterDemoStep() {
return stepBuilderFactory.get("flatFileItemWriterDemoStep")
.<User, User>chunk(2)
.reader(flatFileItemReader())
.writer(flatFileItemWriter())
.build();
}
// 创建ItemReader对象,从 users.csv 文件读取数据
@Bean
@StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定
public FlatFileItemReader<? extends User> flatFileItemReader() {
FlatFileItemReader<User> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("users.csv"));
reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名
// 解析数据
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("id", "username", "password");
// 把解析出的数据映射到 User 对象中
DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){
@Override
public User mapFieldSet(FieldSet fieldSet) throws BindException {
User user = new User();
user.setId(fieldSet.readInt("id"));
user.setUsername(fieldSet.readString("username"));
user.setPassword(fieldSet.readString("password"));
System.out.println("reading user: " + user);
return user;
}
});
lineMapper.afterPropertiesSet();
reader.setLineMapper(lineMapper);
return reader;
}
// 创建ItemWriter对象,将读取的数据写入到 F:\customer.txt 文件
@Bean
public ItemWriter<? super User> flatFileItemWriter() {
System.out.println("flatFileItemWriter()");
FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
String path = "F:\\customer.txt";
writer.setResource(new FileSystemResource(path));
//把User对象转换成字符串输出到文件
writer.setLineAggregator(new LineAggregator<User>() {
ObjectMapper mapper = new ObjectMapper();
@Override
public String aggregate(User item) {
String str = null;
try {
str = mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return str;
}
});
return writer;
}
}上述代码,重点关注 flatFileItemWriter() 方法,它使用 FlatFileItemWriter 将 User 类型的数据写入到文本文件中,具体如下:
1、创建 FlatFileItemWriter 实例
FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
创建了一个专门处理 User 类型的平面文件写入器。
2、设置输出文件路径
String path = "F:\\customer.txt"; writer.setResource(new FileSystemResource(path));
指定输出文件为本地文件系统中的 F:\\customer.txt,所有数据将写入该文件。
3、配置行聚合器(核心逻辑)
writer.setLineAggregator(new LineAggregator<User>() {
ObjectMapper mapper = new ObjectMapper(); // Jackson库的JSON处理对象
@Override
public String aggregate(User item) {
String str = null;
try {
// 将User对象序列化为JSON字符串
str = mapper.writeValueAsString(item);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return str; // 返回JSON字符串作为文件中的一行
}
});上述代码,自定义了 LineAggregator 接口的实现,用于将 User 对象转换为字符串。这里使用 Jackson 库的 ObjectMapper 将 User 对象序列化为 JSON 格式字符串,每个 User 对象会被转换为一行 JSON 文本写入文件。
这里没有什么新东西,仅仅在 Spring Boot 启动类上多添加 @EnableBatchProcessing 注解,开启批处理功能。代码如下:
package com.hxstrive.spring_batch.flatFileItemWriterDemo;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchDemoApplication.class, args);
}
}运行程序,输出日志如下图:

成功从 csv 文件读取到数据。
继续查看 F:/ 盘,是否多了 customer.txt 文件,如下图:

打开 customer.txt 文件,查看内容,如下图:

更多关于 FlatFileItemWriter 的用法,请参考官方文档。