CompletableFuture - Carefully avoid transition task from one thread to another. It costs.

這個問題我使用以下範例來說明:

private ExecutorService es = Executors.newFixedThreadPool(2);
@Test
public void transitionTaskOfCallback(){
	CompletableFuture.supplyAsync(()->{
		return new Response();
	}, es)
	.thenAcceptAsync(response->{
		System.out.println(response);
	}, es);
}
這段程式透過supplyAsync將工作送給es去執行,當執行完畢後,會再透過thenAcceptAsync把後續處理交給es執行;或許你的程式執行起來,可以很順利地拿到你要的結果,但這裡做了白工。

由於這兩段都使用了async的方式,也都是把工作交由es去執行。假如es中有多個thread,這可能會造成額外的thread context switch;因為第一個工作與第二個工作並不一定會在同一個thread上執行。

首先你必須要搞清楚,你到底在做什麼.. 假如你是很單純的在supplyAsync中做些計算,然後callback是要針對計算結果做處理,那幹嘛需要多此一舉把它宣告成async的寫法呢? 只要改成以下寫法,就可以讓你維持原本的Thread繼續執行:

@Test
public void fixTransitionTaskOfCallback(){
	CompletableFuture.supplyAsync(()->{
		return new Response();
	}, es)
	.thenAccept(response->{
		System.out.println(response);
	});
}
但假如你的使用情況像之前文章中,會將Blocking工作送到另外一個Thread,接著透過compose串接,就要考慮使用Async方式去延續工作:
@Test
public void testSendAsync(){
	CompletableFuture<Response> sendAsync = CompletableFuture.supplyAsync(()->{
		dumpCurrentThreadName("supplyAsync");
		return launchTaskWithAuxThread(()->new BlockingJob().invoke());
	}, es)
	.thenCompose((CompletableFuture<Response> responseReceived)->{
		dumpCurrentThreadName("thenCompose");
		return responseReceived;
	})
	.thenApplyAsync((Response x)->{
		dumpCurrentThreadName("thenApply");
		return x;
	}, es);
 
	sendAsync.join();
}
上面的是我在Java8上實做的範例。如果以作者的範例來說,可以參考我在Java11做的範例程式碼。它使用了一個共享的變數responseReceived;當Blocking工作結束後,使用responseReceived.completeAsync去串接thenApply,這樣可以讓工作的執行回到共用的ExecutorService:
public static class BlockingJob2 {
	private CompletableFuture<Response> responseReceived;
	public BlockingJob2(CompletableFuture<Response> responseReceived) {
		this.responseReceived = responseReceived;
	}
	public void invoke() {
		try {
			dumpCurrentThreadName("before blocking job");
			Thread.sleep(2*1000);
			// io blocking job and get response
			responseReceived.completeAsync(()->new Response(), es);
		} catch( Exception e ) {
			// log
		} finally {
			dumpCurrentThreadName("after blocking job");
		}
	}
}
 
public static CompletableFuture<Response> send2(){
	return CompletableFuture.supplyAsync(()->{
		dumpCurrentThreadName("supplyAsync");
 
		CompletableFuture<Response> responseReceived = new CompletableFuture<>();
		launchTaskWithAuxThread(()->{
			new BlockingJob2(responseReceived).invoke();
		});
		return responseReceived;
	}, es)
	.thenCompose((CompletableFuture<Response> responseReceived)->{
		dumpCurrentThreadName("thenCompose");
		return responseReceived;
	})
	.thenApply((Response x)->{
		dumpCurrentThreadName("whenComplete: " + x.getClass().getName());
		return x;
	});
}
async寫法帶來的是thread的控制性;sync則是因為減少context switch而帶來的則是較好的效能。要選擇哪一種方式,一定要先確定好你要的是什麼。