JobLauncher 是 Spring Batch 4 中用于启动批处理作业(Job)的核心接口,它负责协调作业的执行并返回执行结果。
JobLauncher 提供了控制 Spring Batch4 作业执行的简单接口,支持基于不同运行时标识符的临时执行控制。注意,此接口不保证其调用是同步执行还是异步执行。调用者必须查阅具体实现类的文档,以明确作业的运行方式。
JobLauncher 的默认实现是 SimpleJobLauncher,它提供了同步和异步两种作业执行方式。类继承关系如下图:
JobLauncher 接口仅定义了一个方法 run(),用来启动指定的 Job 作业。接口定义如下:
package org.springframework.batch.core.launch; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; public interface JobLauncher { /** * 启动指定作业的执行,并传入相应的作业参数。 * * @param job 要执行的作业实例,不能为null * @param jobParameters 传递给本次作业执行的参数,包含启动所需的配置信息 * @return 作业执行实例: * - 同步实现中会返回实际执行结果 * - 异步实现中可能返回未知状态的实例 * * @throws JobExecutionAlreadyRunningException 当指定作业实例已有一个正在运行的执行时抛出 * @throws IllegalArgumentException 当作业实例或作业参数为null时抛出 * @throws JobRestartException 当作业之前已运行过且当前无法重启时抛出 * @throws JobInstanceAlreadyCompleteException 当作业之前已使用相同参数运行并成功完成时抛出 * @throws JobParametersInvalidException 当传入的参数对当前作业无效时抛出 */ public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException; }
上述代码中,run() 方法用来启动指定作业的执行,并传入相应的作业参数。
注意事项:
如果成功创建了JobExecution实例,无论执行是否成功,该方法都会返回此实例。
如果存在一个已暂停的历史 JobExecution,将返回该实例而非创建新实例。
仅当启动作业失败时才会抛出异常。
如果作业在处理过程中遇到错误,仍会返回 JobExecution,但需要检查其状态获取详细信息。
SimpleJobLauncher 是 Spring Batch 4 中 JobLauncher 接口的默认实现,负责协调和管理批处理作业的执行。它提供了灵活的作业启动方式,支持同步和异步两种执行模式,是 Spring Batch 作业执行的核心组件。
与 JobRepository 集成:依赖 JobRepository 存储和检索作业执行信息,部分源码:
public class SimpleJobLauncher implements JobLauncher, InitializingBean { // 日志记录器,用于记录作业启动和执行过程中的关键信息 protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class); // 作业仓库,用于管理作业实例、执行记录等元数据 private JobRepository jobRepository; //... }
执行模式切换:通过 TaskExecutor 支持同步和异步执行,默认为同步,部分源码:
public class SimpleJobLauncher implements JobLauncher, InitializingBean { //.... @Override public void afterPropertiesSet() throws Exception { // 校验 JobRepository 必须已设置 Assert.state(jobRepository != null, "A JobRepository has not been set."); // 若未设置 TaskExecutor,默认使用同步执行器 if (taskExecutor == null) { logger.info("No TaskExecutor has been set, defaulting to synchronous executor."); taskExecutor = new SyncTaskExecutor(); // 默认为同步执行 } } }
在 Spring 的核心包中,分别提供了 TaskExecutor 的同步和异步实现,如下图:
作业执行生命周期管理:负责作业的启动、状态跟踪和结果返回。
异常处理:对作业执行过程中可能出现的异常进行规范化处理,部分源码如下:
public class SimpleJobLauncher implements JobLauncher, InitializingBean { //.... // 将异常转换为合适的类型重新抛出 private void rethrow(Throwable t) { if (t instanceof RuntimeException) { throw (RuntimeException) t; } else if (t instanceof Error) { throw (Error) t; } throw new IllegalStateException(t); } }
SimpleJobLauncher 的核心逻辑在 run() 方法中实现,下面通过阅读整体源码来了解其基本流程。源码如下:
package org.springframework.batch.core.launch.support; import java.time.Duration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.metrics.BatchMetrics; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.core.task.TaskRejectedException; import org.springframework.util.Assert; /** * JobLauncher接口的简单实现类,用于启动和管理Spring Batch作业的执行。 * * 该实现通过Spring Core的{@link TaskExecutor}接口来启动作业,这意味着所设置的执行器类型 * 对作业的运行方式至关重要。如果使用{@link SyncTaskExecutor}(默认),作业将在调用启动器的 * 同一线程中同步执行;若使用异步执行器,则作业会在独立线程中异步运行。 * * 此类唯一必需的依赖是{@link JobRepository},它用于获取有效的JobExecution实例。由于作业可能是 * 现有JobInstance的重启,只有通过JobRepository才能可靠地重建执行上下文。 * * @author Lucas Ward * @author Dave Syer * @author Will Schipp * @author Michael Minella * @author Mahmoud Ben Hassine * * @since 1.0 * * @see JobRepository 作业仓库,用于管理作业执行的元数据 * @see TaskExecutor 任务执行器,控制作业的执行方式(同步/异步) */ public class SimpleJobLauncher implements JobLauncher, InitializingBean { // 日志记录器,用于记录作业启动和执行过程中的关键信息 protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class); // 作业仓库,用于管理作业实例、执行记录等元数据 private JobRepository jobRepository; // 任务执行器,负责实际执行作业(默认使用同步执行器) private TaskExecutor taskExecutor; /** * 运行指定的作业并传入对应的作业参数。 * 作业参数将用于判断是执行现有作业实例的重启,还是创建新的作业实例。 * * @param job 要运行的作业实例 * @param jobParameters 本次作业执行的参数(如时间戳、业务ID等) * @return 作业执行实例(同步执行时返回实际执行结果;异步执行时状态可能为未知) * * @throws JobExecutionAlreadyRunningException 如果作业实例已存在且有正在运行的执行 * @throws JobRestartException 如果尝试重启作业但不允许重启或无需重启 * @throws JobInstanceAlreadyCompleteException 如果作业实例已通过相同参数执行并成功完成 * @throws JobParametersInvalidException 如果传入的作业参数无效 */ @Override public JobExecution run(final Job job, final JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException { // 校验参数合法性:作业和作业参数不能为null Assert.notNull(job, "The Job must not be null."); Assert.notNull(jobParameters, "The JobParameters must not be null."); // 作业执行实例(最终返回的结果) final JobExecution jobExecution; // 获取该作业和参数对应的最后一次执行记录 JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters); // 如果存在历史执行记录,判断是否可以重启 if (lastExecution != null) { // 若作业不支持重启,则抛出异常 if (!job.isRestartable()) { throw new JobRestartException("JobInstance already exists and is not restartable"); } /* * 校验历史执行中是否存在状态为UNKNOWN、STARTING、STARTED或STOPPING的步骤执行 * 若存在,则不允许重启,需处理异常状态 */ for (StepExecution execution : lastExecution.getStepExecutions()) { BatchStatus status = execution.getStatus(); // 若步骤处于运行中或停止中状态,说明有执行正在进行,不允许重复启动 if (status.isRunning() || status == BatchStatus.STOPPING) { throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: " + lastExecution); } // 若步骤状态为UNKNOWN,说明上次执行失败且无法回滚,不允许直接重启 else if (status == BatchStatus.UNKNOWN) { throw new JobRestartException( "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. " + "The last execution ended with a failure that could not be rolled back, " + "so it may be dangerous to proceed. Manual intervention is probably necessary."); } } } // 校验作业参数是否有效(如参数类型、必填项等) job.getJobParametersValidator().validate(jobParameters); /* * 此处存在极小概率的并发问题: * 在检查历史执行和创建新执行之间,若其他线程/进程启动并失败了同一作业实例, * 可能导致非可重启作业被意外重启。但实际场景中该概率极低。 */ jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters); try { // 通过任务执行器执行作业(同步或异步取决于执行器类型) taskExecutor.execute(new Runnable() { @Override public void run() { try { // 记录作业启动日志 if (logger.isInfoEnabled()) { logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters + "]"); } // 执行作业逻辑 job.execute(jobExecution); // 记录作业完成日志(包含执行时长) if (logger.isInfoEnabled()) { // 计算作业执行时长 Duration jobExecutionDuration = BatchMetrics.calculateDuration( jobExecution.getStartTime(), jobExecution.getEndTime()); logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters + "] and the following status: [" + jobExecution.getStatus() + "]" + (jobExecutionDuration == null ? "" : " in " + BatchMetrics.formatDuration(jobExecutionDuration))); } } catch (Throwable t) { // 记录作业失败日志 if (logger.isInfoEnabled()) { logger.info("Job: [" + job + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters + "]", t); } // 重新抛出异常(转换为运行时异常或错误) rethrow(t); } } // 将异常转换为合适的类型重新抛出 private void rethrow(Throwable t) { if (t instanceof RuntimeException) { throw (RuntimeException) t; } else if (t instanceof Error) { throw (Error) t; } throw new IllegalStateException(t); } }); } // 处理任务被执行器拒绝的情况(如线程池满) catch (TaskRejectedException e) { // 更新作业执行状态为失败 jobExecution.upgradeStatus(BatchStatus.FAILED); // 设置退出状态(若之前为UNKNOWN,则添加异常描述) if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) { jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e)); } // 持久化更新后的作业执行状态 jobRepository.update(jobExecution); } return jobExecution; } public void setJobRepository(JobRepository jobRepository) { this.jobRepository = jobRepository; } public void setTaskExecutor(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } /** * 确保必要的依赖(JobRepository)已被设置,若未设置任务执行器则使用默认值。 */ @Override public void afterPropertiesSet() throws Exception { // 校验JobRepository必须已设置 Assert.state(jobRepository != null, "A JobRepository has not been set."); // 若未设置TaskExecutor,默认使用同步执行器 if (taskExecutor == null) { logger.info("No TaskExecutor has been set, defaulting to synchronous executor."); taskExecutor = new SyncTaskExecutor(); } } }
假设我们已经创建了一个名为 jobLauncherDemoJob 的 Job 任务,配置类如下:
package com.hxstrive.spring_batch.jobLauncherDemo.config; import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.*; /** * Spring Batch 配置类 * @author hxstrive.com */ @Configuration public class BatchConfig implements StepExecutionListener { // 用于创建和配置 Job 对象的工厂类 @Autowired private JobBuilderFactory jobBuilderFactory; // 用于创建和配置 Step 对象的工厂类 @Autowired private StepBuilderFactory stepBuilderFactory; // JOB参数 private Map<String, JobParameter> jobParameters; // 创建Job对象 @Bean public Job jobLauncherDemoJob() { return jobBuilderFactory.get("jobLauncherDemoJob") .start(jobLauncherDemoStep()) .build(); } // 创建Step对象 @Bean public Step jobLauncherDemoStep() { final List<String> dataList = Arrays.asList( "{\"id\":1,\"username\":\"张三\",\"password\":\"13BC03AC29FAC7B29736EC3BE5C2F55A\"}", "{\"id\":2,\"username\":\"李四\",\"password\":\"5E5994FBCFA922D804DF45295AE98604\"}", "{\"id\":3,\"username\":\"王五\",\"password\":\"6C14DA109E294D1E8155BE8AA4B1CE8E\"}", "{\"id\":4,\"username\":\"赵六\",\"password\":\"03774AD7979A5909E78F9C9DB3A2F0B2\"}" ); final Iterator<String> iterator = dataList.iterator(); return stepBuilderFactory.get("jobLauncherDemoStep") .<String, String>chunk(2) .reader(new ItemReader<String>() { @Override public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { if(iterator.hasNext()) { String value = iterator.next(); System.out.println(" read() " + value); return value; } else { return null; } } }) .processor(new ItemProcessor<String, String>() { @Override public String process(String item) throws Exception { String message = ""; if(Objects.nonNull(jobParameters.get("message"))) { message = String.valueOf(jobParameters.get("message").getValue()); } System.out.println(" process() message=" + message + ", item=" + item); return item.toUpperCase(); } }) .writer(new ItemWriter<String>() { @Override public void write(List<? extends String> items) throws Exception { System.out.println(" write() " + Arrays.toString(items.toArray())); } }) .listener(this) .build(); } @Override public void beforeStep(StepExecution stepExecution) { System.out.println("beforeStep(StepExecution stepExecution)"); jobParameters = stepExecution.getJobParameters().getParameters(); } @Override public ExitStatus afterStep(StepExecution stepExecution) { System.out.println("afterStep(StepExecution stepExecution)"); return null; } }
同步执行 Job(Synchronous Job Execution) 指的是任务按顺序执行,前一个任务完成后才会开始执行下一个任务的模式。在这种模式下,多个任务之间存在明确的先后依赖关系,整个执行过程是串行的、阻塞的。例如:
@RestController @SpringBootApplication @EnableBatchProcessing // 开启批处理 public class SpringBatchDemoApplication { @Autowired private JobLauncher jobLauncher; @Autowired private Job jobLauncherDemoJob; // 注入我们创建的 jobLauncherDemoJob 任务 @GetMapping("/runJob") public void runJob(@RequestParam String message) throws Exception { // 构建作业参数,确保每次执行参数唯一 JobParameters jobParameters = new JobParametersBuilder() .addString("message", message) .addLong("timestamp", System.currentTimeMillis()) // 添加时间戳确保唯一性 .toJobParameters(); // 执行作业 System.out.println("jobLauncher=" + jobLauncher); JobExecution jobExecution = jobLauncher.run(jobLauncherDemoJob, jobParameters); System.out.println("作业已启动,ID: " + jobExecution.getId()); } public static void main(String[] args) { SpringApplication.run(SpringBatchDemoApplication.class, args); } }
启动应用,浏览器运行 http://localhost:8080/runJob?message=hello 地址,输出日志如下:
上图中,“作业已启动,ID:40”是在任务处理完成后输出的,这就是同步任务执行,需要等待任务完成。
异步执行 Job(Asynchronous Job Execution) 指的是任务启动后无需等待其完成,调用方可以立即继续执行其他操作的模式。在这种模式下,Job 的执行过程与调用方的逻辑是分离的,两者并行运行,互不阻塞。例如:
(1)修改 BatchConfig 配置文件,添加如下配置:
/** * 配置异步任务执行器 * @return SimpleAsyncTaskExecutor */ @Bean public TaskExecutor taskExecutor() { SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); taskExecutor.setConcurrencyLimit(5); // 设置最大并发数 return taskExecutor; } /** * 配置异步 JobLauncher * @param jobRepository 用于任务数据、状态持久化 * @param taskExecutor 任务执行器 * @return SimpleJobLauncher */ @Bean public JobLauncher asyncJobLauncher(JobRepository jobRepository, TaskExecutor taskExecutor) { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository); jobLauncher.setTaskExecutor(taskExecutor); // 设置异步执行器 return jobLauncher; }
上述代码,自己配置了一个 JobLauncher,而不是使用默认的 JobLauncher。并且将该 JobLauncher 的 TaskExecutor 配置为 SimpleAsyncTaskExecutor(异步执行任务)。
(2)使用 JobLauncher 启动作业时,需要使用 @Qualifier("asyncJobLauncher") 注解注入我们自己创建的 JobLauncher,代码如下:
package com.hxstrive.spring_batch.jobLauncherDemo2; import org.springframework.batch.core.*; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.repository.JobRepository; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @Configuration @RestController @SpringBootApplication @EnableBatchProcessing // 开启批处理 public class SpringBatchDemoApplication { @Autowired @Qualifier("asyncJobLauncher") private JobLauncher asyncJobLauncher; @Autowired private Job jobLauncherDemoJob; // 注入我们创建的 jobLauncherDemoJob 任务 @GetMapping("/runJob") public void runJob(@RequestParam String message) throws Exception { // 构建作业参数,确保每次执行参数唯一 JobParameters jobParameters = new JobParametersBuilder() .addString("message", message) .addLong("timestamp", System.currentTimeMillis()) // 添加时间戳确保唯一性 .toJobParameters(); // 执行作业 System.out.println("asyncJobLauncher=" + asyncJobLauncher); JobExecution jobExecution = asyncJobLauncher.run(jobLauncherDemoJob, jobParameters); System.out.println("作业已启动,ID: " + jobExecution.getId()); } public static void main(String[] args) { SpringApplication.run(SpringBatchDemoApplication.class, args); } }
启动应用,浏览器运行 http://localhost:8080/runJob?message=hello 地址,输出日志如下:
上图中,“作业已启动,ID:41”日志信息在调用 asyncJobLauncher.run() 方法后立即输出,并没有等待 Job 完成,这就是异步任务。
📢注意:JobLauncher 是 Spring Batch 中启动批处理作业的关键组件。通过不同的配置方式,可以实现同步或异步的作业执行。在实际应用中,可以根据业务需求选择合适的启动方式,并结合定时任务、REST API 或命令行等方式来触发批处理作业。