Java多線程批量數(shù)據(jù)導(dǎo)入的方法是什么

這篇文章主要介紹Java多線程批量數(shù)據(jù)導(dǎo)入的方法是什么,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

成都創(chuàng)新互聯(lián)公司專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于做網(wǎng)站、網(wǎng)站設(shè)計、安圖網(wǎng)絡(luò)推廣、微信小程序、安圖網(wǎng)絡(luò)營銷、安圖企業(yè)策劃、安圖品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運營等,從售前售中售后,我們都將竭誠為您服務(wù),您的肯定,是我們最大的嘉獎;成都創(chuàng)新互聯(lián)公司為所有大學(xué)生創(chuàng)業(yè)者提供安圖建站搭建服務(wù),24小時服務(wù)熱線:18980820575,官方網(wǎng)址:bm7419.com

前言:當(dāng)遇到大量數(shù)據(jù)導(dǎo)入時,為了提高處理的速度,可以選擇使用多線程來批量處理這些處理。常見的場景有:

  1. 大文件導(dǎo)入數(shù)據(jù)庫(這個文件不一定是標準的CSV可導(dǎo)入文件或者需要在內(nèi)存中經(jīng)過一定的處理)
  2. 數(shù)據(jù)同步(從第三方接口拉取數(shù)據(jù)處理后寫入自己的數(shù)據(jù)庫)

以上的場景有一個共性,這類數(shù)據(jù)導(dǎo)入的場景簡單來說就是將數(shù)據(jù)從一個數(shù)據(jù)源移動到另外一個數(shù)據(jù)源,而其中必定可以分為兩步

  1. 數(shù)據(jù)讀取:從數(shù)據(jù)源讀取數(shù)據(jù)到內(nèi)存
  2. 數(shù)據(jù)寫入:將內(nèi)存中的數(shù)據(jù)寫入到另外一個數(shù)據(jù)源,可能存在數(shù)據(jù)處理

而且數(shù)據(jù)讀取的速度一般會比數(shù)據(jù)寫入的速度快很多,即讀取快,寫入慢

設(shè)計思路

由于場景的特點是讀取快,寫入慢,如果是使用多線程處理,建議是數(shù)據(jù)寫入部分改造為多線程。而數(shù)據(jù)讀取可以改造成批量讀取數(shù)據(jù)。簡單來說就是兩個要點:

  1. 批量讀取數(shù)據(jù)
  2. 多線程寫入數(shù)據(jù)

示例

多線程批量處理最簡單的方案是使用線程池來進行處理,下面會通過一個模擬批量讀取和寫入的服務(wù),以及對這個服務(wù)的多線程寫入調(diào)用作為示例,展示如何多線程批量數(shù)據(jù)導(dǎo)入。

模擬服務(wù)

import java.util.concurrent.atomic.AtomicLong;

/**
 * 數(shù)據(jù)批量寫入用的模擬服務(wù)
 *
 * @author RJH
 * create at 2019-04-01
 */
public class MockService {
    /**
     * 可讀取總數(shù)
     */
    private long canReadTotal;

    /**
     * 寫入總數(shù)
     */
    private AtomicLong writeTotal=new AtomicLong(0);

    /**
     * 寫入休眠時間(單位:毫秒)
     */
    private final long sleepTime;

    /**
     * 構(gòu)造方法
     *
     * @param canReadTotal
     * @param sleepTime
     */
    public MockService(long canReadTotal, long sleepTime) {
        this.canReadTotal = canReadTotal;
        this.sleepTime = sleepTime;
    }

    /**
     * 批量讀取數(shù)據(jù)接口
     *
     * @param num
     * @return
     */
    public synchronized long readData(int num) {
        long readNum;
        if (canReadTotal >= num) {
            canReadTotal -= num;
            readNum = num;
        } else {
            readNum = canReadTotal;
            canReadTotal = 0;
        }
        //System.out.println("read data size:" + readNum);
        return readNum;
    }

    /**
     * 寫入數(shù)據(jù)接口
     */
    public void writeData() {
        try {
            // 休眠一定時間模擬寫入速度慢
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 寫入總數(shù)自增
        System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
    }

    /**
     * 獲取寫入的總數(shù)
     *
     * @return
     */
    public long getWriteTotal() {
        return writeTotal.get();
    }

}

批量數(shù)據(jù)處理器

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 基于線程池的多線程批量寫入處理器
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandler {

    private ExecutorService executorService;

    private MockService service;
    /**
     * 每次批量讀取的數(shù)據(jù)量
     */
    private int batch;
    /**
     * 線程個數(shù)
     */
    private int threadNum;

    public SimpleBatchHandler(MockService service, int batch,int threadNum) {
        this.service = service;
        this.batch = batch;
        //使用固定數(shù)目的線程池
        this.executorService = Executors.newFixedThreadPool(threadNum);
    }

    /**
     * 開始處理
     */
    public void startHandle() {
        // 開始處理的時間
        long startTime = System.currentTimeMillis();
        System.out.println("start handle time:" + startTime);
        long readData;
        while ((readData = service.readData(batch)) != 0) {// 批量讀取數(shù)據(jù),知道讀取不到數(shù)據(jù)才停止
            for (long i = 0; i < readData; i++) {
                executorService.execute(() -> service.writeData());
            }
        }
        // 關(guān)閉線程池
        executorService.shutdown();
        while (!executorService.isTerminated()) {//等待線程池中的線程執(zhí)行完

        }
        // 結(jié)束時間
        long endTime = System.currentTimeMillis();
        System.out.println("end handle time:" + endTime);
        // 總耗時
        System.out.println("total handle time:" + (endTime - startTime) + "ms");
        // 寫入總數(shù)
        System.out.println("total write num:" + service.getWriteTotal());
    }

}

測試類

/**
 * SimpleBatchHandler的測試類
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandlerTest {

    public static void main(String[] args) {
        // 總數(shù)
        long total=100000;
        // 休眠時間
        long sleepTime=100;
        // 每次拉取的數(shù)量
        int batch=100;
        // 線程個數(shù)
        int threadNum=16;
        MockService mockService=new MockService(total,sleepTime);
        SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
        handler.startHandle();
    }
}

運行結(jié)果

start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分輸出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000

分析

在單線程情況下的執(zhí)行時間應(yīng)該為total*sleepTime,即10000000ms,而改造為多線程后執(zhí)行時間為648447ms。

以上是Java多線程批量數(shù)據(jù)導(dǎo)入的方法是什么的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

當(dāng)前標題:Java多線程批量數(shù)據(jù)導(dǎo)入的方法是什么
本文路徑:http://bm7419.com/article10/igdjdo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站建站公司、網(wǎng)站營銷標簽優(yōu)化、、網(wǎng)站收錄

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站建設(shè)網(wǎng)站維護公司