在 Spring Batch 中,JobExecutionDecider 接口用于实现复杂的决策逻辑,允许作业根据运行时条件动态选择执行路径。通过自定义决策逻辑,可实现灵活的动态路由。合理使用决策器能使批处理作业更好地应对业务变化,提升系统的可维护性和扩展性。以下是对该接口的详细介绍:
JobExecutionDecider 接口定义了单个方法:
public interface JobExecutionDecider {
FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution);
}参数说明:
jobExecution:当前作业的执行上下文,包含作业参数、状态等信息。
stepExecution:触发决策的步骤执行上下文(可能为 null,取决于调用方式)。
返回值:
FlowExecutionStatus:决策结果,可自定义状态码(如 "CONTINUE"、"FAILED")。
工作原理:作为流程中的一个节点,决策器可嵌入 Flow 中,替代普通 Step。然后,根据决策结果(如数据校验、业务规则),引导作业 Job 执行不同路径。
以下是一个根据作业参数决定执行路径的决策器:
(1)实现 JobExecutionDecider 接口,自定义决策器,如下:
package com.hxstrive.spring_batch.deciderDemo;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import java.util.concurrent.atomic.AtomicLong;
/**
* 自定义决策器
* @author hxstrive.com
*/
public class MyDecider implements JobExecutionDecider {
private final static AtomicLong count = new AtomicLong(0);
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
long val = count.incrementAndGet();
if(val % 2 == 0){
return new FlowExecutionStatus("even");
} else {
return new FlowExecutionStatus("odd");
}
}
}(2)创建 BatchConfig 配置类,创建一个名为 deciderDemoJob 的 Job,然后通过自定义的 MyDecider 决策器动态决定执行 哪个 Step,代码如下:
package com.hxstrive.spring_batch.deciderDemo.config;
import com.hxstrive.spring_batch.deciderDemo.MyDecider;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
/**
* Spring Batch 配置类
* @author hxstrive.com
*/
@Configuration
public class BatchConfig {
// 用于创建和配置 Job 对象的工厂类
@Autowired
private JobBuilderFactory jobBuilderFactory;
// 用于创建和配置 Step 对象的工厂类
@Autowired
private StepBuilderFactory stepBuilderFactory;
// 创建决策器
@Bean
public JobExecutionDecider myDecider() {
return new MyDecider();
}
// 创建 Step 对象
@Bean
public Step deciderDemoStep1() {
return stepBuilderFactory.get("deciderDemoStep1").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + " deciderDemoStep1");
return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
}
}).build();
}
@Bean
public Step deciderDemoStep2() {
return stepBuilderFactory.get("deciderDemoStep2").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + " deciderDemoStep2");
return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
}
}).build();
}
@Bean
public Step deciderDemoStep3() {
return stepBuilderFactory.get("deciderDemoStep3").tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + " deciderDemoStep3");
return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
}
}).build();
}
// 创建 Job 任务
@Bean
public Job deciderDemoJob() {
return jobBuilderFactory.get("deciderDemoJob")
.start(deciderDemoStep1())
.next(myDecider())
.from(myDecider()).on("even").to(deciderDemoStep2()) // 决策器返回 event,执行 deciderDemoStep2()
.from(myDecider()).on("odd").to(deciderDemoStep3()) // 决策器返回 odd,执行 deciderDemoStep3()
.from(deciderDemoStep3()).on("*").to(myDecider())
.end()
.build();
}
}上述代码执行流程如下:
初始步骤:执行 deciderDemoStep1(),完成后进入决策器 myDecider()。
决策器逻辑:myDecider() 根据业务逻辑返回 "even" 或 "odd"。
分支处理:
若返回 "even":执行 deciderDemoStep2(),然后作业结束。
若返回 "odd":执行 deciderDemoStep3(),然后进入下一步。
循环逻辑:deciderDemoStep3() 执行后,通过 .on("*") 捕获所有状态,强制回到 myDecider()。
注意:这会形成一个循环:myDecider() → deciderDemoStep3() → myDecider(),直到返回 "even" 跳出循环。