JobLauncher 是 Spring Batch 4 中用于启动批处理作业(Job)的核心接口,它负责协调作业的执行并返回执行结果。
JobLauncher 提供了控制 Spring Batch4 作业执行的简单接口,支持基于不同运行时标识符的临时执行控制。注意,此接口不保证其调用是同步执行还是异步执行。调用者必须查阅具体实现类的文档,以明确作业的运行方式。
JobLauncher 的默认实现是 SimpleJobLauncher,它提供了同步和异步两种作业执行方式。类继承关系如下图:

JobLauncher 接口仅定义了一个方法 run(),用来启动指定的 Job 作业。接口定义如下:
package org.springframework.batch.core.launch;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
public interface JobLauncher {
/**
* 启动指定作业的执行,并传入相应的作业参数。
*
* @param job 要执行的作业实例,不能为null
* @param jobParameters 传递给本次作业执行的参数,包含启动所需的配置信息
* @return 作业执行实例:
* - 同步实现中会返回实际执行结果
* - 异步实现中可能返回未知状态的实例
*
* @throws JobExecutionAlreadyRunningException 当指定作业实例已有一个正在运行的执行时抛出
* @throws IllegalArgumentException 当作业实例或作业参数为null时抛出
* @throws JobRestartException 当作业之前已运行过且当前无法重启时抛出
* @throws JobInstanceAlreadyCompleteException 当作业之前已使用相同参数运行并成功完成时抛出
* @throws JobParametersInvalidException 当传入的参数对当前作业无效时抛出
*/
public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException,
JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}上述代码中,run() 方法用来启动指定作业的执行,并传入相应的作业参数。
注意事项:
如果成功创建了JobExecution实例,无论执行是否成功,该方法都会返回此实例。
如果存在一个已暂停的历史 JobExecution,将返回该实例而非创建新实例。
仅当启动作业失败时才会抛出异常。
如果作业在处理过程中遇到错误,仍会返回 JobExecution,但需要检查其状态获取详细信息。
SimpleJobLauncher 是 Spring Batch 4 中 JobLauncher 接口的默认实现,负责协调和管理批处理作业的执行。它提供了灵活的作业启动方式,支持同步和异步两种执行模式,是 Spring Batch 作业执行的核心组件。
与 JobRepository 集成:依赖 JobRepository 存储和检索作业执行信息,部分源码:
public class SimpleJobLauncher implements JobLauncher, InitializingBean {
// 日志记录器,用于记录作业启动和执行过程中的关键信息
protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class);
// 作业仓库,用于管理作业实例、执行记录等元数据
private JobRepository jobRepository;
//...
}执行模式切换:通过 TaskExecutor 支持同步和异步执行,默认为同步,部分源码:
public class SimpleJobLauncher implements JobLauncher, InitializingBean {
//....
@Override
public void afterPropertiesSet() throws Exception {
// 校验 JobRepository 必须已设置
Assert.state(jobRepository != null, "A JobRepository has not been set.");
// 若未设置 TaskExecutor,默认使用同步执行器
if (taskExecutor == null) {
logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
taskExecutor = new SyncTaskExecutor(); // 默认为同步执行
}
}
}在 Spring 的核心包中,分别提供了 TaskExecutor 的同步和异步实现,如下图:

作业执行生命周期管理:负责作业的启动、状态跟踪和结果返回。
异常处理:对作业执行过程中可能出现的异常进行规范化处理,部分源码如下:
public class SimpleJobLauncher implements JobLauncher, InitializingBean {
//....
// 将异常转换为合适的类型重新抛出
private void rethrow(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if (t instanceof Error) {
throw (Error) t;
}
throw new IllegalStateException(t);
}
}SimpleJobLauncher 的核心逻辑在 run() 方法中实现,下面通过阅读整体源码来了解其基本流程。源码如下:
package org.springframework.batch.core.launch.support;
import java.time.Duration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.metrics.BatchMetrics;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.util.Assert;
/**
* JobLauncher接口的简单实现类,用于启动和管理Spring Batch作业的执行。
*
* 该实现通过Spring Core的{@link TaskExecutor}接口来启动作业,这意味着所设置的执行器类型
* 对作业的运行方式至关重要。如果使用{@link SyncTaskExecutor}(默认),作业将在调用启动器的
* 同一线程中同步执行;若使用异步执行器,则作业会在独立线程中异步运行。
*
* 此类唯一必需的依赖是{@link JobRepository},它用于获取有效的JobExecution实例。由于作业可能是
* 现有JobInstance的重启,只有通过JobRepository才能可靠地重建执行上下文。
*
* @author Lucas Ward
* @author Dave Syer
* @author Will Schipp
* @author Michael Minella
* @author Mahmoud Ben Hassine
*
* @since 1.0
*
* @see JobRepository 作业仓库,用于管理作业执行的元数据
* @see TaskExecutor 任务执行器,控制作业的执行方式(同步/异步)
*/
public class SimpleJobLauncher implements JobLauncher, InitializingBean {
// 日志记录器,用于记录作业启动和执行过程中的关键信息
protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class);
// 作业仓库,用于管理作业实例、执行记录等元数据
private JobRepository jobRepository;
// 任务执行器,负责实际执行作业(默认使用同步执行器)
private TaskExecutor taskExecutor;
/**
* 运行指定的作业并传入对应的作业参数。
* 作业参数将用于判断是执行现有作业实例的重启,还是创建新的作业实例。
*
* @param job 要运行的作业实例
* @param jobParameters 本次作业执行的参数(如时间戳、业务ID等)
* @return 作业执行实例(同步执行时返回实际执行结果;异步执行时状态可能为未知)
*
* @throws JobExecutionAlreadyRunningException 如果作业实例已存在且有正在运行的执行
* @throws JobRestartException 如果尝试重启作业但不允许重启或无需重启
* @throws JobInstanceAlreadyCompleteException 如果作业实例已通过相同参数执行并成功完成
* @throws JobParametersInvalidException 如果传入的作业参数无效
*/
@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
JobParametersInvalidException {
// 校验参数合法性:作业和作业参数不能为null
Assert.notNull(job, "The Job must not be null.");
Assert.notNull(jobParameters, "The JobParameters must not be null.");
// 作业执行实例(最终返回的结果)
final JobExecution jobExecution;
// 获取该作业和参数对应的最后一次执行记录
JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
// 如果存在历史执行记录,判断是否可以重启
if (lastExecution != null) {
// 若作业不支持重启,则抛出异常
if (!job.isRestartable()) {
throw new JobRestartException("JobInstance already exists and is not restartable");
}
/*
* 校验历史执行中是否存在状态为UNKNOWN、STARTING、STARTED或STOPPING的步骤执行
* 若存在,则不允许重启,需处理异常状态
*/
for (StepExecution execution : lastExecution.getStepExecutions()) {
BatchStatus status = execution.getStatus();
// 若步骤处于运行中或停止中状态,说明有执行正在进行,不允许重复启动
if (status.isRunning() || status == BatchStatus.STOPPING) {
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
+ lastExecution);
}
// 若步骤状态为UNKNOWN,说明上次执行失败且无法回滚,不允许直接重启
else if (status == BatchStatus.UNKNOWN) {
throw new JobRestartException(
"Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
+ "The last execution ended with a failure that could not be rolled back, "
+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
}
}
}
// 校验作业参数是否有效(如参数类型、必填项等)
job.getJobParametersValidator().validate(jobParameters);
/*
* 此处存在极小概率的并发问题:
* 在检查历史执行和创建新执行之间,若其他线程/进程启动并失败了同一作业实例,
* 可能导致非可重启作业被意外重启。但实际场景中该概率极低。
*/
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
try {
// 通过任务执行器执行作业(同步或异步取决于执行器类型)
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 记录作业启动日志
if (logger.isInfoEnabled()) {
logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
+ "]");
}
// 执行作业逻辑
job.execute(jobExecution);
// 记录作业完成日志(包含执行时长)
if (logger.isInfoEnabled()) {
// 计算作业执行时长
Duration jobExecutionDuration = BatchMetrics.calculateDuration(
jobExecution.getStartTime(), jobExecution.getEndTime());
logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
+ "] and the following status: [" + jobExecution.getStatus() + "]"
+ (jobExecutionDuration == null ? "" : " in " + BatchMetrics.formatDuration(jobExecutionDuration)));
}
}
catch (Throwable t) {
// 记录作业失败日志
if (logger.isInfoEnabled()) {
logger.info("Job: [" + job
+ "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
+ "]", t);
}
// 重新抛出异常(转换为运行时异常或错误)
rethrow(t);
}
}
// 将异常转换为合适的类型重新抛出
private void rethrow(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if (t instanceof Error) {
throw (Error) t;
}
throw new IllegalStateException(t);
}
});
}
// 处理任务被执行器拒绝的情况(如线程池满)
catch (TaskRejectedException e) {
// 更新作业执行状态为失败
jobExecution.upgradeStatus(BatchStatus.FAILED);
// 设置退出状态(若之前为UNKNOWN,则添加异常描述)
if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
// 持久化更新后的作业执行状态
jobRepository.update(jobExecution);
}
return jobExecution;
}
public void setJobRepository(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* 确保必要的依赖(JobRepository)已被设置,若未设置任务执行器则使用默认值。
*/
@Override
public void afterPropertiesSet() throws Exception {
// 校验JobRepository必须已设置
Assert.state(jobRepository != null, "A JobRepository has not been set.");
// 若未设置TaskExecutor,默认使用同步执行器
if (taskExecutor == null) {
logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
taskExecutor = new SyncTaskExecutor();
}
}
}
假设我们已经创建了一个名为 jobLauncherDemoJob 的 Job 任务,配置类如下:
package com.hxstrive.spring_batch.jobLauncherDemo.config;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.*;
/**
* Spring Batch 配置类
* @author hxstrive.com
*/
@Configuration
public class BatchConfig implements StepExecutionListener {
// 用于创建和配置 Job 对象的工厂类
@Autowired
private JobBuilderFactory jobBuilderFactory;
// 用于创建和配置 Step 对象的工厂类
@Autowired
private StepBuilderFactory stepBuilderFactory;
// JOB参数
private Map<String, JobParameter> jobParameters;
// 创建Job对象
@Bean
public Job jobLauncherDemoJob() {
return jobBuilderFactory.get("jobLauncherDemoJob")
.start(jobLauncherDemoStep())
.build();
}
// 创建Step对象
@Bean
public Step jobLauncherDemoStep() {
final List<String> dataList = Arrays.asList(
"{\"id\":1,\"username\":\"张三\",\"password\":\"13BC03AC29FAC7B29736EC3BE5C2F55A\"}",
"{\"id\":2,\"username\":\"李四\",\"password\":\"5E5994FBCFA922D804DF45295AE98604\"}",
"{\"id\":3,\"username\":\"王五\",\"password\":\"6C14DA109E294D1E8155BE8AA4B1CE8E\"}",
"{\"id\":4,\"username\":\"赵六\",\"password\":\"03774AD7979A5909E78F9C9DB3A2F0B2\"}"
);
final Iterator<String> iterator = dataList.iterator();
return stepBuilderFactory.get("jobLauncherDemoStep")
.<String, String>chunk(2)
.reader(new ItemReader<String>() {
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if(iterator.hasNext()) {
String value = iterator.next();
System.out.println(" read() " + value);
return value;
} else {
return null;
}
}
})
.processor(new ItemProcessor<String, String>() {
@Override
public String process(String item) throws Exception {
String message = "";
if(Objects.nonNull(jobParameters.get("message"))) {
message = String.valueOf(jobParameters.get("message").getValue());
}
System.out.println(" process() message=" + message + ", item=" + item);
return item.toUpperCase();
}
})
.writer(new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
System.out.println(" write() " + Arrays.toString(items.toArray()));
}
})
.listener(this)
.build();
}
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("beforeStep(StepExecution stepExecution)");
jobParameters = stepExecution.getJobParameters().getParameters();
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("afterStep(StepExecution stepExecution)");
return null;
}
}同步执行 Job(Synchronous Job Execution) 指的是任务按顺序执行,前一个任务完成后才会开始执行下一个任务的模式。在这种模式下,多个任务之间存在明确的先后依赖关系,整个执行过程是串行的、阻塞的。例如:
@RestController
@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job jobLauncherDemoJob; // 注入我们创建的 jobLauncherDemoJob 任务
@GetMapping("/runJob")
public void runJob(@RequestParam String message) throws Exception {
// 构建作业参数,确保每次执行参数唯一
JobParameters jobParameters = new JobParametersBuilder()
.addString("message", message)
.addLong("timestamp", System.currentTimeMillis()) // 添加时间戳确保唯一性
.toJobParameters();
// 执行作业
System.out.println("jobLauncher=" + jobLauncher);
JobExecution jobExecution = jobLauncher.run(jobLauncherDemoJob, jobParameters);
System.out.println("作业已启动,ID: " + jobExecution.getId());
}
public static void main(String[] args) {
SpringApplication.run(SpringBatchDemoApplication.class, args);
}
}启动应用,浏览器运行 http://localhost:8080/runJob?message=hello 地址,输出日志如下:

上图中,“作业已启动,ID:40”是在任务处理完成后输出的,这就是同步任务执行,需要等待任务完成。
异步执行 Job(Asynchronous Job Execution) 指的是任务启动后无需等待其完成,调用方可以立即继续执行其他操作的模式。在这种模式下,Job 的执行过程与调用方的逻辑是分离的,两者并行运行,互不阻塞。例如:
(1)修改 BatchConfig 配置文件,添加如下配置:
/**
* 配置异步任务执行器
* @return SimpleAsyncTaskExecutor
*/
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(5); // 设置最大并发数
return taskExecutor;
}
/**
* 配置异步 JobLauncher
* @param jobRepository 用于任务数据、状态持久化
* @param taskExecutor 任务执行器
* @return SimpleJobLauncher
*/
@Bean
public JobLauncher asyncJobLauncher(JobRepository jobRepository, TaskExecutor taskExecutor) {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(taskExecutor); // 设置异步执行器
return jobLauncher;
}上述代码,自己配置了一个 JobLauncher,而不是使用默认的 JobLauncher。并且将该 JobLauncher 的 TaskExecutor 配置为 SimpleAsyncTaskExecutor(异步执行任务)。
(2)使用 JobLauncher 启动作业时,需要使用 @Qualifier("asyncJobLauncher") 注解注入我们自己创建的 JobLauncher,代码如下:
package com.hxstrive.spring_batch.jobLauncherDemo2;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@Configuration
@RestController
@SpringBootApplication
@EnableBatchProcessing // 开启批处理
public class SpringBatchDemoApplication {
@Autowired
@Qualifier("asyncJobLauncher")
private JobLauncher asyncJobLauncher;
@Autowired
private Job jobLauncherDemoJob; // 注入我们创建的 jobLauncherDemoJob 任务
@GetMapping("/runJob")
public void runJob(@RequestParam String message) throws Exception {
// 构建作业参数,确保每次执行参数唯一
JobParameters jobParameters = new JobParametersBuilder()
.addString("message", message)
.addLong("timestamp", System.currentTimeMillis()) // 添加时间戳确保唯一性
.toJobParameters();
// 执行作业
System.out.println("asyncJobLauncher=" + asyncJobLauncher);
JobExecution jobExecution = asyncJobLauncher.run(jobLauncherDemoJob, jobParameters);
System.out.println("作业已启动,ID: " + jobExecution.getId());
}
public static void main(String[] args) {
SpringApplication.run(SpringBatchDemoApplication.class, args);
}
}启动应用,浏览器运行 http://localhost:8080/runJob?message=hello 地址,输出日志如下:

上图中,“作业已启动,ID:41”日志信息在调用 asyncJobLauncher.run() 方法后立即输出,并没有等待 Job 完成,这就是异步任务。
📢注意:JobLauncher 是 Spring Batch 中启动批处理作业的关键组件。通过不同的配置方式,可以实现同步或异步的作业执行。在实际应用中,可以根据业务需求选择合适的启动方式,并结合定时任务、REST API 或命令行等方式来触发批处理作业。