如何使用SpringBatch批處理框架

這篇文章主要講解了“如何使用Spring Batch批處理框架”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何使用Spring Batch批處理框架”吧!

網(wǎng)站的建設創(chuàng)新互聯(lián)建站專注網(wǎng)站定制,經(jīng)驗豐富,不做模板,主營網(wǎng)站定制開發(fā).小程序定制開發(fā),H5頁面制作!給你煥然一新的設計體驗!已為成都辦公空間設計等企業(yè)提供專業(yè)服務。

1 前言

Spring Batch是一個輕量級的、完善的批處理框架,作為Spring體系中的一員,它擁有靈活、方便、生產(chǎn)可用的特點。在應對高效處理大量信息、定時處理大量數(shù)據(jù)等場景十分簡便。

結(jié)合調(diào)度框架能更大地發(fā)揮Spring Batch的作用。

2 Spring Batch的概念知識

2.1 分層架構(gòu)

Spring Batch的分層架構(gòu)圖如下:

如何使用Spring Batch批處理框架

通過例子講解Spring Batch入門,優(yōu)秀的批處理框架

可以看到它分為三層,分別是:

  •  Application應用層:包含了所有任務batch jobs和開發(fā)人員自定義的代碼,主要是根據(jù)項目需要開發(fā)的業(yè)務流程等。

  •  Batch Core核心層:包含啟動和管理任務的運行環(huán)境類,如JobLauncher等。

  •  Batch Infrastructure基礎層:上面兩層是建立在基礎層之上的,包含基礎的讀入reader和寫出writer、重試框架等。

2.2 關(guān)鍵概念

理解下圖所涉及的概念至關(guān)重要,不然很難進行后續(xù)開發(fā)和問題分析。

如何使用Spring Batch批處理框架

通過例子講解Spring Batch入門,優(yōu)秀的批處理框架

2.2.1 JobRepository

專門負責與數(shù)據(jù)庫打交道,對整個批處理的新增、更新、執(zhí)行進行記錄。所以Spring Batch是需要依賴數(shù)據(jù)庫來管理的。

2.2.2 任務啟動器JobLauncher

負責啟動任務Job。

2.2.3 任務Job

Job是封裝整個批處理過程的單位,跑一個批處理任務,就是跑一個Job所定義的內(nèi)容。

如何使用Spring Batch批處理框架

通過例子講解Spring Batch入門,優(yōu)秀的批處理框架

上圖介紹了Job的一些相關(guān)概念:

  •  Job:封裝處理實體,定義過程邏輯。

  •  JobInstance:Job的運行實例,不同的實例,參數(shù)不同,所以定義好一個Job后可以通過不同參數(shù)運行多次。

  •  JobParameters:與JobInstance相關(guān)聯(lián)的參數(shù)。

  •  JobExecution:代表Job的一次實際執(zhí)行,可能成功、可能失敗。

所以,開發(fā)人員要做的事情,就是定義Job。

2.2.4 步驟Step

Step是對Job某個過程的封裝,一個Job可以包含一個或多個Step,一步步的Step按特定邏輯執(zhí)行,才代表Job執(zhí)行完成。

如何使用Spring Batch批處理框架

通過例子講解Spring Batch入門,優(yōu)秀的批處理框架

通過定義Step來組裝Job可以更靈活地實現(xiàn)復雜的業(yè)務邏輯。

2.2.5 輸入——處理——輸出

所以,定義一個Job關(guān)鍵是定義好一個或多個Step,然后把它們組裝好即可。而定義Step有多種方法,但有一種常用的模型就是輸入——處理——輸出,即Item Reader、Item Processor和Item Writer。比如通過Item Reader從文件輸入數(shù)據(jù),然后通過Item Processor進行業(yè)務處理和數(shù)據(jù)轉(zhuǎn)換,最后通過Item Writer寫到數(shù)據(jù)庫中去。

Spring Batch為我們提供了許多開箱即用的Reader和Writer,非常方便。

3 代碼實例

理解了基本概念后,就直接通過代碼來感受一下吧。整個項目的功能是從多個csv文件中讀數(shù)據(jù),處理后輸出到一個csv文件。

3.1 基本框架

添加依賴:

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-batch</artifactId>  </dependency>  <dependency>    <groupId>com.h3database</groupId>    <artifactId>h3</artifactId>    <scope>runtime</scope>  </dependency>

需要添加Spring Batch的依賴,同時使用H2作為內(nèi)存數(shù)據(jù)庫比較方便,實際生產(chǎn)肯定是要使用外部的數(shù)據(jù)庫,如Oracle、PostgreSQL。

入口主類:

@SpringBootApplication  @EnableBatchProcessing  public class PkslowBatchJobMain {      public static void main(String[] args) {          SpringApplication.run(PkslowBatchJobMain.class, args);      }  }

也很簡單,只是在Springboot的基礎上添加注解@EnableBatchProcessing。

領(lǐng)域?qū)嶓w類Employee:

package com.pkslow.batch.entity;  public class Employee {      String id;      String firstName;      String lastName;  }

對應的csv文件內(nèi)容如下:

id,firstName,lastName  1,Lokesh,Gupta  2,Amit,Mishra  3,Pankaj,Kumar  4,David,Miller

3.2 輸入&mdash;&mdash;處理&mdash;&mdash;輸出

3.2.1 讀取ItemReader

因為有多個輸入文件,所以定義如下:

@Value("input/inputData*.csv")  private Resource[] inputResources;  @Bean  public MultiResourceItemReader<Employee> multiResourceItemReader()  {   MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>();    resourceItemReader.setResources(inputResources);    resourceItemReader.setDelegate(reader());   return resourceItemReader;  }  @Bean public FlatFileItemReader<Employee> reader()  {    FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>();    //跳過csv文件第一行,為表頭    reader.setLinesToSkip(1);    reader.setLineMapper(new DefaultLineMapper() {      {        setLineTokenizer(new DelimitedLineTokenizer() {          {            //字段名            setNames(new String[] { "id", "firstName", "lastName" });          }        });        setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {          {            //轉(zhuǎn)換化后的目標類            setTargetType(Employee.class);          }        });      }    });    return reader;  }

這里使用了FlatFileItemReader,方便我們從文件讀取數(shù)據(jù)。

3.2.2 處理ItemProcessor

為了簡單演示,處理很簡單,就是把最后一列轉(zhuǎn)為大寫:

public ItemProcessor<Employee, Employee> itemProcessor() {    return employee -> {      employee.setLastName(employee.getLastName().toUpperCase());      return employee;    };  }

3.2.3 輸出ItremWriter

比較簡單,代碼及注釋如下:

private Resource outputResource = new FileSystemResource("output/outputData.csv");  @Bean  public FlatFileItemWriter<Employee> writer()  {    FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>();    writer.setResource(outputResource);    //是否為追加模式    writer.setAppendAllowed(true);    writer.setLineAggregator(new DelimitedLineAggregator<Employee>() {      {        //設置分割符        setDelimiter(",");        setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() {          {            //設置字段            setNames(new String[] { "id", "firstName", "lastName" });          }        });      }    });    return writer;  }

3.3 Step

有了Reader-Processor-Writer后,就可以定義Step了:

@Bean  public Step csvStep() {    return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)      .reader(multiResourceItemReader())      .processor(itemProcessor())      .writer(writer())      .build();  }

這里有一個chunk的設置,值為5,意思是5條記錄后再提交輸出,可以根據(jù)自己需求定義。

3.4 Job

完成了Step的編碼,定義Job就容易了:

@Bean  public Job pkslowCsvJob() {    return jobBuilderFactory      .get("pkslowCsvJob")      .incrementer(new RunIdIncrementer())      .start(csvStep())      .build();  }

3.5 運行

完成以上編碼后,執(zhí)行程序,結(jié)果如下:

如何使用Spring Batch批處理框架

通過例子講解Spring Batch入門,優(yōu)秀的批處理框架

成功讀取數(shù)據(jù),并將最后字段轉(zhuǎn)為大寫,并輸出到outputData.csv文件。

4 監(jiān)聽Listener

可以通過Listener接口對特定事件進行監(jiān)聽,以實現(xiàn)更多業(yè)務功能。比如如果處理失敗,就記錄一條失敗日志;處理完成,就通知下游拿數(shù)據(jù)等。

我們分別對Read、Process和Write事件進行監(jiān)聽,對應分別要實現(xiàn)ItemReadListener接口、ItemProcessListener接口和ItemWriteListener接口。因為代碼比較簡單,就是打印一下日志,這里只貼出ItemWriteListener的實現(xiàn)代碼:

public class PkslowWriteListener implements ItemWriteListener<Employee> {      private static final Log logger = LogFactory.getLog(PkslowWriteListener.class);      @Override      public void beforeWrite(List<? extends Employee> list) {          logger.info("beforeWrite: " + list);      }      @Override      public void afterWrite(List<? extends Employee> list) {          logger.info("afterWrite: " + list);      }      @Override      public void onWriteError(Exception e, List<? extends Employee> list) {          logger.info("onWriteError: " + list);      }  }

把實現(xiàn)的監(jiān)聽器listener整合到Step中去:

@Bean  public Step csvStep() {    return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5)      .reader(multiResourceItemReader())      .listener(new PkslowReadListener())      .processor(itemProcessor())      .listener(new PkslowProcessListener())      .writer(writer())      .listener(new PkslowWriteListener())      .build();  }

執(zhí)行后看一下日志:

如何使用Spring Batch批處理框架

通過例子講解Spring Batch入門,優(yōu)秀的批處理框架

這里就能明顯看到之前設置的chunk的作用了。Writer每次是處理5條記錄,如果一條輸出一次,會對IO造成壓力。

感謝各位的閱讀,以上就是“如何使用Spring Batch批處理框架”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對如何使用Spring Batch批處理框架這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

分享文章:如何使用SpringBatch批處理框架
新聞來源:http://bm7419.com/article20/jcisco.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供關(guān)鍵詞優(yōu)化云服務器、自適應網(wǎng)站、外貿(mào)網(wǎng)站建設、移動網(wǎng)站建設靜態(tài)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站建設網(wǎng)站維護公司