Java線程池的原理和作用是什么

這篇文章主要講解了“Java線程池的原理和作用是什么”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Java線程池的原理和作用是什么”吧!

十載的公安網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。網(wǎng)絡(luò)營(yíng)銷推廣的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整公安建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)建站從事“公安網(wǎng)站設(shè)計(jì)”,“公安網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。

一、線程的復(fù)用

“首當(dāng)其沖的問題就是,如何復(fù)用線程。”

/**
 * Created by Anur IjuoKaruKas on 2019/7/16
 */
public class ThreadPoolExecutor {
    private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();

    private final Runnable runnable = () -> {
        try {
            while (true) {
                Runnable take = workQueue.poll();

                if (take == null) {
                    Thread.sleep(200);
                } else {
                    take.run();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    public ThreadPoolExecutor() {
        new Thread(runnable).start();
    }

    public void execute(Runnable command) {
        workQueue.offer(command);
    }
}

肥宅埋過了一眼,很快就發(fā)現(xiàn)其中玄妙之處:在小奈的 ThreadPoolExecutor 中,定制了一套 runnable 流程,負(fù)責(zé)不斷從 workQueue 這個(gè)隊(duì)列中拉取由 #execute 方法提交過來的任務(wù),并執(zhí)行其 run() 方法。這樣,無論提交過來多少個(gè)任務(wù),始終都是這個(gè)線程池內(nèi)置的線程在執(zhí)行任務(wù)。當(dāng)獲取不到任務(wù)的時(shí)候,線程池會(huì)自己進(jìn)入休眠狀態(tài)。

二、worker線程的自動(dòng)創(chuàng)建、銷毀以及最大 worker 數(shù)

“雖然這達(dá)到了線程復(fù)用,但是你的這個(gè)線程完全沒辦法自動(dòng)創(chuàng)建和銷毀???甚至它的線程池?cái)?shù)量都是不可控制的?!狈收耠m然感嘆于對(duì)方可以這么快實(shí)現(xiàn)線程復(fù)用,但還是持續(xù)展開攻勢(shì)。

“既然要實(shí)現(xiàn)線程池可控,最直截了當(dāng)?shù)南敕ū闶菍⒎讲诺哪翘?runnable 流程封裝成一個(gè)對(duì)象,我們只需控制這個(gè)對(duì)象的創(chuàng)建、銷毀、以及復(fù)用即可。”作為一只長(zhǎng)期浸泡在 OOP 思維中的程序媛,這種問題難不倒小奈。她很快就寫出了一個(gè)內(nèi)部類,叫做 Worker,其中 #runWorker(this); 就是剛才那個(gè) runnable 流程,負(fù)責(zé)不斷從隊(duì)列中獲取任務(wù),并調(diào)用它的 #run() 方法。

    private final class Worker implements Runnable {

        final Thread thread;

        Runnable firstTask;

        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = threadFactory.newThread(this);
        }

        @Override
        public void run() {
            runWorker(this);
        }
    }

小奈為后續(xù)將要完成的 worker 線程數(shù)量控制打下了基石:ThreadPoolExecutor 中增加了一個(gè)散列集,用于存放 worker,增加了一個(gè) ThreadFactory,供使用者定制化 worker 線程的創(chuàng)建。

其中比較核心的方法叫做 #addWorker(),負(fù)責(zé)創(chuàng)建并初始化 worker 線程,并將其納入散列集中管理。當(dāng)然,這個(gè)線程池還無法自動(dòng)創(chuàng)建,不過已經(jīng)可以自動(dòng)銷毀了??梢钥吹剑诶〔坏饺蝿?wù)時(shí),#getTask() 則返回空,會(huì)跳出 #runWorker()while 循環(huán),之后調(diào)用 #processWorkerExit();,將 worker 線程從散列集中移除。

/**
 * Created by Anur IjuoKaruKas on 2019/7/16
 */
public class ThreadPoolExecutor {

    private final HashSet<Worker> workers = new HashSet<>();

    private volatile ThreadFactory threadFactory;

    private final BlockingQueue<Runnable> workQueue;

    public ThreadPoolExecutor(BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.workQueue = workQueue;
    }

    public void execute(Runnable command) {
        workQueue.offer(command);
    }

    /**
     * 新建一個(gè) worker 線程、啟動(dòng)并納入 workers
     */
    private boolean addWorker(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            workers.add(w);
            t.start();
        }
        return true;
    }

    /**
     * worker 線程池不斷從 workQueue 中拉取 task 進(jìn)行消費(fèi)
     */
    private void runWorker(Worker w) {
        Runnable task = w.firstTask;
        w.firstTask = null;
        while (task != null || (task = getTask()) != null) {
            task.run();
        }

        processWorkerExit(w);
    }

    /**
     * 當(dāng)線程執(zhí)行完畢之前,將其從 workers 中移除
     */
    private void processWorkerExit(Worker w) {
        workers.remove(w);
    }

    private Runnable getTask() {
        return workQueue.poll();
    }
}

看到這里,肥宅埋已經(jīng)能預(yù)測(cè)到接下來的思路了。

Java線程池的原理和作用是什么

線程池需要加入一個(gè)變量 maximumPoolSize,以防無限創(chuàng)建線程,每次進(jìn)行 #addWorker() 時(shí),需要判斷一下是否可以繼續(xù)添加 worker,如果可以,則添加新的 worker,否則將任務(wù)丟入隊(duì)列:

#addWorker() 中加入拒絕的邏輯,確保不能無限創(chuàng)建 worker

再修改一下 #execute() 方法,優(yōu)先創(chuàng)建 worker,如果創(chuàng)建 worker 失?。?workers.size() >= maximumPoolSize),則直接將任務(wù)丟入隊(duì)列。

    public void execute(Runnable command) {
        if (addWorker(command)) {
            return;
        }
        workQueue.offer(command);
    }

    /**
     * 新建一個(gè) worker 線程、啟動(dòng)并納入 workers
     */
    private boolean addWorker(Runnable firstTask) {
        int ws = workers.size();
        if (ws >= maximumPoolSize) {
            return false;
        }

        Worker w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            workers.add(w);
            t.start();
        }
        return true;
    }

三、核心線程、最大線程與 keepAliveTime

已經(jīng)寫到這里小奈可謂是趾高氣揚(yáng),仿佛實(shí)現(xiàn)一個(gè)線程池已經(jīng)不在話下。

Java線程池的原理和作用是什么

“這樣貌似有點(diǎn)問題???雖然說你已經(jīng)實(shí)現(xiàn)了線程的動(dòng)態(tài)創(chuàng)建與銷毀,但在任務(wù)沒有那么緊湊的情況下,基本是每個(gè)任務(wù)都進(jìn)來都需要?jiǎng)?chuàng)建一次線程,再銷毀一次線程,說好的復(fù)用到哪里去了?”肥宅埋給了膨脹的小奈當(dāng)頭一棒。

“咳咳......嘛,銷毀的時(shí)候做一下判斷就可以了,我們加入一個(gè)新的變量,叫做 keepAliveTime,當(dāng)拿不到任務(wù)的時(shí)候,就進(jìn)行阻塞休眠,比如 20ms,每次對(duì) keepAliveTime20ms,直到小于等于 0 ,再銷毀線程?!毙∧畏磻?yīng)迅速,很快給出了答案,并準(zhǔn)備動(dòng)手對(duì)線程池進(jìn)行改動(dòng)。

肥宅埋嘆了一口氣,“我看你是被膨脹蒙蔽了雙眼,既然我們已經(jīng)使用了阻塞隊(duì)列,那么就可以充分利用阻塞隊(duì)列的特性!阻塞隊(duì)列中內(nèi)置了一個(gè)顯式鎖,利用鎖的 condition 對(duì)象,使用它的 #awaitNanos()#notify() 方法,就可以直接精準(zhǔn)地實(shí)現(xiàn)線程調(diào)度了。”畢竟肥宅埋也是一只學(xué)霸,聽到小奈的想法后提出了更具有建設(shè)性的設(shè)計(jì)。

小奈也很快反應(yīng)過來,阻塞隊(duì)列有一個(gè) #poll() 方法,底層是借助 condition 對(duì)象封裝的 LockSupport.parkNanos(this, nanosTimeout); 來實(shí)現(xiàn)的,會(huì)阻塞直到有新的元素加入,當(dāng)有新的元素加入,這個(gè) condition 就會(huì)被喚醒,來實(shí)現(xiàn) 當(dāng)調(diào)用阻塞隊(duì)列的 #poll() 時(shí),如果阻塞隊(duì)列為空,會(huì)進(jìn)行一段時(shí)間的休眠,直到被喚醒,或者休眠超時(shí)。

肥宅埋一手接管了改造線程池的大權(quán),馬上大刀闊斧地改了起來。

改動(dòng)十分簡(jiǎn)單,原先的 #getTask() 是直接調(diào)用阻塞隊(duì)列的 #take() 方法,如果隊(duì)列為空,則直接返回,只要將其改為 #poll 方法即可。

    /**
     * 當(dāng) runWorker 一定時(shí)間內(nèi)獲取不到任務(wù)時(shí),就會(huì) processWorkerExit 銷毀
     */
    private Runnable getTask() {
        boolean timedOut = false;
        while (true) {
            try {
                if (timedOut) {
                    return null;
                }

                Runnable r = workQueue.poll(keepAliveTime, unit);
                if (r != null) {
                    return r;
                } else {
                    timedOut = true;
                }
            } catch (InterruptedException e) {
                timedOut = false;
            }
        }
    }

“一般來說,我們的任務(wù)提交都不會(huì)太過于均勻,如果我們平常不需要那么多線程來消費(fèi),但又想避免任務(wù)一直被堆積導(dǎo)致某些任務(wù)遲遲不被消費(fèi),就需要引入**核心線程 corePoolSize ** 與 **最大線程 maximumPoolSize ** 的概念。”肥宅埋想到了一個(gè)簡(jiǎn)單的可以優(yōu)化的點(diǎn),頭頭是道地分析道:“我們可以不用做那么復(fù)雜的動(dòng)態(tài) worker 消費(fèi)池,最簡(jiǎn)單的,如果我們的阻塞隊(duì)列滿了,就繼續(xù)創(chuàng)建更多的線程池,這樣,堆積的任務(wù)能比以往更快速的降下來。”

說起來好像復(fù)雜,實(shí)際上代碼十分簡(jiǎn)單。小奈看見肥宅埋修改了 #addWorker() 方法,增加了一個(gè)參數(shù) core,其作用只有一個(gè),如果是核心線程,則創(chuàng)建時(shí),數(shù)量必須小于等于 corePoolSize,否則數(shù)量必須小于等于 maximumPoolSize

另外, #execute() 方法的改動(dòng)也十分簡(jiǎn)單,前面的改動(dòng)不大,主要是,當(dāng)任務(wù) #offer() 失敗后,創(chuàng)建非核心 worker 線程。

    /**
     * 優(yōu)先創(chuàng)建核心線程,核心線程滿了以后,則優(yōu)先將任務(wù)放入隊(duì)列
     * 
     * 隊(duì)列滿了以后,則啟用非核心線程池,以防任務(wù)堆積
     */
    public void execute(Runnable command) {
        if (getPoolSize() < corePoolSize) {
            if (addWorker(command, true)) {
                return;
            }
        }

        if (!workQueue.offer(command)) {
            addWorker(command, false);
        }
    }

    /**
     * 新建一個(gè) worker 線程、啟動(dòng)并納入 workers
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        int ws = workers.size();
        if (ws >= (core ? corePoolSize : maximumPoolSize)) {
            return false;
        }

        Worker w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            workers.add(w);
            t.start();
        }
        return true;
    }

四、拒絕策略

“現(xiàn)在這個(gè)版本的線程池看起來真是有模有樣呢 ~ 可以動(dòng)態(tài)創(chuàng)建與銷毀線程,線程也能復(fù)用,還可以動(dòng)態(tài)增加更多的線程來消費(fèi)堆積的線程!” 肥宅埋滿意地看著兩人的杰作,“其實(shí)我還發(fā)現(xiàn)有個(gè)地方不太友好,在推送任務(wù)時(shí),調(diào)用方可能并不知道自己的任務(wù)是否失敗?!?/p>

“這個(gè)簡(jiǎn)單鴨,只需要在調(diào)用 #execute() 時(shí)返回 flase 來代表添加失敗,或者拋出對(duì)應(yīng)的異常即可。”小奈給出了很直觀的設(shè)計(jì)。

“這確實(shí)不失為一個(gè)好方法,但是對(duì)于調(diào)用方來說,如果所有使用線程池的地方都需要去做這個(gè)判斷,那豈不是太麻煩了!”肥宅埋對(duì)方案進(jìn)行了補(bǔ)充:“這個(gè)是面向切面編程的一種思想,我們可以提供一個(gè)如何處理這些隊(duì)列已經(jīng)放不下,且無法創(chuàng)建更多消費(fèi)線程的切面入口,就叫它 AbortPolicy 吧!”

肥宅埋修改了一下 #execute() 方法,如果在創(chuàng)建非核心線程池的時(shí)候失敗,就直接將任務(wù)拒絕掉。

    /**
     * 優(yōu)先創(chuàng)建核心線程,核心線程滿了以后,則優(yōu)先將任務(wù)放入隊(duì)列
     *
     * 隊(duì)列滿了以后,則啟用非核心線程池,以防任務(wù)堆積
     *
     * 如果非核心線程池創(chuàng)建失敗,則拒絕這個(gè)任務(wù)
     */
    public void execute(Runnable command) {
        if (getPoolSize() < corePoolSize) {
            if (addWorker(command, true)) {
                return;
            }
        }

        if (!workQueue.offer(command)) {
            if (!addWorker(command, false)) {
                reject(command);
            }
        }
    }

如何去拒絕任務(wù),交給調(diào)用者去實(shí)現(xiàn),#reject() 的實(shí)現(xiàn)非常簡(jiǎn)單,就是調(diào)用一下 BiConsumer,這個(gè)可以供調(diào)用方自由定制。

    private void reject(Runnable command) {
        abortPolicy.accept(command, this);
    }

五、執(zhí)行線程池

小奈與肥宅埋已經(jīng)完成了她們的線程池,現(xiàn)在需要測(cè)試一下線程池是否可以正常使用,比較細(xì)心的肥宅埋寫了測(cè)試用例如下:

核心線程數(shù)為5,最大線程數(shù)為10,緊接著每個(gè)線程在拉取不到任務(wù)時(shí)會(huì)存活一分鐘,有一個(gè)長(zhǎng)度為 5 的并發(fā)阻塞隊(duì)列,采用默認(rèn)的 ThreadFactory,最后,使用了 DiscardPolicy,當(dāng)任務(wù)被拒絕后,直接丟棄任務(wù),并打印日志。

Java線程池的原理和作用是什么

她們運(yùn)行了代碼,日志打印如下。完全符合預(yù)期,在阻塞隊(duì)列還未裝滿之前,只有 5 個(gè)核心線程在消費(fèi)任務(wù),當(dāng)阻塞隊(duì)列滿了以后,會(huì)逐步創(chuàng)建更多的線程,而當(dāng)無法創(chuàng)建更多線程后,則觸發(fā)丟棄策略。

Java線程池的原理和作用是什么

感謝各位的閱讀,以上就是“Java線程池的原理和作用是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)Java線程池的原理和作用是什么這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

文章標(biāo)題:Java線程池的原理和作用是什么
本文路徑:http://bm7419.com/article32/geiepc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號(hào)、商城網(wǎng)站服務(wù)器托管、品牌網(wǎng)站制作、Google、網(wǎng)站策劃

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)