Spring Batch4 教程

Spring Batch4 ItemProcessor 完整示例

前面在 Spring Batch4 核心组件 ItemProcessor 介绍 章节详细介绍了 ItemProcessor 组件的作用,下面通过一个完整的例子介绍如何使用它。详细步骤如下:

准备数据

我们依然通过在 resources 目录下面创建的 users.csv 文件提供数据,内容如下:

"id","username","password"
1,张三,"13BC03AC29FAC7B29736EC3BE5C2F55A"
2,李四,"5E5994FBCFA922D804DF45295AE98604"
3,王五,"6C14DA109E294D1E8155BE8AA4B1CE8E"
4,赵六,"03774AD7979A5909E78F9C9DB3A2F0B2"

创建实体

创建一个 Java POJO 类,用来映射从 CSV 读取的每行数据,如下:

package com.hxstrive.spring_batch.itemProcessorDemo.dto;

import lombok.Data;
import lombok.ToString;

/**
 * 用户DTO
 * @author hxstrive.com
 */
@Data
@ToString
public class User {
    private int id;
    private String username;
    private String password;
}

创建 Spring Batch 配置类

💯重点,我们通过 @Configuration 注解创建一个名为 BatchConfig 的配置类。该类用来配置 Job、Step 以及 ItemReader 读取数据,ItemProcessor 处理数据,ItemWriter 写出数据。代码如下:

package com.hxstrive.spring_batch.itemProcessorDemo.config;

import com.hxstrive.spring_batch.itemProcessorDemo.dto.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.validation.BindException;
import java.util.Arrays;
import java.util.List;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 创建Job对象
    @Bean
    public Job itemProcessorDemoJob() {
        return jobBuilderFactory.get("itemProcessorDemoJob-" + System.currentTimeMillis())
                .start(itemProcessorDemoStep())
                .build();
    }

    // 创建Step对象
    @Bean
    public Step itemProcessorDemoStep() {
        return stepBuilderFactory.get("flatFileItemWriterDemoStep")
                .<User, User>chunk(2)
                .reader(flatFileItemReader())
                .processor(itemProcessor())
                .writer(new ItemWriter<User>() {
                    @Override
                    public void write(List<? extends User> items) throws Exception {
                        System.out.println(Arrays.toString(items.toArray()));
                    }
                })
                .build();
    }

    // 看这里,重点
    @Bean
    public ItemProcessor<? super User,? extends User> itemProcessor() {
        return new ItemProcessor<User, User>() {
            @Override
            public User process(User item) throws Exception {
                System.out.println("processing user: " + item);
                item.setPassword(item.getPassword().toLowerCase());
                return item;
            }
        };
    }

    // 创建ItemReader对象
    @Bean
    @StepScope //将 Bean 的生命周期与 Step 执行上下文 绑定
    public FlatFileItemReader<? extends User> flatFileItemReader() {
        FlatFileItemReader<User> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("users.csv"));
        reader.setLinesToSkip(1); // 跳过文件第一行,因为第一行是字段名

        // 解析数据
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("id", "username", "password");
        // 把解析出的数据映射到 User 对象中
        DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(new FieldSetMapper<User>(){
            @Override
            public User mapFieldSet(FieldSet fieldSet) throws BindException {
                User user = new User();
                user.setId(fieldSet.readInt("id"));
                user.setUsername(fieldSet.readString("username"));
                user.setPassword(fieldSet.readString("password"));
                System.out.println("reading user: " + user);
                return user;
            }
        });

        lineMapper.afterPropertiesSet();
        reader.setLineMapper(lineMapper);
        return reader;
    }

}

主要关注 itemProcessor() 方法,该方法主要含义如下:

(1)先看方法定义

public ItemProcessor<? super User, ? extends User> itemProcessor()

该方法定义返回类型为 ItemProcessor 接口,且使用了泛型通配符:

  • ? super User:表示输入类型可以是 User 或其父类

  • ? extends User:表示输出类型可以是 User 或其子类

这种泛型定义让处理器具有更好的兼容性,可处理 User 及其相关类型。

(2)方法实现

该方法通过匿名内部类直接实现 ItemProcessor<User, User> 接口,指定输入和输出类型都是 User。在 process() 方法中,仅将用户密码进行转换,转化成小写操作,读者可以根据自己的业务进行业务处理。

item.setPassword(item.getPassword().toLowerCase());

创建启动类

启动类很简单,仅添加了 @EnableBatchProcessing  注解,用来开启批处理功能。代码如下:

package com.hxstrive.spring_batch.itemProcessorDemo;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {

    public static void main(String[] args) {
       SpringApplication.run(SpringBatchDemoApplication.class, args);
    }

}

到这里,代码写完了,运行应用程序,输出日志如下:

image.png

到这里,简单的 ItemProcessor 就完成了,更多关于 ItemProcessor 的用法请参考官方文档。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号