Spring Batch4 教程

Spring Batch4 JobLauncher 手动触发 Job

JobLauncher 是 Spring Batch 4 中用于启动批处理作业(Job)的核心接口,它负责协调作业的执行并返回执行结果。

JobLauncher 提供了控制 Spring Batch4  作业执行的简单接口,支持基于不同运行时标识符的临时执行控制。注意,此接口不保证其调用是同步执行还是异步执行。调用者必须查阅具体实现类的文档,以明确作业的运行方式。

JobLauncher 的默认实现是 SimpleJobLauncher,它提供了同步和异步两种作业执行方式。类继承关系如下图:

image.png

JobLauncher 主要方法

JobLauncher 接口仅定义了一个方法 run(),用来启动指定的 Job 作业。接口定义如下:

package org.springframework.batch.core.launch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;

public interface JobLauncher {

	/**
	 * 启动指定作业的执行,并传入相应的作业参数。
	 *
	 * @param job 要执行的作业实例,不能为null
	 * @param jobParameters 传递给本次作业执行的参数,包含启动所需的配置信息
	 * @return 作业执行实例:
	 *         - 同步实现中会返回实际执行结果
	 *         - 异步实现中可能返回未知状态的实例
	 * 
	 * @throws JobExecutionAlreadyRunningException 当指定作业实例已有一个正在运行的执行时抛出
	 * @throws IllegalArgumentException 当作业实例或作业参数为null时抛出
	 * @throws JobRestartException 当作业之前已运行过且当前无法重启时抛出
	 * @throws JobInstanceAlreadyCompleteException 当作业之前已使用相同参数运行并成功完成时抛出
	 * @throws JobParametersInvalidException 当传入的参数对当前作业无效时抛出
	 */
	public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException,
			JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException;

}

上述代码中,run() 方法用来启动指定作业的执行,并传入相应的作业参数。

注意事项:

  • 如果成功创建了JobExecution实例,无论执行是否成功,该方法都会返回此实例。

  • 如果存在一个已暂停的历史 JobExecution,将返回该实例而非创建新实例。

  • 仅当启动作业失败时才会抛出异常。

  • 如果作业在处理过程中遇到错误,仍会返回 JobExecution,但需要检查其状态获取详细信息。

  

JobLauncher 的实现:SimpleJobLauncher

SimpleJobLauncher 是 Spring Batch 4 中 JobLauncher 接口的默认实现,负责协调和管理批处理作业的执行。它提供了灵活的作业启动方式,支持同步和异步两种执行模式,是 Spring Batch 作业执行的核心组件。

SimpleJobLauncher 的核心特性

  • 与 JobRepository 集成:依赖 JobRepository 存储和检索作业执行信息,部分源码:

public class SimpleJobLauncher implements JobLauncher, InitializingBean {
	// 日志记录器,用于记录作业启动和执行过程中的关键信息
	protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class);

	// 作业仓库,用于管理作业实例、执行记录等元数据
	private JobRepository jobRepository;
    //...
}
  • 执行模式切换:通过 TaskExecutor 支持同步和异步执行,默认为同步,部分源码:

public class SimpleJobLauncher implements JobLauncher, InitializingBean {
    //....
	@Override
	public void afterPropertiesSet() throws Exception {
		// 校验 JobRepository 必须已设置
		Assert.state(jobRepository != null, "A JobRepository has not been set.");
		// 若未设置 TaskExecutor,默认使用同步执行器
		if (taskExecutor == null) {
			logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
			taskExecutor = new SyncTaskExecutor(); // 默认为同步执行
		}
	}
}

在 Spring 的核心包中,分别提供了 TaskExecutor 的同步和异步实现,如下图:

image (1).png

  • 作业执行生命周期管理:负责作业的启动、状态跟踪和结果返回。

  • 异常处理:对作业执行过程中可能出现的异常进行规范化处理,部分源码如下:

public class SimpleJobLauncher implements JobLauncher, InitializingBean {
    //....
    // 将异常转换为合适的类型重新抛出
    private void rethrow(Throwable t) {
        if (t instanceof RuntimeException) {
            throw (RuntimeException) t;
        }
        else if (t instanceof Error) {
            throw (Error) t;
        }
        throw new IllegalStateException(t);
    }
}

SimpleJobLauncher 的工作原理

SimpleJobLauncher 的核心逻辑在 run() 方法中实现,下面通过阅读整体源码来了解其基本流程。源码如下:

package org.springframework.batch.core.launch.support;

import java.time.Duration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.metrics.BatchMetrics;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.util.Assert;

/**
 * JobLauncher接口的简单实现类,用于启动和管理Spring Batch作业的执行。
 * 
 * 该实现通过Spring Core的{@link TaskExecutor}接口来启动作业,这意味着所设置的执行器类型
 * 对作业的运行方式至关重要。如果使用{@link SyncTaskExecutor}(默认),作业将在调用启动器的
 * 同一线程中同步执行;若使用异步执行器,则作业会在独立线程中异步运行。
 * 
 * 此类唯一必需的依赖是{@link JobRepository},它用于获取有效的JobExecution实例。由于作业可能是
 * 现有JobInstance的重启,只有通过JobRepository才能可靠地重建执行上下文。
 *
 * @author Lucas Ward
 * @author Dave Syer
 * @author Will Schipp
 * @author Michael Minella
 * @author Mahmoud Ben Hassine
 *
 * @since 1.0
 *
 * @see JobRepository 作业仓库,用于管理作业执行的元数据
 * @see TaskExecutor 任务执行器,控制作业的执行方式(同步/异步)
 */
public class SimpleJobLauncher implements JobLauncher, InitializingBean {
    // 日志记录器,用于记录作业启动和执行过程中的关键信息
    protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class);

    // 作业仓库,用于管理作业实例、执行记录等元数据
    private JobRepository jobRepository;

    // 任务执行器,负责实际执行作业(默认使用同步执行器)
    private TaskExecutor taskExecutor;

    /**
     * 运行指定的作业并传入对应的作业参数。
     * 作业参数将用于判断是执行现有作业实例的重启,还是创建新的作业实例。
     *
     * @param job 要运行的作业实例
     * @param jobParameters 本次作业执行的参数(如时间戳、业务ID等)
     * @return 作业执行实例(同步执行时返回实际执行结果;异步执行时状态可能为未知)
     * 
     * @throws JobExecutionAlreadyRunningException 如果作业实例已存在且有正在运行的执行
     * @throws JobRestartException 如果尝试重启作业但不允许重启或无需重启
     * @throws JobInstanceAlreadyCompleteException 如果作业实例已通过相同参数执行并成功完成
     * @throws JobParametersInvalidException 如果传入的作业参数无效
     */
    @Override
    public JobExecution run(final Job job, final JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
            JobParametersInvalidException {

        // 校验参数合法性:作业和作业参数不能为null
        Assert.notNull(job, "The Job must not be null.");
        Assert.notNull(jobParameters, "The JobParameters must not be null.");

        // 作业执行实例(最终返回的结果)
        final JobExecution jobExecution;
       
        // 获取该作业和参数对应的最后一次执行记录
        JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
       
        // 如果存在历史执行记录,判断是否可以重启
        if (lastExecution != null) {
            // 若作业不支持重启,则抛出异常
            if (!job.isRestartable()) {
                throw new JobRestartException("JobInstance already exists and is not restartable");
            }
           
            /*
             * 校验历史执行中是否存在状态为UNKNOWN、STARTING、STARTED或STOPPING的步骤执行
             * 若存在,则不允许重启,需处理异常状态
             */
            for (StepExecution execution : lastExecution.getStepExecutions()) {
                BatchStatus status = execution.getStatus();
                // 若步骤处于运行中或停止中状态,说明有执行正在进行,不允许重复启动
                if (status.isRunning() || status == BatchStatus.STOPPING) {
                    throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
                            + lastExecution);
                }
                // 若步骤状态为UNKNOWN,说明上次执行失败且无法回滚,不允许直接重启
                else if (status == BatchStatus.UNKNOWN) {
                    throw new JobRestartException(
                            "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
                                + "The last execution ended with a failure that could not be rolled back, "
                                + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
                }
            }
        }

        // 校验作业参数是否有效(如参数类型、必填项等)
        job.getJobParametersValidator().validate(jobParameters);

        /*
         * 此处存在极小概率的并发问题:
         * 在检查历史执行和创建新执行之间,若其他线程/进程启动并失败了同一作业实例,
         * 可能导致非可重启作业被意外重启。但实际场景中该概率极低。
         */
        jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

        try {
            // 通过任务执行器执行作业(同步或异步取决于执行器类型)
            taskExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 记录作业启动日志
                        if (logger.isInfoEnabled()) {
                            logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
                                    + "]");
                        }
                       
                        // 执行作业逻辑
                        job.execute(jobExecution);
                       
                        // 记录作业完成日志(包含执行时长)
                        if (logger.isInfoEnabled()) {
                            // 计算作业执行时长
                            Duration jobExecutionDuration = BatchMetrics.calculateDuration(
                                    jobExecution.getStartTime(), jobExecution.getEndTime());
                            logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
                                    + "] and the following status: [" + jobExecution.getStatus() + "]"
                                    + (jobExecutionDuration == null ? "" : " in " + BatchMetrics.formatDuration(jobExecutionDuration)));
                        }
                    }
                    catch (Throwable t) {
                        // 记录作业失败日志
                        if (logger.isInfoEnabled()) {
                            logger.info("Job: [" + job
                                    + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
                                    + "]", t);
                        }
                        // 重新抛出异常(转换为运行时异常或错误)
                        rethrow(t);
                    }
                }

                // 将异常转换为合适的类型重新抛出
                private void rethrow(Throwable t) {
                    if (t instanceof RuntimeException) {
                        throw (RuntimeException) t;
                    }
                    else if (t instanceof Error) {
                        throw (Error) t;
                    }
                    throw new IllegalStateException(t);
                }
            });
        }
        // 处理任务被执行器拒绝的情况(如线程池满)
        catch (TaskRejectedException e) {
            // 更新作业执行状态为失败
            jobExecution.upgradeStatus(BatchStatus.FAILED);
            // 设置退出状态(若之前为UNKNOWN,则添加异常描述)
            if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
                jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
            }
            // 持久化更新后的作业执行状态
            jobRepository.update(jobExecution);
        }

        return jobExecution;
    }

    public void setJobRepository(JobRepository jobRepository) {
        this.jobRepository = jobRepository;
    }

    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    /**
     * 确保必要的依赖(JobRepository)已被设置,若未设置任务执行器则使用默认值。
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        // 校验JobRepository必须已设置
        Assert.state(jobRepository != null, "A JobRepository has not been set.");
        // 若未设置TaskExecutor,默认使用同步执行器
        if (taskExecutor == null) {
            logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
            taskExecutor = new SyncTaskExecutor();
        }
    }
}

  

简单示例

假设我们已经创建了一个名为 jobLauncherDemoJob 的 Job 任务,配置类如下:

package com.hxstrive.spring_batch.jobLauncherDemo.config;

import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.*;

/**
 * Spring Batch 配置类
 * @author hxstrive.com
 */
@Configuration
public class BatchConfig implements StepExecutionListener {

    // 用于创建和配置 Job 对象的工厂类
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    // 用于创建和配置 Step 对象的工厂类
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // JOB参数
    private Map<String, JobParameter> jobParameters;

    // 创建Job对象
    @Bean
    public Job jobLauncherDemoJob() {
        return jobBuilderFactory.get("jobLauncherDemoJob")
                .start(jobLauncherDemoStep())
                .build();
    }

    // 创建Step对象
    @Bean
    public Step jobLauncherDemoStep() {
        final List<String> dataList = Arrays.asList(
                "{\"id\":1,\"username\":\"张三\",\"password\":\"13BC03AC29FAC7B29736EC3BE5C2F55A\"}",
                "{\"id\":2,\"username\":\"李四\",\"password\":\"5E5994FBCFA922D804DF45295AE98604\"}",
                "{\"id\":3,\"username\":\"王五\",\"password\":\"6C14DA109E294D1E8155BE8AA4B1CE8E\"}",
                "{\"id\":4,\"username\":\"赵六\",\"password\":\"03774AD7979A5909E78F9C9DB3A2F0B2\"}"
        );
        final Iterator<String> iterator = dataList.iterator();

        return stepBuilderFactory.get("jobLauncherDemoStep")
                .<String, String>chunk(2)
                .reader(new ItemReader<String>() {
                    @Override
                    public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                        if(iterator.hasNext()) {
                            String value = iterator.next();
                            System.out.println("    read() "  + value);
                            return value;
                        } else {
                            return null;
                        }
                    }
                })
                .processor(new ItemProcessor<String, String>() {
                    @Override
                    public String process(String item) throws Exception {
                        String message = "";
                        if(Objects.nonNull(jobParameters.get("message"))) {
                            message = String.valueOf(jobParameters.get("message").getValue());
                        }

                        System.out.println("        process() message=" + message + ", item=" + item);
                        return item.toUpperCase();
                    }
                })
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        System.out.println("            write() " + Arrays.toString(items.toArray()));
                    }
                })
                .listener(this)
                .build();
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("beforeStep(StepExecution stepExecution)");
        jobParameters = stepExecution.getJobParameters().getParameters();
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("afterStep(StepExecution stepExecution)");
        return null;
    }
}

同步执行Job

同步执行 Job(Synchronous Job Execution) 指的是任务按顺序执行,前一个任务完成后才会开始执行下一个任务的模式。在这种模式下,多个任务之间存在明确的先后依赖关系,整个执行过程是串行的、阻塞的。例如:

@RestController
@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job jobLauncherDemoJob; // 注入我们创建的 jobLauncherDemoJob 任务

    @GetMapping("/runJob")
    public void runJob(@RequestParam String message) throws Exception {
       // 构建作业参数,确保每次执行参数唯一
       JobParameters jobParameters = new JobParametersBuilder()
             .addString("message", message)
             .addLong("timestamp", System.currentTimeMillis()) // 添加时间戳确保唯一性
             .toJobParameters();

       // 执行作业
       System.out.println("jobLauncher=" + jobLauncher);
       JobExecution jobExecution = jobLauncher.run(jobLauncherDemoJob, jobParameters);
       System.out.println("作业已启动,ID: " + jobExecution.getId());
    }

    public static void main(String[] args) {
       SpringApplication.run(SpringBatchDemoApplication.class, args);
    }

}

启动应用,浏览器运行 http://localhost:8080/runJob?message=hello 地址,输出日志如下:

image.png

上图中,“作业已启动,ID:40”是在任务处理完成后输出的,这就是同步任务执行,需要等待任务完成。

异步执行Job

异步执行 Job(Asynchronous Job Execution) 指的是任务启动后无需等待其完成,调用方可以立即继续执行其他操作的模式。在这种模式下,Job 的执行过程与调用方的逻辑是分离的,两者并行运行,互不阻塞。例如:

(1)修改 BatchConfig 配置文件,添加如下配置:

/**
 * 配置异步任务执行器
 * @return SimpleAsyncTaskExecutor
 */
@Bean
public TaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(5); // 设置最大并发数
    return taskExecutor;
}

/**
 * 配置异步 JobLauncher
 * @param jobRepository 用于任务数据、状态持久化
 * @param taskExecutor 任务执行器
 * @return SimpleJobLauncher
 */
@Bean
public JobLauncher asyncJobLauncher(JobRepository jobRepository, TaskExecutor taskExecutor) {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(taskExecutor); // 设置异步执行器
    return jobLauncher;
}

上述代码,自己配置了一个 JobLauncher,而不是使用默认的 JobLauncher。并且将该 JobLauncher 的 TaskExecutor 配置为 SimpleAsyncTaskExecutor(异步执行任务)。

(2)使用 JobLauncher 启动作业时,需要使用 @Qualifier("asyncJobLauncher") 注解注入我们自己创建的 JobLauncher,代码如下:

package com.hxstrive.spring_batch.jobLauncherDemo2;

import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@RestController
@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {

    @Autowired
    @Qualifier("asyncJobLauncher")
    private JobLauncher asyncJobLauncher;

    @Autowired
    private Job jobLauncherDemoJob; // 注入我们创建的 jobLauncherDemoJob 任务


    @GetMapping("/runJob")
    public void runJob(@RequestParam String message) throws Exception {
       // 构建作业参数,确保每次执行参数唯一
       JobParameters jobParameters = new JobParametersBuilder()
             .addString("message", message)
             .addLong("timestamp", System.currentTimeMillis()) // 添加时间戳确保唯一性
             .toJobParameters();

       // 执行作业
       System.out.println("asyncJobLauncher=" + asyncJobLauncher);
       JobExecution jobExecution = asyncJobLauncher.run(jobLauncherDemoJob, jobParameters);
       System.out.println("作业已启动,ID: " + jobExecution.getId());
    }

    public static void main(String[] args) {
       SpringApplication.run(SpringBatchDemoApplication.class, args);
    }

}

启动应用,浏览器运行 http://localhost:8080/runJob?message=hello 地址,输出日志如下:

image.png

上图中,“作业已启动,ID:41”日志信息在调用 asyncJobLauncher.run() 方法后立即输出,并没有等待 Job 完成,这就是异步任务。

📢注意:JobLauncher 是 Spring Batch 中启动批处理作业的关键组件。通过不同的配置方式,可以实现同步或异步的作业执行。在实际应用中,可以根据业务需求选择合适的启动方式,并结合定时任务、REST API 或命令行等方式来触发批处理作业。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号