差異處

這裏顯示兩個版本的差異處。

連向這個比對檢視

Both sides previous revision 前次修改
下次修改
前次修改
java:java:java8:concurrent:blockingoperationwithcompletablefuture:build_chain_before_execution [2019/01/13 22:41]
tony [How to resolve?]
java:java:java8:concurrent:blockingoperationwithcompletablefuture:build_chain_before_execution [2023/06/25 09:48] (目前版本)
行 39: 行 39:
 這樣可以避免client thread跑去執行原本預期要async的工作,而增加了client thread的回應速度。\\ 這樣可以避免client thread跑去執行原本預期要async的工作,而增加了client thread的回應速度。\\
 \\ \\
-至於Java8就科科了。+至於Java8目前我使用下面方式來達到lazy launch: 
 +<code java> 
 +CompletableFuture<​Response>​ asyncJob = new CompletableFuture<>​();​ 
 +job.thenApply((ret)->​{ 
 + dumpCurrentThreadName("​supplyAsync"​);​ 
 + return new Response();​rrentThreadName("​runAsync"​);​
  
 +}).whenComplete((response,​ ex)->{
 + dumpCurrentThreadName("​whenComplete"​);​
 +});
 +
 +CompletableFuture.runAsync(()->​{
 + asyncJob.complete(null);​
 +})
 +</​code>​
 +===== Test =====
 +我針對三種情況做測試,在whenComplete中會去計算submit時的context switch次數並delay 50ms,
 +  * 直接透過runAsync launch程式。
 +  * 透過JDK9 completeAsync lazy launch程式。
 +  * JDK8的lazy launch程式。
 +由於我使用common的pool,因此我在執行測試之前,會先叫起所有的thread,以避免第一個執行的需要去產生thread。以下是我的測試結果:​
 +<​code>​
 +launchImmediately-submit:​ 306
 +launchImmediately-done:​ 1855
 +count for context swtching of submit operation: 6
 +launchLazilyJDK9-submit:​ 2
 +launchLazilyJDK9-done:​ 1656
 +count for context swtching of submit operation: 0
 +launchLazily-submit:​ 2
 +launchLazily-done:​ 1712
 +count for context swtching of submit operation: 0
 +</​code>​
 +可以發現,如果在submit時發生了Problem中所敘述的情況,將會延長client thread與整體工作時間;JDK9與JDK8的做法其實差異不大,不過JDK9的寫法更為簡潔且少產生中繼物件。\\
 +\\
 +我附上測試程式碼供大家參考:​
 +<code java>
 +package org.tonylin.test.lambda.parallel.async;​
 +
 +import java.util.ArrayList;​
 +import java.util.List;​
 +import java.util.concurrent.CompletableFuture;​
 +import java.util.concurrent.atomic.AtomicInteger;​
 +
 +import org.junit.AfterClass;​
 +import org.junit.BeforeClass;​
 +import org.junit.Test;​
 +import org.tonylin.test.util.ThreadUtil;​
 +
 +public class TestLazyCompletableFuture {
 +
 + private int jobNum = 100;
 + private long delayTime = 50;
 + private List<​CompletableFuture<​Void>>​ jobs = new ArrayList<>​();​
 + private AtomicInteger count = new AtomicInteger(0);​
 +
 + @BeforeClass
 + public static void classSetup(){
 + ThreadUtil.disableLoggin();​
 + ThreadUtil.launchThreadsOfCommonPool();​
 + }
 +
 + @AfterClass
 + public static void classTeardown(){
 + ThreadUtil.enableLoggin();​
 + }
 +
 + private void countIfMainThread(){
 + if(Thread.currentThread().getName().equals("​main"​)){
 + count.incrementAndGet();​
 + }
 + }
 +
 + @Test
 + public void launchImmediately(){
 + long before = System.currentTimeMillis();​
 + for( int i = 0 ; i < jobNum ; i++ ) {
 + CompletableFuture<​Void>​ job = CompletableFuture.runAsync(()->​{
 + ThreadUtil.dumpCurrentThreadName("​runAsync"​);​
 + }).whenComplete((ret,​ex)->​{
 + countIfMainThread();​
 + ThreadUtil.dumpCurrentThreadName("​whenComplete"​);​
 + ThreadUtil.sleep(delayTime);​
 + });
 + jobs.add(job);​
 + }
 + dumpDuration("​launchImmediately",​ before);
 + }
 +
 + @Test
 + public void launchLazily(){
 + long before = System.currentTimeMillis();​
 + for( int i = 0 ; i < jobNum ; i++ ) {
 + CompletableFuture<​Void>​ job = new CompletableFuture<>​(); ​
 + job.whenComplete((ret,​ex)->​{
 + countIfMainThread();​
 + ThreadUtil.dumpCurrentThreadName("​whenComplete"​);​
 + ThreadUtil.sleep(delayTime);​
 + });
 +
 + jobs.add(CompletableFuture.runAsync(()->​{
 + ThreadUtil.dumpCurrentThreadName("​runAsync"​);​
 + job.complete(null);​
 + }));
 + }
 + dumpDuration("​launchLazily",​ before);
 + }
 +
 + @Test
 + public void launchLazilyJDK9() {
 + long before = System.currentTimeMillis();​
 + for( int i = 0 ; i < jobNum ; i++ ) {
 + CompletableFuture<​Void>​ job = new CompletableFuture<>​(); ​
 + job.whenComplete((ret,​ex)->​{
 + countIfMainThread();​
 + ThreadUtil.dumpCurrentThreadName("​whenComplete"​);​
 + ThreadUtil.sleep(delayTime);​
 + });
 +
 + job.completeAsync(()->​{
 + ThreadUtil.dumpCurrentThreadName("​runAsync"​);​
 + return null;
 + });
 +
 + jobs.add(job);​
 + }
 + dumpDuration("​launchLazilyJDK9",​ before);
 + }
 +
 + private void dumpDuration(String testCase, long before){
 + long duration_submit = System.currentTimeMillis() - before;
 + System.out.println(testCase+"​-submit:​ " + duration_submit);​
 +
 + CompletableFuture.allOf(jobs.toArray(new CompletableFuture[0])).join();​
 + long duration = System.currentTimeMillis() - before;
 + System.out.println(testCase+"​-done:​ " + duration);
 + System.out.println("​count for context swtching of submit operation: " + count);
 + }
 +
 +}
 +</​code>​
 +<code java>
 +package org.tonylin.test.util;​
 +
 +import java.util.ArrayList;​
 +import java.util.List;​
 +import java.util.concurrent.CompletableFuture;​
 +import java.util.concurrent.ForkJoinPool;​
 +import java.util.concurrent.ForkJoinTask;​
 +
 +public class ThreadUtil {
 + private static boolean logginEnabled = true;
 +
 + public static void dumpCurrentThreadName(String suffix) {
 + if( logginEnabled )
 + System.out.println(Thread.currentThread().getName() + ": " + suffix);
 + }
 +
 + public static void launchThreadsOfCommonPool(){
 +
 + List<​ForkJoinTask<?>>​ tasks = new ArrayList<>​();​
 + for( int i = 0 ; i < ForkJoinPool.getCommonPoolParallelism();​ i++){
 + ForkJoinTask<?>​ task = ForkJoinPool.commonPool().submit(()->​{
 + sleep(100);​
 + });
 + tasks.add(task);​
 + }
 + tasks.forEach(task->​task.join());​
 + }
 +
 + public static void sleep(long duration){
 + try {
 + Thread.sleep(duration);​
 + } catch (InterruptedException e) {
 + Thread.currentThread().interrupt();​
 + throw new RuntimeException(e);​
 + }
 + }
 +
 + public static void enableLoggin(){
 + logginEnabled = true;
 + }
 +
 + public static void disableLoggin(){
 + logginEnabled = false;
 + }
 +}
 +</​code>​
 ===== Reference ===== ===== Reference =====
   * [[https://​qconsf.com/​sf2017/​system/​files/​presentation-slides/​cf.pdf|Asynchronous API with   * [[https://​qconsf.com/​sf2017/​system/​files/​presentation-slides/​cf.pdf|Asynchronous API with