SpringBatch开发实战-2
ItemReader
ItemReader 数据读取
- 首先需要解决的就是数据来源的问题,不同项目存在有个完整的数据批处理操作中所以批处理的数据可能来自于文件,也可能是数据库或不同的应用环境以及业务背景者是消息组件
ItemReader 实现子类
- SpringBatch 框架中提供了 ItemReader 数据读取接口,同时对于常见的数据读取操作也提供有相应的实现子类
多资源数据读取
- 使用 FlatFileltemReader 每次只能够读取一个数据文件,但是在现实的开发中考虑到批处理文件过大,从而会按照文件存储的阈值进行拆分操作,若要同时读取多个资源文件则就需要通过 MultiResourceltemReader 子类进行处理,需要注意的是,该类仅仅提供资源配置,具体资源读取依然由 FlatFileltemReader 实现
1、
@Bean
public ItemReader<Bill> reader() {
// 此时的数据是批量读取进来的,但是会按照既定的格式进行拆分(每行读一次)
// 此时使用的数据输入流类,只支持单个文件的读取(后面有适合于目录的读取讲解)
FlatFileItemReader<Bill> reader = new FlatFileItemReader<>(); // 文件输入流
// 每行读取到的数据需要设置专属的数据行的处理映射实例
reader.setLineMapper(this.lineMapper()); // 数据的处理
// 当前的数据是在CLASSPATH资源下,要使用到Resource读取
PathMatchingResourcePatternResolver resolver =
new PathMatchingResourcePatternResolver(); // 资源匹配
String filePath = "classpath:data/bill.txt"; // 资源文件路径
reader.setResource(resolver.getResource(filePath));
return reader;
}
2、
package com.yootk.test;
import com.yootk.StartSpringBatchApplication;
import com.yootk.batch.vo.Bill;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@ContextConfiguration(classes = StartSpringBatchApplication.class)
@ExtendWith(SpringExtension.class)
public class TestItemReader { // 定义测试类
private static final Logger LOGGER = LoggerFactory.getLogger(TestItemReader.class);
@Autowired
private FlatFileItemReader<Bill> itemReader; // 数据拆分
@Test
public void testSplit() throws Exception { // 定义作业启动方法
this.itemReader.open(new ExecutionContext()); // 创建执行上下文
Bill bill = null; // 保存读取数据
while((bill = this.itemReader.read()) != null) {
LOGGER.info("【交易信息】账户ID:{}、姓名:{}、金额:{}、交易时间:{}、交易位置:{}",
bill.getId(), bill.getName(), bill.getAmount(), bill.getTransaction(), bill.getLocation());
}
}
}
3、
@Bean
public FlatFileItemReader<Bill> reader() {
// 此时的数据是批量读取进来的,但是会按照既定的格式进行拆分(每行读一次)
// 此时使用的数据输入流类,只支持单个文件的读取(后面有适合于目录的读取讲解)
FlatFileItemReader<Bill> reader = new FlatFileItemReader<>(); // 文件输入流
// 每行读取到的数据需要设置专属的数据行的处理映射实例
reader.setLineMapper(this.lineMapper()); // 数据的处理
return reader;
}
@Bean
public ItemReader<Bill> multiReader() throws IOException { // 多资源读取
MultiResourceItemReader<Bill> reader =
new MultiResourceItemReader<>(); // 多个资源读取数据
reader.setDelegate(this.reader()); // 单个资源读取
// 当前的数据是在CLASSPATH资源下,要使用到Resource读取
PathMatchingResourcePatternResolver resolver =
new PathMatchingResourcePatternResolver(); // 资源匹配
String filePath = "classpath:data/*.txt"; // 资源文件路径
reader.setResources(resolver.getResources(filePath));
return reader;
}
4、
5、
【交易信息】账户ID:8197276813101、姓名:马云涛、金额:782830.23、交易时间:Sun Oct 11 11:21:21 CST 2026、交易位置:洛阳支行
【交易信息】账户ID:8197276813101、姓名:马云涛、金额:727265.13、交易时间:Mon Oct 12 16:16:24 CST 2026、交易位置:洛阳支行
【交易信息】账户ID:8197276813101、姓名:马云涛、金额:223891.13、交易时间:Sun Nov 01 16:05:26 CST 2026、交易位置:北京支行
【交易信息】账户ID:8197276813101、姓名:马云涛、金额:-122500.0、交易时间:Mon Nov 02 20:09:27 CST 2026、交易位置:北京ATM
【交易信息】账户ID:8197276813101、姓名:马云涛、金额:-1.622228198E7、交易时间:Tue Nov 03 15:24:28 CST 2026、交易位置:信用卡
【交易信息】账户ID:9197276813101、姓名:李兴华、金额:9830.23、交易时间:Sat Oct 11 11:21:21 CST 2025、交易位置:洛阳支行
【交易信息】账户ID:9197276813101、姓名:李兴华、金额:1265.13、交易时间:Sun Oct 12 16:16:24 CST 2025、交易位置:洛阳支行
【交易信息】账户ID:9197276813101、姓名:李兴华、金额:3891.13、交易时间:Sat Nov 01 16:05:26 CST 2025、交易位置:北京支行
【交易信息】账户ID:9197276813101、姓名:李兴华、金额:-1500.0、交易时间:Sun Nov 02 20:09:27 CST 2025、交易位置:北京ATM
【交易信息】账户ID:9197276813101、姓名:李兴华、金额:-1681.98、交易时间:Mon Nov 03 15:24:28 CST 2025、交易位置:信用卡
ItemProcessor
ItemProcessor 数据处理
- 当前的开发中已经成功的实现了文本数据与 Bill 对象实例之间的转换操作,但是由于不同批处理业务的需要,可能某些操作中要使用到 Bill 对象,而某些操作中可能会使用 Account 对象
使用 JavaScript 处理 ltemProcessor
- 由于开发中数据转换处理的使用较多,所以 ItemProcessor 接口提供了,通过 ScriptltemProcessor 实现子类,该类可以基于 Spring 的脚本支持环境 JavaScript 配置数据转换的操作代码
1、
package com.yootk.batch.vo;
public class Account {
private Long id;
private Double amount;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
}
2、
package com.yootk.batch.processor;
import com.yootk.batch.vo.Account;
import com.yootk.batch.vo.Bill;
import org.springframework.batch.item.ItemProcessor;
public class BillToAccountItemProcessor implements ItemProcessor<Bill, Account> {
@Override
public Account process(Bill item) throws Exception {
Account account = new Account(); // 实例化Account目标类对象
account.setId(item.getId());
account.setAmount(item.getAmount());
return account;
}
}
3、
@Bean
public ItemProcessor<Bill, Account> itemProcessor() {
return new BillToAccountItemProcessor();
}
4、
package com.yootk.test;
import com.yootk.StartSpringBatchApplication;
import com.yootk.batch.vo.Account;
import com.yootk.batch.vo.Bill;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@ContextConfiguration(classes = StartSpringBatchApplication.class)
@ExtendWith(SpringExtension.class)
public class TestItemProcessor { // 定义测试类
private static final Logger LOGGER = LoggerFactory.getLogger(TestItemProcessor.class);
@Autowired
private ItemProcessor<Bill, Account> itemProcessor; // 数据处理
@Autowired
private MultiResourceItemReader<Bill> multiReader; // 多资源读取
@Test
public void testSplit() throws Exception { // 定义作业启动方法
this.multiReader.open(new ExecutionContext()); // 创建执行上下文
Bill bill = null; // 保存读取数据
while((bill = this.multiReader.read()) != null) {
Account account = this.itemProcessor.process(bill); // 实现对象转换处理
LOGGER.info("【账户信息】账户ID:{}、金额:{}", bill.getId(), bill.getAmount());
}
}
}
5、
【账户信息】账户ID:8197276813101、金额:782830.23
【账户信息】账户ID:8197276813101、金额:727265.13
【账户信息】账户ID:8197276813101、金额:223891.13
【账户信息】账户ID:8197276813101、金额:-122500.0
【账户信息】账户ID:8197276813101、金额:-1.622228198E7
【账户信息】账户ID:9197276813101、金额:9830.23
【账户信息】账户ID:9197276813101、金额:1265.13
【账户信息】账户ID:9197276813101、金额:3891.13
【账户信息】账户ID:9197276813101、金额:-1500.0
【账户信息】账户ID:9197276813101、金额:-1681.98
6、
// https://mvnrepository.com/artifact/org.openjdk.nashorn/nashorn-core
implementation group: 'org.openjdk.nashorn', name: 'nashorn-core', version: '15.4'
7、
var account = new com.yootk.batch.vo.Account(); // 对象实例化
account.id = item.getId(); // 属性赋值
account.amount = item.getAmount() * 1
account;
8、
@Bean
public ItemProcessor<Bill, Account> itemProcessor() {
ScriptItemProcessor<Bill, Account> itemProcessor =
new ScriptItemProcessor<>();
PathMatchingResourcePatternResolver resolver =
new PathMatchingResourcePatternResolver(); // 资源匹配
String filePath = "classpath:js/billToAccount.js"; // 脚本文件
itemProcessor.setScript(resolver.getResource(filePath));
itemProcessor.setItemBindingVariableName("item"); // 脚本里面绑定的属性名称
return itemProcessor;
}
9、
【账户信息】账户ID:8197276813101、金额:782830.23
【账户信息】账户ID:8197276813101、金额:727265.13
【账户信息】账户ID:8197276813101、金额:223891.13
【账户信息】账户ID:8197276813101、金额:-122500.0
【账户信息】账户ID:8197276813101、金额:-1.622228198E7
【账户信息】账户ID:9197276813101、金额:9830.23
【账户信息】账户ID:9197276813101、金额:1265.13
【账户信息】账户ID:9197276813101、金额:3891.13
【账户信息】账户ID:9197276813101、金额:-1500.0
【账户信息】账户ID:9197276813101、金额:-1681.98
ItemWriter
ItemWriter 实现批处理结果存储
- 数据在经过了若干处理步骤后,已经从其中获取到了所需的重要数据信息。为了后续数据的使用方便,就需要将这些数据进行存储,考虑到实际应用的业务区别 SpringBatch 提供了消息组件存储、数据库存储或文件存储的支持
ItemWriter 接口继承结构
- 为了满足不同数据存储的需要,SpringBatch 提供了 ltemWriter 接口标准,该接口默认提供了不同的实现子类
1、
DROP DATABASE IF EXISTS yootk;
CREATE DATABASE yootk CHARACTER SET UTF8;
USE yootk;
CREATE TABLE account (
id BIGINT,
amount INT
)ENGINE = InnoDB;
2、
yootk.database.driverClassName=com.mysql.cj.jdbc.Driver
yootk.database.jdbcUrl=jdbc:mysql://localhost:3306/yootk
yootk.database.username=root
yootk.database.password=mysqladmin
yootk.database.connectionTimeOut=3000
yootk.database.readOnly=false
yootk.database.pool.idleTimeOut=3000
yootk.database.pool.maxLifetime=60000
yootk.database.pool.maximumPoolSize=60
yootk.database.pool.minimumIdle=20
3、
package com.yootk.config;
@Configuration // 配置类
@PropertySource("classpath:config/database.properties") // 配置加载
public class YootkDataSourceConfig { // 数据源配置Bean
@Value("${yootk.database.driverClassName}") // 资源文件读取配置项
private String driverClassName; // 数据库驱动程序
@Value("${yootk.database.jdbcUrl}") // 资源文件读取配置项
private String jdbcUrl; // 数据库连接地址
@Value("${yootk.database.username}") // 资源文件读取配置项
private String username; // 用户名
@Value("${yootk.database.password}") // 资源文件读取配置项
private String password; // 密码
@Value("${yootk.database.connectionTimeOut}") // 资源文件读取配置项
private long connectionTimeout; // 连接超时
@Value("${yootk.database.readOnly}") // 资源文件读取配置项
private boolean readOnly; // 只读配置
@Value("${yootk.database.pool.idleTimeOut}") // 资源文件读取配置项
private long idleTimeout; // 连接最小维持时长
@Value("${yootk.database.pool.maxLifetime}") // 资源文件读取配置项
private long maxLifetime; // 连接最大存活时长
@Value("${yootk.database.pool.maximumPoolSize}") // 资源文件读取配置项
private int maximumPoolSize; // 连接池最大维持数量
@Value("${yootk.database.pool.minimumIdle}") // 资源文件读取配置项
private int minimumIdle; // 连接池最小维持数量
@Bean("yootkDataSource") // Bean注册
public DataSource yootkDataSource() { // 配置数据源
HikariDataSource dataSource = new HikariDataSource(); // DataSource子类实例化
dataSource.setDriverClassName(this.driverClassName); // 驱动程序
dataSource.setJdbcUrl(this.jdbcUrl); // JDBC连接地址
dataSource.setUsername(this.username); // 用户名
dataSource.setPassword(this.password); // 密码
dataSource.setConnectionTimeout(this.connectionTimeout); // 连接超时
dataSource.setReadOnly(this.readOnly); // 是否为只读数据库
dataSource.setIdleTimeout(this.idleTimeout); // 最小维持时间
dataSource.setMaxLifetime(this.maxLifetime); // 连接的最大时长
dataSource.setMaximumPoolSize(this.maximumPoolSize); // 连接池最大容量
dataSource.setMinimumIdle(this.minimumIdle); // 最小维持连接量
return dataSource; // 返回Bean实例
}
}
4、
@Autowired
private DataSource yootkDataSource; // 注入指定名称的实例
@Bean
public ItemWriter<Account> itemWriter() {
JdbcBatchItemWriter<Account> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(yootkDataSource); // 数据库的连接
String sql = "INSERT INTO account(id, amount) VALUES (:id, :amount)";
itemWriter.setSql(sql); // 追加的SQL语句
itemWriter.setItemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<>()); // SQL参数的配置
return itemWriter;
}
5、
package com.yootk.test;
import com.yootk.StartSpringBatchApplication;
import com.yootk.batch.vo.Account;
import com.yootk.batch.vo.Bill;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.util.ArrayList;
import java.util.List;
@ContextConfiguration(classes = StartSpringBatchApplication.class)
@ExtendWith(SpringExtension.class)
public class TestItemWriter { // 定义测试类
private static final Logger LOGGER = LoggerFactory.getLogger(TestItemWriter.class);
@Autowired
private ItemProcessor<Bill, Account> itemProcessor; // 数据处理
@Autowired
private MultiResourceItemReader<Bill> multiReader; // 多资源读取
@Autowired
private ItemWriter<Account> itemWriter; // 数据输出
@Test
public void testSplit() throws Exception { // 定义作业启动方法
List<Account> accounts = new ArrayList<>(); // 保存要输出的数据
this.multiReader.open(new ExecutionContext()); // 创建执行上下文
Bill bill = null; // 保存读取数据
while((bill = this.multiReader.read()) != null) {
accounts.add(this.itemProcessor.process(bill)); // 数据处理后的保存
}
this.itemWriter.write(new Chunk<Account>(accounts)); // 新版本的特点
}
}
创建批处理作业
执行批处理作业
- SpringBatch 是以作业的形式展开处理的,所以最终的作业执行依然要通过 JobLauncher 实例进行触发,这样就需要在应用中进行作业以及步骤的配置,一个完整的作业步骤中会包含有数据的输入(ItemReader)、数据处理(ltemProcessor)以及数据输出(ltemWriter)等几项核心的操作
1、
@Bean
@StepScope // 接收作业执行的参数
public ItemReader<Bill> multiReader(
@Value("#{jobParameters[path]}") String path
) throws IOException { // 多资源读取
MultiResourceItemReader<Bill> reader =
new MultiResourceItemReader<>(); // 多个资源读取数据
reader.setDelegate(this.reader()); // 单个资源读取
// 当前的数据是在CLASSPATH资源下,要使用到Resource读取
PathMatchingResourcePatternResolver resolver =
new PathMatchingResourcePatternResolver(); // 资源匹配
// String filePath = "classpath:data/*.txt"; // 资源文件路径
reader.setResources(resolver.getResources(path));
return reader;
}
2、
@Autowired
private ItemReader<Bill> multiReader; // 自己进入自己
@Bean
public Step billStep() {
SimpleStepBuilder<Bill, Account> builder = new SimpleStepBuilder<>(
new StepBuilder("billStep", jobRepository));
builder.transactionManager(this.batchTransactionManager);
return builder
.chunk(5) // 每次处理5条数据项
.reader(this.multiReader) // 设置数据输入来源
.processor(this.itemProcessor()) // 设置数据处理
.writer(this.itemWriter()) // 设置数据的输出
.build(); // 构建Step
}
3、
@Bean
public Job billJob() { // 对账作业
return new JobBuilder("billJob", jobRepository)
.start(this.billStep()) // 自定义的异步任务处理
.build();
}
4、
package com.yootk.test;
import com.yootk.StartSpringBatchApplication;
import com.yootk.batch.vo.Account;
import com.yootk.batch.vo.Bill;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@ContextConfiguration(classes = StartSpringBatchApplication.class)
@ExtendWith(SpringExtension.class)
public class TestBillJob { // 定义测试类
private static final Logger LOGGER = LoggerFactory.getLogger(TestBillJob.class);
@Autowired
private Job billJob; // 定义作业
@Autowired
private JobLauncher launcher; // 作业执行器
@Test
public void testBillJob() throws Exception { // 定义作业启动方法
JobParameters jobParameters = new JobParametersBuilder()
.addString("project", "muyan-yootk-spring-batch") // 项目参数
.addString("developer", "yootk-lixinghua") // 项目参数
.addLong("timestamp", System.currentTimeMillis()) // 项目参数
.addString("path", "classpath:data/*.txt") // 项目参数
.toJobParameters(); // 创建作业参数
this.launcher.run(this.billJob, jobParameters); // 启动作业
}
}
操作监听
数据操作监听接口
- 一次完整的数据写入处理操作,需要进行大量的底层逻辑单元的配置,数据在这其中的每一步处理都可以通过监听器来获取,考虑到核心的底层单元包括读、处理以及写三个操作,所以操作监听器也分别提供有三个,分别是数据读取监听(ltemReadListener) 数据处理监听(ltemProcessListener)以及数据写入监听(ltemWriteListener)二个接口
1、
package com.yootk.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.item.Chunk;
public class AccountItemWriteListener implements ItemWriteListener { // 数据的写入监听
private static final Logger LOGGER =
LoggerFactory.getLogger(AccountItemWriteListener.class);
@Override
public void beforeWrite(Chunk items) {
LOGGER.info("【数据写入监听】数据写入长度:{}", items.size());
}
}
2、
@Bean
public AccountItemWriteListener itemWriteListener() {
return new AccountItemWriteListener();
}
Chunk 简介
SpringBatch 逻辑组成结构
- Step 在进行批处理操作时,依靠的是 Tasklet 接口定义了具体的执行步骤,而在一个处理步骤之中,有可能会需要进行数据读取、数据处理以及数据写入三个核心逻辑,所以为了便于执行任务的定义,SpringBatch 提供了 Chunk 的概念
Chunk 配置
- Chunk 元素定义面向批处理的操作,一个完整的 Chunk 处理中会包含有数据读取、数据处理以及数据输出三种,除此之外也支持有数据的跳过策略、错误重试策略以及事务处理策略的支持,SimpleStepBuild 类中对于 Chunk 的配置提供有两种不同的定义方法
1、
@Bean
public CompletionPolicy completionPolicy() {
SimpleCompletionPolicy policy = new SimpleCompletionPolicy(); // 完成策略
policy.setChunkSize(3); // 每次执行3条处理
return policy;
}
ChunkListener
配置 ChunkListener
- Chunk 实现了每一个 Step 中的核心操作逻辑定义,在执行时会根据其配置的完成策略重复的执行 Chunk 操作,为了便于用户实现 Chunk 状态的监听操作,SpringBatch 提供了 ChunkListener 监听接口,利用该接口的实例可以在 Chunk 执行前后以及异常产生时进行所需操作
SpringBatch 监听接口
- SpringBatch 中的每一个元素几乎都提供了与之对应的监听接口,这样的设计便于开发者进行各个处理环节的监控
1、
package com.yootk.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;
public class BillStepChunkListener implements ChunkListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(BillStepChunkListener.class);
@Override
public void beforeChunk(ChunkContext context) {
// Chunk里面会包含有ItemReader、ItemProcessor、ItemWriter操作单元
// ChunkContext配置的内容一定可以在这些单元中获取
context.setAttribute("book", "《SSM开发实战》"); // 设置属性
LOGGER.info("【Chunk监听 —— beforeChunk()】步骤名称:{}、完成状态:{}",
context.getStepContext().getStepName(), context.isComplete());
}
@Override
public void afterChunk(ChunkContext context) {
LOGGER.info("【Chunk监听 —— afterChunk()】步骤名称:{}、完成状态:{}",
context.getStepContext().getStepName(), context.isComplete());
}
@Override
public void afterChunkError(ChunkContext context) {
LOGGER.info("【Chunk监听 —— afterChunkError()】步骤名称:{}、完成状态:{}",
context.getStepContext().getStepName(), context.isComplete());
}
}
2、
@Bean
public BillStepChunkListener chunkListener() {
return new BillStepChunkListener();
}
3、
@Bean
public Step billStep() {
SimpleStepBuilder<Bill, Account> builder = new SimpleStepBuilder<>(
new StepBuilder("billStep", jobRepository));
builder.transactionManager(this.batchTransactionManager);
return builder
.chunk(this.completionPolicy()) // 定义完成策略
.listener(this.itemWriteListener()) // 设置数据的写入监听
.reader(this.multiReader) // 设置数据输入来源
.processor(this.itemProcessor()) // 设置数据处理
.writer(this.itemWriter()) // 设置数据的输出
.faultTolerant() // 失败处理操作
.listener(this.chunkListener())// 配置监听
.build(); // 构建Step
}
Chunk 事务处理
Chunk 与事务配置
- 一个完整的批处理之中是以 Step 作为批处理的基本配置单位,考虑到数据处理的简洁性,每一个 Step 之间都存在有独立的事务支持,而在每一个 Step 中执行 Chunk 时,也都会有独立的事务开启,在一次完整的批处理读写过程中,事务会工作在 Chunk 之中并且随着数据量以及批处理长度的配置,事务会重复执行多次
Step 事务配置
- SpringBatch 中继续使用了 Spring 提供的 PlatformTransactionManager 实现了事务处理支持,用户可以在进行 Step 配置的时候,利用 transactionManager()方法配置具体的事务处理实例
1、
package com.yootk.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
public class SpringBatchTransactionManagerConfig { // Spring批处理的事务管理器
@Bean
public PlatformTransactionManager batchTransactionManager(
DataSource batchDataSource) {
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
transactionManager.setDataSource(batchDataSource);
return transactionManager;
}
}
异常跳过机制
批处理作业中断
SpringBatch 数据批处理是一个庞大的处理逻辑单元的总和,默认情况下批处理中的每.个处理单元只要产生了某些异常,那么就会导致整个批处理作业的中断
ItemReader 异常跳过
由于批处理作业的事务是以 Chunk 为单元进行控制的,所以正确的批处理数据结构是可以正常存储的,但是错误产生之后的数据将无法得到正确的保存,这就导致批处理作业的不完整。所以为了可以让批处理作业正常执行完毕,在 SpringBatch 中提供了异常跳过的机制,这样当某些异常产生时就会跳过当前数据,而执行下一条数据处理
SkipListener 监听接
虽然使用异常跳过机制可以有效的解决批处理作业执行中断的问题,但是这些错误的数据本身依然会存在有某些价值,这样就需要对这些错误的数据进行有效的监听记录。为此提供了 SkipListener 监听接口,在该接口中提供有三个数据监听方法,分别对应着 ltemReader、ltemProcessor 以及 ltemWriter 三种不同的操作结构,开发者可以依据该接口的结构实现所需数据项的记录。
异常跳过配置结构
如果要想配置异常跳过机制,只需要通过 FaultTolerantStepBuilder 配置类即可,除了需要配置 SkipListener 接口实例之外,还需要配置忽略的异常类型,以及异常跳过策略(SkipPolicy)
1、
package com.yootk.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.SkipListener;
public class BillStepSkipListener implements SkipListener {
private static final Logger LOGGER = LoggerFactory.getLogger(
BillStepSkipListener.class);
@Override
public void onSkipInRead(Throwable t) {
LOGGER.info("【SkipInRead】数据读取错误,异常信息:{}", t.getMessage());
}
@Override
public void onSkipInProcess(Object item, Throwable t) {
LOGGER.info("【SkipInProcess】数据处理错误,数据项:{}、异常信息:{}", item, t.getMessage());
}
@Override
public void onSkipInWrite(Object item, Throwable t) {
LOGGER.info("【SkipInWrite】数据写入错误,数据项:{}、异常信息:{}", item, t.getMessage());
}
}
2、
@Bean
public BillStepSkipListener skipListener() {
return new BillStepSkipListener();
}
3、
@Bean
public SkipPolicy skipPolicy() { // 追加跳过策略
return new AlwaysSkipItemSkipPolicy(); // 总是跳过错误
}
4、
@Bean
public Step billStep() {
SimpleStepBuilder<Bill, Account> builder = new SimpleStepBuilder<>(
new StepBuilder("billStep", jobRepository));
builder.transactionManager(this.batchTransactionManager);
return builder
.chunk(this.completionPolicy()) // 定义完成策略
.listener(this.itemWriteListener()) // 设置数据的写入监听
.reader(this.multiReader) // 设置数据输入来源
.processor(this.itemProcessor()) // 设置数据处理
.writer(this.itemWriter()) // 设置数据的输出
.faultTolerant() // 失败处理操作
.skip(NumberFormatException.class) // 跳过异常
.skip(DuplicateKeyException.class) // 重复主键异常
.skipLimit(Integer.MAX_VALUE) // 跳过的极限
.skipPolicy(this.skipPolicy()) // 跳过策略
.listener(this.chunkListener())// 配置监听
.listener(this.skipListener()) // 跳过监听
.build(); // 构建Step
}
错误重试机制
错误重试机制
- 一些临时性的错误,但是这些错误可能只会在短期内出现在批处理操作过程中经常会出现所以为了保证批处理的正确执行应该在批处理中追加有错误重试的机制。例如:现在批处理的数据最终都要写入到指定的数据库之中,但是由于临时性的网络故障(可能只是几秒的时间),导致服务器连接不上,那么应该在间隔一段时间后继续重新尝试写入
重拾机制实现架构
- 重试机制属于 SpringBatch 中的容错故障支持,所以需要通过 FaultTolerantStepBuilder 类的实例进行定义,在配置时还需要定义好重试策略(BRetryPolicy)以及重试间隔策略(BackoffPolicy),在每次操作重试时也可以通过 RetryListener 接口进行状态的监听
1、
package com.yootk.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.listener.RetryListenerSupport;
public class BillStepRetryListener extends RetryListenerSupport {
private static final Logger LOGGER =
LoggerFactory.getLogger(BillStepRetryListener.class);
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
LOGGER.info("【Retry - open()】进入到Retry操作,{}", context);
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
LOGGER.info("【Retry - close()】Retry操作结束,{}", context);
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
LOGGER.info("【Retry - onError()】Retry操作异常,{}", throwable.getMessage());
}
}
2、
@Bean
public RetryListener retryListener() {
return new BillStepRetryListener();
}
3、
@Bean
public RetryPolicy retryPolicy() {
SimpleRetryPolicy policy = new SimpleRetryPolicy();
policy.setMaxAttempts(3);// 最多尝试次数
return policy;
}
4、
@Bean
public BackOffPolicy backOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); // 间隔重试
backOffPolicy.setBackOffPeriod(5000); // 每5秒重试一次
return backOffPolicy;
}
5、
@Bean
public Step billStep() {
SimpleStepBuilder<Bill, Account> builder = new SimpleStepBuilder<>(
new StepBuilder("billStep", jobRepository));
builder.transactionManager(this.batchTransactionManager);
return builder
.chunk(this.completionPolicy()) // 定义完成策略
.listener(this.itemWriteListener()) // 设置数据的写入监听
.reader(this.multiReader) // 设置数据输入来源
.processor(this.itemProcessor()) // 设置数据处理
.writer(this.itemWriter()) // 设置数据的输出
.faultTolerant() // 失败处理操作
.skip(NumberFormatException.class) // 跳过异常
.skip(DuplicateKeyException.class) // 重复主键异常
.skipLimit(Integer.MAX_VALUE) // 跳过的极限
.skipPolicy(this.skipPolicy()) // 跳过策略
.listener(this.chunkListener())// 配置监听
.listener(this.skipListener()) // 跳过监听
.retry(SQLException.class) // 产生SQL异常
.retryPolicy(this.retryPolicy()) // 重试策略
.backOffPolicy(this.backOffPolicy()) // 回退策略
.listener(this.retryListener()) // 监听机制
.build(); // 构建Step
}
6、
DROP TABLE IF EXISTS account;
SpringTask 简介
定时任务与批处理作业
- 批处理操作由于会大量的占用服务器的硬件资源,所以在在实际的应用中,大多都是在服务器空闲的时间段内完成的,这样不仅保证了批处理作业的稳定性,同时又避免了应用高峰期内服务处理性能不足的问题出现。所以对于批处理的作业不应该采用手工触发的形式,而是应该采用自动触发的形式
JDK 定时调度任务
- 由于在应用开发中,定时调度较为常用,所以 JDK 也提供了实现结构,开发者直接通过 TimerTask 接口即可定义定义任务处理类,同时该任务将基于 Runnable 多线程接口实现,但是这一操作只支持有间隔调度的支持,同时该操作结构每次只允许一个线程任务的调度,所以并不适合于更加准确的定时任务控制。
1、
package com.yootk.test;
import java.util.Timer;
import java.util.TimerTask;
class YootkTask extends TimerTask {// 是由Java系统所提供的一个工具类
@Override
public void run() { // 定时任务的本质就是一个专属的操作线程
System.out.println("【定时任务】主体信息,输出当前时间戳:" + System.currentTimeMillis());
}
}
public class TestTask {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new YootkTask(), 1, 2); // 1秒后开始执行,每2秒调度一次
}
}
SpringTask 间隔调度
SpringTask 定时任务类
- SpringTask 是 Spring 框架的中自带的子组件,当开发者在项目中引用了“spring-就可以直接使用 SpringTask 进行定时任务的编写。而具体的定 context”依赖库之后时任务处理方法的定义,则可以使用“@Scheduled”注解进行声明,这样在每次任务执行时就会根据触发模式而进行该方法的调用,而所有的定时任务处理类如果要想被 Spring 正确的调度,则可以通过“@Component”注解的形式进行 Bean 注册
定时任务启用
- 如果要想在当前的 Spring 容器中启动定时任务调度处理,那么还需要在应用启动中追加“@EnableScheduling”注解,该注解会自动进行所有定时任务方法的解析,而后基于 “@Scheduled”注解的配置进行间隔调度或 CRON 表达式调度处理,
1、
project(":task") {
dependencies{} // 因为公共的依赖库已经导入了所需要的依赖
}
2、
package com.yootk.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component // 自动注册
public class YootkTask {
private static final Logger LOGGER = LoggerFactory.getLogger(YootkTask.class);
@Scheduled(fixedRate = 2000) // 间隔2秒进行业务调度
public void runTask() {
LOGGER.info("【YOOTK - 定时任务】当前时间:{}", System.currentTimeMillis());
}
}
3、
package com.yootk;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
@ComponentScan({"com.yootk.task"}) // 定义扫描包
@EnableScheduling // 启用SpringTask定时调度处理
public class StartSpringTaskApplication {
}
4、
<beans>
<task:annotation-driven scheduler="taskScheduler"/>
<task:scheduler id="taskScheduler" pool-size="42"/>
<task:scheduled-tasks scheduler="taskScheduler">
<task:scheduled ref="myTask" method="work" fixed-rate="1000"/>
</task:scheduled-tasks>
<bean id="myTask" class="com.foo.MyTask"/>
</beans>
5、
package com.yootk.test;
import com.yootk.StartSpringTaskApplication;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.util.concurrent.TimeUnit;
@ContextConfiguration(classes = StartSpringTaskApplication.class) // 定义启动类
@ExtendWith(SpringExtension.class)
public class TestSpringTask { // 定时任务的测试类
@Test
public void testRun() throws Exception {
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
}
CRON 表达式
CRON
- 使用 SpringTask 任务调度组件代替 JDK 组件实现的主要目的在于,SpringTask 可以实现定期调度,例如:每周五晚上 12 点启动任务,或者每年 12 月 31 日启动统计任务等而间隔调度的处理就需要结合 CRON 表达式来完成。CRON 原本是一个 Linux 系统下执行定时任务的处理工具,利用如下格式的字符串来进行任务调度的安排:
- 【格式一】七个描述域:
- 秒分时日月周年(Seconds Minutes Hours DayofMonth Month DayofWeek Year)
- 【格式二】六个描述域:
- 秒分时日月周(Seconds Minutes Hours DayofMonth Month DayofWeek)
- 【格式一】七个描述域:
1、
package com.yootk.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component // 自动注册
public class YootkTask {
private static final Logger LOGGER = LoggerFactory.getLogger(YootkTask.class);
@Scheduled(cron = "* * * * * ?") // 间隔秒进行业务调度
public void runTask() {
LOGGER.info("【YOOTK - 定时任务】当前时间:{}", System.currentTimeMillis());
}
}
2、
package com.yootk.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component // 自动注册
public class YootkTask {
private static final Logger LOGGER = LoggerFactory.getLogger(YootkTask.class);
@Scheduled(cron = "0 * * * * ?") // 间隔秒进行业务调度
public void runTask() {
LOGGER.info("【YOOTK - 定时任务】当前时间:{}", System.currentTimeMillis());
}
}
SpringTask 任务调度池
定时任务调度
- 根据不同业务的需求,同一个项目应用中可能会同时存在有多个并行的处理任务,然而在默认情况下 SpringTask 采用的是单线程池的方式进行处理,这就意味着每次只允许一个任务执行,而其他任务即使已经到了调度时间也需要等待其他任务执行完毕后才可以启动,这样一来就造成了任务管理的困难
- 为了解决此类问题,可以在 Spring 容器中根据调度任务的数量进行线程池的开辟,这样就可以保证多个任务并行调度的处理
1、
package com.yootk.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component // 自动注册
public class MuyanTask {
private static final Logger LOGGER = LoggerFactory.getLogger(MuyanTask.class);
@Scheduled(cron = "* * * * * ?") // 间隔秒进行业务调度
public void runTask() {
LOGGER.info("【MUYAN - 定时任务】当前时间:{}", System.currentTimeMillis());
}
}
2、
package com.yootk.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component // 自动注册
public class YootkTask {
private static final Logger LOGGER = LoggerFactory.getLogger(YootkTask.class);
@Scheduled(cron = "* * * * * ?") // 间隔秒进行业务调度
public void runTask() {
LOGGER.info("【YOOTK - 定时任务】当前时间:{}", System.currentTimeMillis());
try {
TimeUnit.SECONDS.sleep(5); // 每次休眠5秒的时间
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
3、
package com.yootk.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.Executors;
@Configuration
public class ScheduleConfig implements SchedulingConfigurer { // 配置定时任务处理
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(2)); // 线程池配置
}
}
4、
package com.yootk;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
@ComponentScan({"com.yootk.task", "com.yootk.config"}) // 定义扫描包
@EnableScheduling // 启用SpringTask定时调度处理
public class StartSpringTaskApplication {
}
5、
09:04:03.012 com.yootk.task.YootkTask - 【YOOTK - 定时任务】当前时间:1675472643012
09:04:03.012 com.yootk.task.MuyanTask - 【MUYAN - 定时任务】当前时间:1675472643012
09:04:04.000 com.yootk.task.MuyanTask - 【MUYAN - 定时任务】当前时间:1675472644000
09:04:05.002 com.yootk.task.MuyanTask - 【MUYAN - 定时任务】当前时间:1675472645002
09:04:06.000 com.yootk.task.MuyanTask - 【MUYAN - 定时任务】当前时间:1675472646000
09:04:07.002 com.yootk.task.MuyanTask - 【MUYAN - 定时任务】当前时间:1675472647002
09:04:08.001 com.yootk.task.MuyanTask - 【MUYAN - 定时任务】当前时间:1675472648001
09:04:09.000 com.yootk.task.YootkTask - 【YOOTK - 定时任务】当前时间:1675472649000
09:04:09.001 com.yootk.task.MuyanTask - 【MUYAN - 定时任务】当前时间:1675472649001
09:04:10.002 com.yootk.task.MuyanTask - 【MUYAN - 定时任务】当前时间:1675472650002
09:04:11.001 com.yootk.task.MuyanTask - 【MUYAN - 定时任务】当前时间:1675472651001
demo