本章将介绍怎样利用 Spring Batch4 的 MultiResourceItemReader 类从多个文件中读取数据。
MultiResourceItemReader 用于同时处理多个资源文件的核心组件,它将多个资源(如多个 CSV/TXT 文件)视为单个连续的数据流进行读取。特别适合处理分布式文件、分片数据或批量输入文件。
该类核心功能:
多文件顺序处理:顺序读取多个资源文件(如 file1.csv → file2.csv → ... → fileN.csv)
统一接口:将多个文件包装为单一 ItemReader
资源感知:自动传递当前处理的资源给委托读取器
重启恢复:记住最后处理的文件和位置
常用方法:
void setResources(Resource[] resources) 设置要读取的资源数组
void setSkipEmptyResources(boolean skipEmptyResources) 设置是否在遇到空资源时跳过(默认false,遇到空资源会抛出异常)
void setComparator(Comparator<Resource> comparator) 设置资源的排序策略(当资源顺序重要时使用)
void setDelegate(ItemReader<T> delegate) 设置实际处理每个资源的ItemReader(必须配置)
void setSaveState(boolean saveState) 设置是否保存当前读取的资源位置(用于重启任务)
void setName(String name) 设置状态键(用于区分不同的读取器实例)
void setStrict(boolean strict) 设置是否在读取到EOF时自动切换到下一个资源。默认true,通常不需要修改
使用 MultiResourceItemReader 的核心代码如下:
第一步:创建 MultiResourceItemReader
@Bean public MultiResourceItemReader<Customer> multiResourceReader() { MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>(); // 1. 设置文件资源 (支持通配符) Resource[] resources = new PathMatchingResourcePatternResolver() .getResources("file:/data/input/customers-*.csv"); reader.setResources(resources); // 2. 设置委托读取器(需配置为原型) reader.setDelegate(flatFileItemReader()); // 3. 设置资源排序(可选) reader.setComparator(new ResourceComparator()); return reader; }
第二步:创建 FlatFileItemReader,实际上 MultiResourceItemReader 还是委托给 FlatFileItemReader 进行处理。
// 重要:委托读取器必须是原型作用域 @Bean @Scope("prototype") public FlatFileItemReader<Customer> flatFileItemReader() { // 标准FlatFileItemReader配置 FlatFileItemReader<Customer> reader = new FlatFileItemReader<>(); reader.setLineMapper(lineMapper()); // ... return reader; }
分别创建 users-1.csv、users-2.csv 和 users-3.csv 三个文件,如下图:
users-1.csv
1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A"
users-2.csv
2,李四,"5E5994FBCFA922D804DF45295AE98604" 3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E"
users-3.csv
4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"
创建名为 User 的 Java 实体,代码如下:
package com.hxstrive.spring_batch.multiResourceItemReaderDemo.dto; import lombok.Data; import lombok.ToString; /** * 用户DTO * @author hxstrive.com */ @Data @ToString public class User { private int id; private String username; private String password; }
创建名为 BatchConfig 的配置类,代码如下:
package com.hxstrive.spring_batch.multiResourceItemReaderDemo.config; import com.hxstrive.spring_batch.multiResourceItemReaderDemo.dto.User; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.MultiResourceItemReader; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; import org.springframework.validation.BindException; import java.util.Arrays; import java.util.List; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; @Value("classpath:/users-*.csv") private Resource[] fileResources; // 创建Job对象 @Bean public Job multiResourceItemReaderDemoJob() { return jobBuilderFactory.get("multiResourceItemReaderDemoJob" + System.currentTimeMillis()) .start(multiResourcesItemReaderDemoStep()) .build(); } // 创建Step对象 @Bean public Step multiResourcesItemReaderDemoStep() { return stepBuilderFactory.get("multiResourcesItemReaderDemoStep") .<User, User>chunk(1) .reader(multiResourceItemReader()) .writer(new ItemWriter<User>() { @Override public void write(List<? extends User> list) throws Exception { System.out.println(Arrays.toString(list.toArray())); } }) .build(); } // 创建ItemReader对象 @Bean @StepScope public MultiResourceItemReader<User> multiResourceItemReader() { MultiResourceItemReader<User> reader = new MultiResourceItemReader<>(); // flatFileItemReader() 个方法将返回一个具体的 ItemReader 实现(如 FlatFileItemReader),用于读取单个资源的内容 // setDelegate() 方法将具体的读取器设置为 MultiResourceItemReader 的委托读取器。 // MultiResourceItemReader 本身不处理数据,而是将读取工作委托给这个具体的读取器。 reader.setDelegate(flatFileItemReader()); // reader.setResources(new ClassPathResource[]{ // new ClassPathResource("users-1.csv"), // new ClassPathResource("users-2.csv"), // new ClassPathResource("users-3.csv") // }); // 或者通过 @Value 注入资源 reader.setResources(fileResources); return reader; } @Bean @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定 public FlatFileItemReader<? extends User> flatFileItemReader() { FlatFileItemReader<User> reader = new FlatFileItemReader<>(); // reader.setResource(new ClassPathResource("users.csv")); // reader.setLinesToSkip(0); // 跳过文件第一行,因为第一行是字段名 // 解析数据 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames("id", "username", "password"); // 把解析出的数据映射到 User 对象中 DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){ @Override public User mapFieldSet(FieldSet fieldSet) throws BindException { User user = new User(); user.setId(fieldSet.readInt("id")); user.setUsername(fieldSet.readString("username")); user.setPassword(fieldSet.readString("password")); return user; } }); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; } }
上述代码中,关键代码解释如下:
@Value("classpath:/users-*.csv") 的作用是将类路径根目录下所有文件名以users-开头且扩展名为.csv的文件,作为资源数组注入到 Spring 组件里。
运行代码,查看关注日志如下:
上图中,成功输出了 csv 中的所有数据。