Spring Batch4 从多个文件中读取数据

本章将介绍怎样利用 Spring Batch4 的 MultiResourceItemReader 类从多个文件中读取数据。

MultiResourceItemReader 用于同时处理多个资源文件的核心组件,它将多个资源(如多个 CSV/TXT 文件)视为单个连续的数据流进行读取。特别适合处理分布式文件、分片数据或批量输入文件。

该类核心功能:

  • 多文件顺序处理:顺序读取多个资源文件(如 file1.csv → file2.csv → ... → fileN.csv)

  • 统一接口:将多个文件包装为单一 ItemReader

  • 资源感知:自动传递当前处理的资源给委托读取器

  • 重启恢复:记住最后处理的文件和位置

常用方法:

  • void setResources(Resource[] resources)   设置要读取的资源数组

  • void setSkipEmptyResources(boolean skipEmptyResources)   设置是否在遇到空资源时跳过(默认false,遇到空资源会抛出异常)

  • void setComparator(Comparator<Resource> comparator)   设置资源的排序策略(当资源顺序重要时使用)

  • void setDelegate(ItemReader<T> delegate)   设置实际处理每个资源的ItemReader(必须配置)

  • void setSaveState(boolean saveState)   设置是否保存当前读取的资源位置(用于重启任务)

  • void setName(String name)   设置状态键(用于区分不同的读取器实例)

  • void setStrict(boolean strict)  设置是否在读取到EOF时自动切换到下一个资源。默认true,通常不需要修改

使用 MultiResourceItemReader  的核心代码如下:

第一步:创建 MultiResourceItemReader

@Bean
public MultiResourceItemReader<Customer> multiResourceReader() {
    MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();
    
    // 1. 设置文件资源 (支持通配符)
    Resource[] resources = new PathMatchingResourcePatternResolver()
        .getResources("file:/data/input/customers-*.csv");
    reader.setResources(resources);
    
    // 2. 设置委托读取器(需配置为原型)
    reader.setDelegate(flatFileItemReader());
    
    // 3. 设置资源排序(可选)
    reader.setComparator(new ResourceComparator());
    
    return reader;
}

第二步:创建 FlatFileItemReader,实际上 MultiResourceItemReader 还是委托给 FlatFileItemReader 进行处理。

// 重要:委托读取器必须是原型作用域
@Bean
@Scope("prototype")
public FlatFileItemReader<Customer> flatFileItemReader() {
    // 标准FlatFileItemReader配置
    FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
    reader.setLineMapper(lineMapper());
    // ...
    return reader;
}

  

简单示例

准备工作

分别创建 users-1.csv、users-2.csv 和 users-3.csv 三个文件,如下图:

image.png

users-1.csv

1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A"

users-2.csv

2,李四,"5E5994FBCFA922D804DF45295AE98604"
3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E"

users-3.csv

4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"

  

创建实体

创建名为 User 的 Java 实体,代码如下:

package com.hxstrive.spring_batch.multiResourceItemReaderDemo.dto;

import lombok.Data;
import lombok.ToString;

/**
 * 用户DTO
 * @author hxstrive.com
 */
@Data
@ToString
public class User {
    private int id;
    private String username;
    private String password;
}

  

创建 BatchConfig 配置

创建名为 BatchConfig 的配置类,代码如下:

package com.hxstrive.spring_batch.multiResourceItemReaderDemo.config;

import com.hxstrive.spring_batch.multiResourceItemReaderDemo.dto.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.validation.BindException;
import java.util.Arrays;
import java.util.List;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Value("classpath:/users-*.csv")
    private Resource[] fileResources;

    // 创建Job对象
    @Bean
    public Job multiResourceItemReaderDemoJob() {
        return jobBuilderFactory.get("multiResourceItemReaderDemoJob" + System.currentTimeMillis())
                .start(multiResourcesItemReaderDemoStep())
                .build();
    }

    // 创建Step对象
    @Bean

    public Step multiResourcesItemReaderDemoStep() {
        return stepBuilderFactory.get("multiResourcesItemReaderDemoStep")
                .<User, User>chunk(1)
                .reader(multiResourceItemReader())
                .writer(new ItemWriter<User>() {
                    @Override
                    public void write(List<? extends User> list) throws Exception {
                        System.out.println(Arrays.toString(list.toArray()));
                    }
                })
                .build();
    }

    // 创建ItemReader对象
    @Bean
    @StepScope
    public MultiResourceItemReader<User> multiResourceItemReader() {
        MultiResourceItemReader<User> reader = new MultiResourceItemReader<>();

        // flatFileItemReader() 个方法将返回一个具体的 ItemReader 实现(如 FlatFileItemReader),用于读取单个资源的内容
        // setDelegate() 方法将具体的读取器设置为 MultiResourceItemReader 的委托读取器。
        // MultiResourceItemReader 本身不处理数据,而是将读取工作委托给这个具体的读取器。
        reader.setDelegate(flatFileItemReader());

//        reader.setResources(new ClassPathResource[]{
//                new ClassPathResource("users-1.csv"),
//                new ClassPathResource("users-2.csv"),
//                new ClassPathResource("users-3.csv")
//        });

        // 或者通过 @Value 注入资源
        reader.setResources(fileResources);

        return reader;
    }


    @Bean
    @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定
    public FlatFileItemReader<? extends User> flatFileItemReader() {
        FlatFileItemReader<User> reader = new FlatFileItemReader<>();
//        reader.setResource(new ClassPathResource("users.csv"));
//        reader.setLinesToSkip(0); // 跳过文件第一行,因为第一行是字段名

        // 解析数据
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("id", "username", "password");
        // 把解析出的数据映射到 User 对象中
        DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){
            @Override
            public User mapFieldSet(FieldSet fieldSet) throws BindException {
                User user = new User();
                user.setId(fieldSet.readInt("id"));
                user.setUsername(fieldSet.readString("username"));
                user.setPassword(fieldSet.readString("password"));
                return user;
            }
        });

        lineMapper.afterPropertiesSet();
        reader.setLineMapper(lineMapper);
        return reader;
    }

}

上述代码中,关键代码解释如下:

  • @Value("classpath:/users-*.csv") 的作用是将类路径根目录下所有文件名以users-开头且扩展名为.csv的文件,作为资源数组注入到 Spring 组件里。

  

运行&验证

运行代码,查看关注日志如下:

image.png

上图中,成功输出了 csv 中的所有数据。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号