在 Spring Batch 里,split() 方法是一个非常重要的功能,它能够让多个 Flow 并行执行,split() 方法主要作用是将工作流分解为多个并行的执行路径,以此提升批处理的性能。其关键信息如下:
并行执行:多个 Flow 会同时启动,不过每个 Flow 内部的步骤依旧按顺序执行。
任务协调:所有并行的 Flow 都执行完毕后,主流程才会继续往下执行。
线程管理:需要借助 TaskExecutor 来管理并行任务的线程。
split() 方法的基本用法:
Flow splitFlow = new FlowBuilder<Flow>("splitFlow") .split(taskExecutor) // 指定任务执行器 .add(flow1, flow2, ...) // 添加要并行执行的 Flow .build();
参数说明:
taskExecutor 该参数用于定义并行任务的执行策略,常见的选择有:
SimpleAsyncTaskExecutor:为每个任务创建一个新线程(适用于开发和测试环境)。
ThreadPoolTaskExecutor:使用线程池来管理线程(推荐在生产环境中使用)。
SyncTaskExecutor:以同步方式执行任务(会使并行失效,主要用于调试)。
下面是一个并行执行两个 Flow 的完整示例:
package com.hxstrive.spring_batch.splitDemo.config; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.SimpleAsyncTaskExecutor; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; // 创建 Step 对象 @Bean public Step splitDemoStep1() { return stepBuilderFactory.get("splitDemoStep1").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println(Thread.currentThread().getName() + " splitDemoStep1"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).build(); } @Bean public Step splitDemoStep2() { return stepBuilderFactory.get("splitDemoStep2").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println(Thread.currentThread().getName() + " splitDemoStep2"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).build(); } @Bean public Step splitDemoStep3() { return stepBuilderFactory.get("splitDemoStep3").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println(Thread.currentThread().getName() + " splitDemoStep3"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).build(); } // 创建 Flow 对象 @Bean public Flow splitDemoFlow1() { return new FlowBuilder<Flow>("splitDemoFlow1") .start(splitDemoStep1()) .next(splitDemoStep2()).build(); } @Bean public Flow splitDemoFlow2() { return new FlowBuilder<Flow>("splitDemoFlow2") .start(splitDemoStep3()).build(); } // 创建 Job 任务 @Bean public Job splitDemoJob() { return jobBuilderFactory.get("splitDemoJob") .start(splitDemoFlow1()) // 第一个 Flow .split(new SimpleAsyncTaskExecutor()) .add(splitDemoFlow2()) // 第二个 Flow .end() .build(); } }
上面代码的执行流程如下:
先执行 splitDemoFlow1():这是主流程的起始点,会按顺序执行。
再通过 split() 进行并行处理:
使用 SimpleAsyncTaskExecutor 创建新线程执行后续 Flow。
splitDemoFlow2() 会与后续可能的步骤(代码中未显示)并行执行。
等待所有并行 Flow 完成:主线程会等待 splitDemoFlow2() 执行完毕,然后继续后续步骤(若有)。