Spring Batch4 自定义决策器

在 Spring Batch 中,JobExecutionDecider 接口用于实现复杂的决策逻辑,允许作业根据运行时条件动态选择执行路径。通过自定义决策逻辑,可实现灵活的动态路由。合理使用决策器能使批处理作业更好地应对业务变化,提升系统的可维护性和扩展性。以下是对该接口的详细介绍:

接口定义&核心方法

JobExecutionDecider 接口定义了单个方法:

public interface JobExecutionDecider {
    FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution);
}

参数说明:

  • jobExecution:当前作业的执行上下文,包含作业参数、状态等信息。

  • stepExecution:触发决策的步骤执行上下文(可能为 null,取决于调用方式)。

返回值:

  • FlowExecutionStatus:决策结果,可自定义状态码(如 "CONTINUE"、"FAILED")。

工作原理:作为流程中的一个节点,决策器可嵌入 Flow 中,替代普通 Step。然后,根据决策结果(如数据校验、业务规则),引导作业 Job 执行不同路径。

自定义决策器示例

以下是一个根据作业参数决定执行路径的决策器:

(1)实现 JobExecutionDecider  接口,自定义决策器,如下:

package com.hxstrive.spring_batch.deciderDemo;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 自定义决策器
 * @author hxstrive.com
 */
public class MyDecider implements JobExecutionDecider {
    private final static AtomicLong count = new AtomicLong(0);

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        long val = count.incrementAndGet();
        if(val % 2 == 0){
            return new FlowExecutionStatus("even");
        } else {
            return new FlowExecutionStatus("odd");
        }
    }

}

(2)创建 BatchConfig 配置类,创建一个名为 deciderDemoJob 的 Job,然后通过自定义的 MyDecider  决策器动态决定执行 哪个 Step,代码如下:

package com.hxstrive.spring_batch.deciderDemo.config;

import com.hxstrive.spring_batch.deciderDemo.MyDecider;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 创建决策器
    @Bean
    public JobExecutionDecider myDecider() {
        return new MyDecider();
    }

    // 创建 Step 对象
    @Bean
    public Step deciderDemoStep1() {
        return stepBuilderFactory.get("deciderDemoStep1").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println(Thread.currentThread().getName() + " deciderDemoStep1");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).build();
    }

    @Bean
    public Step deciderDemoStep2() {
        return stepBuilderFactory.get("deciderDemoStep2").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println(Thread.currentThread().getName() + " deciderDemoStep2");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).build();
    }

    @Bean
    public Step deciderDemoStep3() {
        return stepBuilderFactory.get("deciderDemoStep3").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println(Thread.currentThread().getName() + " deciderDemoStep3");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).build();
    }

    // 创建 Job 任务
    @Bean
    public Job deciderDemoJob() {
        return jobBuilderFactory.get("deciderDemoJob")
                .start(deciderDemoStep1())
                .next(myDecider())
                .from(myDecider()).on("even").to(deciderDemoStep2()) // 决策器返回 event,执行 deciderDemoStep2()
                .from(myDecider()).on("odd").to(deciderDemoStep3()) // 决策器返回 odd,执行 deciderDemoStep3()
                .from(deciderDemoStep3()).on("*").to(myDecider())
                .end()
                .build();
    }

}

上述代码执行流程如下:

  • 初始步骤:执行 deciderDemoStep1(),完成后进入决策器 myDecider()。

  • 决策器逻辑:myDecider() 根据业务逻辑返回 "even" 或 "odd"。

  • 分支处理:

    • 若返回 "even":执行 deciderDemoStep2(),然后作业结束。

    • 若返回 "odd":执行 deciderDemoStep3(),然后进入下一步。

  • 循环逻辑:deciderDemoStep3() 执行后,通过 .on("*") 捕获所有状态,强制回到 myDecider()。

注意:这会形成一个循环:myDecider() → deciderDemoStep3() → myDecider(),直到返回 "even" 跳出循环。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号