在 Spring Batch 里,Flow 是用于编排步骤(Step)的组件,能构建出复杂的批处理工作流。Flow 的核心概念如下:
Flow:Flow 是由多个 Step 组成的逻辑单元,可定义 Step 之间的执行顺序。
Step编排:支持顺序执行、条件分支、并行处理以及流程嵌套。
状态转换:能根据执行结果(像 COMPLETED、FAILED)来决定后续执行的步骤。
顺序流程(Sequential Process)是指一系列按照固定、线性顺序依次执行的操作或步骤,前一步骤的输出作为后一步骤的输入,且不可跳过或颠倒顺序。如下图:

下面是顺序流程的简单示例:
@Configuration
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
System.out.println("执行步骤1");
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
System.out.println("执行步骤2");
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public Flow simpleFlow() {
return new FlowBuilder<Flow>("simpleFlow")
.start(step1()) // 步骤1
.next(step2()) // 步骤2
.end();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(simpleFlow()) // 设置 Job 的首个步骤
.end()
.build();
}
}
条件分支流程(Conditional Branching Process)是指根据特定条件判断决定执行不同路径的流程,其核心是"如果...则...否则..."的逻辑结构。如下图:

简单示例如下:
@Bean
public Flow conditionalFlow() {
return new FlowBuilder<Flow>("conditionalFlow")
.start(step1())
.on("COMPLETED").to(step2()) // 步骤1完成后执行步骤2
.from(step1()).on("FAILED").to(step3()) // 步骤1失败后执行步骤3
.from(step2()).on("*").end() // 步骤2结束后结束流程
.from(step3()).on("*").end() // 步骤3结束后结束流程
.build();
}
并行流程(Parallel Process)是指多个任务或操作同时执行,彼此独立且无需等待其他任务完成的流程模式。其核心是"同时做多件事",通过资源拆分提升效率。
简单示例如下:
@Bean
public Flow parallelFlow() {
// 定义两个 Flow
Flow flow1 = new FlowBuilder<Flow>("flow1").start(step1()).build();
Flow flow2 = new FlowBuilder<Flow>("flow2").start(step2()).build();
return new FlowBuilder<Flow>("parallelFlow")
.split(new SimpleAsyncTaskExecutor()) // 使用异步任务执行器
.add(flow1, flow2)
.build();
}
可以将一个 Flow 嵌套到另一个 Flow 中,如下图:

示例代码如下:
// 创建 Flow 对象
@Bean
public Flow simpleFlow() {
return new FlowBuilder<Flow>("simpleFlow")
.start(flowDemoStep1())
.next(flowDemoStep2()).build();
}
// 父 Flow
@Bean
public Flow parentFlow() {
return new FlowBuilder<Flow>("parentFlow")
.start(simpleFlow()) // 嵌套子流程
.next(step3())
.end();
}
使用 JobExecutionDecider 能够实现复杂的决策逻辑,示例如下:
public class MyDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
// 根据条件返回不同的状态
return new FlowExecutionStatus("CONDITION_MET");
}
}
@Bean
public Flow decisionFlow() {
return new FlowBuilder<Flow>("decisionFlow")
.start(step1())
.next(myDecider()) // 使用决策器
.on("CONDITION_MET").to(step2())
.from(myDecider()).on("CONDITION_NOT_MET").to(step3())
.end();
}
下面通过一个完整示例介绍如何使用 Flow,代码如下:
package com.hxstrive.spring_batch.flowDemo.config;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Spring Batch 配置类
* @author hxstrive.com
*/
@Configuration
public class BatchConfig {
// 用于创建和配置 Job 对象的工厂类
@Autowired
private JobBuilderFactory jobBuilderFactory;
// 用于创建和配置 Step 对象的工厂类
@Autowired
private StepBuilderFactory stepBuilderFactory;
// 创建 Step 对象
@Bean
public Step flowDemoStep1() {
return stepBuilderFactory.get("flowDemoStep1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("flowDemoStep1");
return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
}
}).build();
}
@Bean
public Step flowDemoStep2() {
return stepBuilderFactory.get("flowDemoStep2").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("flowDemoStep2");
return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
}
}).build();
}
@Bean
public Step flowDemoStep3() {
return stepBuilderFactory.get("flowDemoStep3").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("flowDemoStep3");
return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
}
}).build();
}
// 创建 Flow 对象
@Bean
public Flow flowDemoFlow() {
return new FlowBuilder<Flow>("flowDemoFlow")
.start(flowDemoStep1())
.next(flowDemoStep2()).build();
}
// 创建 Job 任务
@Bean
public Job flowDemoJob() {
return jobBuilderFactory.get("flowDemoJob")
.start(flowDemoFlow()) // 创建 Flow 对象,将 Flow 作为 Job 的首个 Step
.next(flowDemoStep3())
.end()
.build();
}
}