Spring Batch4 Flow 的创建和使用

在 Spring Batch 里,Flow 是用于编排步骤(Step)的组件,能构建出复杂的批处理工作流。Flow 的核心概念如下:

  • Flow:Flow 是由多个 Step 组成的逻辑单元,可定义 Step 之间的执行顺序。

  • Step编排:支持顺序执行、条件分支、并行处理以及流程嵌套。

  • 状态转换:能根据执行结果(像 COMPLETED、FAILED)来决定后续执行的步骤。

  

Step编排

顺序流程

顺序流程(Sequential Process)是指一系列按照固定、线性顺序依次执行的操作或步骤,前一步骤的输出作为后一步骤的输入,且不可跳过或颠倒顺序。如下图:

image.png

下面是顺序流程的简单示例:

@Configuration
public class BatchConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("执行步骤1");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("执行步骤2");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    @Bean
    public Flow simpleFlow() {
        return new FlowBuilder<Flow>("simpleFlow")
                .start(step1()) // 步骤1
                .next(step2()) // 步骤2
                .end();
    }

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .start(simpleFlow()) // 设置 Job 的首个步骤
                .end()
                .build();
    }
}

  

条件分支流程

条件分支流程(Conditional Branching Process)是指根据特定条件判断决定执行不同路径的流程,其核心是"如果...则...否则..."的逻辑结构。如下图:

image.png

简单示例如下:

@Bean
public Flow conditionalFlow() {
    return new FlowBuilder<Flow>("conditionalFlow")
            .start(step1())
            .on("COMPLETED").to(step2())  // 步骤1完成后执行步骤2
            .from(step1()).on("FAILED").to(step3())  // 步骤1失败后执行步骤3
            .from(step2()).on("*").end()  // 步骤2结束后结束流程
            .from(step3()).on("*").end()  // 步骤3结束后结束流程
            .build();
}

 

并行流程

并行流程(Parallel Process)是指多个任务或操作同时执行,彼此独立且无需等待其他任务完成的流程模式。其核心是"同时做多件事",通过资源拆分提升效率。

简单示例如下:

@Bean
public Flow parallelFlow() {
    // 定义两个 Flow
    Flow flow1 = new FlowBuilder<Flow>("flow1").start(step1()).build();
    Flow flow2 = new FlowBuilder<Flow>("flow2").start(step2()).build();

    return new FlowBuilder<Flow>("parallelFlow")
            .split(new SimpleAsyncTaskExecutor())  // 使用异步任务执行器
            .add(flow1, flow2)
            .build();
}

 

嵌套流程

可以将一个 Flow 嵌套到另一个 Flow 中,如下图:

image.png

示例代码如下:

// 创建 Flow 对象
@Bean
public Flow simpleFlow() {
    return new FlowBuilder<Flow>("simpleFlow")
        .start(flowDemoStep1())
        .next(flowDemoStep2()).build();
}

// 父 Flow
@Bean
public Flow parentFlow() {
    return new FlowBuilder<Flow>("parentFlow")
            .start(simpleFlow())  // 嵌套子流程
            .next(step3())
            .end();
}

  

决策状态

使用 JobExecutionDecider 能够实现复杂的决策逻辑,示例如下:

public class MyDecider implements JobExecutionDecider {
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        // 根据条件返回不同的状态
        return new FlowExecutionStatus("CONDITION_MET");
    }
}

@Bean
public Flow decisionFlow() {
    return new FlowBuilder<Flow>("decisionFlow")
            .start(step1())
            .next(myDecider())  // 使用决策器
            .on("CONDITION_MET").to(step2())
            .from(myDecider()).on("CONDITION_NOT_MET").to(step3())
            .end();
}

 

完整示例

下面通过一个完整示例介绍如何使用 Flow,代码如下:

package com.hxstrive.spring_batch.flowDemo.config;

import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 创建 Step 对象
    @Bean
    public Step flowDemoStep1() {
        return stepBuilderFactory.get("flowDemoStep1").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("flowDemoStep1");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).build();
    }

    @Bean
    public Step flowDemoStep2() {
        return stepBuilderFactory.get("flowDemoStep2").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("flowDemoStep2");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).build();
    }

    @Bean
    public Step flowDemoStep3() {
        return stepBuilderFactory.get("flowDemoStep3").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("flowDemoStep3");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).build();
    }


    // 创建 Flow 对象
    @Bean
    public Flow flowDemoFlow() {
        return new FlowBuilder<Flow>("flowDemoFlow")
                .start(flowDemoStep1())
                .next(flowDemoStep2()).build();
    }

    // 创建 Job 任务
    @Bean
    public Job flowDemoJob() {
        return jobBuilderFactory.get("flowDemoJob")
                .start(flowDemoFlow()) // 创建 Flow 对象,将 Flow 作为 Job 的首个 Step
                .next(flowDemoStep3())
                .end()
                .build();
    }

}

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号