Einführung
Wir wissen also, dass Java Threads hat. Darüber können Sie in der Rezension mit dem Titel „ Besser zusammen: Java und die Thread-Klasse“ nachlesen. Teil I – Threads zur Ausführung . Schauen wir uns den typischen Code noch einmal an:
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Wie Sie sehen, ist der Code zum Starten einer Aufgabe ziemlich typisch, aber wir müssen ihn für neue Aufgaben wiederholen. Eine Lösung besteht darin, es in eine separate Methode einzufügen, z. B. execute(Runnable runnable)
. Aber die Entwickler von Java haben unsere Notlage berücksichtigt und die Schnittstelle entwickelt Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Dieser Code ist deutlich prägnanter: Jetzt schreiben wir einfach Code, um den Runnable
Thread zu starten. Das ist großartig, nicht wahr? Aber das ist erst der Anfang:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
verfügt die Schnittstelle über eine ExecutorService
Unterschnittstelle. Im Javadoc für diese Schnittstelle heißt es, dass an ExecutorService
eine bestimmte Methode beschrieben wird Executor
, die Methoden zum Herunterfahren der . bereitstellt Executor
. Außerdem ist es möglich, eine Datei zu erhalten, java.util.concurrent.Future
um den Ausführungsprozess zu verfolgen. Zuvor in Better Together: Java und die Thread-Klasse. In Teil IV – Callable, Future und Freunde haben wir kurz die Fähigkeiten von besprochen Future
. Wenn Sie es vergessen oder nie gelesen haben, empfehle ich Ihnen, Ihr Gedächtnis aufzufrischen ;) Was sagt das Javadoc sonst noch? Es sagt uns, dass wir über eine spezielle java.util.concurrent.Executors
Factory verfügen, mit der wir Standardimplementierungen von erstellen können ExecutorService
.
ExecutorService
Lassen Sie uns einen Rückblick geben. Wir müssen eine bestimmte Aufgabe in einem ThreadExecutor
ausführen (dh aufrufen execute()
), und der Code, der den Thread erstellt, bleibt uns verborgen. Wir haben ExecutorService
– ein Spezifisches Executor
, das mehrere Optionen zur Steuerung des Fortschritts bietet. Und wir haben die Executors
Fabrik, mit der wir eine erstellen können ExecutorService
. Jetzt machen wir es selbst:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Sie können sehen, dass wir einen festen Thread-Pool mit einer Größe von 2 angegeben haben. Anschließend senden wir Aufgaben nacheinander an den Pool. Jede Aufgabe gibt eine zurück, die String
den Thread-Namen enthält ( currentThread().GetName()
). ExecutorService
Es ist wichtig, den ganz am Ende herunterzufahren , da sonst unser Programm nicht beendet wird. Die Executors
Fabrik verfügt über zusätzliche Fabrikmethoden. Beispielsweise können wir einen Pool erstellen, der nur aus einem Thread besteht ( newSingleThreadExecutor
), oder einen Pool, der einen Cache enthält ( newCachedThreadPool
), aus dem Threads entfernt werden, nachdem sie eine Minute lang inaktiv waren. In Wirklichkeit werden diese ExecutorService
durch eine Blockierungswarteschlange unterstützt , in die Aufgaben gestellt und von der aus Aufgaben ausgeführt werden. Weitere Informationen zum Blockieren von Warteschlangen finden Sie in diesem Video . Sie können dies auch lesenRezension zu BlockingQueue . Und schauen Sie sich die Antwort auf die Frage „Wann ist LinkedBlockingQueue dem ArrayBlockingQueue vorzuziehen?“ an. Vereinfacht ausgedrückt BlockingQueue
blockiert a einen Thread in zwei Fällen:
- Der Thread versucht, Elemente aus einer leeren Warteschlange abzurufen
- Der Thread versucht, Elemente in eine volle Warteschlange zu stellen
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
oder
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Wie wir sehen können, werden Implementierungen von ExecutorService
innerhalb der Factory-Methoden erstellt. Und größtenteils reden wir darüber ThreadPoolExecutor
. Es werden nur die Parameter geändert, die die Arbeit beeinflussen.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Wie wir bereits gesehen haben,ThreadPoolExecutor
wird dies normalerweise innerhalb der Factory-Methoden erstellt. Die Funktionalität wird durch die Argumente beeinflusst, die wir als maximale und minimale Anzahl von Threads übergeben, sowie durch den verwendeten Warteschlangentyp. Es kann jedoch jede Implementierung der java.util.concurrent.BlockingQueue
Schnittstelle verwendet werden. Apropos ThreadPoolExecutor
: Wir sollten einige interessante Funktionen erwähnen. Beispielsweise können Sie keine Aufgaben an senden, ThreadPoolExecutor
wenn kein Speicherplatz verfügbar ist:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Dieser Code stürzt mit einem Fehler wie diesem ab:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Mit anderen Worten, task
kann nicht eingereicht werden, da SynchronousQueue
es so konzipiert ist, dass es tatsächlich aus einem einzigen Element besteht und es uns nicht erlaubt, mehr hineinzufügen. Wir können sehen, dass wir queued tasks
hier Null haben („Queued Tasks = 0“). Daran ist aber nichts Seltsames, denn das ist eine Besonderheit von SynchronousQueue
, bei der es sich tatsächlich um eine 1-Element-Warteschlange handelt, die immer leer ist! Wenn ein Thread ein Element in die Warteschlange stellt, wartet er, bis ein anderer Thread das Element aus der Warteschlange übernimmt. Dementsprechend können wir es durch ersetzen new LinkedBlockingQueue<>(1)
und der Fehler wird in now show geändert queued tasks = 1
. Da die Warteschlange nur aus einem Element besteht, können wir kein zweites Element hinzufügen. Und genau das führt zum Scheitern des Programms. Wenn wir unsere Diskussion über die Warteschlange fortsetzen, ist es erwähnenswert, dass dieThreadPoolExecutor
Die Klasse verfügt über zusätzliche Methoden zum Bedienen der Warteschlange. Die Methode entfernt beispielsweise threadPoolExecutor.purge()
alle abgebrochenen Aufgaben aus der Warteschlange, um Platz in der Warteschlange freizugeben. Eine weitere interessante warteschlangenbezogene Funktion ist der Handler für abgelehnte Aufgaben:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
In diesem Beispiel zeigt unser Handler einfach Rejected
jedes Mal an, wenn eine Aufgabe in der Warteschlange abgelehnt wird. Praktisch, nicht wahr? Darüber hinaus ThreadPoolExecutor
gibt es eine interessante Unterklasse: ScheduledThreadPoolExecutor
, eine ScheduledExecutorService
. Es bietet die Möglichkeit, eine Aufgabe basierend auf einem Timer auszuführen.
ScheduledExecutorService
ScheduledExecutorService
(was eine Art von ist ExecutorService
) ermöglicht es uns, Aufgaben nach einem Zeitplan auszuführen. Schauen wir uns ein Beispiel an:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Hier ist alles einfach. Die Aufgaben werden eingereicht und dann bekommen wir eine java.util.concurrent.ScheduledFuture
. Ein Zeitplan kann auch in der folgenden Situation hilfreich sein:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Hier übermitteln wir eine Runnable
Aufgabe zur Ausführung in einer festen Häufigkeit („FixedRate“) mit einer bestimmten Anfangsverzögerung. In diesem Fall wird die Aufgabe nach 1 Sekunde alle 2 Sekunden ausgeführt. Es gibt eine ähnliche Option:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
In diesem Fall werden die Aufgaben jedoch mit einem bestimmten Intervall ZWISCHEN jeder Ausführung ausgeführt. Das heißt, der Befehl task
wird nach 1 Sekunde ausgeführt. Sobald es abgeschlossen ist, vergehen 2 Sekunden und dann wird eine neue Aufgabe gestartet. Hier sind einige zusätzliche Ressourcen zu diesem Thema:
- Eine Einführung in Thread-Pools in Java
- Einführung in Thread-Pools in Java
- Java Multithreading Steeplechase: Abbrechen von Aufgaben in Executors
- Verwendung von Java-Executoren für Hintergrundaufgaben
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Zusätzlich zu den oben genannten Thread-Pools gibt es noch einen weiteren. Wir können ehrlich sagen, dass es etwas Besonderes ist. Man nennt es einen arbeitsraubenden Pool. Kurz gesagt, Work-Stealing ist ein Algorithmus, bei dem inaktive Threads damit beginnen, Aufgaben von anderen Threads oder Aufgaben aus einer gemeinsam genutzten Warteschlange zu übernehmen. Schauen wir uns ein Beispiel an:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Wenn wir diesen Code ausführen, werden ExecutorService
5 Threads für uns erstellt, da jeder Thread in die Warteschlange für das Sperrobjekt gestellt wird. Wir haben die Monitore und Sperren bereits besser zusammen herausgefunden : Java und die Thread-Klasse. Teil II – Synchronisierung . Jetzt ersetzen wir Executors.newCachedThreadPool()
durch Executors.newWorkStealingPool()
. Was wird sich ändern? Wir werden sehen, dass unsere Aufgaben in weniger als 5 Threads ausgeführt werden. Denken Sie daran, dass CachedThreadPool
für jede Aufgabe ein Thread erstellt wird? Das liegt daran, dass wait()
der Thread blockiert wurde, nachfolgende Aufgaben abgeschlossen werden müssen und für sie neue Threads im Pool erstellt wurden. Bei einem Diebstahlpool bleiben Threads nicht ewig untätig. Sie beginnen, die Aufgaben ihrer Nachbarn zu erledigen. Was unterscheidet einen WorkStealingPool
so von anderen Thread-Pools? Die Tatsache, dass das MagischeForkJoinPool
lebt darin:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Tatsächlich gibt es noch einen weiteren Unterschied. Standardmäßig sind die für a erstellten Threads ForkJoinPool
Daemon-Threads, im Gegensatz zu den Threads, die über onrdinary erstellt wurden ThreadPool
. Im Allgemeinen sollten Sie sich Daemon-Threads merken, da beispielsweise CompletableFuture
auch Daemon-Threads verwendet werden, es sei denn, Sie geben einen eigenen Thread an ThreadFactory
, der Nicht-Daemon-Threads erstellt. Das sind die Überraschungen, die an unerwarteten Orten lauern können! :) :)
ForkJoinPool
In diesem Teil werden wir noch einmal überForkJoinPool
(auch Fork/Join-Framework genannt) sprechen, das „unter der Haube“ von WorkStealingPool
. Im Allgemeinen erschien das Fork/Join-Framework bereits in Java 1.7. Und auch wenn Java 11 in greifbarer Nähe ist, lohnt es sich dennoch, daran zu denken. Dies ist nicht die häufigste Implementierung, aber sie ist sehr interessant. Es gibt eine gute Rezension dazu im Internet: Understanding Java Fork-Join Framework with Examples . Das ForkJoinPool
beruht auf java.util.concurrent.RecursiveTask
. Es gibt auch java.util.concurrent.RecursiveAction
. RecursiveAction
gibt kein Ergebnis zurück. Daher RecursiveTask
ähnelt es Callable
, und RecursiveAction
ähnelt unnable
. Wir können sehen, dass der Name die Namen von zwei wichtigen Methoden enthält: fork
und join
. Derfork
Die Methode startet eine Aufgabe asynchron in einem separaten Thread. Und mit dieser join
Methode können Sie warten, bis die Arbeit erledigt ist. Um das beste Verständnis zu erhalten, sollten Sie „ Von imperativer Programmierung über Fork/Join bis hin zu parallelen Streams in Java 8“ lesen .
Zusammenfassung
Damit ist dieser Teil der Rezension abgeschlossen. Wir haben erfahren, dassExecutor
es ursprünglich zur Ausführung von Threads erfunden wurde. Dann beschlossen die Entwickler von Java, die Idee fortzusetzen und entwickelten ExecutorService
. ExecutorService
ermöglicht es uns, Aufgaben mit submit()
und zur Ausführung zu übermitteln invoke()
und den Dienst auch herunterzufahren. Da ExecutorService
Implementierungen erforderlich sind, haben sie eine Klasse mit Factory-Methoden geschrieben und sie genannt Executors
. Damit können Sie Thread-Pools erstellen ( ThreadPoolExecutor
). Darüber hinaus gibt es Thread-Pools, mit denen wir auch einen Ausführungsplan festlegen können. Und ein ForkJoinPool
versteckt sich hinter einem WorkStealingPool
. Ich hoffe, Sie fanden das, was ich oben geschrieben habe, nicht nur interessant, sondern auch verständlich :) Ich freue mich immer über Ihre Anregungen und Kommentare. Besser zusammen: Java und die Thread-Klasse. Teil I – Ausführungsthreads Gemeinsam besser: Java und die Thread-Klasse. Teil II – Synchronisierung Gemeinsam besser: Java und die Thread-Klasse. Teil III – Gemeinsam besser interagieren: Java und die Thread-Klasse. Teil IV – Callable, Future und Freunde Gemeinsam besser: Java und die Thread-Klasse. Teil VI – Feuer los!
GO TO FULL VERSION