forkjoin框架怎么在java中使用?相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
冷水灘網站建設公司創(chuàng)新互聯,冷水灘網站設計制作,有大型網站制作公司豐富經驗。已為冷水灘數千家提供企業(yè)網站建設服務。企業(yè)網站搭建\外貿營銷網站建設要多少錢,請找那個售后服務好的冷水灘做網站的公司定做!fork join框架是java 7中引入框架,這個框架的引入主要是為了提升并行計算的能力。
fork join主要有兩個步驟,第一就是fork,將一個大任務分成很多個小任務,第二就是join,將第一個任務的結果join起來,生成最后的結果。如果第一步中并沒有任何返回值,join將會等到所有的小任務都結束。
還記得之前的文章我們講到了thread pool的基本結構嗎?
ExecutorService - ForkJoinPool 用來調用任務執(zhí)行。
workerThread - ForkJoinWorkerThread 工作線程,用來執(zhí)行具體的任務。
task - ForkJoinTask 用來定義要執(zhí)行的任務。
下面我們從這三個方面來詳細講解fork join框架。
ForkJoinPool
ForkJoinPool是一個ExecutorService的一個實現,它提供了對工作線程和線程池的一些便利管理方法。
public class ForkJoinPool extends AbstractExecutorService
一個work thread一次只能處理一個任務,但是ForkJoinPool并不會為每個任務都創(chuàng)建一個單獨的線程,它會使用一個特殊的數據結構double-ended queue來存儲任務。這樣的結構可以方便的進行工作竊取(work-stealing)。
什么是work-stealing呢?
默認情況下,work thread從分配給自己的那個隊列頭中取出任務。如果這個隊列是空的,那么這個work thread會從其他的任務隊列尾部取出任務來執(zhí)行,或者從全局隊列中取出。這樣的設計可以充分利用work thread的性能,提升并發(fā)能力。
下面看下怎么創(chuàng)建一個ForkJoinPool。
最常見的方法就是使用ForkJoinPool.commonPool()來創(chuàng)建,commonPool()為所有的ForkJoinTask提供了一個公共默認的線程池。
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
另外一種方式是使用構造函數:
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
這里的參數是并行級別,2指的是線程池將會使用2個處理器核心。
ForkJoinWorkerThread
ForkJoinWorkerThread是使用在ForkJoinPool的工作線程。
public class ForkJoinWorkerThread extends Thread }
和一般的線程不一樣的是它定義了兩個變量:
final ForkJoinPool pool; // the pool this thread works in final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
一個是該worker thread所屬的ForkJoinPool。 另外一個是支持 work-stealing機制的Queue。
再看一下它的run方法:
public void run() { if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); pool.runWorker(workQueue); } catch (Throwable ex) { exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); } } } }
簡單點講就是從Queue中取出任務執(zhí)行。
ForkJoinTask
ForkJoinTask是ForkJoinPool中運行的任務類型。通常我們會用到它的兩個子類:RecursiveAction
和RecursiveTask<V>。
他們都定義了一個需要實現的compute()方法用來實現具體的業(yè)務邏輯。不同的是RecursiveAction只是用來執(zhí)行任務,而RecursiveTask<V>可以有返回值。
既然兩個類都帶了Recursive,那么具體的實現邏輯也會跟遞歸有關,我們舉個使用RecursiveAction來打印字符串的例子:
public class CustomRecursiveAction extends RecursiveAction { private String workload = ""; private static final int THRESHOLD = 4; private static Logger logger = Logger.getAnonymousLogger(); public CustomRecursiveAction(String workload) { this.workload = workload; } @Override protected void compute() { if (workload.length() > THRESHOLD) { ForkJoinTask.invokeAll(createSubtasks()); } else { processing(workload); } } private List<CustomRecursiveAction> createSubtasks() { List<CustomRecursiveAction> subtasks = new ArrayList<>(); String partOne = workload.substring(0, workload.length() / 2); String partTwo = workload.substring(workload.length() / 2, workload.length()); subtasks.add(new CustomRecursiveAction(partOne)); subtasks.add(new CustomRecursiveAction(partTwo)); return subtasks; } private void processing(String work) { String result = work.toUpperCase(); logger.info("This result - (" + result + ") - was processed by " + Thread.currentThread().getName()); } }
上面的例子使用了二分法來打印字符串。
我們再看一個RecursiveTask<V>的例子:
public class CustomRecursiveTask extends RecursiveTask<Integer> { private int[] arr; private static final int THRESHOLD = 20; public CustomRecursiveTask(int[] arr) { this.arr = arr; } @Override protected Integer compute() { if (arr.length > THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks()) .stream() .mapToInt(ForkJoinTask::join) .sum(); } else { return processing(arr); } } private Collection<CustomRecursiveTask> createSubtasks() { List<CustomRecursiveTask> dividedTasks = new ArrayList<>(); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, 0, arr.length / 2))); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, arr.length / 2, arr.length))); return dividedTasks; } private Integer processing(int[] arr) { return Arrays.stream(arr) .filter(a -> a > 10 && a < 27) .map(a -> a * 10) .sum(); } }
和上面的例子很像,不過這里我們需要有返回值。
在ForkJoinPool中提交Task
有了上面的兩個任務,我們就可以在ForkJoinPool中提交了:
int[] intArray= {12,12,13,14,15}; CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray); int result = forkJoinPool.invoke(customRecursiveTask); System.out.println(result);
上面的例子中,我們使用invoke來提交,invoke將會等待任務的執(zhí)行結果。
如果不使用invoke,我們也可以將其替換成fork()和join():
customRecursiveTask.fork(); int result2= customRecursiveTask.join(); System.out.println(result2);
fork() 是將任務提交給pool,但是并不觸發(fā)執(zhí)行, join()將會真正的執(zhí)行并且得到返回結果。
看完上述內容,你們掌握forkjoin框架怎么在java中使用的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注創(chuàng)新互聯網站建設公司行業(yè)資訊頻道,感謝各位的閱讀!
另外有需要云服務器可以了解下創(chuàng)新互聯建站bm7419.com,海內外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。
網站欄目:forkjoin框架怎么在java中使用-創(chuàng)新互聯
當前鏈接:http://bm7419.com/article18/dssegp.html
成都網站建設公司_創(chuàng)新互聯,為您提供App開發(fā)、關鍵詞優(yōu)化、營銷型網站建設、App設計、小程序開發(fā)、移動網站建設
聲明:本網站發(fā)布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯