差異處
這裏顯示兩個版本的差異處。
下次修改 | 前次修改 | ||
java:java:java8:concurrent:blockingoperationwithcompletablefuture:build_chain_before_execution [2019/01/13 22:22] tony 建立 |
java:java:java8:concurrent:blockingoperationwithcompletablefuture:build_chain_before_execution [2023/06/25 09:48] (目前版本) |
||
---|---|---|---|
行 37: | 行 37: | ||
ForkJoinPool.commonPool-worker-3: whenComplete | ForkJoinPool.commonPool-worker-3: whenComplete | ||
</code> | </code> | ||
- | 至於Java8就科科了。 | + | 這樣可以避免client thread跑去執行原本預期要async的工作,而增加了client thread的回應速度。\\ |
+ | \\ | ||
+ | 至於Java8目前我使用下面方式來達到lazy launch: | ||
+ | <code java> | ||
+ | CompletableFuture<Response> asyncJob = new CompletableFuture<>(); | ||
+ | job.thenApply((ret)->{ | ||
+ | dumpCurrentThreadName("supplyAsync"); | ||
+ | return new Response();rrentThreadName("runAsync"); | ||
+ | }).whenComplete((response, ex)->{ | ||
+ | dumpCurrentThreadName("whenComplete"); | ||
+ | }); | ||
+ | |||
+ | CompletableFuture.runAsync(()->{ | ||
+ | asyncJob.complete(null); | ||
+ | }) | ||
+ | </code> | ||
+ | ===== Test ===== | ||
+ | 我針對三種情況做測試,在whenComplete中會去計算submit時的context switch次數並delay 50ms, | ||
+ | * 直接透過runAsync launch程式。 | ||
+ | * 透過JDK9 completeAsync lazy launch程式。 | ||
+ | * JDK8的lazy launch程式。 | ||
+ | 由於我使用common的pool,因此我在執行測試之前,會先叫起所有的thread,以避免第一個執行的需要去產生thread。以下是我的測試結果: | ||
+ | <code> | ||
+ | launchImmediately-submit: 306 | ||
+ | launchImmediately-done: 1855 | ||
+ | count for context swtching of submit operation: 6 | ||
+ | launchLazilyJDK9-submit: 2 | ||
+ | launchLazilyJDK9-done: 1656 | ||
+ | count for context swtching of submit operation: 0 | ||
+ | launchLazily-submit: 2 | ||
+ | launchLazily-done: 1712 | ||
+ | count for context swtching of submit operation: 0 | ||
+ | </code> | ||
+ | 可以發現,如果在submit時發生了Problem中所敘述的情況,將會延長client thread與整體工作時間;JDK9與JDK8的做法其實差異不大,不過JDK9的寫法更為簡潔且少產生中繼物件。\\ | ||
+ | \\ | ||
+ | 我附上測試程式碼供大家參考: | ||
+ | <code java> | ||
+ | package org.tonylin.test.lambda.parallel.async; | ||
+ | |||
+ | import java.util.ArrayList; | ||
+ | import java.util.List; | ||
+ | import java.util.concurrent.CompletableFuture; | ||
+ | import java.util.concurrent.atomic.AtomicInteger; | ||
+ | |||
+ | import org.junit.AfterClass; | ||
+ | import org.junit.BeforeClass; | ||
+ | import org.junit.Test; | ||
+ | import org.tonylin.test.util.ThreadUtil; | ||
+ | |||
+ | public class TestLazyCompletableFuture { | ||
+ | |||
+ | private int jobNum = 100; | ||
+ | private long delayTime = 50; | ||
+ | private List<CompletableFuture<Void>> jobs = new ArrayList<>(); | ||
+ | private AtomicInteger count = new AtomicInteger(0); | ||
+ | |||
+ | @BeforeClass | ||
+ | public static void classSetup(){ | ||
+ | ThreadUtil.disableLoggin(); | ||
+ | ThreadUtil.launchThreadsOfCommonPool(); | ||
+ | } | ||
+ | |||
+ | @AfterClass | ||
+ | public static void classTeardown(){ | ||
+ | ThreadUtil.enableLoggin(); | ||
+ | } | ||
+ | |||
+ | private void countIfMainThread(){ | ||
+ | if(Thread.currentThread().getName().equals("main")){ | ||
+ | count.incrementAndGet(); | ||
+ | } | ||
+ | } | ||
+ | |||
+ | @Test | ||
+ | public void launchImmediately(){ | ||
+ | long before = System.currentTimeMillis(); | ||
+ | for( int i = 0 ; i < jobNum ; i++ ) { | ||
+ | CompletableFuture<Void> job = CompletableFuture.runAsync(()->{ | ||
+ | ThreadUtil.dumpCurrentThreadName("runAsync"); | ||
+ | }).whenComplete((ret,ex)->{ | ||
+ | countIfMainThread(); | ||
+ | ThreadUtil.dumpCurrentThreadName("whenComplete"); | ||
+ | ThreadUtil.sleep(delayTime); | ||
+ | }); | ||
+ | jobs.add(job); | ||
+ | } | ||
+ | dumpDuration("launchImmediately", before); | ||
+ | } | ||
+ | |||
+ | @Test | ||
+ | public void launchLazily(){ | ||
+ | long before = System.currentTimeMillis(); | ||
+ | for( int i = 0 ; i < jobNum ; i++ ) { | ||
+ | CompletableFuture<Void> job = new CompletableFuture<>(); | ||
+ | job.whenComplete((ret,ex)->{ | ||
+ | countIfMainThread(); | ||
+ | ThreadUtil.dumpCurrentThreadName("whenComplete"); | ||
+ | ThreadUtil.sleep(delayTime); | ||
+ | }); | ||
+ | |||
+ | jobs.add(CompletableFuture.runAsync(()->{ | ||
+ | ThreadUtil.dumpCurrentThreadName("runAsync"); | ||
+ | job.complete(null); | ||
+ | })); | ||
+ | } | ||
+ | dumpDuration("launchLazily", before); | ||
+ | } | ||
+ | |||
+ | @Test | ||
+ | public void launchLazilyJDK9() { | ||
+ | long before = System.currentTimeMillis(); | ||
+ | for( int i = 0 ; i < jobNum ; i++ ) { | ||
+ | CompletableFuture<Void> job = new CompletableFuture<>(); | ||
+ | job.whenComplete((ret,ex)->{ | ||
+ | countIfMainThread(); | ||
+ | ThreadUtil.dumpCurrentThreadName("whenComplete"); | ||
+ | ThreadUtil.sleep(delayTime); | ||
+ | }); | ||
+ | |||
+ | job.completeAsync(()->{ | ||
+ | ThreadUtil.dumpCurrentThreadName("runAsync"); | ||
+ | return null; | ||
+ | }); | ||
+ | |||
+ | jobs.add(job); | ||
+ | } | ||
+ | dumpDuration("launchLazilyJDK9", before); | ||
+ | } | ||
+ | |||
+ | private void dumpDuration(String testCase, long before){ | ||
+ | long duration_submit = System.currentTimeMillis() - before; | ||
+ | System.out.println(testCase+"-submit: " + duration_submit); | ||
+ | |||
+ | CompletableFuture.allOf(jobs.toArray(new CompletableFuture[0])).join(); | ||
+ | long duration = System.currentTimeMillis() - before; | ||
+ | System.out.println(testCase+"-done: " + duration); | ||
+ | System.out.println("count for context swtching of submit operation: " + count); | ||
+ | } | ||
+ | |||
+ | } | ||
+ | </code> | ||
+ | <code java> | ||
+ | package org.tonylin.test.util; | ||
+ | |||
+ | import java.util.ArrayList; | ||
+ | import java.util.List; | ||
+ | import java.util.concurrent.CompletableFuture; | ||
+ | import java.util.concurrent.ForkJoinPool; | ||
+ | import java.util.concurrent.ForkJoinTask; | ||
+ | |||
+ | public class ThreadUtil { | ||
+ | private static boolean logginEnabled = true; | ||
+ | |||
+ | public static void dumpCurrentThreadName(String suffix) { | ||
+ | if( logginEnabled ) | ||
+ | System.out.println(Thread.currentThread().getName() + ": " + suffix); | ||
+ | } | ||
+ | |||
+ | public static void launchThreadsOfCommonPool(){ | ||
+ | |||
+ | List<ForkJoinTask<?>> tasks = new ArrayList<>(); | ||
+ | for( int i = 0 ; i < ForkJoinPool.getCommonPoolParallelism(); i++){ | ||
+ | ForkJoinTask<?> task = ForkJoinPool.commonPool().submit(()->{ | ||
+ | sleep(100); | ||
+ | }); | ||
+ | tasks.add(task); | ||
+ | } | ||
+ | tasks.forEach(task->task.join()); | ||
+ | } | ||
+ | |||
+ | public static void sleep(long duration){ | ||
+ | try { | ||
+ | Thread.sleep(duration); | ||
+ | } catch (InterruptedException e) { | ||
+ | Thread.currentThread().interrupt(); | ||
+ | throw new RuntimeException(e); | ||
+ | } | ||
+ | } | ||
+ | |||
+ | public static void enableLoggin(){ | ||
+ | logginEnabled = true; | ||
+ | } | ||
+ | |||
+ | public static void disableLoggin(){ | ||
+ | logginEnabled = false; | ||
+ | } | ||
+ | } | ||
+ | </code> | ||
===== Reference ===== | ===== Reference ===== | ||
* [[https://qconsf.com/sf2017/system/files/presentation-slides/cf.pdf|Asynchronous API with | * [[https://qconsf.com/sf2017/system/files/presentation-slides/cf.pdf|Asynchronous API with |