SpringBatch开发实战
大约 28 分钟
数据批处理简介
数据批量处理结构
- 在项目中所有的业务都是以数据支撑的,而不同的项目平台之间也会存在有数据对接的需要而为了便于数据发送的效率,往往会将所发送的数据封装成一个庞大的数据文件发送给指定的数据处理接口。而在该接口中为了便于该批量数据文件的处理,往往会采用分批的方式进行读取与处理,最终再根据最终的需要将处理后的数据保存在指定的存储介质之中。
SpringBatch 处理架构
- SpringBatch 框架会帮助开发者提供大量的批处理支持,开发者只需要依据 SpringBatch 所提供的结构单元进行各自业务逻辑的开发即可,在 SpringBatch 考虑到程序开发的便捷性,提供了如图所示的分层技术架构,在该架构中会存在有三个重要的组成部分,每个部分的作用如下所示:
- 应用层(Application):包含所有批处理任务以及开发者使用 SpringBatch 编写的其他程序代码:
- 核心层(Batch Core):提供运行与管理批处理任务的支持,包含了批处理启动和控制所需的核心类,例如:JobLauncher(作业运行)、Job(作业)、Step(作业步骤);
- 基础架构层(Batch Infrastructure),应用层和核心层建立在基础架构层之上,该层提供数据的统一读 (ItemReader)、写(ltemWriter)和服务(例如 RetryTemplate 提供的重试支持)。
SpringBatch 数据存储结构
获取 SpringBatch 数据库脚本
- SpringBatch 项目的运行,必须提供有 JobRepository(作业仓库),同时要提供与 JobRepository 对应的数据库存储表结构,这样才可以在运行批处理作业时,实现相关信息的记录。由于 SpringBatch 中的数据表结构有明确的定义要求,就需要开发者首先通过 SpringBatch 的官方仓库地址:https://github.com/spring-projects/spring-batch
https://github.com/spring-projects/spring-batch
https://github.com/spring-projects/spring-batch/tree/main/spring-batch-core/src/main/resources/org/springframework/batch/core
1、
DROP DATABASE IF EXISTS batch;
CREATE DATABASE batch CHARACTER SET UTF8;
USE batch;
-- Autogenerated: do not edit this file
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME(6) NOT NULL,
START_TIME DATETIME(6) DEFAULT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6),
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
PARAMETER_NAME VARCHAR(100) NOT NULL ,
PARAMETER_TYPE VARCHAR(100) NOT NULL ,
PARAMETER_VALUE VARCHAR(2500) ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
CREATE_TIME DATETIME(6) NOT NULL,
START_TIME DATETIME(6) DEFAULT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6),
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
SpringBatch 编程起步
Job 接口实例创建结构
- 一个批处理作业之中会存在有若干个处理步骤,所以 SpringBatch 在项目中提供了 Job 与 Step 接口标准,以便于开发者进行应用的定义,在进行批处理作业定义时,开发者可以在一个 Job 接口实例中配置若干个 Step 接口实例,以描述不同的作业步骤。用户可以使用 StepBuilderFactory 构建工厂类进行 Step 接口实例的创建,处理结构如图所示而对于 Job 接口的实例创建,也采用了类似的结构,提供了 JobBuilderFactory 构建工厂类
@EnableBatchProcessing 批处理启用注解
- 如果要想创建 Job 与 Step 接口实例,那么首先就需要在当前的项目中获取到 StepBuilderFactory 与 jobBuilderFactory 两各类的实例化对象,所以为了更好的与 Spring 容器整合,SpringBatch 中提供了一个“@EnableBatchProcessing”批处理启动注解,该注解可以基于 ImportSelector 配置实现相关 Bean 的注册
JobRepository 接囗关联结构
- SpringBatch 在运行过程中,会通过 JobRepository 接口实例实现数据库中相关数据表的 CRUD 操作,该类操作的实现主要基于 JdbcTemplate 模版技术实现,针对于不同的数据表提供有不同的 DAO 接口和实现类
1、
// https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core
implementation group: 'org.springframework.batch', name: 'spring-batch-core', version: '5.0.0'
// https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-infrastructure
implementation group: 'org.springframework.batch', name: 'spring-batch-infrastructure', version: '5.0.0'
// https://mvnrepository.com/artifact/com.mysql/mysql-connector-j
implementation group: 'com.mysql', name: 'mysql-connector-j', version: '8.0.31'
// https://mvnrepository.com/artifact/com.zaxxer/HikariCP
implementation group: 'com.zaxxer', name: 'HikariCP', version: '5.0.1'
2、
batch.database.driverClassName=com.mysql.cj.jdbc.Driver
batch.database.jdbcUrl=jdbc:mysql://localhost:3306/batch
batch.database.username=root
batch.database.password=mysqladmin
batch.database.connectionTimeOut=3000
batch.database.readOnly=false
batch.database.pool.idleTimeOut=3000
batch.database.pool.maxLifetime=60000
batch.database.pool.maximumPoolSize=60
batch.database.pool.minimumIdle=20
3、
package com.yootk.config;
@Configuration // 配置类
@PropertySource("classpath:config/database.properties") // 配置加载
public class BatchDataSourceConfig { // 数据源配置Bean
@Value("${batch.database.driverClassName}") // 资源文件读取配置项
private String driverClassName; // 数据库驱动程序
@Value("${batch.database.jdbcUrl}") // 资源文件读取配置项
private String jdbcUrl; // 数据库连接地址
@Value("${batch.database.username}") // 资源文件读取配置项
private String username; // 用户名
@Value("${batch.database.password}") // 资源文件读取配置项
private String password; // 密码
@Value("${batch.database.connectionTimeOut}") // 资源文件读取配置项
private long connectionTimeout; // 连接超时
@Value("${batch.database.readOnly}") // 资源文件读取配置项
private boolean readOnly; // 只读配置
@Value("${batch.database.pool.idleTimeOut}") // 资源文件读取配置项
private long idleTimeout; // 连接最小维持时长
@Value("${batch.database.pool.maxLifetime}") // 资源文件读取配置项
private long maxLifetime; // 连接最大存活时长
@Value("${batch.database.pool.maximumPoolSize}") // 资源文件读取配置项
private int maximumPoolSize; // 连接池最大维持数量
@Value("${batch.database.pool.minimumIdle}") // 资源文件读取配置项
private int minimumIdle; // 连接池最小维持数量
@Bean // Bean注册
@Primary // 注入首选
public DataSource batchDataSource() { // 配置数据源
HikariDataSource dataSource = new HikariDataSource(); // DataSource子类实例化
dataSource.setDriverClassName(this.driverClassName); // 驱动程序
dataSource.setJdbcUrl(this.jdbcUrl); // JDBC连接地址
dataSource.setUsername(this.username); // 用户名
dataSource.setPassword(this.password); // 密码
dataSource.setConnectionTimeout(this.connectionTimeout); // 连接超时
dataSource.setReadOnly(this.readOnly); // 是否为只读数据库
dataSource.setIdleTimeout(this.idleTimeout); // 最小维持时间
dataSource.setMaxLifetime(this.maxLifetime); // 连接的最大时长
dataSource.setMaximumPoolSize(this.maximumPoolSize); // 连接池最大容量
dataSource.setMinimumIdle(this.minimumIdle); // 最小维持连接量
return dataSource; // 返回Bean实例
}
}
4、
package com.yootk.batch.tasklet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
public class MessageTasklet implements Tasklet { // 创建批处理的任务
private static final Logger LOGGER = LoggerFactory.getLogger(MessageTasklet.class);
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
LOGGER.info("【数据批处理操作】沐言科技 —— 李兴华Java高薪就业编程训练营。");
// 每一个步骤都会包含一个完整的执行的状态,这个状态通过RepeatStatus表示
return RepeatStatus.FINISHED; // 处理结束
}
}
5、
package com.yootk.config;
import com.yootk.batch.tasklet.MessageTasklet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration // 定义配置类
public class SpringBatchConfig { // Spring批处理作业的配置
private static final Logger LOGGER = LoggerFactory.getLogger(SpringBatchConfig.class);
@Autowired // 由SpringBatch自动提供好的支持
private JobRepository jobRepository; // SpringBatch数据库的操作
@Autowired
private PlatformTransactionManager batchTransactionManager; // 事务管理器
@Bean
public Job messageJob() {
return new JobBuilder("messageJob", jobRepository)
.start(this.messageStep()).build();
}
@Bean
public Step messageStep() { // 定义消息的处理步骤
return new StepBuilder("messageStep", this.jobRepository)
.tasklet(this.messageTasklet(),
this.batchTransactionManager).build(); // 创建任务的步骤
}
@Bean
public Tasklet messageTasklet() {
return new MessageTasklet(); // 任务的具体处理逻辑
}
}
6、
package com.yootk.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
public class SpringBatchTransactionManagerConfig { // Spring批处理的事务管理器
@Bean
public PlatformTransactionManager batchTransactionManager(
DataSource batchDataSource) {
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
transactionManager.setDataSource(batchDataSource);
return transactionManager;
}
}
7、
package com.yootk.test;
import com.yootk.StartSpringBatchApplication;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.util.concurrent.TimeUnit;
@ContextConfiguration(classes = StartSpringBatchApplication.class)
@ExtendWith(SpringExtension.class)
public class TestSpringBatch { // 定义测试类
private static final Logger LOGGER = LoggerFactory.getLogger(TestSpringBatch.class);
@Autowired
private Job messageJob; // 定义作业
@Autowired
private JobLauncher jobLauncher; // 定义作业执行器
@Test
public void testJobRun() throws Exception { // 定义作业启动方法
this.jobLauncher.run(this.messageJob, new JobParameters()); // 作业运行
TimeUnit.SECONDS.sleep(Long.MAX_VALUE); // 等待作业执行完毕
}
}
8、
package com.yootk;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan({"com.yootk.config", "com.yootk.batch"})
// 简化点直接使用如下配置:@ComponentScan("com.yootk")
@EnableBatchProcessing(dataSourceRef = "batchDataSource",
transactionManagerRef = "batchTransactionManager") // 启动SpringBatch的环境
public class StartSpringBatchApplication {
}
JobParameters
作业实例组成结构
- SpringBatch 之中所有的作业都需要在数据表中进行记录,所以当一个作业要想重复执行时,就需要为其配置不同的参数,所以一个完整的作业实例(JobInstance)是由作业名称(Job Name)以及作业参数(Job Parameters)两个结构组成
SpringBatch 作业参数
- 每一个作业中都可能会存在有若干个不同的参数,所以为了便于作业参数的管理 SpringBatch 提供了 JobParameters 结构类,该类本身维护着个 Map 集合,每 Map 集合中保存的参数使用 JobParameter 对象实例进行包装允许配置的数据参数类型有四种(通过 JobParameter.ParameterType 枚举类进行定义)分别为字符串、日期、长整型以及浮点型,而对于 JobParameters 实例创建,则可以依靠 JobParametersBuilder 构建器类进行创建
1、
@Test
public void testJobRun() throws Exception { // 定义作业启动方法
JobParameters jobParameters = new JobParametersBuilder()
.addString("project", "muyan-yootk-spring-batch") // 项目参数
.addString("dataResource", "file:d:/data/") // 项目参数
.addString("developer", "yootk-lixinghua") // 项目参数
.addLong("timestamp", System.currentTimeMillis()) // 项目参数
.addDate("createDate", new Date()) // 项目参数
.toJobParameters(); // 构建作业参数
this.jobLauncher.run(this.messageJob, jobParameters); // 作业运行
TimeUnit.SECONDS.sleep(Long.MAX_VALUE); // 等待作业执行完毕
}
2、
SELECT * FROM batch_job_instance;
3、
SELECT * FROM batch_job_execution_params;
JobExplorer
作业与作业信息获取
- 作业是批处理之中的基本单元,在一个批处理的项目之中,开发者可以根据自己的需要配置多个批处理作业。每一个批处理作业之中都会存在有若干个 Step 进行该作业具体实现步骤的配置。所有用户定义的作业都可以通过 JobLauncher 接口实例启动,并可以根据需要配置所需的作业参数。
JobExplorer 类关联结构
- 由于项目中执行的批处理操作较多,所以 SpringBatch 会将每一次执行的批处理作业的信息保存在与之相关的数据表之中,由于整个的操作都是由 SpringBatch 内部的 DAO 接口实现的支持,所以为了便于用户查询作业的相关信息,对外提供了一个 JobExplorer 接口实例,利用该接口可以获取作业以及对应的作业步骤的数据项,同时每一个数据都会以实体类的形式返回
1、
package com.yootk.test;
import com.yootk.StartSpringBatchApplication;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.*;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ContextConfiguration(classes = StartSpringBatchApplication.class)
@ExtendWith(SpringExtension.class)
public class TestJobExplorer { // 定义测试类
private static final Logger LOGGER = LoggerFactory.getLogger(TestJobExplorer.class);
@Autowired
private JobExplorer jobExplorer; // 获取作业浏览器
@Test
public void testJobRun() throws Exception { // 定义作业启动方法
String jobName = "messageJob"; // 定义作业名称
List<String> names = this.jobExplorer.getJobNames(); // 获取作业名称的列表
LOGGER.info("【作业信息】作业列表:{}", names);
LOGGER.info("【作业信息】messageJob作业数量:{}",
this.jobExplorer.getJobInstanceCount(jobName));
JobInstance instance = this.jobExplorer.getLastJobInstance(jobName); // 获取作业实例
LOGGER.info("【作业信息】最后一次执行作业ID:{}、作业名称:{}",
instance.getInstanceId(), instance.getJobName());
List<JobExecution> executions =
this.jobExplorer.getJobExecutions(instance); // 获取作业执行信息
for (JobExecution execution : executions) {
LOGGER.info("【作业信息】作业状态:{}、作业参数:{}",
execution.getStatus(), execution.getJobParameters());
}
}
}
Job 参数验证
批处理作业
- 在一个完整的线上应用中,由于需要保证每个应用的运行的稳定性,所以在进行批处理操作时,就需要基于参数的配置来进行作业的启动。在作业运行过程之中,也会使用到一些核心的控制参数,此时为了保证所需参数的正确配置,于是在 SpringBatch 内部提供了一个参数验证器的支持
作业监听器
- 为了便于作业参数验证处理逻辑的规范化开发,SpringBatch 内部提供了 JobParametersValidator 验证器接口,并提供了配置参数检査的 DefaultobParametersValidator 默认实现类,继承结构如图所示,在该类中配置有两个重要的属性:requiredKeys(必须包含的属性 KEY)、optionalKeys(允许出现的属性 KEY),这样在进行作业执行前,如果发现用户传递过来的 JobParameters 对象中所包含的数据 KEY 不满足验证条件,那么程序将直接抛出异常,并结束当前的作业,如果符合数据验证的配置,则可以进行作业的正确执行,由于该操作主要是进行配置定义,所以只需要修改 SpringBatch 配置类即可
1、
@Bean
public DefaultJobParametersValidator jobParametersValidator() { // 定义参数验证器
DefaultJobParametersValidator validator =
new DefaultJobParametersValidator(); // 参数验证器
validator.setRequiredKeys(new String[] {"project"}); // 设置必须传递的参数
validator.setOptionalKeys(new String[] {"developer", "timestamp"}); // 允许传递的参数
return validator;
}
2、
public Job messageJob() {
return new JobBuilder("messageJob", jobRepository)
.start(this.messageStep())
.validator(this.jobParametersValidator()) // 批处理的参数验证器
.build();
}
Job 监听器
作业监听器
- 为便于作业执行的监控,SpringBatch 提供了作业监听器,同时提供了 JobExecutionListener 监听接口标准,这样开发者只需要在作业配置时,利用 listener()方法即可将自定义的监听器实例定义到作业之中,当作业执行前和执行后会调用找到与之匹配的监听处理方法
1、
package com.yootk.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
public class MessageJobExecutionListener implements JobExecutionListener { // 定义作业监听
private static final Logger LOGGER =
LoggerFactory.getLogger(MessageJobExecutionListener.class);
@Override
public void beforeJob(JobExecution jobExecution) { // 作业执行之前
LOGGER.info("【作业执行前】作业ID:{}、作业名称:{}",
jobExecution.getJobId(), jobExecution.getJobInstance().getJobName());
LOGGER.info("【作业执行前】作业参数:{}", jobExecution.getJobParameters());
}
@Override
public void afterJob(JobExecution jobExecution) { // 作业执行之后
LOGGER.info("【作业执行后】作业状态:{}", jobExecution.getStatus());
}
}
2、
@Bean
public Job messageJob() {
return new JobBuilder("messageJob", jobRepository)
.start(this.messageStep())
.validator(this.jobParametersValidator()) // 批处理的参数验证器
.listener(this.messageJobListener()) // 消息作业监听器
.build();
}
@Bean
public MessageJobExecutionListener messageJobListener() {
return new MessageJobExecutionListener();
}
3、
package com.yootk.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.annotation.AfterJob;
import org.springframework.batch.core.annotation.BeforeJob;
public class MessageJobExecutionListener { // 定义作业监听
private static final Logger LOGGER =
LoggerFactory.getLogger(MessageJobExecutionListener.class);
@BeforeJob // 作业执行前的监听方法
public void beforeJobHanadler(JobExecution jobExecution) { // 作业执行之前
LOGGER.info("【作业执行前】作业ID:{}、作业名称:{}",
jobExecution.getJobId(), jobExecution.getJobInstance().getJobName());
LOGGER.info("【作业执行前】作业参数:{}", jobExecution.getJobParameters());
}
@AfterJob // 作业执行后的监听方法
public void afterJobHanadler(JobExecution jobExecution) { // 作业执行之后
LOGGER.info("【作业执行后】作业状态:{}", jobExecution.getStatus());
}
}
作业退出
SpringBatch 作业控制
- 为了便于作业状态的控制,SpringBatch 提供了 JobOperateor 接口,同时该接口又配置了 SimpleJobOperator 实现子类,在该类中可以实现作业的启动(start()方法)、作业重启(restart()方法)以及作业停止(stop()方法)的操作,在开发中可以直接基于监听器的方式来进行作业控制
1、
package com.yootk.test;
@ContextConfiguration(classes = StartSpringBatchApplication.class)
@ExtendWith(SpringExtension.class)
public class TestJobOperator { // 定义测试类
private static final Logger LOGGER = LoggerFactory.getLogger(TestJobOperator.class);
@Autowired
private Job messageJob; // 定义作业
@Autowired
private JobLauncher jobLauncher; // 定义作业执行器
@Autowired // 是由SpringBatch自动提供的对象实例
private JobOperator jobOperator; // 作业操作器
@Test
public void testJobOperator() throws Exception { // 定义作业启动方法
LOGGER.info("【JobOperator】获取全部作业信息:{}", this.jobOperator.getJobNames());
LOGGER.info("【JobOperator】获取作业的信息:{}", this.jobOperator.getParameters(6));
}
}
2、
package com.yootk.batch.tasklet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import java.util.concurrent.TimeUnit;
public class MessageTasklet implements Tasklet { // 创建批处理的任务
private static final Logger LOGGER = LoggerFactory.getLogger(MessageTasklet.class);
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
for (int x = 0 ; x < 10 ; x ++) {
TimeUnit.SECONDS.sleep(1); // 运行一段时间
LOGGER.info("【数据批处理操作】沐言科技 —— 李兴华Java高薪就业编程训练营。");
}
// 每一个步骤都会包含一个完整的执行的状态,这个状态通过RepeatStatus表示
return RepeatStatus.FINISHED; // 处理结束
}
}
3、
package com.yootk.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.annotation.BeforeJob;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
public class AbortJobExecutionListener { // 中断作业执行
private static final Logger LOGGER =
LoggerFactory.getLogger(AbortJobExecutionListener.class);
@Autowired
private JobOperator jobOperator; // 作业的操作者
@BeforeJob // 作业执行之前处理
public void beforeJobHandler(JobExecution jobExecution) {
// 当前只要运行的作业才允许被中断,并且在运行的项目名称里面发现了yootk的信息
if (jobExecution.isRunning() &&
jobExecution.getJobParameters().getString("project").contains("yootk")) {
try {
this.jobOperator.stop(jobExecution.getId()); // 中断任务
} catch (NoSuchJobExecutionException e) {
throw new RuntimeException(e);
} catch (JobExecutionNotRunningException e) {
throw new RuntimeException(e);
}
}
}
}
4、
@Bean
public Job messageJob() {
return new JobBuilder("messageJob", jobRepository)
.start(this.messageStep())
.validator(this.jobParametersValidator()) // 批处理的参数验证器
.listener(this.messageJobListener()) // 消息作业监听器
.listener(this.abortJobExecutionListener()) // 配置中断监听
.build();
}
@Bean
public AbortJobExecutionListener abortJobExecutionListener() {
return new AbortJobExecutionListener(); // 中断执行监听
}
5、
@Test
public void testJobOperator() throws Exception { // 定义作业启动方法
LOGGER.info("【JobOperator】获取对象实例:{}", this.jobOperator.getClass());
LOGGER.info("【JobOperator】获取全部作业信息:{}", this.jobOperator.getJobNames());
LOGGER.info("【JobOperator】获取作业的信息:{}", this.jobOperator.getParameters(6));
}
6、
package com.yootk.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.annotation.BeforeJob;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
public class AbortJobExecutionListener { // 中断作业执行
private static final Logger LOGGER =
LoggerFactory.getLogger(AbortJobExecutionListener.class);
@Autowired
private JobExplorer jobExplorer;
@Autowired
private JobRepository jobRepository;
@Autowired
private JobRegistry jobRegistry;
@BeforeJob // 作业执行之前处理
public void beforeJobHandler(JobExecution jobExecution) {
// 当前只要运行的作业才允许被中断,并且在运行的项目名称里面发现了yootk的信息
if (jobExecution.isRunning() &&
jobExecution.getJobParameters().getString("project").contains("yootk")) {
SimpleJobOperator operator = new SimpleJobOperator();
operator.setJobExplorer(this.jobExplorer);
operator.setJobRegistry(this.jobRegistry);
operator.setJobRepository(this.jobRepository);
try {
operator.stop(jobExecution.getId()); // 中断任务
} catch (NoSuchJobExecutionException e) {
throw new RuntimeException(e);
} catch (JobExecutionNotRunningException e) {
throw new RuntimeException(e);
}
}
}
}
Step 多步骤配置
配置多个作业处理步骤
- Step 是批处理作业之中的基础组成单元,每一个批处理作业之中可以包含有若干个处理步骤,每一个处理步骤都包含了实际运行的批处理任务中的所有必须信息。同时每一个处理步骤在作业运行过程中,也可以基于 RepeatStatus 枚举类来配置重复执行的状态,如果返回的状态是“RepeatStatus.FINISHED”则表示可以执行下一步骤,而如果现在某一个步骤需要被重复执行,则直接返回“RepeatStatus.CONTINUABLE”状态即可
1、
package com.yootk.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration // 定义配置类
public class SpringBatchConfig { // Spring批处理作业的配置
private static final Logger LOGGER = LoggerFactory.getLogger(SpringBatchConfig.class);
@Autowired // 由SpringBatch自动提供好的支持
private JobRepository jobRepository; // SpringBatch数据库的操作
@Autowired
private PlatformTransactionManager batchTransactionManager; // 事务管理器
@Bean
public Job messageJob() {
return new JobBuilder("messageJob", jobRepository)
.start(this.messageReadStep()) // 设置作业的处理步骤
.next(this.messageHandlerStep()) // 设置作业的处理步骤
.next(this.messageWriteStep()) // 设置作业的处理步骤
.build();
}
// 现在假设说考虑到一个作业可能会同时存在有多个步骤,所以下面定义不同的步骤
@Bean
public Step messagePrepareStep() { // 准备步骤,留给后续使用
// 有可能此时需要进行某些特定服务器的连接,或者是执行一些数据库的DDL操作
return new StepBuilder("messagePrepareStep", this.jobRepository)
.tasklet(new Tasklet() { // 定义任务处理逻辑
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
LOGGER.info("【Step-0.消息准备步骤】初始化系统环境,连接服务器接口。");
return RepeatStatus.FINISHED; // 处理完成
}
}, this.batchTransactionManager).build();
}
@Bean
public Step messageReadStep() { // 数据读取部分
return new StepBuilder("messageReadStep", this.jobRepository)
.tasklet(new Tasklet() { // 定义任务处理逻辑
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
LOGGER.info("【Step-1.消息读取步骤】通过输入源读取消息内容。");
return RepeatStatus.FINISHED; // 处理完成
}
}, this.batchTransactionManager).build();
}
@Bean
public Step messageHandlerStep() { // 数据处理部分
return new StepBuilder("messageHandlerStep", this.jobRepository)
.tasklet(new Tasklet() { // 定义任务处理逻辑
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
LOGGER.info("【Step-2.消息处理步骤】检索出包含有“yootk”的数据内容。");
return RepeatStatus.FINISHED; // 处理完成
}
}, this.batchTransactionManager).build();
}
@Bean
public Step messageWriteStep() { // 数据处理部分
return new StepBuilder("messageWriteStep", this.jobRepository)
.tasklet(new Tasklet() { // 定义任务处理逻辑
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
LOGGER.info("【Step-3.消息写入步骤】将合法的消息写入到数据终端。");
return RepeatStatus.FINISHED; // 处理完成
}
}, this.batchTransactionManager).build();
}
}
Step 监听器
Step 监听器
- 由于 Step 属于一个核心的处理单元,所以为了配合该单元的使用,SpringBatch 也提供了对应的监听接口,可以在每个批处理步骤执行前后进行操作拦截。考虑到监听的标准化配置,SpringBatch 提供了 StepExecutionListener 监听接口与操作拦截方法,程序实现结松如图所示。考虑到更灵活的监听配置,也可以使用自定义方法,并结合“@BeforeStep“@AfterStep”注解,可以避免强制性接口实现所带来的代码结构限制
1、
package com.yootk.batch.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
public class MessageStepExecutionListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(MessageStepExecutionListener.class);
@BeforeStep // 步骤执行前监听
public void beforeStepHandler(StepExecution stepExecution) {
LOGGER.info("【步骤执行前】步骤名称:{}、步骤状态:{}",
stepExecution.getStepName(), stepExecution.getStatus());
}
@AfterStep // 步骤执行后监听
public ExitStatus afterStepHandler(StepExecution stepExecution) {
LOGGER.info("【步骤执行后】步骤名称:{}、步骤状态:{}",
stepExecution.getStepName(), stepExecution.getStatus());
return stepExecution.getExitStatus();
}
}
2、
@Bean
public Step messageReadStep() { // 数据读取部分
return new StepBuilder("messageReadStep", this.jobRepository)
.tasklet(new Tasklet() { // 定义任务处理逻辑
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
LOGGER.info("【Step-1.消息读取步骤】通过输入源读取消息内容。");
return RepeatStatus.FINISHED; // 处理完成
}
}, this.batchTransactionManager)
.listener(this.stepExecutionListener()) // 定义监听器
.build();
}
@Bean
public MessageStepExecutionListener stepExecutionListener(){
return new MessageStepExecutionListener(); // 消息步骤监听
}
Flow
基于 Flow 管理 Step
- 在一个完整的批处理作业之中,经常需要进行大量的处理步骤定义,为了便于同一类处理步骤的管理,SpringBatch 提供了 Flow 结构,每一个 Flow 可以配置一组相关的处理个作业中也可以定义多个不同的 Fow 步骤,而后在一
Flow 构建
- 在一个作业之中将基于配置的顺序执行每一个配置的 Flow,在每一个 Flow 中所有的 Step 按照配置的顺序执行,为便于 Flow 的创建,SpringBatch 提供了 FlowBuilder 构建工具类
1、
@Bean
public Flow messageFlow() { // 此时需要配置一系列的处理步骤
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("muyanMessageFlow"); // 定义一组步骤
return flowBuilder.start(this.messageReadStep()) // Flow中的执行步骤
.next(this.messageHandlerStep()) // Flow中的执行步骤
.next(this.messageWriteStep()) // Flow中的执行步骤
.build();
}
2、
public Job messageJob() {
// 如果此时设置的是一个步骤,那么可以直接返回SimpleJobBuilder实例
// 如果是start(Flow flow),则必须通过end方法才可以返回SimpleJobBuilder实例
return new JobBuilder("messageJob", jobRepository)
.start(this.messageFlow()).end() // 配置若干个处理的步骤
.build();
}
JobExecutionDecider
作业执行决策器
- 在进行作业步骤执行的处理之中,用户可以根据自己的需要进行步骤的分支策略配置为此 SpringBatch 提供了决策器的支持,,实现结构如图所示。在决策器的处理中,可以由用户定义具体的分支逻辑,当满足某一逻辑条件时,执行特定的处理步骤。
决策器实现类结构
- 为了便于作业执行决策器的实现,SpringBatch 提供了 JobExecutionDecider 接口标准该接口可以通过 JobExecution 与 StepExecution 两个接口的实例获取到作业与步骤执并通过 FlowExecutionStatus 封装执行步骤的处理标记,相关类的行的相关对象实例,结构定义如图所示,每一个处理标记都需要对应一个完整的操作步骤
1、
package com.yootk.batch.decider;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
public class MessageJobExecutionDecider implements JobExecutionDecider { // 执行决策器
private int count = 0; // 计数标记
@Override
public FlowExecutionStatus decide(JobExecution jobExecution,
StepExecution stepExecution) {
if (this.count ++ % 2 == 0) {
return new FlowExecutionStatus("HANDLER"); // 自定义的执行名称
} else {
return new FlowExecutionStatus("WRITE"); // 自定义的执行名称
}
}
}
2、
@Bean
public MessageJobExecutionDecider messageDecider() {
return new MessageJobExecutionDecider();
}
3、
@Bean
public Job messageJob() {
// 如果此时设置的是一个步骤,那么可以直接返回SimpleJobBuilder实例
// 如果是start(Flow flow),则必须通过end方法才可以返回SimpleJobBuilder实例
return new JobBuilder("messageJob", jobRepository)
.start(this.messageReadStep()) // 开始的处理步骤
.next(this.messageDecider()) // 配置决策器
.from(this.messageDecider()).on("HANDLER")
.to(this.messageHandlerStep()) // 执行处理的步骤
.from(this.messageDecider()).on("WRITE")
.to(this.messageWriteStep()).end()
.build();
}
异步作业
同步作业与异步作业
- JobLauncher 对象实例在进行作业执行时,默认采用的是同步的处理方式,即在 run()方法执行完成后才可以执行后续的处理业务
- 考虑到程序处理性能的机制,往往会基于异步线程的方式进行处理,这样每当调用了 run()方法后,会启动一个异步线程进行批处理业务,而后无需等待即可执行后续的业务功能
配置异步作业
- JobLancher 支持同步与异步两种处理方式,Spring 为了便于异步作业的管理,提供了 TaskExecutor 接口,在该接口中提供有 SimpleAsyncTaskExecutor(异步任务)与 ThreadPoolTaskExecutor(线程池)两个常用子类
1、
@Bean
public TaskExecutor asyncTaskExecutor() { // 异步任务
SimpleAsyncTaskExecutor executor =
new SimpleAsyncTaskExecutor("spring_batch_"); // 线程前缀
// 对于并行数量肯定是由物理CPU来决定的,同时按照常规的优化方式最佳的做法:内核数量 * 2
executor.setConcurrencyLimit(
Runtime.getRuntime().availableProcessors() * 2); // 并行数量
return executor; // 返回线程配置任务
}
2、
@Bean
public Job messageJob() {
// 如果此时设置的是一个步骤,那么可以直接返回SimpleJobBuilder实例
// 如果是start(Flow flow),则必须通过end方法才可以返回SimpleJobBuilder实例
return new JobBuilder("messageJob", jobRepository)
.start(this.messageReadStep()) // 开始的处理步骤
.split(this.asyncTaskExecutor()) // 配置异步任务
.add(this.messageFlow())
.end()
.build();
}
Tasklet 简介
内置 Tasklet 实现子类
- Tasklet 主要用于实现每一个 Step 的具体处理任务定义,开发者可以基于 Tasklet 接口定义自己的任务程序,这样在每一个作业步骤执行时,就可以调用对应的 Tasklet。考虑到程序开发人员的设计需要,SpringBatch 内置了若干 Tasklet 接口实现子类
1、
@Bean
public Step messageStep() {
CallableTaskletAdapter tasklet = new CallableTaskletAdapter(); // 异步任务处理
// 如果要想实现异步的处理,就需要提供有Callable接口实例
tasklet.setCallable(()->{
LOGGER.info("【消息批处理任务】Hello,《SSM开发实战》"); // 日志输出
return RepeatStatus.FINISHED; // 该任务执行完毕
});
return new StepBuilder("messageReadStep", this.jobRepository)
.tasklet(tasklet, this.batchTransactionManager).build();
}
@Bean
public Job messageJob() {
// 如果此时设置的是一个步骤,那么可以直接返回SimpleJobBuilder实例
// 如果是start(Flow flow),则必须通过end方法才可以返回SimpleJobBuilder实例
return new JobBuilder("messageJob", jobRepository)
.start(this.messageStep()) // 自定义的异步任务处理
.build();
}
2、
@Bean
public Job messageJob() {
// 如果此时设置的是一个步骤,那么可以直接返回SimpleJobBuilder实例
// 如果是start(Flow flow),则必须通过end方法才可以返回SimpleJobBuilder实例
return new JobBuilder("messageJob", jobRepository)
.start(this.messageStep()) // 自定义的异步任务处理
.build();
}
@Bean
public Step messageStep() {
class MessageHandler { // 自定义的结构类
public RepeatStatus exec() { // 一定要注意的是其返回的结果
LOGGER.info("【消息处理任务】Hello、《Spring Boot开发实战》");
return RepeatStatus.FINISHED;
}
}
MethodInvokingTaskletAdapter tasklet =
new MethodInvokingTaskletAdapter(); // 创建自定义的方法调用的任务处理
tasklet.setTargetObject(new MessageHandler());// 调用的实例
tasklet.setTargetMethod("exec"); // 配置方法名称
return new StepBuilder("messageStep", this.jobRepository)
.tasklet(tasklet, this.batchTransactionManager).build();
}
批处理模型
批处理模型
- 完整的数据批处理开发之中,必然会存在有原始数据的读取,数据的处理逻辑以及数据的存储配置,SpringBatch 提供了完善的批处理管理结构
数据定义
- 在进行数据批处理操作之前,首先双方需要协商好数据的存储结构,这样在进行批处理操作时,才可以对数据进行正确的解析,而开发中最为常见的三种数据传输格式分别为文本数据、JSON 数据或者是 XML 数据
数据组成
- 由于一般批处理的数据文件较大,所以为了更好的提升处理性能,在开发中往往会基于文本文件方式进行发送,而为了便于文本文件的解析,会对文件进行一些格式限制,常规做法通过定长结构文件,或者基于特定分隔符配置方式处理。
9197276813101,李兴华,9830.23,2025-10-11 11:21:21,洛阳支行
9197276813101,李兴华,1265.13,2025-10-12 16:16:24,洛阳支行
9197276813101,李兴华,3891.13,2025-11-01 16:05:26,北京支行
9197276813101,李兴华,-1500.00,2025-11-02 20:09:27,北京ATM
9197276813101,李兴华,-1681.98,2025-11-03 15:24:28,信用卡
LineMapper
数据批处理操作
- LineMapper 提供了一个数据拆分的标准化操作接口,由于传入的数据信息都有着严格的组织结构,这样在进行数据处理时就需要依据既定的结构进行拆分,如果采用的是分隔符的方式定义的数据,可以通过 DelimitedLineTokenizer 类进行拆分。如果采用的是定长数据,则可以使用 FixedLengthTokenizer 类拆分,而更加繁琐的数据组成结构则可以使用正则表达式进行拆分,通过 RegexLineTokenizer 类进行配置
数据分割操作
- 在 LineTokenizer 接口中提供有 tokenize()的处理方法,该方法每次会接收一行完整的记录,而后利用不同的子类对这些传入的数据进行有效的拆分
1、
@Bean
public DelimitedLineTokenizer lineTokenizer() {
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); // 定义数据分割器
tokenizer.setDelimiter(","); // 定义拆分标记
return tokenizer;
}
2、
package com.yootk.test;
import com.yootk.StartSpringBatchApplication;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@ContextConfiguration(classes = StartSpringBatchApplication.class)
@ExtendWith(SpringExtension.class)
public class TestLineTokenizer { // 定义测试类
private static final Logger LOGGER = LoggerFactory.getLogger(TestLineTokenizer.class);
@Autowired
private DelimitedLineTokenizer lineTokenizer; // 数据拆分器
@Test
public void testSplit() throws Exception { // 定义作业启动方法
String lineData = "9197276813101,李兴华,9830.23,2025-10-11 11:21:21,洛阳支行"; // 要读取的一行数据
FieldSet fieldSet = this.lineTokenizer.tokenize(lineData); // 数据拆分
LOGGER.info("【交易信息】账户ID:{}、姓名:{}、金额:{}、交易时间:{}、交易位置:{}",
fieldSet.readLong(0), fieldSet.readString(1),
fieldSet.readDouble(2), fieldSet.readDate(3),
fieldSet.readString(4));
}
}
3、
package com.yootk.test;
import com.yootk.StartSpringBatchApplication;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.text.SimpleDateFormat;
@ContextConfiguration(classes = StartSpringBatchApplication.class)
@ExtendWith(SpringExtension.class)
public class TestLineTokenizer { // 定义测试类
private static final Logger LOGGER = LoggerFactory.getLogger(TestLineTokenizer.class);
@Autowired
private DelimitedLineTokenizer lineTokenizer; // 数据拆分器
@Test
public void testSplit() throws Exception { // 定义作业启动方法
String lineData = "9197276813101,李兴华,9830.23,2025-10-11 11:21:21,洛阳支行"; // 要读取的一行数据
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
FieldSet fieldSet = this.lineTokenizer.tokenize(lineData); // 数据拆分
LOGGER.info("【交易信息】账户ID:{}、姓名:{}、金额:{}、交易时间:{}、交易位置:{}",
fieldSet.readLong(0), fieldSet.readString(1),
fieldSet.readDouble(2), format.format(fieldSet.readDate(3)),
fieldSet.readString(4));
}
}
4、
package com.yootk.test;
import com.yootk.StartSpringBatchApplication;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.text.SimpleDateFormat;
@ContextConfiguration(classes = StartSpringBatchApplication.class)
@ExtendWith(SpringExtension.class)
public class TestLineTokenizer { // 定义测试类
private static final Logger LOGGER = LoggerFactory.getLogger(TestLineTokenizer.class);
@Autowired
private DelimitedLineTokenizer lineTokenizer; // 数据拆分器
@Test
public void testSplit() throws Exception { // 定义作业启动方法
String lineData = "9197276813101x,李兴华,9830.23x,2025-10-11 11:21:21,洛阳支行"; // 要读取的一行数据
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
FieldSet fieldSet = this.lineTokenizer.tokenize(lineData); // 数据拆分
LOGGER.info("【交易信息】账户ID:{}、姓名:{}、金额:{}、交易时间:{}、交易位置:{}",
fieldSet.readLong(0), fieldSet.readString(1),
fieldSet.readDouble(2),
format.format(fieldSet.readDate(3, "yyyy-MM-dd HH:mm:ss")),
fieldSet.readString(4));
}
}
FieldSetMapper
数据与对象转换
- 为了更好的将批处理文件中的数据与程序中的对象进行对接,SpringBatch 提供了对象映射转换支持,该转换器的实现核心在于通过 FieldSet 接口获取拆分信息,随后将数据填充到指定的对象之中,这样在后续的批处理操作中就可以通过数据对象完成后续的功能
数据行映射转换
- 如果要想实现这样的数据行与对象转换的处理,SpringBatch 提供了一个完善的操作流程,并且提供了 LineMapper 映射处理接口
1、
package com.yootk.batch.vo;
import java.util.Date;
public class Bill {
private Long id; // 账户ID
private String name; // 账户姓名
private Double amount; // 金额
private java.util.Date transaction; // 交易日期时间
private String location; // 交易的位置
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public Date getTransaction() {
return transaction;
}
public void setTransaction(Date transaction) {
this.transaction = transaction;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
2、
package com.yootk.batch.mapper;
import com.yootk.batch.vo.Bill;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
public class BillMapper implements FieldSetMapper<Bill> {
@Override
public Bill mapFieldSet(FieldSet fieldSet) throws BindException {
// 需要将传入的FieldSet接口对象实例,通过手工的方式转为Bill对象
// 如果你的代码的功底更强,此时就可以使用一个你自定义的反射处理类来进行对象实例化
Bill bill = new Bill();
bill.setId(fieldSet.readLong(0));
bill.setName(fieldSet.readString(1));
bill.setAmount(fieldSet.readDouble(2));
bill.setTransaction(fieldSet.readDate(3, "yyyy-MM-dd HH:mm:ss"));
bill.setLocation(fieldSet.readString(4));
return bill;
}
}
3、
@Bean
public BillMapper billMapper() {
return new BillMapper();
}
4、
@Bean
public LineMapper<Bill> lineMapper() {
DefaultLineMapper<Bill> mapper = new DefaultLineMapper<>();
mapper.setLineTokenizer(this.lineTokenizer()); // 设置分割器
mapper.setFieldSetMapper(this.billMapper()); // 数据映射器
return mapper;
}
5、
package com.yootk.test;
import com.yootk.StartSpringBatchApplication;
import com.yootk.batch.vo.Bill;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@ContextConfiguration(classes = StartSpringBatchApplication.class)
@ExtendWith(SpringExtension.class)
public class TestLineMapper { // 定义测试类
private static final Logger LOGGER = LoggerFactory.getLogger(TestLineMapper.class);
@Autowired
private LineMapper<Bill> lineMapper; // 数据拆分
@Test
public void testSplit() throws Exception { // 定义作业启动方法
String lineData = "9197276813101x,李兴华,9830.23x,2025-10-11 11:21:21,洛阳支行"; // 要读取的一行数据
Bill bill = this.lineMapper.mapLine(lineData, 0);
LOGGER.info("【交易信息】账户ID:{}、姓名:{}、金额:{}、交易时间:{}、交易位置:{}",
bill.getId(), bill.getName(), bill.getAmount(), bill.getTransaction(), bill.getLocation());
}
}
demo