在 Spring Batch 里,Flow 是用于编排步骤(Step)的组件,能构建出复杂的批处理工作流。Flow 的核心概念如下:
Flow:Flow 是由多个 Step 组成的逻辑单元,可定义 Step 之间的执行顺序。
Step编排:支持顺序执行、条件分支、并行处理以及流程嵌套。
状态转换:能根据执行结果(像 COMPLETED、FAILED)来决定后续执行的步骤。
顺序流程(Sequential Process)是指一系列按照固定、线性顺序依次执行的操作或步骤,前一步骤的输出作为后一步骤的输入,且不可跳过或颠倒顺序。如下图:
下面是顺序流程的简单示例:
@Configuration public class BatchConfig { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Step step1() { return stepBuilderFactory.get("step1") .tasklet((contribution, chunkContext) -> { System.out.println("执行步骤1"); return RepeatStatus.FINISHED; }).build(); } @Bean public Step step2() { return stepBuilderFactory.get("step2") .tasklet((contribution, chunkContext) -> { System.out.println("执行步骤2"); return RepeatStatus.FINISHED; }).build(); } @Bean public Flow simpleFlow() { return new FlowBuilder<Flow>("simpleFlow") .start(step1()) // 步骤1 .next(step2()) // 步骤2 .end(); } @Bean public Job job() { return jobBuilderFactory.get("job") .start(simpleFlow()) // 设置 Job 的首个步骤 .end() .build(); } }
条件分支流程(Conditional Branching Process)是指根据特定条件判断决定执行不同路径的流程,其核心是"如果...则...否则..."的逻辑结构。如下图:
简单示例如下:
@Bean public Flow conditionalFlow() { return new FlowBuilder<Flow>("conditionalFlow") .start(step1()) .on("COMPLETED").to(step2()) // 步骤1完成后执行步骤2 .from(step1()).on("FAILED").to(step3()) // 步骤1失败后执行步骤3 .from(step2()).on("*").end() // 步骤2结束后结束流程 .from(step3()).on("*").end() // 步骤3结束后结束流程 .build(); }
并行流程(Parallel Process)是指多个任务或操作同时执行,彼此独立且无需等待其他任务完成的流程模式。其核心是"同时做多件事",通过资源拆分提升效率。
简单示例如下:
@Bean public Flow parallelFlow() { // 定义两个 Flow Flow flow1 = new FlowBuilder<Flow>("flow1").start(step1()).build(); Flow flow2 = new FlowBuilder<Flow>("flow2").start(step2()).build(); return new FlowBuilder<Flow>("parallelFlow") .split(new SimpleAsyncTaskExecutor()) // 使用异步任务执行器 .add(flow1, flow2) .build(); }
可以将一个 Flow 嵌套到另一个 Flow 中,如下图:
示例代码如下:
// 创建 Flow 对象 @Bean public Flow simpleFlow() { return new FlowBuilder<Flow>("simpleFlow") .start(flowDemoStep1()) .next(flowDemoStep2()).build(); } // 父 Flow @Bean public Flow parentFlow() { return new FlowBuilder<Flow>("parentFlow") .start(simpleFlow()) // 嵌套子流程 .next(step3()) .end(); }
使用 JobExecutionDecider 能够实现复杂的决策逻辑,示例如下:
public class MyDecider implements JobExecutionDecider { @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { // 根据条件返回不同的状态 return new FlowExecutionStatus("CONDITION_MET"); } } @Bean public Flow decisionFlow() { return new FlowBuilder<Flow>("decisionFlow") .start(step1()) .next(myDecider()) // 使用决策器 .on("CONDITION_MET").to(step2()) .from(myDecider()).on("CONDITION_NOT_MET").to(step3()) .end(); }
下面通过一个完整示例介绍如何使用 Flow,代码如下:
package com.hxstrive.spring_batch.flowDemo.config; import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; // 创建 Step 对象 @Bean public Step flowDemoStep1() { return stepBuilderFactory.get("flowDemoStep1").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("flowDemoStep1"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).build(); } @Bean public Step flowDemoStep2() { return stepBuilderFactory.get("flowDemoStep2").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("flowDemoStep2"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).build(); } @Bean public Step flowDemoStep3() { return stepBuilderFactory.get("flowDemoStep3").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("flowDemoStep3"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).build(); } // 创建 Flow 对象 @Bean public Flow flowDemoFlow() { return new FlowBuilder<Flow>("flowDemoFlow") .start(flowDemoStep1()) .next(flowDemoStep2()).build(); } // 创建 Job 任务 @Bean public Job flowDemoJob() { return jobBuilderFactory.get("flowDemoJob") .start(flowDemoFlow()) // 创建 Flow 对象,将 Flow 作为 Job 的首个 Step .next(flowDemoStep3()) .end() .build(); } }