在 Spring Batch 4 中,ItemReader 是批处理作业 读取 数据的核心抽象接口。它是 "读-处理-写" 模式中的第一个关键环节,负责从各种数据源(数据库、文件、队列、API 等)逐条或分块地读取数据,并将这些数据作为 Item 提供给后续的 ItemProcessor 和 ItemWriter。
ItemReader 组件的核心职责是把数据从外部数据源读取出来,然后将其转换为应用程序能够处理的对象。在每次调用 read() 方法返回一个单独的 Item 对象。当所有数据读取完毕时返回 null,标志着读取步骤的结束。
ItemReader 对各类数据源进行了抽象,使批处理作业能够轻松地与不同类型的数据源进行交互,这些数据源包括但不限于:
文件(如 CSV、XML、JSON)
数据库(通过 JDBC、JPA 或者 Hibernate)
消息队列(像 JMS)
REST API
其他系统(借助自定义适配器)
ItemReader 每次执行 read() 方法时,会读取一条记录/数据,并将其转换为 Java 对象。具体的读取逻辑如下:
当有数据可读时,返回一个对象。
当所有数据都读取完毕后,返回 null。
如果读取过程中出现异常,会抛出相应的异常,如 UnexpectedInputException 或者 ParseException。
为了确保批处理作业能够可靠地执行,ItemReader 具备以下特性:
事务管理:读取操作在事务范围内进行,保证数据的一致性。
状态保存:通过实现 ItemStream 接口,ItemReader 可以保存当前的读取位置。
作业重启:在作业重启时,能够从上次中断的位置继续读取数据。
ItemReader 还需负责将原始数据转换为应用程序可以处理的对象,常见的转换方式有:
对于 CSV 文件,使用 DelimitedLineTokenizer 进行解析。
对于 XML 文件,采用 StaxEventItemReader 结合 XStream 或者 JAXB 进行解析。
对于数据库记录,利用 RowMapper 或者 JPA 实体映射。
对于自定义格式的数据,实现 FieldSetMapper 接口进行自定义映射。
在数据读取过程中,ItemReader 需要对可能出现的异常情况进行处理:
处理格式错误的数据,例如解析失败的 CSV 行。
处理缺失的数据,如必填字段为空的情况。
处理不可恢复的错误,如文件不存在或者数据库连接失败。
结合 SkipPolicy 和 RetryTemplate 实现跳过机制和重试机制。
下面是对常见 ItemReader 接口实现类的介绍,按数据源类型分类说明:
FlatFileItemReader:用于读取平面文件(如 CSV、固定格式文件),支持自定义分隔符、行前缀过滤,可通过 setLineToSkip 从文件中间位置开始读取,并能通过 LineMapper 接口自定义行映射逻辑。
JsonItemReader:用于读取 JSON 文件(每行一个 JSON 对象),基于 Jackson 或 Gson 解析 JSON,需配置 JsonObjectReader 指定解析器。
XmlItemReader:基于 StAX 解析器读取 XML 文件,支持大文件流式处理,可通过 XPath 表达式定位数据节点。
AvroItemReader:用于读取 Avro 格式文件,需提供 Avro 模式(Schema)或使用反射推断,支持二进制或 JSON 编码的 Avro 文件。
JdbcCursorItemReader:通过 JDBC 游标读取数据库,使用 ResultSet 逐行读取,适合大数据量,需手动管理数据库连接和事务。
JdbcPagingItemReader:通过 SQL 分页查询读取数据库,基于 PagingQueryProvider 自动生成分页 SQL,支持不同数据库方言(如 MySQL、Oracle)。
JpaPagingItemReader:通过 JPA 分页读取数据库,基于 JPA EntityManager,无需手动编写 SQL,支持 JPA 实体映射和查询。
HibernateCursorItemReader:通过 Hibernate 游标读取数据库,使用 Hibernate ScrollableResults 逐行读取,需配置 Hibernate SessionFactory。
HibernatePagingItemReader:通过 Hibernate 分页读取数据库,基于 Hibernate Query 分页,支持 setMaxResults,需手动管理 Hibernate 会话。
StoredProcedureItemReader:通过存储过程读取数据库,支持调用数据库存储过程并获取结果集,需配置SqlParameter和RowMapper。
AmqpItemReader:从 RabbitMQ 等 AMQP 消息队列读取消息,基于 Spring AMQP,支持事务性消费,可配置 MessageConverter 转换消息格式。
JmsItemReader:从 JMS 消息队列读取消息,支持 JMS 规范的消息中间件(如 ActiveMQ),需配置 JmsTemplate。
KafkaItemReader:从 Kafka 读取消息,基于 Spring Kafka,支持批量消费和偏移量管理,需配置 ConsumerFactory和RecordMessageConverter。
ListItemReader:从 Java 集合(如 List)读取数据,适合测试或小数据量场景,数据需预先加载到内存。
IteratorItemReader:从 Java Iterator 读取数据,支持惰性加载数据(如通过生成器函数),需确保Iterator线程安全。
RepositoryItemReader:从 Spring Data Repository 读取数据,直接调用 Repository 方法(如 findAll()),支持分页和排序。
MongoItemReader:从 MongoDB 读取数据,基于 Spring Data MongoDB,支持 MongoDB 查询,需配置 MongoTemplate 或 ReactiveMongoTemplate。
Neo4jItemReader:从 Neo4j 图数据库读取数据,基于 Spring Data Neo4j,支持 Cypher 查询,需配置 Neo4jTemplate。
LdifReader / MappingLdifReader:从 LDIF(LDAP 数据交换格式)文件读取数据,支持 LDAP 条目解析和映射,MappingLdifReader可自定义条目映射逻辑。
MultiResourceItemReader:从多个资源(如多个文件)读取数据,自动按顺序读取多个资源,支持资源排序和失败恢复。
ResourcesItemReader:读取资源列表(如文件路径集合),返回资源对象(如 Resource)而非具体内容,通常作为其他读取器的前置步骤。
SynchronizedItemStreamReader:装饰其他 ItemReader,通过同步块包装 read() 方法提供线程安全,适用于多线程步骤。
SingleItemPeekableItemReader:允许预读取下一条记录而不消耗它,通过 peek() 方法查看下一条记录,常用于条件处理场景。
ItemReaderAdapter:适配自定义对象为 ItemReader,可包装任意对象的方法为读取逻辑(如将 MyService::fetchNext 转换为 ItemReader)。
StaxEventItemReader:基于 StAX 事件模型读取 XML,内存高效,适合大文件,通过 XmlEventReader 解析 XML 流。
AvroItemReader:读取 Avro 格式数据,支持 Schema 演化和数据类型转换,需配置 AvroItemReaderBuilder。
ItemReader 接口源代码如下:
package org.springframework.batch.item; import org.springframework.lang.Nullable; /** * 提供数据的策略接口。 * * 实现类应该是有状态的,并且对于每个批次会被多次调用,每次调用 read() 方法会返回 * 不同的值,当所有输入数据耗尽时最终返回 null。 * * 实现类无需保证线程安全,ItemReader 的客户端需要注意这一点。 * * 更丰富的接口(例如带有前瞻或窥视功能)并不可行,因为我们需要在异步批处理中支持事务。 * * @author Rob Harrop * @author Dave Syer * @author Lucas Ward * @author Mahmoud Ben Hassine * @since 1.0 */ public interface ItemReader<T> { /** * 读取一段输入数据并推进到下一个数据。实现必须在输入数据集末尾返回 null。 * 在事务环境中,如果第一次调用处于回滚的事务中,调用者可能会从连续调 * 用(或以其他方式)中两次获取相同的项。 * * @throws ParseException 如果解析当前记录时出现问题(但下一条记录可能仍然有效) * @throws NonTransientResourceException 如果底层资源中出现致命异常。 * 抛出此异常后,实现应尽量在后续的读取调用中返回null。 * @throws UnexpectedInputException 如果输入数据存在未分类的问题。 * 假定该问题可能是暂时的,因此后续对read的调用可能会成功。 * @throws Exception 如果存在非特定错误。 * @return T 要处理的项;如果数据源已耗尽,则为 null */ @Nullable T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }
源码解析:
T 表示读取的数据项的类型,例如 Customer, User, String 等等。
read() 该方法是接口的核心方法,每次调用返回一个类型为 T 的 Item 对象。当没有更多数据可读时,返回 null。该接口将抛出如下异常:
UnexpectedInputException 表示读取时遇到意外错误,如文件格式错误等
ParseException 表示解析读取到的数据失败,如 CSV 行解析错误、JSON 解析失败等
NonTransientResourceException 表示遇到不可恢复的错误,如数据源连接永久丢失等
下面是 ItemReader 的执行流程图:
到这里,ItemReader 接口已经介绍完,后续章节将介绍 ItemReader 实现类到用法。