Wstęp
Wiemy więc, że Java ma wątki. Można o tym przeczytać w recenzji zatytułowanej Better together: Java and the Thread class. Część I — Wątki egzekucyjne .
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Jak widać, kod uruchamiający zadanie jest dość typowy, ale musimy go powtórzyć dla nowego zadania. Jednym z rozwiązań jest umieszczenie go w osobnej metodzie, np execute(Runnable runnable)
. . Ale twórcy Javy wzięli pod uwagę naszą trudną sytuację i wymyślili interfejs Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Ten kod jest wyraźnie bardziej zwięzły: teraz po prostu piszemy kod uruchamiający Runnable
wątek. To wspaniale, prawda? Ale to dopiero początek: 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
interfejs ma ExecutorService
podinterfejs. Javadoc dla tego interfejsu mówi, że an ExecutorService
opisuje konkret Executor
, który zapewnia metody zamykania Executor
. Umożliwia również uzyskanie java.util.concurrent.Future
w celu śledzenia procesu realizacji. Wcześniej w Better together: Java i klasie Thread. Część IV — Callable, Future i friends , pokrótce omówiliśmy możliwości platformy Future
. Jeśli zapomniałeś lub nigdy go nie przeczytałeś, sugeruję odświeżenie pamięci ;) Co jeszcze mówi Javadoc? Mówi nam, że mamy specjalną java.util.concurrent.Executors
fabrykę, która pozwala nam tworzyć domyślne implementacje ExecutorService
.
Usługa Wykonawcy
Przejrzyjmy. MusimyExecutor
wykonać (tzn. wywołać execute()
) określone zadanie na wątku, a kod tworzący wątek jest przed nami ukryty. Mamy ExecutorService
— specyfik Executor
, który ma kilka opcji kontrolowania postępów. I mamy Executors
fabrykę, która pozwala nam tworzyć ExecutorService
. Teraz zróbmy to sami:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Widać, że określiliśmy stałą pulę wątków, której rozmiar to 2. Następnie przesyłamy zadania do puli jedno po drugim. Każde zadanie zwraca wiadomość String
zawierającą nazwę wątku ( currentThread().GetName()
). Ważne jest, aby zamknąć program ExecutorService
na samym końcu, ponieważ inaczej nasz program się nie skończy. Fabryka Executors
ma dodatkowe metody fabryczne. Na przykład możemy utworzyć pulę składającą się tylko z jednego wątku ( newSingleThreadExecutor
) lub pulę zawierającą pamięć podręczną ( newCachedThreadPool
), z której usuwane są wątki po 1 minucie bezczynności. W rzeczywistości ExecutorService
są one wspierane przez kolejkę blokującą , w której umieszczane są zadania i z której zadania są wykonywane. Więcej informacji na temat blokowania kolejek można znaleźć w tym filmie . Możesz to również przeczytaćopinia o BlockingQueue . I sprawdź odpowiedź na pytanie „Kiedy preferować LinkedBlockingQueue zamiast ArrayBlockingQueue?” Mówiąc najprościej, a BlockingQueue
blokuje wątek w dwóch przypadkach:
- wątek próbuje pobrać elementy z pustej kolejki
- wątek próbuje umieścić elementy w pełnej kolejce
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
Lub
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Jak widać, implementacje ExecutorService
są tworzone wewnątrz metod fabrycznych. I w większości mówimy o ThreadPoolExecutor
. Zmieniają się tylko parametry wpływające na pracę. 
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Jak widzieliśmy wcześniej,ThreadPoolExecutor
jest to, co zwykle powstaje wewnątrz metod fabrycznych. Na funkcjonalność wpływają argumenty, które przekazujemy jako maksymalną i minimalną liczbę wątków, a także rodzaj używanej kolejki. java.util.concurrent.BlockingQueue
Ale można użyć dowolnej implementacji interfejsu . Mówiąc o ThreadPoolExecutor
, powinniśmy wspomnieć o kilku ciekawych funkcjach. Na przykład nie możesz przesyłać zadań do, ThreadPoolExecutor
jeśli nie ma dostępnego miejsca:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Ten kod ulegnie awarii z takim błędem:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Innymi słowy, task
nie można go złożyć, ponieważ SynchronousQueue
jest tak zaprojektowany, że faktycznie składa się z jednego elementu i nie pozwala na dodanie do niego niczego więcej. Widzimy, że mamy queued tasks
tutaj zero („zadań w kolejce = 0”). Ale nie ma w tym nic dziwnego, ponieważ jest to szczególna cecha SynchronousQueue
, która w rzeczywistości jest 1-elementową kolejką, która jest zawsze pusta! Gdy jeden wątek umieści element w kolejce, będzie czekał, aż inny wątek pobierze element z kolejki. W związku z tym możemy go zastąpić, new LinkedBlockingQueue<>(1)
a błąd zmieni się na teraz pokaż queued tasks = 1
. Ponieważ kolejka składa się tylko z 1 elementu, nie możemy dodać drugiego elementu. I to jest przyczyną niepowodzenia programu. Kontynuując naszą dyskusję o kolejce, warto zauważyć, żeThreadPoolExecutor
class ma dodatkowe metody obsługi kolejki. Na przykład threadPoolExecutor.purge()
metoda usunie wszystkie anulowane zadania z kolejki, aby zwolnić miejsce w kolejce. Inną interesującą funkcją związaną z kolejką jest procedura obsługi odrzuconych zadań:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
W tym przykładzie nasz program obsługi po prostu wyświetla Rejected
za każdym razem, gdy zadanie w kolejce zostanie odrzucone. Wygodne, prawda? Ponadto ThreadPoolExecutor
ma interesującą podklasę: ScheduledThreadPoolExecutor
, która jest ScheduledExecutorService
. Zapewnia możliwość wykonania zadania w oparciu o timer.
ScheduledExecutorService
ScheduledExecutorService
(który jest rodzajem ExecutorService
) pozwala nam uruchamiać zadania zgodnie z harmonogramem. Spójrzmy na przykład:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Tutaj wszystko jest proste. Zadania są przesyłane, a następnie otrzymujemy plik java.util.concurrent.ScheduledFuture
. Harmonogram może być również pomocny w następującej sytuacji:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Tutaj przekazujemy Runnable
zadanie do wykonania ze stałą częstotliwością („FixedRate”) z pewnym początkowym opóźnieniem. W takim przypadku po 1 sekundzie zadanie zacznie być wykonywane co 2 sekundy. Jest podobna opcja:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Ale w tym przypadku zadania są wykonywane w określonych odstępach czasu POMIĘDZY każdym wykonaniem. Oznacza to, że task
zostanie wykonany po 1 sekundzie. Następnie, gdy tylko zostanie ukończone, miną 2 sekundy, a następnie rozpocznie się nowe zadanie. Oto kilka dodatkowych zasobów na ten temat:
- Wprowadzenie do pul wątków w Javie
- Wprowadzenie do pul wątków w Javie
- Java Multithreading Steeplechase: Anulowanie zadań w executorach
- Używanie executorów Java do zadań w tle

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
Pula kradzieży pracy
Oprócz powyższych pul wątków istnieje jeszcze jedna. Możemy szczerze powiedzieć, że jest trochę wyjątkowy. Nazywa się to pulą kradnącą pracę. Krótko mówiąc, kradzież pracy to algorytm, w którym bezczynne wątki zaczynają pobierać zadania z innych wątków lub zadania z kolejki współdzielonej. Spójrzmy na przykład:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Jeśli uruchomimy ten kod, to ExecutorService
utworzy nam 5 wątków, ponieważ każdy wątek zostanie umieszczony w kolejce oczekiwania na obiekt blokady. Opracowaliśmy już monitory i blokady w Better together: Java i klasa Thread. Część II — Synchronizacja . Teraz zamieńmy Executors.newCachedThreadPool()
na Executors.newWorkStealingPool()
. Co się zmieni? Zobaczymy, że nasze zadania są wykonywane na mniej niż 5 wątkach. Pamiętaj, że CachedThreadPool
tworzy wątek dla każdego zadania? To dlatego wait()
, że wątek został zablokowany, kolejne zadania chcą się realizować, aw puli zostały utworzone dla nich nowe wątki. Dzięki puli kradzieży wątki nie pozostają bezczynne na zawsze. Zaczynają wykonywać zadania swoich sąsiadów. Co WorkStealingPool
tak bardzo różni się od innych pul wątków? Fakt, że magiczneForkJoinPool
mieszka w nim:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Właściwie jest jeszcze jedna różnica. Domyślnie wątki utworzone dla a ForkJoinPool
są wątkami demonów, w przeciwieństwie do wątków utworzonych przez onrdinary ThreadPool
. Ogólnie rzecz biorąc, powinieneś pamiętać o wątkach demonów, ponieważ na przykład CompletableFuture
używa również wątków demonów, chyba że określisz swój własny ThreadFactory
, który tworzy wątki inne niż demony. Oto niespodzianki, które mogą czaić się w nieoczekiwanych miejscach! :)
Widelec Dołącz do puli
W tej części ponownie porozmawiamy oForkJoinPool
(nazywanym również frameworkiem fork/join), który żyje „pod maską” WorkStealingPool
. Ogólnie rzecz biorąc, framework fork/join pojawił się w Javie 1.7. I chociaż Java 11 jest już blisko, warto o niej pamiętać. Nie jest to najczęstsza implementacja, ale jest dość interesująca. Jest dobra recenzja na ten temat w Internecie: Zrozumienie Java Fork-Join Framework z przykładami . Opiera ForkJoinPool
się na java.util.concurrent.RecursiveTask
. Jest też java.util.concurrent.RecursiveAction
. RecursiveAction
nie zwraca wyniku. Zatem RecursiveTask
jest podobny do Callable
i RecursiveAction
jest podobny do unnable
. Widzimy, że nazwa zawiera nazwy dwóch ważnych metod: fork
i join
. Thefork
Metoda uruchamia niektóre zadania asynchronicznie w osobnym wątku. A join
metoda pozwala czekać na wykonanie pracy. Aby uzyskać najlepsze zrozumienie, należy przeczytać Od programowania imperatywnego do rozwidlenia/łączenia do strumieni równoległych w Javie 8 .
Streszczenie
Cóż, na tym kończy się ta część recenzji. Dowiedzieliśmy się, żeExecutor
został pierwotnie wymyślony do wykonywania wątków. Następnie twórcy Javy postanowili kontynuować ten pomysł i wymyślili ExecutorService
. ExecutorService
pozwala nam przesyłać zadania do wykonania za pomocą submit()
i invoke()
, a także zamykać usługę. Ponieważ ExecutorService
wymaga implementacji, napisali klasę z metodami fabrycznymi i nazwali ją Executors
. Pozwala tworzyć pule wątków ( ThreadPoolExecutor
). Dodatkowo istnieją pule wątków, które pozwalają nam również określić harmonogram wykonywania. A ForkJoinPool
ukrywa się za WorkStealingPool
. Mam nadzieję, że to, co napisałem powyżej, było dla Ciebie nie tylko interesujące, ale również zrozumiałe :) Zawsze cieszę się z Twoich sugestii i komentarzy. Razem lepiej: Java i klasa Thread. Część I — Wątki wykonania Lepiej razem: Java i klasa Thread. Część II — Synchronizacja Lepiej razem: Java i klasa Thread. Część III — Interakcja Lepiej razem: Java i klasa Thread. Część IV — Callable, Future i friends Razem lepiej: Java i klasa Thread. Część VI — Odpalaj!
GO TO FULL VERSION