frontpage导入网站百度推广怎么样
什么是Spring-batch
-
Sping Batch 是一个轻量级的、完善的的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。
-
Spring Batch 是Spring的一个子项目,基于Spring框架为基础的开发的框架
-
Spring Batch 提供大量可重用的组件,比如:日志,追踪,事务,任务作业统计,任务重启,跳过,重复,资源管理等
-
Spring Batch 是一个批处理应用框架,不提供调度框架,如果需要定时处理需要额外引入-调度框架,比如: Quartz
什么是批处理
就是将数据分批次进行处理的过程。比如:银行对账逻辑,跨系统数据同步等。
常规的批处理操作步骤:系统A从数据库中导出数据到文件,系统B读取文件数据并写入到数据库
典型批处理特点:
-
自动执行,根据系统设定的工作步骤自动完成
-
数据量大,少则百万,多则上千万甚至上亿。(如果是10亿,100亿那只能上大数据了)
-
定时执行,比如:每天,每周,每月执行。
批处理逻辑介绍
spring-batch的运行结构大概分为上图几个部分,我们重点先关注Job,Step,ItemReader,ItemProcessor,ItemWriter几个部分,为了方便理解我举一个例子:
假如.Job是我们上学时老师布置的的作业,那么Step就好比现在有好几个学科的作业,我们总得有个先后顺序,我先写哪个后写哪个,所以一个Job里面可以有多个Step, 然后比如我写到英语这一门,我不会做,怎们办呢,我想抄一抄同学的作业,这一步就是ItemReader的工作,此时我还害怕被老师发现我的作业是抄的同学的,于是我把同学的答案又加工了一下这就是ItemProcessor的作用,ItemWriter相信就大家已经猜到了,这是真正写到了自己的作业本上的答案,所以Step里面又可以分为这么几个步骤
JobLauncher:作业调度器,作业启动主要入口。
Job:作业,需要执行的任务逻辑,
Step:作业步骤,一个Job作业由1个或者多个Step组成,完成所有Step操作,一个完整Job才算执行结束。
ItemReader:Step步骤执行过程中数据输入。可以从数据源(文件系统,数据库,队列等)中读取Item(数据记录)。
ItemWriter:Step步骤执行过程中数据输出,将Item(数据记录)写入数据源(文件系统,数据库,队列等)。
ItemProcessor:Item数据加工逻辑(输入),比如:数据清洗,数据转换,数据过滤,数据校验等
JobRepository: 保存Job或者检索Job的信息。SpringBatch需要持久化Job(可以选择数据库/内存),JobRepository就是持久化的接口
小试牛刀
介绍完上面的各个逻辑,我们来写一个简单的入门案例
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.3</version><relativePath/>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--内存版--><dependency><groupId>com.h2database</groupId><artifactId>h2</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
@SpringBootApplication
@EnableBatchProcessing
public class HelloJod {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;public static void main(String[] args) {SpringApplication.run(HelloJod.class, args);}/*** 任务*/@Beanpublic Job job() {return jobBuilderFactory.get("hello-job").start(step1()).next(step2()).build();}/*** 步骤一*/@Beanpublic Step step1() {return stepBuilderFactory.get("step1").tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("这是第一步!");return RepeatStatus.FINISHED;}}).build();}/*** 步骤二*/@Beanpublic Step step2() {return stepBuilderFactory.get("step2").tasklet(new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("这是第二步!");return RepeatStatus.FINISHED;}}).build();}}
这是spring-batch最简单的一个步骤处理模型,其中使用了h2内存数据库,后续要换成MySQL或其他数据库,只需要引入相应依赖,然后更改yml即可,以MySQL为例:
<!-- <dependency><groupId>com.h2database</groupId><artifactId>h2</artifactId><scope>runtime</scope>
</dependency> --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.12</version>
</dependency>
spring:datasource:username: rootpassword: 123456url: jdbc:mysql://127.0.0.1:3306/springbatch?serverTimezone=GMT%2B8&useSSL=false&allowPublicKeyRetrieval=truedriver-class-name: com.mysql.cj.jdbc.Driver
注意,在启动之前我们需要创建一下数据库的表结构,它存在于org/springframework/batch/core/schema-mysql.sql这个路径下,这里还有其他数据库的初始化语句,
作业监听器
作业监听器:用于监听作业的执行过程逻辑。在作业执行前,执行后2个时间点嵌入业务逻辑。
-
执行前:一般用于初始化操作, 作业执行前需要着手准备工作,比如:各种连接建立,线程池初始化等。
-
执行后:业务执行完后,需要做各种清理动作,比如释放资源等。
public interface JobExecutionListener {//作业执行前void beforeJob(JobExecution jobExecution);//作业执行后void afterJob(JobExecution jobExecution);
}
居于块Tasklet
居于块的Tasklet相对简单Tasklet来说,多了3个模块:ItemReader( 读模块), ItemProcessor(处理模块),ItemWriter(写模块), 跟它们名字一样, 一个负责数据读, 一个负责数据加工,一个负责数据写。
步骤监听器
步骤也有监听器,也是执行步骤执行前监听,步骤执行后监听。
步骤监听器有2个分别是:StepExecutionListener ChunkListener 意义很明显,就是step前后,chunk块执行前后监听。
public interface StepExecutionListener extends StepListener {void beforeStep(StepExecution stepExecution);@NullableExitStatus afterStep(StepExecution stepExecution);
}
带有监听器的案例
@SpringBootApplication
@EnableBatchProcessing
public class JobStepListener {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;public static void main(String[] args) {SpringApplication.run(JobStepListener.class, args);}/*** 任务*/@Beanpublic Job job() {return jobBuilderFactory.get("job_state_job").start(step1()).incrementer(jobParametersIncrementer())//作业监听器.listener(jobExecutionListener())//步骤监听器.listener(JobListenerFactoryBean.getListener(new AnnoJobExecutionListener())).build();}/*** 步骤一*/@Beanpublic Step step1() {return stepBuilderFactory.get("step1").tasklet(tasklet1()).listener(stepExecutionListener()).build();}/*** 步骤的内容*/@Beanpublic Tasklet tasklet1() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {BatchStatus status = chunkContext.getStepContext().getStepExecution().getStatus();System.out.println("运行中!" + status);return RepeatStatus.FINISHED;}};}@Beanpublic JobParametersIncrementer jobParametersIncrementer() {return new DateTimeParameter();}@Beanpublic JobExecutionListener jobExecutionListener() {return new IJobExecutionListener();}@Beanpublic StepExecutionListener stepExecutionListener() {return new IStepListener();}}
public class IJobExecutionListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {BatchStatus status = jobExecution.getStatus();System.out.println("作业运行前的状态" + status);}@Overridepublic void afterJob(JobExecution jobExecution) {BatchStatus status = jobExecution.getStatus();System.out.println("作业运行后的状态" + status);}
}
public class IStepListener implements StepExecutionListener {@Overridepublic void beforeStep(StepExecution stepExecution) {System.out.println("执行了步骤前监听");}@Overridepublic ExitStatus afterStep(StepExecution stepExecution) {System.out.println("执行了步骤后监听");return stepExecution.getExitStatus();}
}
这里仅仅是介绍了spring-batch的冰山一角,其中好多细节都没涉及到,只是大体熟悉一个流程,下一篇文章将用一个综合的Demo更深入了解spring-boot的使用