JDK8 - CompletableFuture 非同步處理簡介
前言
在編寫非同步處理的程式時,若遇到較複雜的連續組合或者Exception Handling等等的狀況,
很容易就會寫出可讀性不佳的程式碼。在JDK 8中,java.util.concurret新增了CompletableFuture,
它提供了許多強大的功能讓非同步處理的調用變得更加便利且清晰。
使用
1.創建非同步任務
我們可以透過CompletableFuture提供的 runAsync 及 supplyAsync 兩個靜態方法來創建非同步的任務
而兩個方法主要的差別是:
runAsync - 沒有回傳值
supplyAsync - 有回傳值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
如果我們在調用runAsync及supplyAsync時,若沒有特別指定調用的執行緒池的話,
預設則會使用 ForkJoinPool ,當然我們也可以使用自訂的執行緒池。
如果在方法中帶入我們自訂的Executor,CompletableFuture將會調用我們自定義的執行緒池去創建非同步任務,
並且可以使用 get() 來取得它的回傳結果。
以下是簡單的例子:
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
LOGGER.info("supplyAsync START");
LOGGER.info("say Hello World");
return "Hello World";
});
LOGGER.info(hello.get());
沒有指定 Executor的話,則會取用預設的Executor,若我們額外設定Executor的話,它將會取用我們所定義的
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
LOGGER.info("supplyAsync START");
LOGGER.info("say Hello World");
return "Hello World";
}, sampleExecutor);
這樣我們就可以讓 CompletableFuture 使用自定義的Executor執行非同步任務。
2.同時執行多個非同步任務
上面已經成功利用 CompletableFuture 創建非同步任務了,
當然也可以同時創建多個非同步的任務來同時執行,
所以利用簡單的範例來測試是否同時使用多個 CompletableFuture 時,可以同時執行不同任務。
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
try {
LOGGER.info("supplyAsync 1 Sleep");
Thread.sleep(500);
LOGGER.info("supplyAsync 1 wake up");
} catch (InterruptedException e) {
}
LOGGER.info("say Hello");
return "Hello";
}, sampleExecutor);
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
try {
LOGGER.info("supplyAsync 2 Sleep");
Thread.sleep(500);
LOGGER.info("supplyAsync 2 wake up");
} catch (InterruptedException e) {
}
LOGGER.info("say world");
return "World";
}, sampleExecutor);
LOGGER.info("All OK");
執行後可以看到,CompletableFuture 使用了兩個 sample-thread執行,並且同時正在執行自己的任務,
也發現到主執行緒也同時執行自己的工作,因此如果遇到需要等待子執行緒結束的時候再繼續執行的話,
我們將可以利用CompletableFuture 的allOf以及join的功能來等待它們結束。
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
try {
LOGGER.info("supplyAsync 1 Sleep");
Thread.sleep(500);
LOGGER.info("supplyAsync 1 wake up");
} catch (InterruptedException e) {
}
LOGGER.info("say Hello");
return "Hello";
}, sampleExecutor);
CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
try {
LOGGER.info("supplyAsync 2 Sleep");
Thread.sleep(500);
LOGGER.info("supplyAsync 2 wake up");
} catch (InterruptedException e) {
}
LOGGER.info("say world");
return "World";
}, sampleExecutor);
CompletableFuture allOf = CompletableFuture.allOf(hello, world);
allOf.join();
LOGGER.info("All OK");
加上allOf及join之後,就可以使主執行緒等待allOf裡面所帶入的非同步任務結束後再繼續執行,
主執行緒就不會在子執行緒還在執行的時候就搶跑了。
3.whenComplete
我們也可以利用CompletableFuture裡的whenComplete來使非同步任務結束之後執行他後續的程式,
像下方簡單的範例可以看到,在whenComplete裡的程式會等到supplyAsync結束之後,
將supplyAsync回傳的內容代入whenComplete裡,並繼續處理後面的程序。
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
LOGGER.info("say Hello");
return "Hello";
}, sampleExecutor).whenComplete((ok , e) -> {
if(e != null ) {
LOGGER.error("e : ", e);
}
LOGGER.info("whenComplete : " + ok);
});
可以看到whenComplete會在supplyAsync結束後取得其回傳值並開始後續的作業,
若希望在完成後繼續以Aysnc執行任務的話,也可以使用whenCompleteAsync來繼續執行,
這樣就可以在完成第一個非同步任務之後繼續處理後續的任務,也不會使程式碼看起來太過雜亂且複雜。
結論
CompletableFuture 實現了Future<T>, CompletionStage<T>後,
更進一步的整合及擴展其功能使非同步處理更加得容易。
像是completeExceptionally、whenComplete等等,許多功能讓開發變得更加好控制,並且讓程式更加得簡潔易讀,
也可以結合ThreadPoolTaskExecutor來自行定義專案內的執行緒池,在非同步處理上相當好用且強大。