Spring Batch4 教程

Spring Batch4 JobOperator 的使用

在 Spring Batch4 中,JobOperator 是一个高级接口,提供了对批处理作业(Job)的集中式管理能力,包括启动、停止、重启、查询作业状态等操作。它简化了作业的生命周期管理,尤其适合需要通过编程方式或外部系统控制作业的场景。

JobOperator 接口常用方法如下:

方法功能描述
Long start(String jobName, String parameters)

启动指定名称的作业,参数以键值对字符串形式传入(如 param1=value1;param2=value2)

Long restart(Long executionId)重启指定执行 ID 的作业(针对失败或停止的作业)
void stop(Long executionId)停止正在执行的作业(通过设置执行状态为 STOPPED)
List<Long> getExecutions(Long instanceId)获取指定作业实例的所有执行记录 ID
List<Long> getJobInstances(String jobName, int start, int count)分页查询指定作业名称的实例 ID 列表
JobExecution getJobExecution(Long executionId)获取指定执行 ID 的作业执行详情
JobInstance getJobInstance(Long instanceId)获取指定实例 ID 的作业实例详情
Set<String> getJobNames()获取所有已注册的作业名称
String getParametersAsString(Long executionId)将指定执行的参数转换为字符串

JobOperator 定义如下:

package org.springframework.batch.core.launch;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.UnexpectedJobExecutionException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;

/**
 * Spring Batch 中低级别接口,用于检查和控制作业,仅通过基本类型和集合类型进行访问。
 * 适用于命令行客户端(例如,每个操作启动一个新进程)或远程启动器(如JMX控制台)。
 * 
 * 提供对作业实例(JobInstance)、作业执行(JobExecution)的管理能力,
 * 包括启动、重启、停止作业,查询作业状态等操作,是 Spring Batch 中作业调度的关键接口。
 * 
 * @author Dave Syer
 * @since 2.0
 */
public interface JobOperator {

    /**
     * 获取与特定 JobInstance 关联的所有 JobExecution 的 ID,
     * 按创建时间倒序排列(通常也是执行时间的倒序)。
     * 
     * @param instanceId 作业实例(JobInstance)的ID
     * @return 与该实例关联的所有作业执行(JobExecution)的ID列表
     * @throws NoSuchJobInstanceException 如果指定ID的JobInstance不存在时抛出
     */
    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    /**
     * 获取指定作业名称的所有 JobInstance 的 ID,按创建时间倒序排列。
     * 用于分页查询某个作业的历史实例,例如在监控界面展示作业的执行历史。
     * 
     * @param jobName 作业实例所属的作业名称
     * @param start 起始索引(用于分页)
     * @param count 最大返回数量(用于分页)
     * @return 作业实例(JobInstance)的 ID 列表
     * @throws NoSuchJobException 如果指定名称的作业没有任何实例时抛出
     */
    List<Long> getJobInstances(String jobName, int start, int count) throws NoSuchJobException;

    /**
     * 获取指定作业名称的所有运行中 JobExecution 的 ID。
     * 用于监控当前正在执行的作业,例如在运维界面展示实时运行的作业列表。
     * 
     * @param jobName 要查询的作业名称
     * @return 运行中的 JobExecution 实例的 ID 集合
     * @throws NoSuchJobException 如果没有该作业名称的任何 JobExecution 时抛出
     */
    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    /**
     * 将 JobParameters 转换为易读的字符串格式。
     * 作业参数通常包含复杂信息,此方法用于简化参数的展示(如在日志或界面中显示)。
     * 
     * @param executionId 已存在的 JobExecution 的 ID
     * @return 用于启动该作业实例的作业参数(字符串格式)
     * @throws NoSuchJobExecutionException 如果指定 ID 的 JobExecution 不存在时抛出
     */
    String getParameters(long executionId) throws NoSuchJobExecutionException;

    /**
     * 使用指定的参数启动一个新的作业实例。
     * 根据作业名称和参数创建新的 JobInstance 和 JobExecution,并触发作业执行。
     * 
     * @param jobName 要启动的作业({@link Job})名称
     * @param parameters 启动参数(以逗号或换行分隔的"name=value"键值对)
     * @return 启动的JobExecution的ID
     * @throws NoSuchJobException 如果不存在指定名称的 Job 时抛出
     * @throws JobInstanceAlreadyExistsException 如果相同名称和参数的 JobInstance 已存在时抛出
     * @throws JobParametersInvalidException 如果作业参数无效(如类型不匹配)时抛出
     */
    Long start(String jobName, String parameters) throws NoSuchJobException, JobInstanceAlreadyExistsException, JobParametersInvalidException;

    /**
     * 重启一个失败或已停止的 JobExecution。
     * 如果指定的ID不存在,或对应的 JobInstance 已正常完成,则抛出异常。
     * 
     * 注意:仅能重启处于失败(FAILED)或停止(STOPPED)状态的作业执行。
     * 
     * @param executionId 失败或已停止的JobExecution的ID
     * @return 重启后新的JobExecution的ID
     * @throws JobInstanceAlreadyCompleteException 如果作业实例已成功完成时抛出
     * @throws NoSuchJobExecutionException 如果指定ID的JobExecution不存在时抛出
     * @throws NoSuchJobException 如果JobExecution对应的Job已不存在时抛出
     * @throws JobRestartException 重启过程中出现非特定错误(如重启数据损坏)时抛出
     * @throws JobParametersInvalidException 重启时的参数无效时抛出
     */
    Long restart(long executionId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
            NoSuchJobException, JobRestartException, JobParametersInvalidException;

    /**
     * 启动由作业关联的 JobParametersIncrementer 确定的下一个作业实例序列。
     * 如果前一个实例处于失败状态,只要 JobParametersIncrementer 正常工作,
     * 此方法仍会创建新实例并使用不同的参数运行。
     * 
     * 适用场景:需要周期性执行的作业(如定时任务),通过递增参数确保每次执行都是新实例。
     * 
     * 注意:以下三种异常(JobRestartException、JobExecutionAlreadyRunningException、
     * JobInstanceAlreadyCompleteException)极少出现,通常由并发操作导致。
     * 
     * @param jobName 要启动的作业名称
     * @return 启动的JobExecution的ID
     * @throws NoSuchJobException 不存在该作业定义时抛出
     * @throws JobParametersNotFoundException 无法获取递增参数时抛出
     * @throws JobParametersInvalidException 作业参数无效时抛出
     * @throws UnexpectedJobExecutionException 出现意外情况时抛出
     * @throws JobRestartException 非法重启作业时抛出
     * @throws JobExecutionAlreadyRunningException 尝试重启正在执行的作业时抛出
     * @throws JobInstanceAlreadyCompleteException 尝试重启已完成的作业时抛出
     */
    Long startNextInstance(String jobName) throws NoSuchJobException, JobParametersNotFoundException,
            JobRestartException, JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException, UnexpectedJobExecutionException, JobParametersInvalidException;

    /**
     * 向具有指定 ID 的 JobExecution 发送停止信号。
     * 若此方法返回 true,说明信号发送成功,但不保证作业已实际停止。
     * 需通过轮询作业执行状态确认是否已停止。
     * 
     * 实现原理:通常通过设置 JobExecution 的状态为 STOPPING,并通知作业线程停止。
     * 
     * @param executionId 运行中的 JobExecution 的 ID
     * @return 若信号成功发送则返回true(不保证作业已停止)
     * @throws NoSuchJobExecutionException 不存在指定ID的JobExecution时抛出
     * @throws JobExecutionNotRunningException 作业执行未处于运行状态时抛出
     */
    boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    /**
     * 汇总具有指定 ID 的 JobExecution 的信息,包括状态、开始和结束时间等。
     * 作用:快速获取作业执行的关键信息,用于日志输出或界面展示。
     * 
     * @param executionId 已存在的JobExecution的ID
     * @return 汇总作业执行状态的字符串
     * @throws NoSuchJobExecutionException 不存在指定ID的JobExecution时抛出
     */
    String getSummary(long executionId) throws NoSuchJobExecutionException;

    /**
     * 汇总属于指定 JobExecution 的所有 StepExecution 实例的信息,
     * 包括状态、开始和结束时间等。
     * 作用:查看作业执行包含的步骤执行详情,用于问题排查或进度监控。
     * 
     * @param executionId 已存在的JobExecution的ID
     * @return 步骤执行ID到其状态汇总字符串的映射
     * @throws NoSuchJobExecutionException 不存在指定ID的JobExecution时抛出
     */
    Map<Long, String> getStepExecutionSummaries(long executionId) throws NoSuchJobExecutionException;

    /**
     * 列出可通过 start(String, String) 方法启动的所有作业名称。
     * 作用:获取系统中已注册的所有作业,用于展示可执行的作业列表。
     * 
     * @return 作业名称的集合
     */
    Set<String> getJobNames();

    /**
     * 将 JobExecution 标记为 ABANDONED(废弃)。
     * 如果停止信号因进程崩溃而被忽略,此方法是标记作业完成的最佳方式
     * (与 STOPPED 状态区分)。被标记为废弃的作业执行不能被框架重启。
     * 
     * 适用场景:作业进程意外终止后,手动标记其状态为废弃,避免残留未处理的执行记录。
     * 
     * @param jobExecutionId 要终止的作业执行ID
     * @return 被终止的JobExecution实例
     * @throws NoSuchJobExecutionException 不存在指定ID的作业执行时抛出
     * @throws JobExecutionAlreadyRunningException 如果作业正在运行(应先停止)时抛出
     */
    JobExecution abandon(long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException;
}

  

JobOperator 的实现类

Spring Batch4 为 JobOperator 接口提供了唯一一个实现类 SimpleJobOperator,如下图:

Spring Batch4 JobOperator 的使用

SimpleJobOperator 类是 JobOperator 接口的默认实现类,它提供了一套完整的 API 用于管理和操作批处理任务(Job),包括启动、停止、重启、查询任务状态等核心功能。我们看看它成员变量的定义,如下图:

image.png

从上图可知,JobOperator 在 afterPropertiesSet() 方法中对 jobRegistry、jobExplorer、jobLauncher 和 jobRepository 进行了非 null 校验,这是必传项。它们的作用分别如下:

  • JobLauncher:用于实际启动 Job(支持同步或异步执行)。

  • JobRepository:用于访问 Job 元数据(如执行状态、参数等),是 SimpleJobOperator 的数据来源。

  • JobRegistry:用于管理已注册的 Job 定义,通过 Job 名称查找对应的 Job 实例。

  • JobExplorer:用于查询 Job 执行历史和元数据(通常与 JobRepository 关联)。

我们可以通过四个 set 方法进行设置,如下图:

image.png

然后,其他实现 JobOperator 的方法均是通过上述的四个对象进行完成的。

  

配置 JobOperator

在使用 JobOperator 之前,需要在 Spring 配置类中注册 SimpleJobOperator 并注入依赖组件,如下:

@Configuration
public class BatchConfig implements StepExecutionListener, ApplicationContextAware {
    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobLauncher jobLauncher; // Spring 已帮我们创建好

    @Autowired
    private JobExplorer jobExplorer; // Spring 已帮我们创建好

    @Autowired
    private JobRepository jobRepository; // Spring 已帮我们创建好

    @Autowired
    private JobRegistry jobRegistry; // Spring 已帮我们创建好

    // JOB参数
    private Map<String, JobParameter> jobParameters;
    // Spring 应用上下文对象
    private ApplicationContext applicationContext;

    // 配置 JobOperator
    @Bean
    public JobOperator jobOperator() {
        SimpleJobOperator operator = new SimpleJobOperator();
        operator.setJobParametersConverter(new DefaultJobParametersConverter());
        operator.setJobRepository(jobRepository);
        operator.setJobLauncher(jobLauncher);
        operator.setJobExplorer(jobExplorer);
        operator.setJobRegistry(jobRegistry);
        return operator;
    }

    // JobRegistryBeanPostProcessor 是一个特殊的 Bean 后置处理器
    // 主要作用是自动将 Spring 容器中定义的 Job 实例注册到 JobRegistry 中,实现 Job 定义与注册的自动化关联
    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() throws Exception {
        JobRegistryBeanPostProcessor processor = new JobRegistryBeanPostProcessor();
        processor.setJobRegistry(jobRegistry);
        processor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
        processor.afterPropertiesSet();
        return processor;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
   
    //...省略 Job、ItemReader、ItemProcessor 等...
}

  

简单示例

假设我们已经创建了一个名为 jobOperatorDemoJob 的 Job 任务,完整的配置如下:

package com.hxstrive.spring_batch.jobOperatorDemo.config;

import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.*;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.*;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig implements StepExecutionListener, ApplicationContextAware {
    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobExplorer jobExplorer;

    @Autowired
    private JobRepository jobRepository;

    @Autowired
    private JobRegistry jobRegistry;

    // JOB参数
    private Map<String, JobParameter> jobParameters;
    // Spring 应用上下文对象
    private ApplicationContext applicationContext;

    // 配置 JobOperator
    @Bean
    public JobOperator jobOperator() {
        SimpleJobOperator operator = new SimpleJobOperator();
        operator.setJobParametersConverter(new DefaultJobParametersConverter());
        operator.setJobRepository(jobRepository);
        operator.setJobLauncher(jobLauncher);
        operator.setJobExplorer(jobExplorer);
        operator.setJobRegistry(jobRegistry);
        return operator;
    }

    // JobRegistryBeanPostProcessor 是一个特殊的 Bean 后置处理器
    // 主要作用是自动将 Spring 容器中定义的 Job 实例注册到 JobRegistry 中,实现 Job 定义与注册的自动化关联
    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() throws Exception {
        JobRegistryBeanPostProcessor processor = new JobRegistryBeanPostProcessor();
        processor.setJobRegistry(jobRegistry);
        processor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
        processor.afterPropertiesSet();
        return processor;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("->> beforeStep(StepExecution stepExecution)");
        jobParameters = stepExecution.getJobParameters().getParameters();
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("->> afterStep(StepExecution stepExecution)");
        return null;
    }

    // 创建Job对象
    @Bean
    public Job jobOperatorDemoJob() {
        return jobBuilderFactory.get("jobOperatorDemoJob")
                .start(jobOperatorDemoStep())
                .listener(new JobExecutionListener() {
                    @Override
                    public void beforeJob(JobExecution jobExecution) {
                        System.out.println("->> JobExecutionListener :: beforeJob() 任务开始...");
                    }

                    @Override
                    public void afterJob(JobExecution jobExecution) {
                        if(BatchStatus.COMPLETED.equals(jobExecution.getStatus())) {
                            System.out.println("->> JobExecutionListener :: afterJob() 任务完成 - 成功");
                        } else {
                            System.err.println("->> JobExecutionListener :: afterJob() 任务完成 - 执行异常");
                        }

                    }
                })
                .build();
    }

    // 创建Step对象
    @Bean
    public Step jobOperatorDemoStep() {
        final List<String> dataList = Arrays.asList(
                "{\"id\":1,\"username\":\"张三\",\"password\":\"13BC03AC29FAC7B29736EC3BE5C2F55A\"}",
                "{\"id\":2,\"username\":\"李四\",\"password\":\"5E5994FBCFA922D804DF45295AE98604\"}",
                "{\"id\":3,\"username\":\"王五\",\"password\":\"6C14DA109E294D1E8155BE8AA4B1CE8E\"}",
                "{\"id\":4,\"username\":\"赵六\",\"password\":\"03774AD7979A5909E78F9C9DB3A2F0B2\"}"
        );
        final Iterator<String> iterator = dataList.iterator();

        return stepBuilderFactory.get("jobOperatorDemoStep")
                .<String, String>chunk(2)
                .reader(new ItemReader<String>() {
                    @Override
                    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                        if(iterator.hasNext()) {
                            String value = iterator.next();
                            System.out.println("    ->> read() "  + value);
                            return value;
                        } else {
                            return null;
                        }
                    }
                })
                .processor(new ItemProcessor<String, String>() {
                    @Override
                    public String process(String item) throws Exception {
                        String message = "";
                        if(Objects.nonNull(jobParameters.get("message"))) {
                            message = String.valueOf(jobParameters.get("message").getValue());
                        }

                        Thread.sleep(1000); // 模拟耗时操作

                        System.out.println("        ->> process() message=" + message + ", item=" + item);
                        return item.toUpperCase();
                    }
                })
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        System.out.println("            ->> write() " + Arrays.toString(items.toArray()));
                    }
                })
                .listener(this)
                .build();
    }

}

启动同步 Job

使用 start() 方法启动 Job,且传递两个参数(message 和 timestamp),代码如下:

@RestController
@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {

    @Autowired
    private JobOperator jobOperator;

    public static void main(String[] args) {
       SpringApplication.run(SpringBatchDemoApplication.class, args);
    }

    @GetMapping("/runJob")
    public void runJob(@RequestParam String message) throws Exception {
       // 执行任务
       Long jobExecutionId = jobOperator.start("jobOperatorDemoJob",
             "message=" + message + ",timestamp=" + System.currentTimeMillis());
       System.out.println("->> jobExecutionId=" + jobExecutionId);
    }

}

启动项目,使用浏览器运行 http://localhost:8080/runJob?message=hello 地址,输出日志如下:

image.png

通过上面日志可知,启动的任务是一个同步任务,因为“->> jobExecutionId=44”是等待任务完成后才输出。

启动异步Job

异步 Job 需要配置 SimpleAsyncTaskExecutor 来执行,因此我们需要在 BatchConfig 配置类中添加如下配置信息:

@Configuration
public class BatchConfig implements StepExecutionListener, ApplicationContextAware {
    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobExplorer jobExplorer;

    @Autowired
    private JobRepository jobRepository;

    @Autowired
    private JobRegistry jobRegistry;

    // JOB参数
    private Map<String, JobParameter> jobParameters;
    // Spring 应用上下文对象
    private ApplicationContext applicationContext;

    /**
     * 配置异步任务执行器
     * @return SimpleAsyncTaskExecutor
     */
    @Bean
    public TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(5); // 设置最大并发数
        return taskExecutor;
    }

    /**
     * 配置异步 JobLauncher
     * @param jobRepository 用于任务数据、状态持久化
     * @param taskExecutor 任务执行器
     * @return SimpleJobLauncher
     */
    @Bean
    public JobLauncher asyncJobLauncher(JobRepository jobRepository, TaskExecutor taskExecutor) {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(taskExecutor); // 设置异步执行器
        return jobLauncher;
    }

    // 配置 JobOperator
    @Bean
    public JobOperator jobOperator() {
        // 【看这里】使用异步 JobLauncher
        TaskExecutor taskExecutor = taskExecutor();
        JobLauncher jobLauncher = asyncJobLauncher(jobRepository, taskExecutor);

        SimpleJobOperator operator = new SimpleJobOperator();
        operator.setJobParametersConverter(new DefaultJobParametersConverter());
        operator.setJobRepository(jobRepository);
        operator.setJobLauncher(jobLauncher);
        operator.setJobExplorer(jobExplorer);
        operator.setJobRegistry(jobRegistry);
        return operator;
    }

    // ...参考上面的配置类...

}

完成配置后,使用 JobOperator 启动任务:

package com.hxstrive.spring_batch.jobOperatorDemo2;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Set;

@RestController
@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {

    @Autowired
    private JobOperator jobOperator;

    public static void main(String[] args) {
       SpringApplication.run(SpringBatchDemoApplication.class, args);
    }

    @GetMapping("/runJob")
    public void runJob(@RequestParam String message) throws Exception {
       // 执行任务
       Long jobExecutionId = jobOperator.start("jobOperatorDemoJob",
             "message=" + message + ",timestamp=" + System.currentTimeMillis());
       System.out.println("->> 启动作业 jobExecutionId=" + jobExecutionId);

       // 休息片刻
       Thread.sleep(1000);

       // 停止作业
       boolean stopStatus = jobOperator.stop(jobExecutionId);
       if(stopStatus) {
          System.out.println("->> 作业 " + jobExecutionId + " 停止成功");

          // 查看作业信息
          String summary = jobOperator.getSummary(jobExecutionId);
          System.out.println("->> 作业详细信息:" + summary);

          // 获取作业参数信息
          String parameters = jobOperator.getParameters(jobExecutionId);
          System.out.println("->> 作业参数信息:" + parameters);

          // 检查是否还有实例在运行
          // 获取 jobOperatorDemoJob 名称的 Job 正在运行的实例ID
          Set<Long> runningExecutions = jobOperator.getRunningExecutions("jobOperatorDemoJob");
          while(!runningExecutions.isEmpty()) {
             Thread.sleep(100); // 如果还有实例在运行,则继续检查
             runningExecutions = jobOperator.getRunningExecutions("jobOperatorDemoJob");
          }

          // 没有实例运行,则重启作业
          long restartId = jobOperator.restart(jobExecutionId);
          System.out.println("->> 作业 " + jobExecutionId + " 重启,重启后的ID restartId=" + restartId);
       } else {
          System.err.println("->> 作业 " + jobExecutionId + " 停止失败");
       }
    }

}

上面代码启动一个任务,等待1秒又将任务停止,然后查看作业信息和参数等,最后通过 while() 循环判断名为“jobOperatorDemoJob”的任务有正在执行的实例没有,如果没有,则重启任务。

启动项目,使用浏览器运行 http://localhost:8080/runJob?message=hello 地址,输出日志如下:

image.png

获取 Job 实例和执行信息

下面通过调用 JobOperator 的 getJobInstances() 方法获取名为“jobOperatorDemoJob”的前十个实例ID,然后根据实例ID获取每个实例的执行信息。如下:

@RestController
@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {

    @Autowired
    private JobOperator jobOperator;

    public static void main(String[] args) {
       SpringApplication.run(SpringBatchDemoApplication.class, args);
    }

    @GetMapping("/runJob")
    public void runJob(@RequestParam String message) throws Exception {
       // 获取名为 jobOperatorDemoJob 的 Job 的实例和执行信息
       List<Long> jobInstanceIdList = jobOperator.getJobInstances("jobOperatorDemoJob", 0, 10);
       for(Long jobInstanceId : jobInstanceIdList) {
          List<Long> executionIdList = jobOperator.getExecutions(jobInstanceId); // 获取实例的执行ID列表
          for(Long executionId : executionIdList) {
             System.out.println("->> ID=" + executionId);
             System.out.println("   ->> Parameter=" + jobOperator.getParameters(executionId)); // 获取某个执行的参数
             System.out.println("   ->> Summary=" + jobOperator.getSummary(executionId)); // 获取某个执行的概要信息
          }
       }

    }

}

启动项目,使用浏览器运行 http://localhost:8080/runJob?message=hello 地址,输出日志如下:

image.png

到这里,JobOperator 的用法就介绍完了,更多用法请参考官方文档和API。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号