CodeGym/Blog Java/Random-PL/Razem lepiej: Java i klasa Thread. Część V — Executor, Th...
John Squirrels
Poziom 41
San Francisco

Razem lepiej: Java i klasa Thread. Część V — Executor, ThreadPool, Fork/Join

Opublikowano w grupie Random-PL

Wstęp

Wiemy więc, że Java ma wątki. Można o tym przeczytać w recenzji zatytułowanej Better together: Java and the Thread class. Część I — Wątki egzekucyjne . Razem lepiej: Java i klasa Thread.  Część V — Executor, Pula wątków, Fork/Join — 1Przyjrzyjmy się jeszcze raz typowemu kodowi:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Jak widać, kod uruchamiający zadanie jest dość typowy, ale musimy go powtórzyć dla nowego zadania. Jednym z rozwiązań jest umieszczenie go w osobnej metodzie, np execute(Runnable runnable). . Ale twórcy Javy wzięli pod uwagę naszą trudną sytuację i wymyślili interfejs Executor:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Ten kod jest wyraźnie bardziej zwięzły: teraz po prostu piszemy kod uruchamiający Runnablewątek. To wspaniale, prawda? Ale to dopiero początek: Razem lepiej: Java i klasa Thread.  Część V — Executor, Pula wątków, Fork/Join — 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Jak widać, Executorinterfejs ma ExecutorServicepodinterfejs. Javadoc dla tego interfejsu mówi, że an ExecutorServiceopisuje konkret Executor, który zapewnia metody zamykania Executor. Umożliwia również uzyskanie java.util.concurrent.Futurew celu śledzenia procesu realizacji. Wcześniej w Better together: Java i klasie Thread. Część IV — Callable, Future i friends , pokrótce omówiliśmy możliwości platformy Future. Jeśli zapomniałeś lub nigdy go nie przeczytałeś, sugeruję odświeżenie pamięci ;) Co jeszcze mówi Javadoc? Mówi nam, że mamy specjalną java.util.concurrent.Executorsfabrykę, która pozwala nam tworzyć domyślne implementacje ExecutorService.

Usługa Wykonawcy

Przejrzyjmy. Musimy Executorwykonać (tzn. wywołać execute()) określone zadanie na wątku, a kod tworzący wątek jest przed nami ukryty. Mamy ExecutorService— specyfik Executor, który ma kilka opcji kontrolowania postępów. I mamy Executorsfabrykę, która pozwala nam tworzyć ExecutorService. Teraz zróbmy to sami:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Widać, że określiliśmy stałą pulę wątków, której rozmiar to 2. Następnie przesyłamy zadania do puli jedno po drugim. Każde zadanie zwraca wiadomość Stringzawierającą nazwę wątku ( currentThread().GetName()). Ważne jest, aby zamknąć program ExecutorServicena samym końcu, ponieważ inaczej nasz program się nie skończy. Fabryka Executorsma dodatkowe metody fabryczne. Na przykład możemy utworzyć pulę składającą się tylko z jednego wątku ( newSingleThreadExecutor) lub pulę zawierającą pamięć podręczną ( newCachedThreadPool), z której usuwane są wątki po 1 minucie bezczynności. W rzeczywistości ExecutorServicesą one wspierane przez kolejkę blokującą , w której umieszczane są zadania i z której zadania są wykonywane. Więcej informacji na temat blokowania kolejek można znaleźć w tym filmie . Możesz to również przeczytaćopinia o BlockingQueue . I sprawdź odpowiedź na pytanie „Kiedy preferować LinkedBlockingQueue zamiast ArrayBlockingQueue?” Mówiąc najprościej, a BlockingQueueblokuje wątek w dwóch przypadkach:
  • wątek próbuje pobrać elementy z pustej kolejki
  • wątek próbuje umieścić elementy w pełnej kolejce
Jeśli przyjrzymy się implementacji metod fabrycznych, zobaczymy, jak one działają. Na przykład:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
Lub
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Jak widać, implementacje ExecutorServicesą tworzone wewnątrz metod fabrycznych. I w większości mówimy o ThreadPoolExecutor. Zmieniają się tylko parametry wpływające na pracę. Razem lepiej: Java i klasa Thread.  Część V — Executor, Pula wątków, Fork/Join — 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Jak widzieliśmy wcześniej, ThreadPoolExecutorjest to, co zwykle powstaje wewnątrz metod fabrycznych. Na funkcjonalność wpływają argumenty, które przekazujemy jako maksymalną i minimalną liczbę wątków, a także rodzaj używanej kolejki. java.util.concurrent.BlockingQueueAle można użyć dowolnej implementacji interfejsu . Mówiąc o ThreadPoolExecutor, powinniśmy wspomnieć o kilku ciekawych funkcjach. Na przykład nie możesz przesyłać zadań do, ThreadPoolExecutorjeśli nie ma dostępnego miejsca:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Ten kod ulegnie awarii z takim błędem:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Innymi słowy, tasknie można go złożyć, ponieważ SynchronousQueuejest tak zaprojektowany, że faktycznie składa się z jednego elementu i nie pozwala na dodanie do niego niczego więcej. Widzimy, że mamy queued taskstutaj zero („zadań w kolejce = 0”). Ale nie ma w tym nic dziwnego, ponieważ jest to szczególna cecha SynchronousQueue, która w rzeczywistości jest 1-elementową kolejką, która jest zawsze pusta! Gdy jeden wątek umieści element w kolejce, będzie czekał, aż inny wątek pobierze element z kolejki. W związku z tym możemy go zastąpić, new LinkedBlockingQueue<>(1)a błąd zmieni się na teraz pokaż queued tasks = 1. Ponieważ kolejka składa się tylko z 1 elementu, nie możemy dodać drugiego elementu. I to jest przyczyną niepowodzenia programu. Kontynuując naszą dyskusję o kolejce, warto zauważyć, żeThreadPoolExecutorclass ma dodatkowe metody obsługi kolejki. Na przykład threadPoolExecutor.purge()metoda usunie wszystkie anulowane zadania z kolejki, aby zwolnić miejsce w kolejce. Inną interesującą funkcją związaną z kolejką jest procedura obsługi odrzuconych zadań:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
W tym przykładzie nasz program obsługi po prostu wyświetla Rejectedza każdym razem, gdy zadanie w kolejce zostanie odrzucone. Wygodne, prawda? Ponadto ThreadPoolExecutorma interesującą podklasę: ScheduledThreadPoolExecutor, która jest ScheduledExecutorService. Zapewnia możliwość wykonania zadania w oparciu o timer.

ScheduledExecutorService

ScheduledExecutorService(który jest rodzajem ExecutorService) pozwala nam uruchamiać zadania zgodnie z harmonogramem. Spójrzmy na przykład:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Tutaj wszystko jest proste. Zadania są przesyłane, a następnie otrzymujemy plik java.util.concurrent.ScheduledFuture. Harmonogram może być również pomocny w następującej sytuacji:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Tutaj przekazujemy Runnablezadanie do wykonania ze stałą częstotliwością („FixedRate”) z pewnym początkowym opóźnieniem. W takim przypadku po 1 sekundzie zadanie zacznie być wykonywane co 2 sekundy. Jest podobna opcja:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Ale w tym przypadku zadania są wykonywane w określonych odstępach czasu POMIĘDZY każdym wykonaniem. Oznacza to, że taskzostanie wykonany po 1 sekundzie. Następnie, gdy tylko zostanie ukończone, miną 2 sekundy, a następnie rozpocznie się nowe zadanie. Oto kilka dodatkowych zasobów na ten temat: Razem lepiej: Java i klasa Thread.  Część V — Executor, Pula wątków, Fork/Join — 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

Pula kradzieży pracy

Oprócz powyższych pul wątków istnieje jeszcze jedna. Możemy szczerze powiedzieć, że jest trochę wyjątkowy. Nazywa się to pulą kradnącą pracę. Krótko mówiąc, kradzież pracy to algorytm, w którym bezczynne wątki zaczynają pobierać zadania z innych wątków lub zadania z kolejki współdzielonej. Spójrzmy na przykład:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Jeśli uruchomimy ten kod, to ExecutorServiceutworzy nam 5 wątków, ponieważ każdy wątek zostanie umieszczony w kolejce oczekiwania na obiekt blokady. Opracowaliśmy już monitory i blokady w Better together: Java i klasa Thread. Część II — Synchronizacja . Teraz zamieńmy Executors.newCachedThreadPool()na Executors.newWorkStealingPool(). Co się zmieni? Zobaczymy, że nasze zadania są wykonywane na mniej niż 5 wątkach. Pamiętaj, że CachedThreadPooltworzy wątek dla każdego zadania? To dlatego wait(), że wątek został zablokowany, kolejne zadania chcą się realizować, aw puli zostały utworzone dla nich nowe wątki. Dzięki puli kradzieży wątki nie pozostają bezczynne na zawsze. Zaczynają wykonywać zadania swoich sąsiadów. Co WorkStealingPooltak bardzo różni się od innych pul wątków? Fakt, że magiczneForkJoinPoolmieszka w nim:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Właściwie jest jeszcze jedna różnica. Domyślnie wątki utworzone dla a ForkJoinPoolsą wątkami demonów, w przeciwieństwie do wątków utworzonych przez onrdinary ThreadPool. Ogólnie rzecz biorąc, powinieneś pamiętać o wątkach demonów, ponieważ na przykład CompletableFutureużywa również wątków demonów, chyba że określisz swój własny ThreadFactory, który tworzy wątki inne niż demony. Oto niespodzianki, które mogą czaić się w nieoczekiwanych miejscach! :)

Widelec Dołącz do puli

W tej części ponownie porozmawiamy o ForkJoinPool(nazywanym również frameworkiem fork/join), który żyje „pod maską” WorkStealingPool. Ogólnie rzecz biorąc, framework fork/join pojawił się w Javie 1.7. I chociaż Java 11 jest już blisko, warto o niej pamiętać. Nie jest to najczęstsza implementacja, ale jest dość interesująca. Jest dobra recenzja na ten temat w Internecie: Zrozumienie Java Fork-Join Framework z przykładami . Opiera ForkJoinPoolsię na java.util.concurrent.RecursiveTask. Jest też java.util.concurrent.RecursiveAction. RecursiveActionnie zwraca wyniku. Zatem RecursiveTaskjest podobny do Callablei RecursiveActionjest podobny do unnable. Widzimy, że nazwa zawiera nazwy dwóch ważnych metod: forki join. TheforkMetoda uruchamia niektóre zadania asynchronicznie w osobnym wątku. A joinmetoda pozwala czekać na wykonanie pracy. Aby uzyskać najlepsze zrozumienie, należy przeczytać Od programowania imperatywnego do rozwidlenia/łączenia do strumieni równoległych w Javie 8 .

Streszczenie

Cóż, na tym kończy się ta część recenzji. Dowiedzieliśmy się, że Executorzostał pierwotnie wymyślony do wykonywania wątków. Następnie twórcy Javy postanowili kontynuować ten pomysł i wymyślili ExecutorService. ExecutorServicepozwala nam przesyłać zadania do wykonania za pomocą submit()i invoke(), a także zamykać usługę. Ponieważ ExecutorServicewymaga implementacji, napisali klasę z metodami fabrycznymi i nazwali ją Executors. Pozwala tworzyć pule wątków ( ThreadPoolExecutor). Dodatkowo istnieją pule wątków, które pozwalają nam również określić harmonogram wykonywania. A ForkJoinPoolukrywa się za WorkStealingPool. Mam nadzieję, że to, co napisałem powyżej, było dla Ciebie nie tylko interesujące, ale również zrozumiałe :) Zawsze cieszę się z Twoich sugestii i komentarzy. Razem lepiej: Java i klasa Thread. Część I — Wątki wykonania Lepiej razem: Java i klasa Thread. Część II — Synchronizacja Lepiej razem: Java i klasa Thread. Część III — Interakcja Lepiej razem: Java i klasa Thread. Część IV — Callable, Future i friends Razem lepiej: Java i klasa Thread. Część VI — Odpalaj!
Komentarze
  • Popularne
  • Najnowsze
  • Najstarsze
Musisz się zalogować, aby dodać komentarz
Ta strona nie ma jeszcze żadnych komentarzy