Spring Batch4 使用 split() 方法实现并发执行

在 Spring Batch 里,split() 方法是一个非常重要的功能,它能够让多个 Flow 并行执行,split() 方法主要作用是将工作流分解为多个并行的执行路径,以此提升批处理的性能。其关键信息如下:

  • 并行执行:多个 Flow 会同时启动,不过每个 Flow 内部的步骤依旧按顺序执行。

  • 任务协调:所有并行的 Flow 都执行完毕后,主流程才会继续往下执行。

  • 线程管理:需要借助 TaskExecutor 来管理并行任务的线程。

split() 方法的基本用法:

Flow splitFlow = new FlowBuilder<Flow>("splitFlow")
    .split(taskExecutor)  // 指定任务执行器
    .add(flow1, flow2, ...)  // 添加要并行执行的 Flow
    .build();

参数说明:

  • taskExecutor  该参数用于定义并行任务的执行策略,常见的选择有:

    • SimpleAsyncTaskExecutor:为每个任务创建一个新线程(适用于开发和测试环境)。

    • ThreadPoolTaskExecutor:使用线程池来管理线程(推荐在生产环境中使用)。

    • SyncTaskExecutor:以同步方式执行任务(会使并行失效,主要用于调试)。

示例代码

下面是一个并行执行两个 Flow 的完整示例:

package com.hxstrive.spring_batch.splitDemo.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 创建 Step 对象
    @Bean
    public Step splitDemoStep1() {
        return stepBuilderFactory.get("splitDemoStep1").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println(Thread.currentThread().getName() + " splitDemoStep1");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).build();
    }

    @Bean
    public Step splitDemoStep2() {
        return stepBuilderFactory.get("splitDemoStep2").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println(Thread.currentThread().getName() + " splitDemoStep2");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).build();
    }

    @Bean
    public Step splitDemoStep3() {
        return stepBuilderFactory.get("splitDemoStep3").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println(Thread.currentThread().getName() + " splitDemoStep3");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).build();
    }


    // 创建 Flow 对象
    @Bean
    public Flow splitDemoFlow1() {
        return new FlowBuilder<Flow>("splitDemoFlow1")
                .start(splitDemoStep1())
                .next(splitDemoStep2()).build();
    }

    @Bean
    public Flow splitDemoFlow2() {
        return new FlowBuilder<Flow>("splitDemoFlow2")
                .start(splitDemoStep3()).build();
    }

    // 创建 Job 任务
    @Bean
    public Job splitDemoJob() {
        return jobBuilderFactory.get("splitDemoJob")
                .start(splitDemoFlow1()) // 第一个 Flow
                .split(new SimpleAsyncTaskExecutor())
                .add(splitDemoFlow2()) // 第二个 Flow
                .end()
                .build();
    }

}

上面代码的执行流程如下:

  • 先执行 splitDemoFlow1():这是主流程的起始点,会按顺序执行。

  • 再通过 split() 进行并行处理:

    • 使用 SimpleAsyncTaskExecutor 创建新线程执行后续 Flow。

    • splitDemoFlow2() 会与后续可能的步骤(代码中未显示)并行执行。

  • 等待所有并行 Flow 完成:主线程会等待 splitDemoFlow2() 执行完毕,然后继续后续步骤(若有)。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号