前面在 Spring Batch4 核心组件 ItemProcessor 介绍 章节详细介绍了 ItemProcessor 组件的作用,下面通过一个完整的例子介绍如何使用它。详细步骤如下:
我们依然通过在 resources 目录下面创建的 users.csv 文件提供数据,内容如下:
"id","username","password" 1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A" 2,李四,"5E5994FBCFA922D804DF45295AE98604" 3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E" 4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"
创建一个 Java POJO 类,用来映射从 CSV 读取的每行数据,如下:
package com.hxstrive.spring_batch.itemProcessorDemo.dto; import lombok.Data; import lombok.ToString; /** * 用户DTO * @author hxstrive.com */ @Data @ToString public class User { private int id; private String username; private String password; }
💯重点,我们通过 @Configuration 注解创建一个名为 BatchConfig 的配置类。该类用来配置 Job、Step 以及 ItemReader 读取数据,ItemProcessor 处理数据,ItemWriter 写出数据。代码如下:
package com.hxstrive.spring_batch.itemProcessorDemo.config; import com.hxstrive.spring_batch.itemProcessorDemo.dto.User; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.validation.BindException; import java.util.Arrays; import java.util.List; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; // 创建Job对象 @Bean public Job itemProcessorDemoJob() { return jobBuilderFactory.get("itemProcessorDemoJob-" + System.currentTimeMillis()) .start(itemProcessorDemoStep()) .build(); } // 创建Step对象 @Bean public Step itemProcessorDemoStep() { return stepBuilderFactory.get("flatFileItemWriterDemoStep") .<User, User>chunk(2) .reader(flatFileItemReader()) .processor(itemProcessor()) .writer(new ItemWriter<User>() { @Override public void write(List<? extends User> items) throws Exception { System.out.println(Arrays.toString(items.toArray())); } }) .build(); } // 看这里,重点 @Bean public ItemProcessor<? super User,? extends User> itemProcessor() { return new ItemProcessor<User, User>() { @Override public User process(User item) throws Exception { System.out.println("processing user: " + item); item.setPassword(item.getPassword().toLowerCase()); return item; } }; } // 创建ItemReader对象 @Bean @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定 public FlatFileItemReader<? extends User> flatFileItemReader() { FlatFileItemReader<User> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("users.csv")); reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名 // 解析数据 DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames("id", "username", "password"); // 把解析出的数据映射到 User 对象中 DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){ @Override public User mapFieldSet(FieldSet fieldSet) throws BindException { User user = new User(); user.setId(fieldSet.readInt("id")); user.setUsername(fieldSet.readString("username")); user.setPassword(fieldSet.readString("password")); System.out.println("reading user: " + user); return user; } }); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; } }
主要关注 itemProcessor() 方法,该方法主要含义如下:
(1)先看方法定义
public ItemProcessor<? super User, ? extends User> itemProcessor()
该方法定义返回类型为 ItemProcessor 接口,且使用了泛型通配符:
? super User:表示输入类型可以是 User 或其父类
? extends User:表示输出类型可以是 User 或其子类
这种泛型定义让处理器具有更好的兼容性,可处理 User 及其相关类型。
(2)方法实现
该方法通过匿名内部类直接实现 ItemProcessor<User, User> 接口,指定输入和输出类型都是 User。在 process() 方法中,仅将用户密码进行转换,转化成小写操作,读者可以根据自己的业务进行业务处理。
item.setPassword(item.getPassword().toLowerCase());
启动类很简单,仅添加了 @EnableBatchProcessing 注解,用来开启批处理功能。代码如下:
package com.hxstrive.spring_batch.itemProcessorDemo; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableBatchProcessing // 开启批处理 public class SpringBatchDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchDemoApplication.class, args); } }
到这里,代码写完了,运行应用程序,输出日志如下:
到这里,简单的 ItemProcessor 就完成了,更多关于 ItemProcessor 的用法请参考官方文档。