Spring Batch4 Job 的创建

前面章节介绍了 Spring Batch 的核心概念等知识,下面将展示如何创建一个基础的 Spring Batch Job 作业。

在创建 Job 作业前,我们先在 application.properties 属性配置文件中添加如下配置

spring.application.name=spring_batch_demo

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/spring_batch4?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
spring.datasource.username=root
spring.datasource.password=aaaaaa

spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-@@platform@@.sql
spring.batch.jdbc.platform=mysql
spring.batch.jdbc.initialize-schema=always

上述配置信息中,配置了数据源和 spring batch 配置信息。

  

使用 start()、next() 构建批处理流程

在 Spring Batch 里,start() 和 next() 方法是用来定义 Job 执行流程的关键方法。下面对它们的功能和使用方式进行详细说明:

start() 方法

此方法用于设定 Job 开始执行的首个 Step,每个 Job 都必须通过 start() 方法来开启执行流程。方法定义如下:

public SimpleJobBuilder start(Step step) {
    return (new SimpleJobBuilder(this)).start(step);
}

简单用法如下:

@Bean
public Job myJob(JobBuilderFactory jobs, Step step1) {
    return jobs.get("myJob")
            .start(step1)  // 指定 step1 为 Job 的起始 Step
            .build();
}

注意,每个 Job 有且只能有一个起始 Step,且该方法返回 JobFlowBuilder 对象,可借助它进一步定义后续的执行步骤。

  

next() 方法

该方法用于定义 Step 执行的顺序,即当一个 Step 成功完成后,接下来要执行的 Step 是哪一个。方法定义如下:

public SimpleJobBuilder next(Step step) {
    this.steps.add(step);
    return this;
}

简单用法如下:

@Bean
public Job myJob(JobBuilderFactory jobs, Step step1, Step step2, Step step3) {
    return jobs.get("myJob")
            .start(step1)
            .next(step2)  // 若 step1 成功完成,则执行 step2
            .next(step3)  // 若 step2 成功完成,则执行 step3
            .build();
}

注意,只有在前一个 Step 成功完成(状态为 COMPLETED)时,才会按照 next() 方法的定义执行后续 Step。并且还可以链式调用 next() 方法,从而构建出线性的执行流程。

下面是一个完整的示例,该实例创建了三个 Step,它们将依次执行,代码如下:

package com.hxstrive.spring_batch.demo1.config;

import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 创建一个名为 jobDemoJob 的任务
    @Bean
    public Job jobDemoJob() {
        return jobBuilderFactory.get("jobDemoJob")
                // 配置任务要执行的步骤
                .start(loadDataStep()) // 加载数据
                .next(handleDataStep()) // 处理数据
                .next(saveDataStep()) // 保存数据
                .build();
    }

    // 创建一个名为 loadDataStep 的步骤,模拟加载要处理的数据
    @Bean
    public Step loadDataStep() {
        return stepBuilderFactory.get("loadDataStep").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("加载数据……");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).build();
    }

    // 创建一个名为 handleDataStep 的步骤,模拟处理数据
    @Bean
    public Step handleDataStep() {
        return stepBuilderFactory.get("handleDataStep").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("处理数据……");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).build();
    }

    // 创建一个名为 handleDataStep 的步骤,模拟保存数据到数据库
    @Bean
    public Step saveDataStep() {
        return stepBuilderFactory.get("saveDataStep").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("保存数据……");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).build();
    }

}

执行上面示例,输出信息如下:

image.png

上图中,三个 Step 均成功执行。

    

使用 start()、on()、to()、from() 构建批处理流程

在 Spring Batch 里,除了上述的 start()、next() 方法外,还提供了 on()、to()、from() 这些方法。这些方法都是用于构建作业流程的 DSL(领域特定语言)的重要组成部分。下面为你详细介绍它们的功能和用法。

on() 方法

该方法用于定义步骤执行后的跳转条件,通常和 end()、stopAndRestart() 或者 to() 搭配使用。on() 方法的参数是一个字符串,表示步骤执行后的状态(像 COMPLETED、FAILED 等)。方法定义如下:

public FlowBuilder.TransitionBuilder<FlowJobBuilder> on(String pattern) {
    Assert.state(this.steps.size() > 0, "You have to start a job with a step");
    Iterator var2 = this.steps.iterator();

    while(var2.hasNext()) {
        Step step = (Step)var2.next();
        if (this.builder == null) {
            this.builder = new JobFlowBuilder(new FlowJobBuilder(this), step);
        } else {
            this.builder.next(step);
        }
    }

    return this.builder.on(pattern);
}

简单用法如下:

@Bean
public Job myJob(JobBuilderFactory jobs, Step step1, Step step2, Step step3) {
    return jobs.get("myJob")
            .start(step1)
            .on("COMPLETED").to(step2)  // 若 step1 完成,就转到 step2
            .from(step1).on("FAILED").to(step3)  // 若 step1 失败,就转到 step3
            .end()
            .build();
}

  

to() 方法

to() 方法用于指定当满足特定条件时要跳转的目标步骤。它要和 on() 方法配合起来使用。方法定义如下:

public FlowBuilder<Q> to(Step step) {
    State next = this.parent.createState(step);
    this.parent.addTransition(this.pattern, next);
    this.parent.currentState = next;
    return this.parent;
}

简单用法如下:

@Bean
public Job myJob(JobBuilderFactory jobs, Step step1, Step step2) {
    return jobs.get("myJob")
            .start(step1)
            .on("COMPLETED").to(step2)  // 当 step1 状态为 COMPLETED 时,跳转到 step2
            .from(step1).on("FAILED").end()  // 当 step1 状态为 FAILED 时,结束作业
            .build();
}

 

from() 方法

from() 方法有两个用途:

  • 一是从某个步骤继续构建分支流程;

  • 二是实现状态转换的循环(比如重试逻辑);

方法定义如下:

public FlowBuilder<Q> from(Step step) {
    this.doFrom(step);
    return this;
}

简单用法如下:

@Bean
public Job myJob(JobBuilderFactory jobs, Step step1, Step step2, Step retryStep) {
    return jobs.get("myJob")
            .start(step1)
            .on("FAILED").to(retryStep)  // 若 step1 失败,转到重试步骤
            .from(retryStep).on("FAILED").end()  // 若重试失败,结束作业
            .from(retryStep).on("COMPLETED").to(step2)  // 若重试成功,转到 step2
            .end()
            .build();
}

下面是一个完整的示例,该实例创建了三个 Step,它们将依次执行,代码如下:

package com.hxstrive.spring_batch.demo2.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 创建一个名为 jobDemoJob2 的任务
    @Bean
    public Job jobDemoJob2() {
        return jobBuilderFactory.get("jobDemoJob2")
                // 配置任务要执行的步骤
                .start(loadDataStep()) // 加载数据
                .on("COMPLETED").to(handleDataStep()) // 处理数据
                .from(handleDataStep()).on("COMPLETED").to(saveDataStep()) // 保存数据
                .from(saveDataStep()).end()
                .build();
    }

    // 创建一个名为 loadDataStep 的步骤,模拟加载要处理的数据
    @Bean
    public Step loadDataStep() {
        return stepBuilderFactory.get("loadDataStep").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("加载数据……");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).allowStartIfComplete(true).build();
    }

    // 创建一个名为 handleDataStep 的步骤,模拟处理数据
    @Bean
    public Step handleDataStep() {
        return stepBuilderFactory.get("handleDataStep").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("处理数据……");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).allowStartIfComplete(true).build();
    }

    // 创建一个名为 handleDataStep 的步骤,模拟保存数据到数据库
    @Bean
    public Step saveDataStep() {
        return stepBuilderFactory.get("saveDataStep").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("保存数据……");
                return RepeatStatus.FINISHED; // 返回 FINISHED 表明任务执行结束
            }
        }).allowStartIfComplete(true).build();
    }

}

 执行上面示例,输出信息如下:

image.png

上图中,所有 Step 都执行成功。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号