CodeGym /Java 博客 /随机的 /更好的结合:Java 和 Thread 类。第四部分 — Callable、Future 和朋友
John Squirrels
第 41 级
San Francisco

更好的结合:Java 和 Thread 类。第四部分 — Callable、Future 和朋友

已在 随机的 群组中发布

介绍

第 I 部分中,我们回顾了线程是如何创建的。让我们再回忆一次。 更好的结合:Java 和 Thread 类。 第四部分 — Callable、Future 和朋友 - 1线程由 Thread 类表示,它的run()方法被调用。因此,让我们使用Tutorialspoint 在线 Java 编译器并执行以下代码:

public class HelloWorld {
    
    public static void main(String[] args) {
        Runnable task = () -> {
            System.out.println("Hello World");
        };
        new Thread(task).start();
    }
}
这是在线程上启动任务的唯一选择吗?

java.util.concurrent.Callable

原来java.lang.Runnable有一个兄弟叫java.util.concurrent.Callable,他是在Java 1.5中诞生的。有什么区别?如果您仔细查看此接口的 Javadoc,我们会发现,与 不同的是Runnable,新接口声明了一个call()返回结果的方法。此外,它默认抛出 Exception。也就是说,它使我们不必try-catch为已检查的异常而阻塞。不错,对吧?现在我们有一个新任务而不是Runnable

Callable task = () -> {
	return "Hello, World!";
};
但是我们用它做什么呢?为什么我们需要在返回结果的线程上运行任务?显然,对于将来执行的任何操作,我们都希望在将来收到这些操作的结果。我们有一个具有相应名称的接口:java.util.concurrent.Future

java.util.concurrent.Future

java.util.concurrent.Future接口定义了一个 API,用于处理我们计划在未来接收其结果任务:获取结果的方法和检查状态的方法。关于Future,我们感兴趣的是它在java.util.concurrent.FutureTask类中的实现。这是将在 中执行的“任务” Future。让这个实现更有趣的是它还实现了 Runnable。您可以认为这是在线程上处理任务的旧模型和新模型(在 Java 1.5 中出现的意义上的新模型)之间的一种适配器。这是一个例子:

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class HelloWorld {
    
    public static void main(String[] args) throws Exception {
        Callable task = () -> {
            return "Hello, World!";
        };
        FutureTask<String> future = new FutureTask<>(task);
        new Thread(future).start();
        System.out.println(future.get());
    }
}
从示例中可以看出,我们使用get方法从任务中获取结果。 笔记:当您使用该方法获得结果时get(),执行变得同步!你认为这里会用到什么机制?没错,没有同步块。这就是为什么我们不会在 JVisualVM 中看到WAITING作为monitoror wait,而是作为熟悉的park()方法(因为LockSupport正在使用该机制)。

功能接口

接下来,我们将讨论 Java 1.8 中的类,因此我们最好提供一个简短的介绍。看下面的代码:

Supplier<String> supplier = new Supplier<String>() {
	@Override
	public String get() {
		return "String";
	}
};
Consumer<String> consumer = new Consumer<String>() {
	@Override
	public void accept(String s) {
		System.out.println(s);
	}
};
Function<String, Integer> converter = new Function<String, Integer>() {
	@Override
	public Integer apply(String s) {
		return Integer.valueOf(s);
	}
};
很多很多额外的代码,你不觉得吗?每个声明的类执行一个功能,但我们使用一堆额外的支持代码来定义它。这就是 Java 开发人员的想法。因此,他们引入了一组“功能接口”(@FunctionalInterface),并决定现在 Java 自己来做“思考”,只留下重要的事情让我们担心:

Supplier<String> supplier = () -> "String";
Consumer<String> consumer = s -> System.out.println(s);
Function<String, Integer> converter = s -> Integer.valueOf(s);
Supplier用品。它没有参数,但会返回一些东西。这就是它供应东西的方式。一个Consumer消费。它需要一些东西作为输入(一个参数)并用它做一些事情。参数是它消耗的东西。那么我们也有Function。它接受输入(参数),做一些事情,然后返回一些东西。您可以看到我们正在积极使用泛型。如果您不确定,可以通过阅读“ Java 中的泛型:如何在实践中使用尖括号” 来复习一下。

可完成的未来

CompletableFuture时间过去了, Java 1.8 中出现了一个名为的新类。它实现了Future接口,即以后我们的任务完成,我们可以调用get()得到结果。但它也实现了CompletionStage接口。顾名思义:这是一组计算的某个阶段。可以在此处的评论中找到对该主题的简要介绍:CompletionStage 和 CompletableFuture 简介。让我们开门见山。让我们看一下可帮助我们入门的可用静态方法列表: 更好的结合:Java 和 Thread 类。 第四部分 — Callable、Future 和朋友 - 2以下是使用它们的选项:

import java.util.concurrent.CompletableFuture;
public class App {
    public static void main(String[] args) throws Exception {
        // A CompletableFuture that already contains a Result
        CompletableFuture<String> completed;
        completed = CompletableFuture.completedFuture("Just a value");
        // A CompletableFuture that runs a new thread from Runnable. That's why it's Void
        CompletableFuture<Void> voidCompletableFuture;
        voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("run " + Thread.currentThread().getName());
        });
        // A CompletableFuture that starts a new thread whose result we'll get from a Supplier 
        CompletableFuture<String> supplier;
        supplier = CompletableFuture.supplyAsync(() -> {
            System.out.println("supply " + Thread.currentThread().getName());
            return "Value";
        });
    }
}
如果我们执行此代码,我们将看到创建一个CompletableFuture还涉及启动整个管道。因此,与 Java8 中的 SteamAPI 有一定的相似性,这就是我们发现这些方法之间的区别的地方。例如:

List<String> array = Arrays.asList("one", "two");
Stream<String> stringStream = array.stream().map(value -> {
	System.out.println("Executed");
	return value.toUpperCase();
});
这是 Java 8 的 Stream API 的示例。如果运行此代码,您会看到“已执行”不会显示。换句话说,在 Java 中创建流时,流不会立即启动。相反,它等待有人想要从它那里获得价值。但是CompletableFuture立即开始执行管道,而不是等待有人向它询问一个值。我认为理解这一点很重要。所以,我们有一个CompletableFuture. 我们如何制作管道(或链)以及我们有什么机制?回想一下我们之前写过的那些函数式接口。
  • 我们有一个Function接受 A 并返回 B 的 a。它只有一个方法:apply()
  • 我们有一个Consumer接受 A 并且不返回任何内容的 a (Void)。它只有一个方法:accept().
  • 我们有Runnable,它在线程上运行,不接受任何东西,也不返回任何东西。它只有一个方法:run().
接下来要记住的是在其工作中CompletableFuture使用Runnable, Consumers, 和Functions。因此,您始终可以知道您可以执行以下操作CompletableFuture

public static void main(String[] args) throws Exception {
        AtomicLong longValue = new AtomicLong(0);
        Runnable task = () -> longValue.set(new Date().getTime());
        Function<Long, Date> dateConverter = (longvalue) -> new Date(longvalue);
        Consumer<Date> printer = date -> {
            System.out.println(date);
            System.out.flush();
        };
        // CompletableFuture computation
        CompletableFuture.runAsync(task)
                         .thenApply((v) -> longValue.get())
                         .thenApply(dateConverter)
                         .thenAccept(printer);
}
、和方法具有thenRun()“异步”版本。这意味着这些阶段将在不同的线程上完成。这个线程将从一个特殊的池中获取——所以我们不会提前知道它是新线程还是旧线程。这完全取决于任务的计算密集程度。除了这些方法之外,还有三种更有趣的可能性。为了清楚起见,让我们假设我们有一个特定的服务从某个地方接收某种消息——这需要时间: thenApply()thenAccept()

public static class NewsService {
	public static String getMessage() {
		try {
			Thread.currentThread().sleep(3000);
			return "Message";
		} catch (InterruptedException e) {
			throw new IllegalStateException(e);
		}
	}
}
现在,让我们来看看CompletableFuture提供的其他能力。我们可以将 a 的结果CompletableFuture与 another 的结果结合起来CompletableFuture

Supplier newsSupplier = () -> NewsService.getMessage();
        
CompletableFuture<String> reader = CompletableFuture.supplyAsync(newsSupplier);
CompletableFuture.completedFuture("!!")
				 .thenCombine(reader, (a, b) -> b + a)
				 .thenAccept(result -> System.out.println(result))
				 .get();
请注意,默认情况下线程是守护线程,因此为了清楚起见,我们使用get()等待结果。我们不仅可以合并CompletableFutures,还可以返回一个CompletableFuture

CompletableFuture.completedFuture(2L)
				.thenCompose((val) -> CompletableFuture.completedFuture(val + 2))
                               .thenAccept(result -> System.out.println(result));
在这里我想指出,该CompletableFuture.completedFuture()方法是为简洁起见而使用的。此方法不会创建新线程,因此管道的其余部分将在completedFuture调用的同一线程上执行。还有一个thenAcceptBoth()方法。它与 非常相似accept(),但如果thenAccept()接受 a ConsumerthenAcceptBoth()则接受另一个CompletableStage+BiConsumer作为输入,即 aconsumer需要 2 个源而不是一个。名称中包含单词“Either”的方法还提供了另一个有趣的功能: 更好的结合:Java 和 Thread 类。 第四部分 — Callable、Future 和朋友 - 3这些方法接受一个替代方法并在最先执行的方法CompletableStage上执行。CompletableStage最后,我想用另一个有趣的特性来结束这篇评论CompletableFuture:错误处理。

CompletableFuture.completedFuture(2L)
				 .thenApply((a) -> {
					throw new IllegalStateException("error");
				 }).thenApply((a) -> 3L)
				 //.exceptionally(ex -> 0L)
				 .thenAccept(val -> System.out.println(val));
这段代码什么都不做,因为会有一个异常,其他什么也不会发生。但是通过取消注释“exceptionally”语句,我们定义了预期的行为。说到CompletableFuture,我也推荐你看下面的视频: 以我的拙见,这些是互联网上最具解释性的视频之一。他们应该清楚这一切是如何运作的,我们有什么工具包,以及为什么需要所有这些。

结论

希望现在您已经清楚如何使用线程在完成计算后进行计算。附加材料: 更好的结合:Java 和 Thread 类。第 I 部分 — 执行的线程 更好地结合:Java 和 Thread 类。第二部分 — 同步 更好地结合:Java 和 Thread 类。第 III 部分 — 更好地交互:Java 和 Thread 类。第五部分 — Executor、ThreadPool、Fork/Join 更好地结合在一起:Java 和 Thread 类。第六部分——开火!
评论
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION