怎么創(chuàng)建Java線(xiàn)程池

這篇文章主要介紹怎么創(chuàng)建Java線(xiàn)程池,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

站在用戶(hù)的角度思考問(wèn)題,與客戶(hù)深入溝通,找到天全網(wǎng)站設(shè)計(jì)與天全網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶(hù)體驗(yàn)好的作品,建站類(lèi)型包括:網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、域名與空間、虛擬主機(jī)、企業(yè)郵箱。業(yè)務(wù)覆蓋天全地區(qū)。

最近在改進(jìn)項(xiàng)目的并發(fā)功能,但開(kāi)發(fā)起來(lái)磕磕碰碰的。看了好多資料,總算加深了認(rèn)識(shí)。于是打算配合查看源代碼,總結(jié)并發(fā)編程的原理。

準(zhǔn)備從用得最多的線(xiàn)程池開(kāi)始,圍繞創(chuàng)建、執(zhí)行、關(guān)閉認(rèn)識(shí)線(xiàn)程池整個(gè)生命周期的實(shí)現(xiàn)原理。后續(xù)再研究原子變量、并發(fā)容器、阻塞隊(duì)列、同步工具、鎖等等主題。java.util.concurrent里的并發(fā)工具用起來(lái)不難,但不能僅僅會(huì)用,我們要read the fucking source code,哈哈。順便說(shuō)聲,我用的JDK是1.8。

Executor框架

Executor是一套線(xiàn)程池管理框架,接口里只有一個(gè)方法execute,執(zhí)行Runnable任務(wù)。ExecutorService接口擴(kuò)展了Executor,添加了線(xiàn)程生命周期的管理,提供任務(wù)終止、返回任務(wù)結(jié)果等方法。AbstractExecutorService實(shí)現(xiàn)了ExecutorService,提供例如submit方法的默認(rèn)實(shí)現(xiàn)邏輯。

然后到今天的主題ThreadPoolExecutor,繼承了AbstractExecutorService,提供線(xiàn)程池的具體實(shí)現(xiàn)。

構(gòu)造方法

下面是ThreadPoolExecutor最普通的構(gòu)造函數(shù),最多有七個(gè)參數(shù)。具體代碼不貼了,只是一些參數(shù)校驗(yàn)和設(shè)置的語(yǔ)句。

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}

corePoolSize是線(xiàn)程池的目標(biāo)大小,即是線(xiàn)程池剛剛創(chuàng)建起來(lái),還沒(méi)有任務(wù)要執(zhí)行時(shí)的大小。maximumPoolSize是線(xiàn)程池的最大上限。keepAliveTime是線(xiàn)程的存活時(shí)間,當(dāng)線(xiàn)程池內(nèi)的線(xiàn)程數(shù)量大于corePoolSize,超出存活時(shí)間的空閑線(xiàn)程就會(huì)被回收。unit就不用說(shuō)了,剩下的三個(gè)參數(shù)看后文的分析。

預(yù)設(shè)的定制線(xiàn)程池

ThreadPoolExecutor預(yù)設(shè)了一些已經(jīng)定制好的線(xiàn)程池,由Executors里的工廠方法創(chuàng)建。下面分析newSingleThreadExecutor、newFixedThreadPool、newCachedThreadPool的創(chuàng)建參數(shù)。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

newFixedThreadPool的corePoolSize和maximumPoolSize都設(shè)置為傳入的固定數(shù)量,keepAliveTim設(shè)置為0。線(xiàn)程池創(chuàng)建后,線(xiàn)程數(shù)量將會(huì)固定不變,適合需要線(xiàn)程很穩(wěn)定的場(chǎng)合。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newSingleThreadExecutor是線(xiàn)程數(shù)量固定為1的newFixedThreadPool版本,保證池內(nèi)的任務(wù)串行。注意到返回的是FinalizableDelegatedExecutorService,來(lái)看看源碼:

static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}

FinalizableDelegatedExecutorService繼承了DelegatedExecutorService,僅僅在gc時(shí)增加關(guān)閉線(xiàn)程池的操作,再來(lái)看看DelegatedExecutorService的源碼:

static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
//...
}

代碼很簡(jiǎn)單,DelegatedExecutorService包裝了ExecutorService,使其只暴露出ExecutorService的方法,因此不能再配置線(xiàn)程池的參數(shù)。本來(lái),線(xiàn)程池創(chuàng)建的參數(shù)是可以調(diào)整的,ThreadPoolExecutor提供了set方法。使用newSingleThreadExecutor目的是生成單線(xiàn)程串行的線(xiàn)程池,如果還能配置線(xiàn)程池大小,那就沒(méi)意思了。

Executors還提供了unconfigurableExecutorService方法,將普通線(xiàn)程池包裝成不可配置的線(xiàn)程池。如果不想線(xiàn)程池被不明所以的后人修改,可以調(diào)用這個(gè)方法。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

newCachedThreadPool生成一個(gè)會(huì)緩存的線(xiàn)程池,線(xiàn)程數(shù)量可以從0到Integer.MAX_VALUE,超時(shí)時(shí)間為1分鐘。線(xiàn)程池用起來(lái)的效果是:如果有空閑線(xiàn)程,會(huì)復(fù)用線(xiàn)程;如果沒(méi)有空閑線(xiàn)程,會(huì)新建線(xiàn)程;如果線(xiàn)程空閑超過(guò)1分鐘,將會(huì)被回收。

newScheduledThreadPool

newScheduledThreadPool將會(huì)創(chuàng)建一個(gè)可定時(shí)執(zhí)行任務(wù)的線(xiàn)程池。這個(gè)不打算在本文展開(kāi),后續(xù)會(huì)另開(kāi)文章細(xì)講。

等待隊(duì)列

newCachedThreadPool的線(xiàn)程上限幾乎等同于無(wú)限,但系統(tǒng)資源是有限的,任務(wù)的處理速度總有可能比不上任務(wù)的提交速度。因此,可以為T(mén)hreadPoolExecutor提供一個(gè)阻塞隊(duì)列來(lái)保存因線(xiàn)程不足而等待的Runnable任務(wù),這就是BlockingQueue。

JDK為BlockingQueue提供了幾種實(shí)現(xiàn)方式,常用的有:

  • ArrayBlockingQueue:數(shù)組結(jié)構(gòu)的阻塞隊(duì)列

  • LinkedBlockingQueue:鏈表結(jié)構(gòu)的阻塞隊(duì)列

  • PriorityBlockingQueue:有優(yōu)先級(jí)的阻塞隊(duì)列

  • SynchronousQueue:不會(huì)存儲(chǔ)元素的阻塞隊(duì)列

newFixedThreadPool和newSingleThreadExecutor在默認(rèn)情況下使用一個(gè)無(wú)界的LinkedBlockingQueue。要注意的是,如果任務(wù)一直提交,但線(xiàn)程池又不能及時(shí)處理,等待隊(duì)列將會(huì)無(wú)限制地加長(zhǎng),系統(tǒng)資源總會(huì)有消耗殆盡的一刻。所以,推薦使用有界的等待隊(duì)列,避免資源耗盡。但解決一個(gè)問(wèn)題,又會(huì)帶來(lái)新問(wèn)題:隊(duì)列填滿(mǎn)之后,再來(lái)新任務(wù),這個(gè)時(shí)候怎么辦?后文會(huì)介紹如何處理隊(duì)列飽和。

newCachedThreadPool使用的SynchronousQueue十分有趣,看名稱(chēng)是個(gè)隊(duì)列,但它卻不能存儲(chǔ)元素。要將一個(gè)任務(wù)放進(jìn)隊(duì)列,必須有另一個(gè)線(xiàn)程去接收這個(gè)任務(wù),一個(gè)進(jìn)就有一個(gè)出,隊(duì)列不會(huì)存儲(chǔ)任何東西。因此,SynchronousQueue是一種移交機(jī)制,不能算是隊(duì)列。newCachedThreadPool生成的是一個(gè)沒(méi)有上限的線(xiàn)程池,理論上提交多少任務(wù)都可以,使用SynchronousQueue作為等待隊(duì)列正合適。

飽和策略

當(dāng)有界的等待隊(duì)列滿(mǎn)了之后,就需要用到飽和策略去處理,ThreadPoolExecutor的飽和策略通過(guò)傳入RejectedExecutionHandler來(lái)實(shí)現(xiàn)。如果沒(méi)有為構(gòu)造函數(shù)傳入,將會(huì)使用默認(rèn)的defaultHandler。

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
}

AbortPolicy是默認(rèn)的實(shí)現(xiàn),直接拋出一個(gè)RejectedExecutionException異常,讓調(diào)用者自己處理。除此之外,還有幾種飽和策略,來(lái)看一下:

public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

DiscardPolicy的rejectedExecution直接是空方法,什么也不干。如果隊(duì)列滿(mǎn)了,后續(xù)的任務(wù)都拋棄掉。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

DiscardOldestPolicy會(huì)將等待隊(duì)列里最舊的任務(wù)踢走,讓新任務(wù)得以執(zhí)行。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

最后一種飽和策略是CallerRunsPolicy,它既不拋棄新任務(wù),也不拋棄舊任務(wù),而是直接在當(dāng)前線(xiàn)程運(yùn)行這個(gè)任務(wù)。當(dāng)前線(xiàn)程一般就是主線(xiàn)程啊,讓主線(xiàn)程運(yùn)行任務(wù),說(shuō)不定就阻塞了。如果不是想清楚了整套方案,還是少用這種策略為妙。

ThreadFactory

每當(dāng)線(xiàn)程池需要?jiǎng)?chuàng)建一個(gè)新線(xiàn)程,都是通過(guò)線(xiàn)程工廠獲取。如果不為T(mén)hreadPoolExecutor設(shè)定一個(gè)線(xiàn)程工廠,就會(huì)使用默認(rèn)的defaultThreadFactory:

public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

平時(shí)打印線(xiàn)程池里線(xiàn)程的name時(shí),會(huì)輸出形如pool-1-thread-1之類(lèi)的名稱(chēng),就是在這里設(shè)置的。這個(gè)默認(rèn)的線(xiàn)程工廠,創(chuàng)建的線(xiàn)程是普通的非守護(hù)線(xiàn)程,如果需要定制,實(shí)現(xiàn)ThreadFactory后傳給ThreadPoolExecutor即可。

不看代碼不總結(jié)不會(huì)知道,光是線(xiàn)程池的創(chuàng)建就可以引出很多學(xué)問(wèn)。別看平時(shí)創(chuàng)建線(xiàn)程池是一句代碼的事,其實(shí)ThreadPoolExecutor提供了很靈活的定制方法。

以上是“怎么創(chuàng)建Java線(xiàn)程池”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

文章名稱(chēng):怎么創(chuàng)建Java線(xiàn)程池
鏈接地址:http://bm7419.com/article46/gipjeg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站排名、企業(yè)建站、網(wǎng)站建設(shè)、網(wǎng)站導(dǎo)航、做網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)