這篇文章主要講解了“Java線程池的原理和作用是什么”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Java線程池的原理和作用是什么”吧!
十載的公安網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。網(wǎng)絡(luò)營(yíng)銷推廣的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整公安建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)建站從事“公安網(wǎng)站設(shè)計(jì)”,“公安網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
“首當(dāng)其沖的問題就是,如何復(fù)用線程。”
/** * Created by Anur IjuoKaruKas on 2019/7/16 */ public class ThreadPoolExecutor { private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); private final Runnable runnable = () -> { try { while (true) { Runnable take = workQueue.poll(); if (take == null) { Thread.sleep(200); } else { take.run(); } } } catch (InterruptedException e) { e.printStackTrace(); } }; public ThreadPoolExecutor() { new Thread(runnable).start(); } public void execute(Runnable command) { workQueue.offer(command); } }
肥宅埋過了一眼,很快就發(fā)現(xiàn)其中玄妙之處:在小奈的 ThreadPoolExecutor
中,定制了一套 runnable
流程,負(fù)責(zé)不斷從 workQueue
這個(gè)隊(duì)列中拉取由 #execute
方法提交過來的任務(wù),并執(zhí)行其 run()
方法。這樣,無論提交過來多少個(gè)任務(wù),始終都是這個(gè)線程池內(nèi)置的線程在執(zhí)行任務(wù)。當(dāng)獲取不到任務(wù)的時(shí)候,線程池會(huì)自己進(jìn)入休眠狀態(tài)。
“雖然這達(dá)到了線程復(fù)用,但是你的這個(gè)線程完全沒辦法自動(dòng)創(chuàng)建和銷毀???甚至它的線程池?cái)?shù)量都是不可控制的?!狈收耠m然感嘆于對(duì)方可以這么快實(shí)現(xiàn)線程復(fù)用,但還是持續(xù)展開攻勢(shì)。
“既然要實(shí)現(xiàn)線程池可控,最直截了當(dāng)?shù)南敕ū闶菍⒎讲诺哪翘?runnable
流程封裝成一個(gè)對(duì)象,我們只需控制這個(gè)對(duì)象的創(chuàng)建、銷毀、以及復(fù)用即可。”作為一只長(zhǎng)期浸泡在 OOP
思維中的程序媛,這種問題難不倒小奈。她很快就寫出了一個(gè)內(nèi)部類,叫做 Worker
,其中 #runWorker(this);
就是剛才那個(gè) runnable
流程,負(fù)責(zé)不斷從隊(duì)列中獲取任務(wù),并調(diào)用它的 #run()
方法。
private final class Worker implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { this.firstTask = firstTask; this.thread = threadFactory.newThread(this); } @Override public void run() { runWorker(this); } }
小奈為后續(xù)將要完成的 worker
線程數(shù)量控制打下了基石:ThreadPoolExecutor
中增加了一個(gè)散列集,用于存放 worker
,增加了一個(gè) ThreadFactory
,供使用者定制化 worker
線程的創(chuàng)建。
其中比較核心的方法叫做 #addWorker()
,負(fù)責(zé)創(chuàng)建并初始化 worker
線程,并將其納入散列集中管理。當(dāng)然,這個(gè)線程池還無法自動(dòng)創(chuàng)建,不過已經(jīng)可以自動(dòng)銷毀了??梢钥吹剑诶〔坏饺蝿?wù)時(shí),#getTask()
則返回空,會(huì)跳出 #runWorker()
的 while
循環(huán),之后調(diào)用 #processWorkerExit();
,將 worker
線程從散列集中移除。
/** * Created by Anur IjuoKaruKas on 2019/7/16 */ public class ThreadPoolExecutor { private final HashSet<Worker> workers = new HashSet<>(); private volatile ThreadFactory threadFactory; private final BlockingQueue<Runnable> workQueue; public ThreadPoolExecutor(BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.workQueue = workQueue; } public void execute(Runnable command) { workQueue.offer(command); } /** * 新建一個(gè) worker 線程、啟動(dòng)并納入 workers */ private boolean addWorker(Runnable firstTask) { Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { workers.add(w); t.start(); } return true; } /** * worker 線程池不斷從 workQueue 中拉取 task 進(jìn)行消費(fèi) */ private void runWorker(Worker w) { Runnable task = w.firstTask; w.firstTask = null; while (task != null || (task = getTask()) != null) { task.run(); } processWorkerExit(w); } /** * 當(dāng)線程執(zhí)行完畢之前,將其從 workers 中移除 */ private void processWorkerExit(Worker w) { workers.remove(w); } private Runnable getTask() { return workQueue.poll(); } }
看到這里,肥宅埋已經(jīng)能預(yù)測(cè)到接下來的思路了。
線程池需要加入一個(gè)變量 maximumPoolSize
,以防無限創(chuàng)建線程,每次進(jìn)行 #addWorker()
時(shí),需要判斷一下是否可以繼續(xù)添加 worker
,如果可以,則添加新的 worker
,否則將任務(wù)丟入隊(duì)列:
#addWorker()
中加入拒絕的邏輯,確保不能無限創(chuàng)建 worker
。
再修改一下 #execute()
方法,優(yōu)先創(chuàng)建 worker
,如果創(chuàng)建 worker
失?。?workers.size() >= maximumPoolSize
),則直接將任務(wù)丟入隊(duì)列。
public void execute(Runnable command) { if (addWorker(command)) { return; } workQueue.offer(command); } /** * 新建一個(gè) worker 線程、啟動(dòng)并納入 workers */ private boolean addWorker(Runnable firstTask) { int ws = workers.size(); if (ws >= maximumPoolSize) { return false; } Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { workers.add(w); t.start(); } return true; }
已經(jīng)寫到這里小奈可謂是趾高氣揚(yáng),仿佛實(shí)現(xiàn)一個(gè)線程池已經(jīng)不在話下。
“這樣貌似有點(diǎn)問題???雖然說你已經(jīng)實(shí)現(xiàn)了線程的動(dòng)態(tài)創(chuàng)建與銷毀,但在任務(wù)沒有那么緊湊的情況下,基本是每個(gè)任務(wù)都進(jìn)來都需要?jiǎng)?chuàng)建一次線程,再銷毀一次線程,說好的復(fù)用到哪里去了?”肥宅埋給了膨脹的小奈當(dāng)頭一棒。
“咳咳......嘛,銷毀的時(shí)候做一下判斷就可以了,我們加入一個(gè)新的變量,叫做 keepAliveTime
,當(dāng)拿不到任務(wù)的時(shí)候,就進(jìn)行阻塞休眠,比如 20ms
,每次對(duì) keepAliveTime
減 20ms
,直到小于等于 0
,再銷毀線程?!毙∧畏磻?yīng)迅速,很快給出了答案,并準(zhǔn)備動(dòng)手對(duì)線程池進(jìn)行改動(dòng)。
肥宅埋嘆了一口氣,“我看你是被膨脹蒙蔽了雙眼,既然我們已經(jīng)使用了阻塞隊(duì)列,那么就可以充分利用阻塞隊(duì)列的特性!阻塞隊(duì)列中內(nèi)置了一個(gè)顯式鎖,利用鎖的 condition
對(duì)象,使用它的 #awaitNanos()
與 #notify()
方法,就可以直接精準(zhǔn)地實(shí)現(xiàn)線程調(diào)度了。”畢竟肥宅埋也是一只學(xué)霸,聽到小奈的想法后提出了更具有建設(shè)性的設(shè)計(jì)。
小奈也很快反應(yīng)過來,阻塞隊(duì)列有一個(gè) #poll()
方法,底層是借助 condition
對(duì)象封裝的 LockSupport.parkNanos(this, nanosTimeout);
來實(shí)現(xiàn)的,會(huì)阻塞直到有新的元素加入,當(dāng)有新的元素加入,這個(gè) condition
就會(huì)被喚醒,來實(shí)現(xiàn) 當(dāng)調(diào)用阻塞隊(duì)列的 #poll()
時(shí),如果阻塞隊(duì)列為空,會(huì)進(jìn)行一段時(shí)間的休眠,直到被喚醒,或者休眠超時(shí)。
肥宅埋一手接管了改造線程池的大權(quán),馬上大刀闊斧地改了起來。
改動(dòng)十分簡(jiǎn)單,原先的 #getTask()
是直接調(diào)用阻塞隊(duì)列的 #take()
方法,如果隊(duì)列為空,則直接返回,只要將其改為 #poll
方法即可。
/** * 當(dāng) runWorker 一定時(shí)間內(nèi)獲取不到任務(wù)時(shí),就會(huì) processWorkerExit 銷毀 */ private Runnable getTask() { boolean timedOut = false; while (true) { try { if (timedOut) { return null; } Runnable r = workQueue.poll(keepAliveTime, unit); if (r != null) { return r; } else { timedOut = true; } } catch (InterruptedException e) { timedOut = false; } } }
“一般來說,我們的任務(wù)提交都不會(huì)太過于均勻,如果我們平常不需要那么多線程來消費(fèi),但又想避免任務(wù)一直被堆積導(dǎo)致某些任務(wù)遲遲不被消費(fèi),就需要引入**核心線程 corePoolSize
** 與 **最大線程 maximumPoolSize
** 的概念。”肥宅埋想到了一個(gè)簡(jiǎn)單的可以優(yōu)化的點(diǎn),頭頭是道地分析道:“我們可以不用做那么復(fù)雜的動(dòng)態(tài) worker
消費(fèi)池,最簡(jiǎn)單的,如果我們的阻塞隊(duì)列滿了,就繼續(xù)創(chuàng)建更多的線程池,這樣,堆積的任務(wù)能比以往更快速的降下來。”
說起來好像復(fù)雜,實(shí)際上代碼十分簡(jiǎn)單。小奈看見肥宅埋修改了 #addWorker()
方法,增加了一個(gè)參數(shù) core
,其作用只有一個(gè),如果是核心線程,則創(chuàng)建時(shí),數(shù)量必須小于等于 corePoolSize
,否則數(shù)量必須小于等于 maximumPoolSize
。
另外, #execute()
方法的改動(dòng)也十分簡(jiǎn)單,前面的改動(dòng)不大,主要是,當(dāng)任務(wù) #offer()
失敗后,創(chuàng)建非核心 worker
線程。
/** * 優(yōu)先創(chuàng)建核心線程,核心線程滿了以后,則優(yōu)先將任務(wù)放入隊(duì)列 * * 隊(duì)列滿了以后,則啟用非核心線程池,以防任務(wù)堆積 */ public void execute(Runnable command) { if (getPoolSize() < corePoolSize) { if (addWorker(command, true)) { return; } } if (!workQueue.offer(command)) { addWorker(command, false); } } /** * 新建一個(gè) worker 線程、啟動(dòng)并納入 workers */ private boolean addWorker(Runnable firstTask, boolean core) { int ws = workers.size(); if (ws >= (core ? corePoolSize : maximumPoolSize)) { return false; } Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { workers.add(w); t.start(); } return true; }
“現(xiàn)在這個(gè)版本的線程池看起來真是有模有樣呢 ~ 可以動(dòng)態(tài)創(chuàng)建與銷毀線程,線程也能復(fù)用,還可以動(dòng)態(tài)增加更多的線程來消費(fèi)堆積的線程!” 肥宅埋滿意地看著兩人的杰作,“其實(shí)我還發(fā)現(xiàn)有個(gè)地方不太友好,在推送任務(wù)時(shí),調(diào)用方可能并不知道自己的任務(wù)是否失敗?!?/p>
“這個(gè)簡(jiǎn)單鴨,只需要在調(diào)用 #execute()
時(shí)返回 flase
來代表添加失敗,或者拋出對(duì)應(yīng)的異常即可。”小奈給出了很直觀的設(shè)計(jì)。
“這確實(shí)不失為一個(gè)好方法,但是對(duì)于調(diào)用方來說,如果所有使用線程池的地方都需要去做這個(gè)判斷,那豈不是太麻煩了!”肥宅埋對(duì)方案進(jìn)行了補(bǔ)充:“這個(gè)是面向切面編程的一種思想,我們可以提供一個(gè)如何處理這些隊(duì)列已經(jīng)放不下,且無法創(chuàng)建更多消費(fèi)線程的切面入口,就叫它 AbortPolicy
吧!”
肥宅埋修改了一下 #execute()
方法,如果在創(chuàng)建非核心線程池的時(shí)候失敗,就直接將任務(wù)拒絕掉。
/** * 優(yōu)先創(chuàng)建核心線程,核心線程滿了以后,則優(yōu)先將任務(wù)放入隊(duì)列 * * 隊(duì)列滿了以后,則啟用非核心線程池,以防任務(wù)堆積 * * 如果非核心線程池創(chuàng)建失敗,則拒絕這個(gè)任務(wù) */ public void execute(Runnable command) { if (getPoolSize() < corePoolSize) { if (addWorker(command, true)) { return; } } if (!workQueue.offer(command)) { if (!addWorker(command, false)) { reject(command); } } }
如何去拒絕任務(wù),交給調(diào)用者去實(shí)現(xiàn),#reject()
的實(shí)現(xiàn)非常簡(jiǎn)單,就是調(diào)用一下 BiConsumer
,這個(gè)可以供調(diào)用方自由定制。
private void reject(Runnable command) { abortPolicy.accept(command, this); }
小奈與肥宅埋已經(jīng)完成了她們的線程池,現(xiàn)在需要測(cè)試一下線程池是否可以正常使用,比較細(xì)心的肥宅埋寫了測(cè)試用例如下:
核心線程數(shù)為5,最大線程數(shù)為10,緊接著每個(gè)線程在拉取不到任務(wù)時(shí)會(huì)存活一分鐘,有一個(gè)長(zhǎng)度為 5 的并發(fā)阻塞隊(duì)列,采用默認(rèn)的 ThreadFactory
,最后,使用了 DiscardPolicy
,當(dāng)任務(wù)被拒絕后,直接丟棄任務(wù),并打印日志。
她們運(yùn)行了代碼,日志打印如下。完全符合預(yù)期,在阻塞隊(duì)列還未裝滿之前,只有 5 個(gè)核心線程在消費(fèi)任務(wù),當(dāng)阻塞隊(duì)列滿了以后,會(huì)逐步創(chuàng)建更多的線程,而當(dāng)無法創(chuàng)建更多線程后,則觸發(fā)丟棄策略。
感謝各位的閱讀,以上就是“Java線程池的原理和作用是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)Java線程池的原理和作用是什么這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
文章標(biāo)題:Java線程池的原理和作用是什么
本文路徑:http://bm7419.com/article32/geiepc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號(hào)、商城網(wǎng)站、服務(wù)器托管、品牌網(wǎng)站制作、Google、網(wǎng)站策劃
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)