Problem
這個問題我使用以下範例來說明:
private ExecutorService es = Executors.newFixedThreadPool(2); @Test public void transitionTaskOfCallback(){ CompletableFuture.supplyAsync(()->{ return new Response(); }, es) .thenAcceptAsync(response->{ System.out.println(response); }, es); }
這段程式透過supplyAsync將工作送給es去執行,當執行完畢後,會再透過thenAcceptAsync把後續處理交給es執行;或許你的程式執行起來,可以很順利地拿到你要的結果,但這裡做了白工。
由於這兩段都使用了async的方式,也都是把工作交由es去執行。假如es中有多個thread,這可能會造成額外的thread context switch;因為第一個工作與第二個工作並不一定會在同一個thread上執行。
How to resolve?
首先你必須要搞清楚,你到底在做什麼.. 假如你是很單純的在supplyAsync中做些計算,然後callback是要針對計算結果做處理,那幹嘛需要多此一舉把它宣告成async的寫法呢? 只要改成以下寫法,就可以讓你維持原本的Thread繼續執行:
@Test public void fixTransitionTaskOfCallback(){ CompletableFuture.supplyAsync(()->{ return new Response(); }, es) .thenAccept(response->{ System.out.println(response); }); }
但假如你的使用情況像之前文章中,會將Blocking工作送到另外一個Thread,接著透過compose串接,就要考慮使用Async方式去延續工作:
@Test public void testSendAsync(){ CompletableFuture<Response> sendAsync = CompletableFuture.supplyAsync(()->{ dumpCurrentThreadName("supplyAsync"); return launchTaskWithAuxThread(()->new BlockingJob().invoke()); }, es) .thenCompose((CompletableFuture<Response> responseReceived)->{ dumpCurrentThreadName("thenCompose"); return responseReceived; }) .thenApplyAsync((Response x)->{ dumpCurrentThreadName("thenApply"); return x; }, es); sendAsync.join(); }
上面的是我在Java8上實做的範例。如果以作者的範例來說,可以參考我在Java11做的範例程式碼。它使用了一個共享的變數responseReceived;當Blocking工作結束後,使用responseReceived.completeAsync去串接thenApply,這樣可以讓工作的執行回到共用的ExecutorService:
public static class BlockingJob2 { private CompletableFuture<Response> responseReceived; public BlockingJob2(CompletableFuture<Response> responseReceived) { this.responseReceived = responseReceived; } public void invoke() { try { dumpCurrentThreadName("before blocking job"); Thread.sleep(2*1000); // io blocking job and get response responseReceived.completeAsync(()->new Response(), es); } catch( Exception e ) { // log } finally { dumpCurrentThreadName("after blocking job"); } } } public static CompletableFuture<Response> send2(){ return CompletableFuture.supplyAsync(()->{ dumpCurrentThreadName("supplyAsync"); CompletableFuture<Response> responseReceived = new CompletableFuture<>(); launchTaskWithAuxThread(()->{ new BlockingJob2(responseReceived).invoke(); }); return responseReceived; }, es) .thenCompose((CompletableFuture<Response> responseReceived)->{ dumpCurrentThreadName("thenCompose"); return responseReceived; }) .thenApply((Response x)->{ dumpCurrentThreadName("whenComplete: " + x.getClass().getName()); return x; }); }
async寫法帶來的是thread的控制性;sync則是因為減少context switch而帶來的則是較好的效能。要選擇哪一種方式,一定要先確定好你要的是什麼。
留言
張貼留言