Spring Batch4 核心组件 ItemReader 介绍

在 Spring Batch 4 中,ItemReader 是批处理作业 读取 数据的核心抽象接口。它是 "读-处理-写" 模式中的第一个关键环节,负责从各种数据源(数据库、文件、队列、API 等)逐条或分块地读取数据,并将这些数据作为 Item 提供给后续的 ItemProcessor ItemWriter

ItemReader 职责

ItemReader 组件的核心职责是把数据从外部数据源读取出来,然后将其转换为应用程序能够处理的对象。在每次调用 read() 方法返回一个单独的 Item 对象。当所有数据读取完毕时返回 null,标志着读取步骤的结束。

数据源抽象化

ItemReader 对各类数据源进行了抽象,使批处理作业能够轻松地与不同类型的数据源进行交互,这些数据源包括但不限于:

  • 文件(如 CSV、XML、JSON)

  • 数据库(通过 JDBC、JPA 或者 Hibernate)

  • 消息队列(像 JMS)

  • REST API

  • 其他系统(借助自定义适配器)

数据读取操作

ItemReader 每次执行 read() 方法时,会读取一条记录/数据,并将其转换为 Java 对象。具体的读取逻辑如下:

  • 当有数据可读时,返回一个对象。

  • 当所有数据都读取完毕后,返回 null。

  • 如果读取过程中出现异常,会抛出相应的异常,如 UnexpectedInputException 或者 ParseException。

事务与重启支持

为了确保批处理作业能够可靠地执行,ItemReader 具备以下特性:

  • 事务管理:读取操作在事务范围内进行,保证数据的一致性。

  • 状态保存:通过实现 ItemStream 接口,ItemReader 可以保存当前的读取位置。

  • 作业重启:在作业重启时,能够从上次中断的位置继续读取数据。

数据格式转换

ItemReader 还需负责将原始数据转换为应用程序可以处理的对象,常见的转换方式有:

  • 对于 CSV 文件,使用 DelimitedLineTokenizer 进行解析。

  • 对于 XML 文件,采用 StaxEventItemReader 结合 XStream 或者 JAXB 进行解析。

  • 对于数据库记录,利用 RowMapper 或者 JPA 实体映射。

  • 对于自定义格式的数据,实现 FieldSetMapper 接口进行自定义映射。

异常处理

在数据读取过程中,ItemReader 需要对可能出现的异常情况进行处理:

  • 处理格式错误的数据,例如解析失败的 CSV 行。

  • 处理缺失的数据,如必填字段为空的情况。

  • 处理不可恢复的错误,如文件不存在或者数据库连接失败。

  • 结合 SkipPolicy 和 RetryTemplate 实现跳过机制和重试机制。

  

ItemReader  实现类

下面是对常见 ItemReader 接口实现类的介绍,按数据源类型分类说明:

文件读取器

  • FlatFileItemReader:用于读取平面文件(如 CSV、固定格式文件),支持自定义分隔符、行前缀过滤,可通过 setLineToSkip 从文件中间位置开始读取,并能通过 LineMapper 接口自定义行映射逻辑。

  • JsonItemReader:用于读取 JSON 文件(每行一个 JSON 对象),基于 Jackson 或 Gson 解析 JSON,需配置 JsonObjectReader 指定解析器。

  • XmlItemReader:基于 StAX 解析器读取 XML 文件,支持大文件流式处理,可通过 XPath 表达式定位数据节点。

  • AvroItemReader:用于读取 Avro 格式文件,需提供 Avro 模式(Schema)或使用反射推断,支持二进制或 JSON 编码的 Avro 文件。

数据库读取器

  • JdbcCursorItemReader:通过 JDBC 游标读取数据库,使用 ResultSet 逐行读取,适合大数据量,需手动管理数据库连接和事务。

  • JdbcPagingItemReader:通过 SQL 分页查询读取数据库,基于 PagingQueryProvider 自动生成分页 SQL,支持不同数据库方言(如 MySQL、Oracle)。

  • JpaPagingItemReader:通过 JPA 分页读取数据库,基于 JPA EntityManager,无需手动编写 SQL,支持 JPA 实体映射和查询。

  • HibernateCursorItemReader:通过 Hibernate 游标读取数据库,使用 Hibernate ScrollableResults 逐行读取,需配置 Hibernate SessionFactory。

  • HibernatePagingItemReader:通过 Hibernate 分页读取数据库,基于 Hibernate Query 分页,支持 setMaxResults,需手动管理 Hibernate 会话。

  • StoredProcedureItemReader:通过存储过程读取数据库,支持调用数据库存储过程并获取结果集,需配置SqlParameter和RowMapper。

消息队列读取器

  • AmqpItemReader:从 RabbitMQ 等 AMQP 消息队列读取消息,基于 Spring AMQP,支持事务性消费,可配置 MessageConverter 转换消息格式。

  • JmsItemReader:从 JMS 消息队列读取消息,支持 JMS 规范的消息中间件(如 ActiveMQ),需配置 JmsTemplate。

  • KafkaItemReader:从 Kafka 读取消息,基于 Spring Kafka,支持批量消费和偏移量管理,需配置 ConsumerFactory和RecordMessageConverter。

内存与集合读取器

  • ListItemReader:从 Java 集合(如 List)读取数据,适合测试或小数据量场景,数据需预先加载到内存。

  • IteratorItemReader:从 Java Iterator 读取数据,支持惰性加载数据(如通过生成器函数),需确保Iterator线程安全。

其他数据源读取器

  • RepositoryItemReader:从 Spring Data Repository 读取数据,直接调用 Repository 方法(如 findAll()),支持分页和排序。

  • MongoItemReader:从 MongoDB 读取数据,基于 Spring Data MongoDB,支持 MongoDB 查询,需配置 MongoTemplate 或 ReactiveMongoTemplate。

  • Neo4jItemReader:从 Neo4j 图数据库读取数据,基于 Spring Data Neo4j,支持 Cypher 查询,需配置 Neo4jTemplate。

  • LdifReader MappingLdifReader:从 LDIF(LDAP 数据交换格式)文件读取数据,支持 LDAP 条目解析和映射,MappingLdifReader可自定义条目映射逻辑。

复合与增强读取器

  • MultiResourceItemReader:从多个资源(如多个文件)读取数据,自动按顺序读取多个资源,支持资源排序和失败恢复。

  • ResourcesItemReader:读取资源列表(如文件路径集合),返回资源对象(如 Resource)而非具体内容,通常作为其他读取器的前置步骤。

  • SynchronizedItemStreamReader:装饰其他 ItemReader,通过同步块包装 read() 方法提供线程安全,适用于多线程步骤。

  • SingleItemPeekableItemReader:允许预读取下一条记录而不消耗它,通过 peek() 方法查看下一条记录,常用于条件处理场景。

  • ItemReaderAdapter:适配自定义对象为 ItemReader,可包装任意对象的方法为读取逻辑(如将 MyService::fetchNext 转换为 ItemReader)。

特殊场景读取器

  • StaxEventItemReader:基于 StAX 事件模型读取 XML,内存高效,适合大文件,通过 XmlEventReader 解析 XML 流。

  • AvroItemReader:读取 Avro 格式数据,支持 Schema 演化和数据类型转换,需配置 AvroItemReaderBuilder。

  

ItemReader 接口定义

ItemReader 接口源代码如下:

package org.springframework.batch.item;

import org.springframework.lang.Nullable;

/**
 * 提供数据的策略接口。 
 * 
 * 实现类应该是有状态的,并且对于每个批次会被多次调用,每次调用 read() 方法会返回
 * 不同的值,当所有输入数据耗尽时最终返回 null。
 * 
 * 实现类无需保证线程安全,ItemReader 的客户端需要注意这一点。
 * 
 * 更丰富的接口(例如带有前瞻或窥视功能)并不可行,因为我们需要在异步批处理中支持事务。
 * 
 * @author Rob Harrop
 * @author Dave Syer
 * @author Lucas Ward
 * @author Mahmoud Ben Hassine
 * @since 1.0
 */
public interface ItemReader<T> {

    /**
     * 读取一段输入数据并推进到下一个数据。实现必须在输入数据集末尾返回 null。
     * 在事务环境中,如果第一次调用处于回滚的事务中,调用者可能会从连续调
     * 用(或以其他方式)中两次获取相同的项。 
     * 
     * @throws ParseException 如果解析当前记录时出现问题(但下一条记录可能仍然有效)
     * @throws NonTransientResourceException 如果底层资源中出现致命异常。
     *       抛出此异常后,实现应尽量在后续的读取调用中返回null。
     * @throws UnexpectedInputException 如果输入数据存在未分类的问题。
     *       假定该问题可能是暂时的,因此后续对read的调用可能会成功。
     * @throws Exception 如果存在非特定错误。
     * @return T 要处理的项;如果数据源已耗尽,则为 null
     */
    @Nullable
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

源码解析:

  • T  表示读取的数据项的类型,例如 Customer, User, String 等等。

  • read()  该方法是接口的核心方法,每次调用返回一个类型为 T 的 Item 对象。当没有更多数据可读时,返回 null。该接口将抛出如下异常:

    • UnexpectedInputException  表示读取时遇到意外错误,如文件格式错误等

    • ParseException  表示解析读取到的数据失败,如 CSV 行解析错误、JSON 解析失败等

    • NonTransientResourceException  表示遇到不可恢复的错误,如数据源连接永久丢失等

  

ItemReader  执行流程

下面是 ItemReader 的执行流程图:

image.png

到这里,ItemReader 接口已经介绍完,后续章节将介绍 ItemReader 实现类到用法。

  

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号