CodeGym /Java Blog /Toto sisi /更好的結合:Java 和 Thread 類。第四部分 — Callable、Future 和朋友
John Squirrels
等級 41
San Francisco

更好的結合:Java 和 Thread 類。第四部分 — Callable、Future 和朋友

在 Toto sisi 群組發布

介紹

第 I 部分中,我們回顧了線程是如何創建的。讓我們再回憶一次。 更好的結合:Java 和 Thread 類。 第四部分 — Callable、Future 和朋友 - 1線程由 Thread 類表示,它的run()方法被調用。因此,讓我們使用Tutorialspoint 在線 Java 編譯器並執行以下代碼:

public class HelloWorld {
    
    public static void main(String[] args) {
        Runnable task = () -> {
            System.out.println("Hello World");
        };
        new Thread(task).start();
    }
}
這是在線程上啟動任務的唯一選擇嗎?

java.util.concurrent.Callable

原來java.lang.Runnable有一個兄弟叫java.util.concurrent.Callable,他是在Java 1.5中誕生的。有什麼區別?如果您仔細查看此接口的 Javadoc,我們會發現,與 不同的是Runnable,新接口聲明了一個call()返回結果的方法。此外,它默認拋出 Exception。也就是說,它使我們不必try-catch為已檢查的異常而阻塞。不錯,對吧?現在我們有一個新任務而不是Runnable

Callable task = () -> {
	return "Hello, World!";
};
但是我們用它做什麼呢?為什麼我們需要在返回結果的線程上運行任務?顯然,對於將來執行的任何操作,我們都希望在將來收到這些操作的結果。我們有一個具有相應名稱的接口:java.util.concurrent.Future

java.util.concurrent.Future

java.util.concurrent.Future接口定義了一個 API,用於處理我們計劃在未來接收其結果的任務:獲取結果的方法和檢查狀態的方法。關於Future,我們感興趣的是它在java.util.concurrent.FutureTask類中的實現。這是將在 中執行的“任務” Future。讓這個實現更有趣的是它還實現了 Runnable。您可以認為這是在線程上處理任務的舊模型和新模型(在 Java 1.5 中出現的意義上的新模型)之間的一種適配器。這是一個例子:

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class HelloWorld {
    
    public static void main(String[] args) throws Exception {
        Callable task = () -> {
            return "Hello, World!";
        };
        FutureTask<String> future = new FutureTask<>(task);
        new Thread(future).start();
        System.out.println(future.get());
    }
}
從示例中可以看出,我們使用 方法get從任務中獲取結果。 筆記:當您使用該方法獲得結果時get(),執行變得同步!你認為這裡會用到什麼機制?沒錯,沒有同步塊。這就是為什麼我們不會在 JVisualVM 中看到WAITING作為monitoror wait,而是作為熟悉的park()方法(因為LockSupport正在使用該機制)。

功能接口

接下來,我們將討論 Java 1.8 中的類,因此我們最好提供一個簡短的介紹。看下面的代碼:

Supplier<String> supplier = new Supplier<String>() {
	@Override
	public String get() {
		return "String";
	}
};
Consumer<String> consumer = new Consumer<String>() {
	@Override
	public void accept(String s) {
		System.out.println(s);
	}
};
Function<String, Integer> converter = new Function<String, Integer>() {
	@Override
	public Integer apply(String s) {
		return Integer.valueOf(s);
	}
};
很多很多額外的代碼,你不覺得嗎?每個聲明的類執行一個功能,但我們使用一堆額外的支持代碼來定義它。這就是 Java 開發人員的想法。因此,他們引入了一組“功能接口”(@FunctionalInterface),並決定現在 Java 自己來做“思考”,只留下重要的事情讓我們擔心:

Supplier<String> supplier = () -> "String";
Consumer<String> consumer = s -> System.out.println(s);
Function<String, Integer> converter = s -> Integer.valueOf(s);
Supplier用品。它沒有參數,但會返回一些東西。這就是它供應東西的方式。一個Consumer消費。它需要一些東西作為輸入(一個參數)並用它做一些事情。參數是它消耗的東西。那麼我們也有Function。它接受輸入(參數),做一些事情,然後返回一些東西。您可以看到我們正在積極使用泛型。如果您不確定,可以通過閱讀“ Java 中的泛型:如何在實踐中使用尖括號” 來複習一下。

可完成的未來

CompletableFuture時間過去了, Java 1.8 中出現了一個名為的新類。它實現了Future接口,即以後我們的任務完成,我們可以調用get()得到結果。但它也實現了CompletionStage接口。顧名思義:這是一組計算的某個階段。可以在此處的評論中找到對該主題的簡要介紹:CompletionStage 和 CompletableFuture 簡介。讓我們開門見山。讓我們看一下可幫助我們入門的可用靜態方法列表: 更好的結合:Java 和 Thread 類。 第四部分 — Callable、Future 和朋友 - 2以下是使用它們的選項:

import java.util.concurrent.CompletableFuture;
public class App {
    public static void main(String[] args) throws Exception {
        // A CompletableFuture that already contains a Result
        CompletableFuture<String> completed;
        completed = CompletableFuture.completedFuture("Just a value");
        // A CompletableFuture that runs a new thread from Runnable. That's why it's Void
        CompletableFuture<Void> voidCompletableFuture;
        voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("run " + Thread.currentThread().getName());
        });
        // A CompletableFuture that starts a new thread whose result we'll get from a Supplier 
        CompletableFuture<String> supplier;
        supplier = CompletableFuture.supplyAsync(() -> {
            System.out.println("supply " + Thread.currentThread().getName());
            return "Value";
        });
    }
}
如果我們執行此代碼,我們將看到創建一個CompletableFuture還涉及啟動整個管道。因此,與 Java8 中的 SteamAPI 有一定的相似性,這就是我們發現這些方法之間的區別的地方。例如:

List<String> array = Arrays.asList("one", "two");
Stream<String> stringStream = array.stream().map(value -> {
	System.out.println("Executed");
	return value.toUpperCase();
});
這是 Java 8 的 Stream API 的示例。如果運行此代碼,您會看到“已執行”不會顯示。換句話說,在 Java 中創建流時,流不會立即啟動。相反,它等待有人想要從它那裡獲得價值。但是CompletableFuture立即開始執行管道,而不是等待有人向它詢問一個值。我認為理解這一點很重要。所以,我們有一個CompletableFuture. 我們如何製作管道(或鏈)以及我們有什麼機制?回想一下我們之前寫過的那些函數式接口。
  • 我們有一個Function接受 A 並返回 B 的 a。它只有一個方法:apply()
  • 我們有一個Consumer接受 A 並且不返回任何內容的 a (Void)。它只有一個方法:accept().
  • 我們有Runnable,它在線程上運行,不接受任何東西,也不返回任何東西。它只有一個方法:run().
接下來要記住的是在其工作中CompletableFuture使用Runnable, Consumers, 和Functions。因此,您始終可以知道您可以執行以下操作CompletableFuture

public static void main(String[] args) throws Exception {
        AtomicLong longValue = new AtomicLong(0);
        Runnable task = () -> longValue.set(new Date().getTime());
        Function<Long, Date> dateConverter = (longvalue) -> new Date(longvalue);
        Consumer<Date> printer = date -> {
            System.out.println(date);
            System.out.flush();
        };
        // CompletableFuture computation
        CompletableFuture.runAsync(task)
                         .thenApply((v) -> longValue.get())
                         .thenApply(dateConverter)
                         .thenAccept(printer);
}
、和方法具有thenRun()“異步”版本。這意味著這些階段將在不同的線程上完成。這個線程將從一個特殊的池中獲取——所以我們不會提前知道它是新線程還是舊線程。這完全取決於任務的計算密集程度。除了這些方法之外,還有三種更有趣的可能性。為了清楚起見,讓我們假設我們有一個特定的服務從某個地方接收某種消息——這需要時間: thenApply()thenAccept()

public static class NewsService {
	public static String getMessage() {
		try {
			Thread.currentThread().sleep(3000);
			return "Message";
		} catch (InterruptedException e) {
			throw new IllegalStateException(e);
		}
	}
}
現在,讓我們來看看CompletableFuture提供的其他能力。我們可以將 a 的結果CompletableFuture與 another 的結果結合起來CompletableFuture

Supplier newsSupplier = () -> NewsService.getMessage();
        
CompletableFuture<String> reader = CompletableFuture.supplyAsync(newsSupplier);
CompletableFuture.completedFuture("!!")
				 .thenCombine(reader, (a, b) -> b + a)
				 .thenAccept(result -> System.out.println(result))
				 .get();
請注意,默認情況下線程是守護線程,因此為了清楚起見,我們使用get()等待結果。我們不僅可以合併CompletableFutures,還可以返回一個CompletableFuture

CompletableFuture.completedFuture(2L)
				.thenCompose((val) -> CompletableFuture.completedFuture(val + 2))
                               .thenAccept(result -> System.out.println(result));
在這裡我想指出,該CompletableFuture.completedFuture()方法是為簡潔起見而使用的。此方法不會創建新線程,因此管道的其餘部分將在completedFuture調用的同一線程上執行。還有一個thenAcceptBoth()方法。它與 非常相似accept(),但如果thenAccept()接受 a ConsumerthenAcceptBoth()則接受另一個CompletableStage+BiConsumer作為輸入,即 aconsumer需要 2 個源而不是一個。名稱中包含單詞“Either”的方法還提供了另一個有趣的功能: 更好的結合:Java 和 Thread 類。 第四部分 — Callable、Future 和朋友 - 3這些方法接受一個替代方法並在最先執行的方法CompletableStage上執行。CompletableStage最後,我想用另一個有趣的特性來結束這篇評論CompletableFuture:錯誤處理。

CompletableFuture.completedFuture(2L)
				 .thenApply((a) -> {
					throw new IllegalStateException("error");
				 }).thenApply((a) -> 3L)
				 //.exceptionally(ex -> 0L)
				 .thenAccept(val -> System.out.println(val));
這段代碼什麼都不做,因為會有一個異常,其他什麼也不會發生。但是通過取消註釋“exceptionally”語句,我們定義了預期的行為。說到CompletableFuture,我也推薦你看下面的視頻: 以我的拙見,這些是互聯網上最具解釋性的視頻之一。他們應該清楚這一切是如何運作的,我們有什麼工具包,以及為什麼需要所有這些。

結論

希望現在您已經清楚如何使用線程在完成計算後進行計算。附加材料: 更好的結合:Java 和 Thread 類。第 I 部分 — 執行的線程 更好地結合:Java 和 Thread 類。第二部分 — 同步 更好地結合:Java 和 Thread 類。第 III 部分 — 更好地交互:Java 和 Thread 類。第五部分 — Executor、ThreadPool、Fork/Join 更好地結合在一起:Java 和 Thread 類。第六部分——開火!
留言
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION