在 Spring Batch 里,split() 方法是一个非常重要的功能,它能够让多个 Flow 并行执行,split() 方法主要作用是将工作流分解为多个并行的执行路径,以此提升批处理的性能。其关键信息如下:
并行执行:多个 Flow 会同时启动,不过每个 Flow 内部的步骤依旧按顺序执行。
任务协调:所有并行的 Flow 都执行完毕后,主流程才会继续往下执行。
线程管理:需要借助 TaskExecutor 来管理并行任务的线程。
split() 方法的基本用法:
Flow splitFlow = new FlowBuilder<Flow>("splitFlow")
.split(taskExecutor) // 指定任务执行器
.add(flow1, flow2, ...) // 添加要并行执行的 Flow
.build();参数说明:
taskExecutor 该参数用于定义并行任务的执行策略,常见的选择有:
SimpleAsyncTaskExecutor:为每个任务创建一个新线程(适用于开发和测试环境)。
ThreadPoolTaskExecutor:使用线程池来管理线程(推荐在生产环境中使用)。
SyncTaskExecutor:以同步方式执行任务(会使并行失效,主要用于调试)。
下面是一个并行执行两个 Flow 的完整示例:
package com.hxstrive.spring_batch.splitDemo.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
/**
* Spring Batch 配置类
* @author hxstrive.com
*/
@Configuration
public class BatchConfig {
// 用于创建和配置 Job 对象的工厂类
@Autowired
private JobBuilderFactory jobBuilderFactory;
// 用于创建和配置 Step 对象的工厂类
@Autowired
private StepBuilderFactory stepBuilderFactory;
// 创建 Step 对象
@Bean
public Step splitDemoStep1() {
return stepBuilderFactory.get("splitDemoStep1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + " splitDemoStep1");
return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
}
}).build();
}
@Bean
public Step splitDemoStep2() {
return stepBuilderFactory.get("splitDemoStep2").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + " splitDemoStep2");
return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
}
}).build();
}
@Bean
public Step splitDemoStep3() {
return stepBuilderFactory.get("splitDemoStep3").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + " splitDemoStep3");
return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
}
}).build();
}
// 创建 Flow 对象
@Bean
public Flow splitDemoFlow1() {
return new FlowBuilder<Flow>("splitDemoFlow1")
.start(splitDemoStep1())
.next(splitDemoStep2()).build();
}
@Bean
public Flow splitDemoFlow2() {
return new FlowBuilder<Flow>("splitDemoFlow2")
.start(splitDemoStep3()).build();
}
// 创建 Job 任务
@Bean
public Job splitDemoJob() {
return jobBuilderFactory.get("splitDemoJob")
.start(splitDemoFlow1()) // 第一个 Flow
.split(new SimpleAsyncTaskExecutor())
.add(splitDemoFlow2()) // 第二个 Flow
.end()
.build();
}
}上面代码的执行流程如下:
先执行 splitDemoFlow1():这是主流程的起始点,会按顺序执行。
再通过 split() 进行并行处理:
使用 SimpleAsyncTaskExecutor 创建新线程执行后续 Flow。
splitDemoFlow2() 会与后续可能的步骤(代码中未显示)并行执行。
等待所有并行 Flow 完成:主线程会等待 splitDemoFlow2() 执行完毕,然后继续后续步骤(若有)。