Spring Batch4 核心组件 ItermWriter 介绍

在 Spring Batch 4 里,ItemWriter 是核心接口之一,负责将处理后的数据写入目标系统(数据库、文件、消息队列等)。与逐项读取的 ItemReader 不同,ItemWriter 通常以 块(chunk) 为单位批量写入数据。

ItemWriter 接口的定义如下:

package org.springframework.batch.item;

import java.util.List;

public interface ItemWriter<T> {

    /**
     * 对所提供的数据元素进行处理,在正常操作过程中,不会用任何空值(null)的项来调用此操作。
     * 也就是说,正常情况下传入用于处理的数据项不会是 null 值。
     *
     * @param items 要写入的项
     * @throws Exception 如果出现错误,框架将捕获异常,并根据情况进行转换或重新抛出。
     */
    void write(List<? extends T> items) throws Exception;

}

注意:ItemWriter 接口只有一个 write() 方法,该方法接收一个 List 类型的 item 集合,在执行写入操作时,这些 item 会被批量处理。write() 方法 item 的类型要和 ItemProcessor 输出的类型或者 ItemReader返回的类型保持一致。

ItemWriter 主要特性如下:

  • 批处理特性:每次调用 write 方法时,传入的是一个包含多个 item 的列表,而非单个 item,这样能有效提升写入效率。

  • 事务支持:ItemWriter 的操作处于 Spring Batch 的事务管理范畴,若写入过程中出现异常,事务会回滚。

  • 可重试机制:可以和 RetryTemplate 配合使用,对失败的写入操作进行重试。

  • Chunk 处理模式:通常与 ChunkOrientedTasklet 结合,按照块的方式处理数据,处理完一个块就提交一次。

 

内置实现类

Spring Batch4 提供了很多 ItemWriter 的内置实现,如下图:

image.png

image.png

部分实现类说明如下:

  • JdbcBatchItemWriter  通过 JDBC 批量操作将数据写入关系型数据库,支持预编译语句和参数化查询,依赖 Spring JDBC。

  • JpaItemWriter  基于 JPA 实现数据写入,使用 EntityManager 管理持久化操作,适用于 JPA 环境,自动处理事务 flush。

  • RepositoryItemWriter  基于 Spring Data Repository 接口写入数据,支持调用自定义 Repository 方法,简化 NoSQL 或关系型数据库操作。

  • FlatFileItemWriter  将数据写入文本文件,支持自定义格式(如 CSV、固定长度),提供行聚合器和资源管理功能。

  • KafkaItemWriter  将数据发送到 Kafka 消息队列,支持批量发送和自定义消息转换器,依赖 spring-kafka 模块。

  • AmqpItemWriter  将数据发送到 AMQP 消息中间件(如 RabbitMQ),支持自定义消息属性和转换器。

  • CompositeItemWriter  组合多个 ItemWriter 按顺序执行写入操作,适用于需要同时写入多个目标的场景。

  • ClassifierCompositeItemWriter  基于分类器(Classifier)动态选择不同的 ItemWriter 处理数据,支持按条件路由。

  

简单示例

该示例创建一个 ItemWriter 的匿名实现,将 ItemReader 读取的数据简单输出到控制台。

创建 BatchConfig 配置类

使用 @Configuration 注解创建 BatchConfig 配置类:

package com.hxstrive.spring_batch.itemWriterDemo.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.*;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;


    // 创建Job对象
    @Bean
    public Job itemWriterDemoJob() {
        return jobBuilderFactory.get("itemWriterDemoJob-" + System.currentTimeMillis())
                .start(itemWriterDemoStep())
                .build();
    }

    // 创建Step对象
    @Bean
    public Step itemWriterDemoStep() {
        // 模拟数据,为了方便
        List<String> dataList = new ArrayList<>(Arrays.asList(
                "java", "c++", "php", "python", "erlang"));
        final Iterator<String> iterator = dataList.iterator();

        return stepBuilderFactory.get("itemWriterDemoStep")
                .<String,String>chunk(2) //分块大小为2,当读取2个,就传递给 ItemWriter
                .reader(new ItemReader<String>() {
                    @Override
                    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                        if(iterator.hasNext()) {
                            System.out.println("reading data...");
                            return iterator.next();
                        }
                        return null;
                    }
                })
                // 通过匿名内部类实现 ItemWriter,直观感受一下
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        System.out.println("write: " + Arrays.toString(items.toArray()));
                    }
                })
                .build();
    }

}

上述代码中,使用 List<String> 模拟数据,然后通过 ItemReader 逐一读取。Spring Batch4 将读取的数据批量传递给 ItemWriter,这里的 ItemWriter 只是简单的输出到控制台。

  

启动类

在启动类中,使用 @EnableBatchProcessing 注解开启批处理操作,代码如下:

package com.hxstrive.spring_batch.itemWriterDemo;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {

    public static void main(String[] args) {
       SpringApplication.run(SpringBatchDemoApplication.class, args);
    }

}

  

运行&验证

启动项目,观察输出日志:

image.png

通过上图日志可知,读取时是一条条数据读取的,当读取的数据数量为2时,会自动将数据传递给 ItemWriter,输出数据到控制台。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号