Spring Batch4 教程

Spring Batch4 实现数据写入多个目标 CompositeItemWriter

在 Spring Batch 4 中,CompositeItemWriter 是一个核心的多路输出组件,它允许将处理后的数据同时写入多个不同的目标(如数据库、文件、消息队列等)。通过 CompositeItemWriter,Spring Batch 可以轻松实现 "一次处理,多处写入" 的复杂场景,大幅提升批处理作业的灵活性和功能性。

CompositeItemWriter 核心特性:

  • 组合多个写入器:可以包含一个 ItemWriter 列表,当 write() 方法被调用时,会按顺序调用所有包含的写入器的 write() 方法。

  • 事务一致性:所有包含的写入器将在同一个事务上下文中工作,确保数据写入的原子性(要么全部成功,要么全部失败)。

  • 顺序执行:写入器按添加顺序依次执行,前一个写入器完成后才会执行下一个。

CompositeItemWriter 主要方法:

  • setDelegates(List<ItemWriter<? super T>> delegates)  设置要组合的 ItemWriter 列表,源码如下:

private List<ItemWriter<? super T>> delegates;

public void setIgnoreItemStream(boolean ignoreItemStream) {
    this.ignoreItemStream = ignoreItemStream;
}

使用示例代码如下:

CompositeItemWriter<User> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.<ItemWriter<? super User>>asList(flatFileItemWriter(), dbItemWriteDemo()));
  • write(List<? extends T> items)  重写的写入方法,会依次调用所有 delegate 的 write() 方法,源码如下:

// 添加的 ItemWrite 列表
private List<ItemWriter<? super T>> delegates;

@Override
public void write(List<? extends T> item) throws Exception {
    for (ItemWriter<? super T> writer : delegates) {
       writer.write(item);
    }
}

  

简单示例

该示例将从 csv 读取数据,然后使用 CompositeItemWriter 将数据分别写入到数据库和 text 文件。下面是详细步骤:

准备数据

在项目的 resources 目录下面创建名为 users.csv 的文件,内容如下:

"id","username","password"
1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A"
2,李四,"5E5994FBCFA922D804DF45295AE98604"
3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E"
4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"

创建实体

创建一个简单的 POJO 用于映射 csv 读取的数据,如下:

package com.hxstrive.spring_batch.compositeItemWriterDemo.dto;

import lombok.Data;
import lombok.ToString;

/**
 * 用户DTO
 * @author hxstrive.com
 */
@Data
@ToString
public class User {
    private int id;
    private String username;
    private String password;
}

创建配置类

使用 @Configuration 注解创建配置类,然后分别创建 FlatFileItemWriter 和 JdbcBatchItemWriter 对象,最后使用前面两个 ItemWriter 创建一个 CompositeItemWriter 对象。代码如下:

package com.hxstrive.spring_batch.compositeItemWriterDemo.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hxstrive.spring_batch.compositeItemWriterDemo.dto.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.batch.item.file.transform.LineAggregator;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.validation.BindException;
import javax.sql.DataSource;
import java.util.Arrays;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    // 创建Job对象
    @Bean
    public Job compositeItemWriterDemoJob() {
        return jobBuilderFactory.get("compositeItemWriterDemoJob-" + System.currentTimeMillis())
                .start(compositeItemWriterDemoStep())
                .build();
    }

    // 创建Step对象
    @Bean
    public Step compositeItemWriterDemoStep() {
        return stepBuilderFactory.get("compositeItemWriterDemoStep")
                .<User, User>chunk(2)
                .reader(flatFileItemReader())
                .writer(compositeItemWriter())
                .build();
    }

    // 创建ItemReader对象
    @Bean
    @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定
    public FlatFileItemReader<? extends User> flatFileItemReader() {
        FlatFileItemReader<User> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("users.csv"));
        reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名

        // 解析数据
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("id", "username", "password");
        // 把解析出的数据映射到 User 对象中
        DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){
            @Override
            public User mapFieldSet(FieldSet fieldSet) throws BindException {
                User user = new User();
                user.setId(fieldSet.readInt("id"));
                user.setUsername(fieldSet.readString("username"));
                user.setPassword(fieldSet.readString("password"));
                System.out.println("reading user: " + user);
                return user;
            }
        });

        lineMapper.afterPropertiesSet();
        reader.setLineMapper(lineMapper);
        return reader;
    }

    // 创建ItemWriter对象,写入数据到 text 文件
    @Bean
    public ItemWriter<? super User> flatFileItemWriter() {
        System.out.println("flatFileItemWriter()");
        FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
        String path = "F:\\customer.txt";
        writer.setResource(new FileSystemResource(path));

        //把User对象转换成字符串输出到文件
        writer.setLineAggregator(new LineAggregator<User>() {
            ObjectMapper mapper = new ObjectMapper();
            @Override
            public String aggregate(User item) {
                String str = null;
                try {
                    str = mapper.writeValueAsString(item);
                } catch (JsonProcessingException e) {
                    throw new RuntimeException(e);
                }
                return str;
            }
        });

        return writer;
    }

    // 创建ItemWriter对象,写入数据到数据库
    @Bean
    public JdbcBatchItemWriter<User> dbItemWriteDemo() {
        JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql("insert into users_db(id,username,password) values (:id,:username,:password)");
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        return writer;
    }

    @Bean
    public CompositeItemWriter<User> compositeItemWriter() {
        CompositeItemWriter<User> compositeItemWriter = new CompositeItemWriter<>();
        // 设置多个 ItemWriter
        compositeItemWriter.setDelegates(Arrays.<ItemWriter<? super User>>asList(flatFileItemWriter(), dbItemWriteDemo()));
        return compositeItemWriter;
    }

}

创建启动类

这里没有什么新东西,仅仅在 Spring Boot 启动类上多添加 @EnableBatchProcessing 注解,开启批处理功能。代码如下:

package com.hxstrive.spring_batch.flatFileItemWriterDemo;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {

    public static void main(String[] args) {
       SpringApplication.run(SpringBatchDemoApplication.class, args);
    }

}

运行程序,输出日志如下图:

image.png

关于更多 CompositeItemWriter 的用法,请参考官方文档。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号