介紹
在
第 I 部分中,我們回顧了線程是如何創建的。讓我們再回憶一次。
線程由 Thread 類表示,它的
run()
方法被調用。因此,讓我們使用
Tutorialspoint 在線 Java 編譯器並執行以下代碼:
public class HelloWorld {
public static void main(String[] args) {
Runnable task = () -> {
System.out.println("Hello World");
};
new Thread(task).start();
}
}
這是在線程上啟動任務的唯一選擇嗎?
java.util.concurrent.Callable
原來
java.lang.Runnable有一個兄弟叫
java.util.concurrent.Callable,他是在Java 1.5中誕生的。有什麼區別?如果您仔細查看此接口的 Javadoc,我們會發現,與 不同的是
Runnable
,新接口聲明了一個
call()
返回結果的方法。此外,它默認拋出 Exception。也就是說,它使我們不必
try-catch
為已檢查的異常而阻塞。不錯,對吧?現在我們有一個新任務而不是
Runnable
:
Callable task = () -> {
return "Hello, World!";
};
但是我們用它做什麼呢?為什麼我們需要在返回結果的線程上運行任務?顯然,對於將來執行的任何操作,我們都希望在將來收到這些操作的結果。我們有一個具有相應名稱的接口:
java.util.concurrent.Future
java.util.concurrent.Future
java.util.concurrent.Future接口定義了一個 API,
用於處理我們計劃在未來接收其結果的任務:獲取結果的方法和檢查狀態的方法。關於
Future
,我們感興趣的是它在
java.util.concurrent.FutureTask類中的實現。這是將在 中執行的“任務”
Future
。讓這個實現更有趣的是它還實現了 Runnable。您可以認為這是在線程上處理任務的舊模型和新模型(在 Java 1.5 中出現的意義上的新模型)之間的一種適配器。這是一個例子:
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class HelloWorld {
public static void main(String[] args) throws Exception {
Callable task = () -> {
return "Hello, World!";
};
FutureTask<String> future = new FutureTask<>(task);
new Thread(future).start();
System.out.println(future.get());
}
}
從示例中可以看出,我們使用 方法
get
從任務中獲取結果。
筆記:當您使用該方法獲得結果時
get()
,執行變得同步!你認為這裡會用到什麼機制?沒錯,沒有同步塊。這就是為什麼我們不會在 JVisualVM 中看到
WAITING作為
monitor
or
wait
,而是作為熟悉的
park()
方法(因為
LockSupport
正在使用該機制)。
功能接口
接下來,我們將討論 Java 1.8 中的類,因此我們最好提供一個簡短的介紹。看下面的代碼:
Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
return "String";
}
};
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
Function<String, Integer> converter = new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return Integer.valueOf(s);
}
};
很多很多額外的代碼,你不覺得嗎?每個聲明的類執行一個功能,但我們使用一堆額外的支持代碼來定義它。這就是 Java 開發人員的想法。因此,他們引入了一組“功能接口”(
@FunctionalInterface
),並決定現在 Java 自己來做“思考”,只留下重要的事情讓我們擔心:
Supplier<String> supplier = () -> "String";
Consumer<String> consumer = s -> System.out.println(s);
Function<String, Integer> converter = s -> Integer.valueOf(s);
一
Supplier
用品。它沒有參數,但會返回一些東西。這就是它供應東西的方式。一個
Consumer
消費。它需要一些東西作為輸入(一個參數)並用它做一些事情。參數是它消耗的東西。那麼我們也有
Function
。它接受輸入(參數),做一些事情,然後返回一些東西。您可以看到我們正在積極使用泛型。
如果您不確定,可以通過閱讀“ Java 中的泛型:如何在實踐中使用尖括號” 來複習一下。
可完成的未來
CompletableFuture
時間過去了, Java 1.8 中出現了一個名為的新類。它實現了
Future
接口,即以後我們的任務完成,我們可以調用
get()
得到結果。但它也實現了
CompletionStage
接口。顧名思義:這是一組計算的某個階段。可以在此處的評論中找到對該主題的簡要介紹:CompletionStage 和 CompletableFuture 簡介。讓我們開門見山。讓我們看一下可幫助我們入門的可用靜態方法列表:
以下是使用它們的選項:
import java.util.concurrent.CompletableFuture;
public class App {
public static void main(String[] args) throws Exception {
// A CompletableFuture that already contains a Result
CompletableFuture<String> completed;
completed = CompletableFuture.completedFuture("Just a value");
// A CompletableFuture that runs a new thread from Runnable. That's why it's Void
CompletableFuture<Void> voidCompletableFuture;
voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println("run " + Thread.currentThread().getName());
});
// A CompletableFuture that starts a new thread whose result we'll get from a Supplier
CompletableFuture<String> supplier;
supplier = CompletableFuture.supplyAsync(() -> {
System.out.println("supply " + Thread.currentThread().getName());
return "Value";
});
}
}
如果我們執行此代碼,我們將看到創建一個
CompletableFuture
還涉及啟動整個管道。因此,與 Java8 中的 SteamAPI 有一定的相似性,這就是我們發現這些方法之間的區別的地方。例如:
List<String> array = Arrays.asList("one", "two");
Stream<String> stringStream = array.stream().map(value -> {
System.out.println("Executed");
return value.toUpperCase();
});
這是 Java 8 的 Stream API 的示例。如果運行此代碼,您會看到“已執行”不會顯示。換句話說,在 Java 中創建流時,流不會立即啟動。相反,它等待有人想要從它那裡獲得價值。但是
CompletableFuture
立即開始執行管道,而不是等待有人向它詢問一個值。我認為理解這一點很重要。所以,我們有一個
CompletableFuture
. 我們如何製作管道(或鏈)以及我們有什麼機制?回想一下我們之前寫過的那些函數式接口。
- 我們有一個
Function
接受 A 並返回 B 的 a。它只有一個方法:apply()
。
- 我們有一個
Consumer
接受 A 並且不返回任何內容的 a (Void)。它只有一個方法:accept()
.
- 我們有
Runnable
,它在線程上運行,不接受任何東西,也不返回任何東西。它只有一個方法:run()
.
接下來要記住的是在其工作中
CompletableFuture
使用
Runnable
,
Consumers
, 和
Functions
。因此,您始終可以知道您可以執行以下操作
CompletableFuture
:
public static void main(String[] args) throws Exception {
AtomicLong longValue = new AtomicLong(0);
Runnable task = () -> longValue.set(new Date().getTime());
Function<Long, Date> dateConverter = (longvalue) -> new Date(longvalue);
Consumer<Date> printer = date -> {
System.out.println(date);
System.out.flush();
};
// CompletableFuture computation
CompletableFuture.runAsync(task)
.thenApply((v) -> longValue.get())
.thenApply(dateConverter)
.thenAccept(printer);
}
、和方法具有
thenRun()
“異步”版本。這意味著這些階段將在不同的線程上完成。這個線程將從一個特殊的池中獲取——所以我們不會提前知道它是新線程還是舊線程。這完全取決於任務的計算密集程度。除了這些方法之外,還有三種更有趣的可能性。為了清楚起見,讓我們假設我們有一個特定的服務從某個地方接收某種消息——這需要時間:
thenApply()
thenAccept()
public static class NewsService {
public static String getMessage() {
try {
Thread.currentThread().sleep(3000);
return "Message";
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
現在,讓我們來看看
CompletableFuture
提供的其他能力。我們可以將 a 的結果
CompletableFuture
與 another 的結果結合起來
CompletableFuture
:
Supplier newsSupplier = () -> NewsService.getMessage();
CompletableFuture<String> reader = CompletableFuture.supplyAsync(newsSupplier);
CompletableFuture.completedFuture("!!")
.thenCombine(reader, (a, b) -> b + a)
.thenAccept(result -> System.out.println(result))
.get();
請注意,默認情況下線程是守護線程,因此為了清楚起見,我們使用
get()
等待結果。我們不僅可以合併
CompletableFutures
,還可以返回一個
CompletableFuture
:
CompletableFuture.completedFuture(2L)
.thenCompose((val) -> CompletableFuture.completedFuture(val + 2))
.thenAccept(result -> System.out.println(result));
在這裡我想指出,該
CompletableFuture.completedFuture()
方法是為簡潔起見而使用的。此方法不會創建新線程,因此管道的其餘部分將在
completedFuture
調用的同一線程上執行。還有一個
thenAcceptBoth()
方法。它與 非常相似
accept()
,但如果
thenAccept()
接受 a
Consumer
,
thenAcceptBoth()
則接受另一個
CompletableStage
+
BiConsumer
作為輸入,即 a
consumer
需要 2 個源而不是一個。名稱中包含單詞“Either”的方法還提供了另一個有趣的功能:
這些方法接受一個替代方法並在最先執行的方法
CompletableStage
上執行。
CompletableStage
最後,我想用另一個有趣的特性來結束這篇評論
CompletableFuture
:錯誤處理。
CompletableFuture.completedFuture(2L)
.thenApply((a) -> {
throw new IllegalStateException("error");
}).thenApply((a) -> 3L)
//.exceptionally(ex -> 0L)
.thenAccept(val -> System.out.println(val));
這段代碼什麼都不做,因為會有一個異常,其他什麼也不會發生。但是通過取消註釋“exceptionally”語句,我們定義了預期的行為。說到
CompletableFuture
,我也推薦你看下面的視頻:
以我的拙見,這些是互聯網上最具解釋性的視頻之一。他們應該清楚這一切是如何運作的,我們有什麼工具包,以及為什麼需要所有這些。
結論
希望現在您已經清楚如何使用線程在完成計算後進行計算。附加材料:
更好的結合:Java 和 Thread 類。第 I 部分 — 執行的線程 更好地結合:Java 和 Thread 類。第二部分 — 同步 更好地結合:Java 和 Thread 類。第 III 部分 — 更好地交互:Java 和 Thread 類。第五部分 — Executor、ThreadPool、Fork/Join 更好地結合在一起:Java 和 Thread 類。第六部分——開火!
GO TO FULL VERSION