详解批处理框架之Spring Batch

详解批处理框架之Spring Batch

Spring Batch是一个轻量级的、完美的批处理框架。作为弹簧系统的一员,它在生产中具有灵活、方便、可用的特点。处理高效处理大量信息、定期处理大量数据等场景非常简单。结合调度框架可以充分发挥春批的作用。

目录

一、概念知识1.1、层次架构1.2、关键概念1.2.1、JobRepository1.2.2、JobLauncher1.2.3、Job1.2.4、Step1.2.5、输入——、过程——、输出2。输入——,进程3354,输出2.2.1,读取ItemReader2.2.2,进程ItemProcessor2.2.3,输出ItremWriter2.3,步骤2.4,作业2.5,运行3,监听监听器。

一、Spring Batch的概念知识

1.1、分层架构

Spring的层次架构图如下:

可以看出,它分为三层,即:

应用层:包含所有任务、批处理作业和开发者自定义代码,主要是根据项目需求开发的业务流程等。

批处理核心层:包含启动和管理任务的运行环境类,如JobLauncher等。

批处理基础架构基础层:以上两层都是基于基础层,包括基本的读入读取器和写出写入器、重试框架等。

1.2、关键概念

理解下图所涉及的概念至关重要,否则很难进行后续的开发和问题分析。

1.2.1、JobRepository

负责处理数据库,记录整批的添加、更新和执行。因此,Spring Batch需要由数据库来管理。

1.2.2、任务启动器JobLauncher

负责启动任务作业。

1.2.3、任务Job

作业是封装整个批处理过程的单元。运行批处理任务意味着运行由作业定义的内容。

上图介绍了Job的一些相关概念:

Job:封装处理实体并定义流程逻辑。

作业实例:作业的运行实例,不同的实例,不同的参数,所以一个作业定义后,可以用不同的参数运行多次。

JobParameters:与JobInstance关联的参数。

JobExecution:表示作业的实际执行,可能成功也可能失败。

所以,开发者要做的就是定义Job。

1.2.4、步骤Step

步骤是对某个作业的某个过程的封装。一个作业可以包含一个或多个步骤。一步一步,按照特定的逻辑,代表作业执行的完成。

通过定义组装作业的步骤,可以更灵活地实现复杂的业务逻辑。

1.2.5、输入——处理——输出

因此,定义作业的关键是定义一个或多个步骤,然后组装它们。定义Step的方法有很多,但一个常见的模型是输入——,处理——输出,即项阅读器、项处理器和项写入器。比如通过项阅读器从文件中输入数据,然后通过项处理器进行业务处理和数据转换,最后通过项写入器写入数据库。

Spring Batch为我们提供了许多开箱即用的阅读器和编写器,非常方便。

二、代码实例

理解基本概念后,直接通过代码感受。整个项目的功能是从多个csv文件中读取数据,处理后输出到一个csv文件中。

2.1、基本框架

添加依赖项:

属国

groupIdorg.springframework.boot/groupId

artifactId spring-boot-starter-batch/artifactId

/依赖关系

属国

groupIdcom.h2database/groupId

artifactIdh2/artifactId

范围运行时间/范围

/依赖关系

添加Spring Batch的依赖项并使用H2作为内存数据库是很方便的。在实际生产中,必须使用外部数据库,如Oracle和PostgreSQL。

主入口类:

@SpringBootApplication

@EnableBatchProcessing

公共类PkslowBatchJobMain {

公共静态void main(String[] args) {

spring application . run(pkslowbatchjobmain . class,args);

}

}

也很简单,在Springboot的基础上添加注释@EnableBatchProcessing即可。

实体类员工:

包com . PK slow . batch . entity;

公共类员工{

字符串id;

字符串firstName

字符串lastName

}

相应的csv文件内容如下:

id,名字,姓氏

1、洛凯什、古普塔

2、阿米特、米什拉

3,潘卡杰,库马尔

4、大卫米勒

2.2、输入——处理——输出

2.2.1、读取ItemReader

因为有多个输入文件,所以定义如下:

@Value(input/inputData* .CSV’)

私有资源[]投入资源;

@Bean

public multiresourceitemreaderempleemultiresourceitemreader()

{

MultiResourceItemReaderEmployee resourceItemReader=new MultiResourceItemReaderEmployee();

资源项阅读器。设置资源(输入资源);

资源项阅读器。设置委托(reader());

返回resourceItemReader

}

@Bean

公共FlatFileItemReaderEmployee读取器()

{

FlatFileItemReaderEmployee reader=new FlatFileItemReaderEmployee();

//跳过战斗支援车文件第一行,为表头

读者。setlinestoskip(1);

reader.setLineMapper(新的DefaultLineMapper() {

{

setLineTokenizer(新分隔的linetokenizer(){

{

//字段名

setNames(new String[] { id , firstName , last name });

}

});

setFieldSetMapper(new BeanWrapperFieldSetMapperEmployee(){

{

//转换化后的目标类

setTargetType(雇员。类);

}

});

}

});

回报读者;

}

这里使用了FlatFileItemReader,方便我们从文件读取数据。

2.2.2、处理ItemProcessor

为了简单演示,处理很简单,就是把最后一列转为大写:

公共ItemProcessorEmployee,雇员项目处理器(){

退回员工- {

员工。设置姓氏(员工。获取姓氏().toupper case());

退回员工;

};

}

2.2.3、输出ItremWriter

比较简单,代码及注释如下:

私有资源输出资源=新文件系统资源(输出/输出数据。CSV’);

@Bean

公共FlatFileItemWriterEmployee编写器()

{

FlatFileItemWriterEmployee writer=new FlatFileItemWriter();

作家。设置资源(输出资源);

//是否为追加模式

作家。setappendallowed(true);

作家。setlineaggregator(新分隔行聚合器雇员(){

{

//设置分割符

setDelimiter(,);

setfield提取器(new BeanWrapperFieldExtractorEmployee(){

{

//设置字段

setNames(new String[] { id , firstName , last name });

}

});

}

});

回归作家;

}

2.3、Step

有了读者-处理器-作者后,就可以定义步骤了:

@Bean

公共步骤csvStep() {

返回stepbuilderfactory。获取(“CSV步骤”).雇员,雇员群(5)。reader(multiResourceItemReader())。处理器(项目处理器())。writer(writer())。build();

}

这里有一个矮胖的人或物的设置,值为5,意思是5条记录后再提交输出,可以根据自己需求定义。

2.4、Job

完成了步骤的编码,定义职位就容易了:

@Bean

公共作业pkslowCsvJob() {

返回jobBuilderFactory。get(pkslowCsvJob )。增量器(新的RunIdIncrementer())。开始(csvStep())。build();

}

2.5、运行

完成以上编码后,执行程序,结果如下:

成功读取数据,并将最后字段转为大写,并输出到outputData.csv文件。

三、监听Listener

可以通过听众接口对特定事件进行监听,以实现更多业务功能。比如如果处理失败,就记录一条失败日志;处理完成,就通知下游拿数据等。

我们分别对阅读、处理和写事件进行监听,对应分别要实现ItemReadListener接口、ItemProcessListener接口和ItemWriteListener接口。因为代码比较简单,就是打印一下日志,这里只贴出ItemWriteListener的实现代码:

公共类PkslowWriteListener实现ItemWriteListenerEmployee {

私有静态最终日志记录器=日志工厂。获取日志(pkslowwritelistener。类);

@覆盖

写前公共无效(列表?扩展员工列表){

伐木工。info(写之前: list );

}

@覆盖

public void afterWrite(List?扩展员工列表){

伐木工。info(写后: list );

}

@覆盖

public void onWriteError(异常e,列表?扩展员工列表){

伐木工。info( onwriterror: list );

}

}

把实现的监听器听众整合到步骤中去:

@Bean

公共步骤csvStep() {

返回stepbuilderfactory . get( CSV step )。雇员,雇员群(5)。reader(multiResourceItemReader())。监听器(新的PkslowReadListener())。处理器(itemProcessor())。监听器(新的PkslowProcessListener())。writer(writer())。侦听器(新的PkslowWriteListener())。build();

}

看一下执行后的日志:

这里可以清楚的看到之前组块集的作用。Writer一次处理5条记录。如果一条记录输出一次,会对IO造成压力。

以上是优批处理框架对Spring Batch的介绍的详细内容。更多关于Spring批处理框架的信息,请关注我们的其他相关文章!

详解批处理框架之Spring Batch