Problem
下方這隻程式在Java8中應是相當常見:
CompletableFuture.supplyAsync(()->{ dumpCurrentThreadName("supplyAsync"); return new Response(); }).whenComplete((response, ex)->{ dumpCurrentThreadName("whenComplete"); // process response });
它的問題是如果supplyAsync中的工作,在執行whenComplete前就結束了;當執行whenComplete時,有可能會使用client thread去執行工作而影響到工作接收的速度。以下是我的執行結果:
ForkJoinPool.commonPool-worker-3: supplyAsync main: whenComplete
假如你的callback chain很長,遇到這個情況的機率就會越高。
How to resolve?
面對這個問題,在Java9中,允許你將實際的執行工作放到callback chain之後:
CompletableFuture<Response> asyncJob = new CompletableFuture<>(); asyncJob.whenComplete((response, ex)->{ dumpCurrentThreadName("whenComplete"); }); asyncJob.completeAsync(()->{ dumpCurrentThreadName("supplyAsync"); return new Response(); });
輸出結果如下:
ForkJoinPool.commonPool-worker-3: supplyAsync ForkJoinPool.commonPool-worker-3: whenComplete
這樣可以避免client thread跑去執行原本預期要async的工作,而增加了client thread的回應速度。
至於Java8目前我使用下面方式來達到lazy launch:
CompletableFuture<Response> asyncJob = new CompletableFuture<>(); job.thenApply((ret)->{ dumpCurrentThreadName("supplyAsync"); return new Response();rrentThreadName("runAsync"); }).whenComplete((response, ex)->{ dumpCurrentThreadName("whenComplete"); }); CompletableFuture.runAsync(()->{ asyncJob.complete(null); })
Test
我針對三種情況做測試,在whenComplete中會去計算submit時的context switch次數並delay 50ms,
- 直接透過runAsync launch程式。
- 透過JDK9 completeAsync lazy launch程式。
- JDK8的lazy launch程式。
由於我使用common的pool,因此我在執行測試之前,會先叫起所有的thread,以避免第一個執行的需要去產生thread。以下是我的測試結果:
launchImmediately-submit: 306 launchImmediately-done: 1855 count for context swtching of submit operation: 6 launchLazilyJDK9-submit: 2 launchLazilyJDK9-done: 1656 count for context swtching of submit operation: 0 launchLazily-submit: 2 launchLazily-done: 1712 count for context swtching of submit operation: 0
可以發現,如果在submit時發生了Problem中所敘述的情況,將會延長client thread與整體工作時間;JDK9與JDK8的做法其實差異不大,不過JDK9的寫法更為簡潔且少產生中繼物件。
我附上測試程式碼供大家參考:
package org.tonylin.test.lambda.parallel.async; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.tonylin.test.util.ThreadUtil; public class TestLazyCompletableFuture { private int jobNum = 100; private long delayTime = 50; private List<CompletableFuture<Void>> jobs = new ArrayList<>(); private AtomicInteger count = new AtomicInteger(0); @BeforeClass public static void classSetup(){ ThreadUtil.disableLoggin(); ThreadUtil.launchThreadsOfCommonPool(); } @AfterClass public static void classTeardown(){ ThreadUtil.enableLoggin(); } private void countIfMainThread(){ if(Thread.currentThread().getName().equals("main")){ count.incrementAndGet(); } } @Test public void launchImmediately(){ long before = System.currentTimeMillis(); for( int i = 0 ; i < jobNum ; i++ ) { CompletableFuture<Void> job = CompletableFuture.runAsync(()->{ ThreadUtil.dumpCurrentThreadName("runAsync"); }).whenComplete((ret,ex)->{ countIfMainThread(); ThreadUtil.dumpCurrentThreadName("whenComplete"); ThreadUtil.sleep(delayTime); }); jobs.add(job); } dumpDuration("launchImmediately", before); } @Test public void launchLazily(){ long before = System.currentTimeMillis(); for( int i = 0 ; i < jobNum ; i++ ) { CompletableFuture<Void> job = new CompletableFuture<>(); job.whenComplete((ret,ex)->{ countIfMainThread(); ThreadUtil.dumpCurrentThreadName("whenComplete"); ThreadUtil.sleep(delayTime); }); jobs.add(CompletableFuture.runAsync(()->{ ThreadUtil.dumpCurrentThreadName("runAsync"); job.complete(null); })); } dumpDuration("launchLazily", before); } @Test public void launchLazilyJDK9() { long before = System.currentTimeMillis(); for( int i = 0 ; i < jobNum ; i++ ) { CompletableFuture<Void> job = new CompletableFuture<>(); job.whenComplete((ret,ex)->{ countIfMainThread(); ThreadUtil.dumpCurrentThreadName("whenComplete"); ThreadUtil.sleep(delayTime); }); job.completeAsync(()->{ ThreadUtil.dumpCurrentThreadName("runAsync"); return null; }); jobs.add(job); } dumpDuration("launchLazilyJDK9", before); } private void dumpDuration(String testCase, long before){ long duration_submit = System.currentTimeMillis() - before; System.out.println(testCase+"-submit: " + duration_submit); CompletableFuture.allOf(jobs.toArray(new CompletableFuture[0])).join(); long duration = System.currentTimeMillis() - before; System.out.println(testCase+"-done: " + duration); System.out.println("count for context swtching of submit operation: " + count); } }
package org.tonylin.test.util; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; public class ThreadUtil { private static boolean logginEnabled = true; public static void dumpCurrentThreadName(String suffix) { if( logginEnabled ) System.out.println(Thread.currentThread().getName() + ": " + suffix); } public static void launchThreadsOfCommonPool(){ List<ForkJoinTask<?>> tasks = new ArrayList<>(); for( int i = 0 ; i < ForkJoinPool.getCommonPoolParallelism(); i++){ ForkJoinTask<?> task = ForkJoinPool.commonPool().submit(()->{ sleep(100); }); tasks.add(task); } tasks.forEach(task->task.join()); } public static void sleep(long duration){ try { Thread.sleep(duration); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } public static void enableLoggin(){ logginEnabled = true; } public static void disableLoggin(){ logginEnabled = false; } }
留言
張貼留言