在 Spring Batch 4 里,ItemWriter 是核心接口之一,负责将处理后的数据写入目标系统(数据库、文件、消息队列等)。与逐项读取的 ItemReader 不同,ItemWriter 通常以 块(chunk) 为单位批量写入数据。
ItemWriter 接口的定义如下:
package org.springframework.batch.item; import java.util.List; public interface ItemWriter<T> { /** * 对所提供的数据元素进行处理,在正常操作过程中,不会用任何空值(null)的项来调用此操作。 * 也就是说,正常情况下传入用于处理的数据项不会是 null 值。 * * @param items 要写入的项 * @throws Exception 如果出现错误,框架将捕获异常,并根据情况进行转换或重新抛出。 */ void write(List<? extends T> items) throws Exception; }
注意:ItemWriter 接口只有一个 write() 方法,该方法接收一个 List 类型的 item 集合,在执行写入操作时,这些 item 会被批量处理。write() 方法 item 的类型要和 ItemProcessor 输出的类型或者 ItemReader返回的类型保持一致。
ItemWriter 主要特性如下:
批处理特性:每次调用 write 方法时,传入的是一个包含多个 item 的列表,而非单个 item,这样能有效提升写入效率。
事务支持:ItemWriter 的操作处于 Spring Batch 的事务管理范畴,若写入过程中出现异常,事务会回滚。
可重试机制:可以和 RetryTemplate 配合使用,对失败的写入操作进行重试。
Chunk 处理模式:通常与 ChunkOrientedTasklet 结合,按照块的方式处理数据,处理完一个块就提交一次。
Spring Batch4 提供了很多 ItemWriter 的内置实现,如下图:
部分实现类说明如下:
JdbcBatchItemWriter 通过 JDBC 批量操作将数据写入关系型数据库,支持预编译语句和参数化查询,依赖 Spring JDBC。
JpaItemWriter 基于 JPA 实现数据写入,使用 EntityManager 管理持久化操作,适用于 JPA 环境,自动处理事务 flush。
RepositoryItemWriter 基于 Spring Data Repository 接口写入数据,支持调用自定义 Repository 方法,简化 NoSQL 或关系型数据库操作。
FlatFileItemWriter 将数据写入文本文件,支持自定义格式(如 CSV、固定长度),提供行聚合器和资源管理功能。
KafkaItemWriter 将数据发送到 Kafka 消息队列,支持批量发送和自定义消息转换器,依赖 spring-kafka 模块。
AmqpItemWriter 将数据发送到 AMQP 消息中间件(如 RabbitMQ),支持自定义消息属性和转换器。
CompositeItemWriter 组合多个 ItemWriter 按顺序执行写入操作,适用于需要同时写入多个目标的场景。
ClassifierCompositeItemWriter 基于分类器(Classifier)动态选择不同的 ItemWriter 处理数据,支持按条件路由。
该示例创建一个 ItemWriter 的匿名实现,将 ItemReader 读取的数据简单输出到控制台。
使用 @Configuration 注解创建 BatchConfig 配置类:
package com.hxstrive.spring_batch.itemWriterDemo.config; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.*; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; // 创建Job对象 @Bean public Job itemWriterDemoJob() { return jobBuilderFactory.get("itemWriterDemoJob-" + System.currentTimeMillis()) .start(itemWriterDemoStep()) .build(); } // 创建Step对象 @Bean public Step itemWriterDemoStep() { // 模拟数据,为了方便 List<String> dataList = new ArrayList<>(Arrays.asList( "java", "c++", "php", "python", "erlang")); final Iterator<String> iterator = dataList.iterator(); return stepBuilderFactory.get("itemWriterDemoStep") .<String,String>chunk(2) //分块大小为2,当读取2个,就传递给 ItemWriter .reader(new ItemReader<String>() { @Override public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { if(iterator.hasNext()) { System.out.println("reading data..."); return iterator.next(); } return null; } }) // 通过匿名内部类实现 ItemWriter,直观感受一下 .writer(new ItemWriter<String>() { @Override public void write(List<? extends String> items) throws Exception { System.out.println("write: " + Arrays.toString(items.toArray())); } }) .build(); } }
上述代码中,使用 List<String> 模拟数据,然后通过 ItemReader 逐一读取。Spring Batch4 将读取的数据批量传递给 ItemWriter,这里的 ItemWriter 只是简单的输出到控制台。
在启动类中,使用 @EnableBatchProcessing 注解开启批处理操作,代码如下:
package com.hxstrive.spring_batch.itemWriterDemo; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableBatchProcessing // 开启批处理 public class SpringBatchDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchDemoApplication.class, args); } }
启动项目,观察输出日志:
通过上图日志可知,读取时是一条条数据读取的,当读取的数据数量为2时,会自动将数据传递给 ItemWriter,输出数据到控制台。