在 Spring Batch 中,JobExecutionDecider 接口用于实现复杂的决策逻辑,允许作业根据运行时条件动态选择执行路径。通过自定义决策逻辑,可实现灵活的动态路由。合理使用决策器能使批处理作业更好地应对业务变化,提升系统的可维护性和扩展性。以下是对该接口的详细介绍:
JobExecutionDecider 接口定义了单个方法:
public interface JobExecutionDecider { FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution); }
参数说明:
jobExecution:当前作业的执行上下文,包含作业参数、状态等信息。
stepExecution:触发决策的步骤执行上下文(可能为 null,取决于调用方式)。
返回值:
FlowExecutionStatus:决策结果,可自定义状态码(如 "CONTINUE"、"FAILED")。
工作原理:作为流程中的一个节点,决策器可嵌入 Flow 中,替代普通 Step。然后,根据决策结果(如数据校验、业务规则),引导作业 Job 执行不同路径。
以下是一个根据作业参数决定执行路径的决策器:
(1)实现 JobExecutionDecider 接口,自定义决策器,如下:
package com.hxstrive.spring_batch.deciderDemo; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.job.flow.FlowExecutionStatus; import org.springframework.batch.core.job.flow.JobExecutionDecider; import java.util.concurrent.atomic.AtomicLong; /** * 自定义决策器 * @author hxstrive.com */ public class MyDecider implements JobExecutionDecider { private final static AtomicLong count = new AtomicLong(0); @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { long val = count.incrementAndGet(); if(val % 2 == 0){ return new FlowExecutionStatus("even"); } else { return new FlowExecutionStatus("odd"); } } }
(2)创建 BatchConfig 配置类,创建一个名为 deciderDemoJob 的 Job,然后通过自定义的 MyDecider 决策器动态决定执行 哪个 Step,代码如下:
package com.hxstrive.spring_batch.deciderDemo.config; import com.hxstrive.spring_batch.deciderDemo.MyDecider; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.job.flow.JobExecutionDecider; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.SimpleAsyncTaskExecutor; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; // 创建决策器 @Bean public JobExecutionDecider myDecider() { return new MyDecider(); } // 创建 Step 对象 @Bean public Step deciderDemoStep1() { return stepBuilderFactory.get("deciderDemoStep1").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println(Thread.currentThread().getName() + " deciderDemoStep1"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).build(); } @Bean public Step deciderDemoStep2() { return stepBuilderFactory.get("deciderDemoStep2").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println(Thread.currentThread().getName() + " deciderDemoStep2"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).build(); } @Bean public Step deciderDemoStep3() { return stepBuilderFactory.get("deciderDemoStep3").tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println(Thread.currentThread().getName() + " deciderDemoStep3"); return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束 } }).build(); } // 创建 Job 任务 @Bean public Job deciderDemoJob() { return jobBuilderFactory.get("deciderDemoJob") .start(deciderDemoStep1()) .next(myDecider()) .from(myDecider()).on("even").to(deciderDemoStep2()) // 决策器返回 event,执行 deciderDemoStep2() .from(myDecider()).on("odd").to(deciderDemoStep3()) // 决策器返回 odd,执行 deciderDemoStep3() .from(deciderDemoStep3()).on("*").to(myDecider()) .end() .build(); } }
上述代码执行流程如下:
初始步骤:执行 deciderDemoStep1(),完成后进入决策器 myDecider()。
决策器逻辑:myDecider() 根据业务逻辑返回 "even" 或 "odd"。
分支处理:
若返回 "even":执行 deciderDemoStep2(),然后作业结束。
若返回 "odd":执行 deciderDemoStep3(),然后进入下一步。
循环逻辑:deciderDemoStep3() 执行后,通过 .on("*") 捕获所有状态,强制回到 myDecider()。
注意:这会形成一个循环:myDecider() → deciderDemoStep3() → myDecider(),直到返回 "even" 跳出循环。