Spring Batch是一个轻量级的、完美的批处理框架。作为弹簧系统的一员,它在生产中具有灵活、方便、可用的特点。处理高效处理大量信息、定期处理大量数据等场景非常简单。结合调度框架可以充分发挥春批的作用。
目录
一、概念知识1.1、层次架构1.2、关键概念1.2.1、JobRepository1.2.2、JobLauncher1.2.3、Job1.2.4、Step1.2.5、输入——、过程——、输出2。输入——,进程3354,输出2.2.1,读取ItemReader2.2.2,进程ItemProcessor2.2.3,输出ItremWriter2.3,步骤2.4,作业2.5,运行3,监听监听器。
一、Spring Batch的概念知识
1.1、分层架构Spring的层次架构图如下:
可以看出,它分为三层,即:
应用层:包含所有任务、批处理作业和开发者自定义代码,主要是根据项目需求开发的业务流程等。
批处理核心层:包含启动和管理任务的运行环境类,如JobLauncher等。
批处理基础架构基础层:以上两层都是基于基础层,包括基本的读入读取器和写出写入器、重试框架等。
1.2、关键概念理解下图所涉及的概念至关重要,否则很难进行后续的开发和问题分析。
1.2.1、JobRepository
负责处理数据库,记录整批的添加、更新和执行。因此,Spring Batch需要由数据库来管理。
1.2.2、任务启动器JobLauncher
负责启动任务作业。
1.2.3、任务Job
作业是封装整个批处理过程的单元。运行批处理任务意味着运行由作业定义的内容。
上图介绍了Job的一些相关概念:
Job:封装处理实体并定义流程逻辑。
作业实例:作业的运行实例,不同的实例,不同的参数,所以一个作业定义后,可以用不同的参数运行多次。
JobParameters:与JobInstance关联的参数。
JobExecution:表示作业的实际执行,可能成功也可能失败。
所以,开发者要做的就是定义Job。
1.2.4、步骤Step
步骤是对某个作业的某个过程的封装。一个作业可以包含一个或多个步骤。一步一步,按照特定的逻辑,代表作业执行的完成。
通过定义组装作业的步骤,可以更灵活地实现复杂的业务逻辑。
1.2.5、输入——处理——输出
因此,定义作业的关键是定义一个或多个步骤,然后组装它们。定义Step的方法有很多,但一个常见的模型是输入——,处理——输出,即项阅读器、项处理器和项写入器。比如通过项阅读器从文件中输入数据,然后通过项处理器进行业务处理和数据转换,最后通过项写入器写入数据库。
Spring Batch为我们提供了许多开箱即用的阅读器和编写器,非常方便。
二、代码实例
理解基本概念后,直接通过代码感受。整个项目的功能是从多个csv文件中读取数据,处理后输出到一个csv文件中。
2.1、基本框架添加依赖项:
属国
groupIdorg.springframework.boot/groupId
artifactId spring-boot-starter-batch/artifactId
/依赖关系
属国
groupIdcom.h2database/groupId
artifactIdh2/artifactId
范围运行时间/范围
/依赖关系
添加Spring Batch的依赖项并使用H2作为内存数据库是很方便的。在实际生产中,必须使用外部数据库,如Oracle和PostgreSQL。
主入口类:
@SpringBootApplication
@EnableBatchProcessing
公共类PkslowBatchJobMain {
公共静态void main(String[] args) {
spring application . run(pkslowbatchjobmain . class,args);
}
}
也很简单,在Springboot的基础上添加注释@EnableBatchProcessing即可。
实体类员工:
包com . PK slow . batch . entity;
公共类员工{
字符串id;
字符串firstName
字符串lastName
}
相应的csv文件内容如下:
id,名字,姓氏
1、洛凯什、古普塔
2、阿米特、米什拉
3,潘卡杰,库马尔
4、大卫米勒
2.2、输入——处理——输出2.2.1、读取ItemReader
因为有多个输入文件,所以定义如下:
@Value(input/inputData* .CSV’)
私有资源[]投入资源;
@Bean
public multiresourceitemreaderempleemultiresourceitemreader()
{
MultiResourceItemReaderEmployee resourceItemReader=new MultiResourceItemReaderEmployee();
资源项阅读器。设置资源(输入资源);
资源项阅读器。设置委托(reader());
返回resourceItemReader
}
@Bean
公共FlatFileItemReaderEmployee读取器()
{
FlatFileItemReaderEmployee reader=new FlatFileItemReaderEmployee();
//跳过战斗支援车文件第一行,为表头
读者。setlinestoskip(1);
reader.setLineMapper(新的DefaultLineMapper() {
{
setLineTokenizer(新分隔的linetokenizer(){
{
//字段名
setNames(new String[] { id , firstName , last name });
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapperEmployee(){
{
//转换化后的目标类
setTargetType(雇员。类);
}
});
}
});
回报读者;
}
这里使用了FlatFileItemReader,方便我们从文件读取数据。
2.2.2、处理ItemProcessor
为了简单演示,处理很简单,就是把最后一列转为大写:
公共ItemProcessorEmployee,雇员项目处理器(){
退回员工- {
员工。设置姓氏(员工。获取姓氏().toupper case());
退回员工;
};
}
2.2.3、输出ItremWriter
比较简单,代码及注释如下:
私有资源输出资源=新文件系统资源(输出/输出数据。CSV’);
@Bean
公共FlatFileItemWriterEmployee编写器()
{
FlatFileItemWriterEmployee writer=new FlatFileItemWriter();
作家。设置资源(输出资源);
//是否为追加模式
作家。setappendallowed(true);
作家。setlineaggregator(新分隔行聚合器雇员(){
{
//设置分割符
setDelimiter(,);
setfield提取器(new BeanWrapperFieldExtractorEmployee(){
{
//设置字段
setNames(new String[] { id , firstName , last name });
}
});
}
});
回归作家;
}
2.3、Step有了读者-处理器-作者后,就可以定义步骤了:
@Bean
公共步骤csvStep() {
返回stepbuilderfactory。获取(“CSV步骤”).雇员,雇员群(5)。reader(multiResourceItemReader())。处理器(项目处理器())。writer(writer())。build();
}
这里有一个矮胖的人或物的设置,值为5,意思是5条记录后再提交输出,可以根据自己需求定义。
2.4、Job完成了步骤的编码,定义职位就容易了:
@Bean
公共作业pkslowCsvJob() {
返回jobBuilderFactory。get(pkslowCsvJob )。增量器(新的RunIdIncrementer())。开始(csvStep())。build();
}
2.5、运行完成以上编码后,执行程序,结果如下:
成功读取数据,并将最后字段转为大写,并输出到outputData.csv文件。
三、监听Listener
可以通过听众接口对特定事件进行监听,以实现更多业务功能。比如如果处理失败,就记录一条失败日志;处理完成,就通知下游拿数据等。
我们分别对阅读、处理和写事件进行监听,对应分别要实现ItemReadListener接口、ItemProcessListener接口和ItemWriteListener接口。因为代码比较简单,就是打印一下日志,这里只贴出ItemWriteListener的实现代码:
公共类PkslowWriteListener实现ItemWriteListenerEmployee {
私有静态最终日志记录器=日志工厂。获取日志(pkslowwritelistener。类);
@覆盖
写前公共无效(列表?扩展员工列表){
伐木工。info(写之前: list );
}
@覆盖
public void afterWrite(List?扩展员工列表){
伐木工。info(写后: list );
}
@覆盖
public void onWriteError(异常e,列表?扩展员工列表){
伐木工。info( onwriterror: list );
}
}
把实现的监听器听众整合到步骤中去:
@Bean
公共步骤csvStep() {
返回stepbuilderfactory . get( CSV step )。雇员,雇员群(5)。reader(multiResourceItemReader())。监听器(新的PkslowReadListener())。处理器(itemProcessor())。监听器(新的PkslowProcessListener())。writer(writer())。侦听器(新的PkslowWriteListener())。build();
}
看一下执行后的日志:
这里可以清楚的看到之前组块集的作用。Writer一次处理5条记录。如果一条记录输出一次,会对IO造成压力。
以上是优批处理框架对Spring Batch的介绍的详细内容。更多关于Spring批处理框架的信息,请关注我们的其他相关文章!