差異處

這裏顯示兩個版本的差異處。

連向這個比對檢視

java:java:java8:concurrent:blockingoperationwithcompletablefuture:blocking_inside_cf [2019/01/13 21:02]
tony
java:java:java8:concurrent:blockingoperationwithcompletablefuture:blocking_inside_cf [2023/06/25 09:48]
行 1: 行 1:
-{{tag>​java concurrency}} 
-====== CompletableFuture - Avoid blocking inside CompletableFuture chains. It costs. ====== 
-===== Problem ===== 
-如下圖所示,使用ExecutorService最害怕的就是遇到Blocking的操作,這將會降低整個ExecutorService的生產力。\\ 
-{{:​java:​java:​java8:​concurrent:​blockingoperationwithcompletablefuture:​java8_completablefuture_perf_block_es.png?​450|}}\\ 
-(本篇圖片都來自於[[https://​qconsf.com/​sf2017/​system/​files/​presentation-slides/​cf.pdf|link]]) 
-===== How to resolve? ===== 
-面對這問題,作者建議針對Blocking操作可以在發送後,透過Aux Thread執行;中間透過completableFuture的compose串接;Aux Thread執行完畢後,會將結果丟給串接的completableFuture,並讓工作繼續回到原本的ExecutorService執行。如下圖所示:​\\ 
-{{:​java:​java:​java8:​concurrent:​blockingoperationwithcompletablefuture:​java8_completablefuture_perf_block_es_solution.png?​600|}}\\ 
-\\ 
-我嘗試使用Java8實作,範例程式碼如下:​ 
-<code java> 
-private static int cpu_num = Runtime.getRuntime().availableProcessors();​ 
-private static ExecutorService es = Executors.newFixedThreadPool(cpu_num, ​ 
- CustomThreadFactory.create("​cpu_bound_thread_"​));​ 
-  
-public static class BlockingJob { 
- public Response invoke() { 
- try { 
- dumpCurrentThreadName("​before blocking job"); 
- Thread.sleep(2*1000);​ 
- } catch( Exception e ) { 
- e.printStackTrace();​ 
- } finally { 
- dumpCurrentThreadName("​after blocking job"); 
- } 
- return new Response(); 
- } 
-} 
  
-private static <T> CompletableFuture<​T>​ launchTaskWithAuxThread(Supplier<​T>​ supplier){ 
- ExecutorService auxEs =  
- Executors.newSingleThreadExecutor(CustomThreadFactory.create("​aux_blocking_thread_"​));​ 
- CompletableFuture<​T>​ task = CompletableFuture 
- .supplyAsync(supplier,​ auxEs); 
- auxEs.shutdown();​ 
- return task; 
-} 
- 
-@Test 
-public void testSendAsync(){ 
- CompletableFuture<​Response>​ sendAsync = CompletableFuture.supplyAsync(()->​{ 
- dumpCurrentThreadName("​supplyAsync"​);​ 
- return launchTaskWithAuxThread(()->​new BlockingJob().invoke());​ 
- }, es) 
- .thenCompose((CompletableFuture<​Response>​ responseReceived)->​{ 
- dumpCurrentThreadName("​thenCompose"​);​ 
- return responseReceived;​ 
- }) 
- .thenApplyAsync((Response x)->{ 
- dumpCurrentThreadName("​thenApply"​);​ 
- return x; 
- }, es); 
-  
- sendAsync.join();​ 
-} 
-</​code>​ 
-實作特別需要注意的兩個部分是, 
-  * Aux Thread儘量別使用與原本相同的ExecutorService。假如你使用的是固定大小的ExecutorService,可能會造成Deadlock;假如你是使用CachedExecutorService,就必須要注意Thread數量是否會造成OOM。 
-  * thenCompose之後的動作必須使用Async,否則會是用Aux Thread去執行後續操作,無法將CPU bound工作集中。(這部分可能會根據實作會有差異) 
-輸出如下:​ 
-<code bash> 
-cpu_bound_thread_1:​ supplyAsync 
-cpu_bound_thread_1:​ thenCompose 
-aux_blocking_thread_1:​ before blocking job 
-aux_blocking_thread_1:​ after blocking job 
-cpu_bound_thread_2:​ thenApply 
-</​code>​ 
-從結果可以發現,除了blocking的操作外,其餘的操作都是跑在我特定的ExecutorService中。我的想法是, 
-  * 如果此ExecutorService專門用於CPU Bound工作,這做法可分離出Blocking操作避免影響到ThreadPool的生產力。 
-  * 範例中針對Blocking操作是建立單一Thread的ExecutorService,但如果會有IO最大限制的情況下,這部分需要控管數量。 
-  * 這方法雖然有額外的thread context switch,但處於Blocking狀態的Thread並不會與CPU搶資源,因此工作忙碌時所帶來的效能優勢應可掩蓋此缺點。 
- 
-Note. 我的範例程式只是實現的方法之一,應要根據需求做調整。 例如原始投影片中,是透過responseReceived.completeAsync讓工作回到compose之後繼續往下處理。 
-===== Reference ===== 
-  * [[https://​qconsf.com/​sf2017/​system/​files/​presentation-slides/​cf.pdf|Asynchronous API with 
-CompletableFuture - Performance Tips and Tricks]] 
- 
-====== ​ ====== 
----- 
-\\ 
-~~DISQUS~~