Bevezetés
Tehát tudjuk, hogy a Java-nak vannak szálai. Erről a Jobb együtt: Java és a szál osztály című ismertetőben olvashat . I. rész – A végrehajtás szálai .
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Amint láthatja, a feladat indításához szükséges kód meglehetősen tipikus, de meg kell ismételnünk az új feladathoz. Az egyik megoldás, ha külön metódusba helyezzük, pl execute(Runnable runnable)
. De a Java készítői figyelembe vették a helyzetünket, és kitalálták a Executor
felületet:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Ez a kód egyértelműen tömörebb: most egyszerűen kódot írunk a Runnable
szál elindításához. Ez nagyszerű, nem? De ez még csak a kezdet: 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
interfésznek van egy ExecutorService
alfelülete. A Javadoc ehhez az interfészhez azt mondja, hogy ExecutorService
egy olyan konkrétumot ír le Executor
, amely módszereket biztosít a Executor
. Lehetővé teszi továbbá java.util.concurrent.Future
a végrehajtási folyamat nyomon követésére szolgáló beszerzést is. Korábban a Jobb együtt: Java és a szál osztályban. IV. rész – Hívható, jövő és barátok , röviden áttekintettük a szolgáltatás lehetőségeit Future
. Ha elfelejtetted vagy nem olvastad, javaslom, hogy frissítsd fel az emlékezeted ;) Mit mond még a Javadoc? Azt mondja nekünk, hogy van egy speciális java.util.concurrent.Executors
gyárunk, amely lehetővé teszi az alapértelmezett megvalósítások létrehozását ExecutorService
.
VégrehajtóSzolgálat
Nézzük át. Egy szálon egy bizonyos feladatot végre kell hajtanunk (azaz fel kell hívnunk), és a szálat létrehozó kód el van rejtve előlünkExecutor
. execute()
Van ExecutorService
egy specifikusunk Executor
, amely számos lehetőséget kínál a haladás szabályozására. És megvan a Executors
gyárunk, amely lehetővé teszi egy ExecutorService
. Most csináljuk meg mi magunk:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Látható, hogy egy fix szálkészletet adtunk meg, amelynek mérete 2. Ezután egyesével küldjük el a feladatokat a készletbe. Minden feladat String
a szál nevét ( currentThread().GetName()
) tartalmazó egyet ad vissza. Fontos, hogy a ExecutorService
legvégén zárjuk le, mert különben a programunk nem ér véget. A Executors
gyár további gyári módszerekkel rendelkezik. Létrehozhatunk például egy szálat ( newSingleThreadExecutor
) vagy egy gyorsítótárat ( newCachedThreadPool
) tartalmazó készletet, amelyből a szálak 1 perc tétlenség után eltávolításra kerülnek. Valójában ezek ExecutorService
mögött egy blokkoló sor áll , amelybe a feladatok kerülnek, és ahonnan a feladatok végrehajtásra kerülnek. A sorok blokkolásával kapcsolatos további információk ebben a videóban találhatók . Ezt is elolvashatodáttekintés a BlockingQueue-ról . És nézze meg a választ arra a kérdésre, hogy "Mikor érdemes előnyben részesíteni a LinkedBlockingQueue-t az ArrayBlockingQueue helyett?" A legegyszerűbben BlockingQueue
egy szálat két esetben blokkol:
- a szál egy üres sorból próbál elemeket beszerezni
- a szál megpróbálja az elemeket egy teljes sorba helyezni
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
vagy
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Amint látjuk, a megvalósítások ExecutorService
a gyári metódusokon belül jönnek létre. És nagyrészt arról beszélünk ThreadPoolExecutor
. Csak a munkát befolyásoló paraméterek változnak. 
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Ahogy korábban láttuk,ThreadPoolExecutor
ez általában a gyári módszereken belül jön létre. A funkcionalitást a szálak maximális és minimális számaként átadott argumentumok, valamint a használt sor típusa befolyásolja. De az interfész bármely megvalósítása java.util.concurrent.BlockingQueue
használható. Apropó ThreadPoolExecutor
, meg kell említenünk néhány érdekes funkciót. Például nem küldhet be feladatokat a címre, ThreadPoolExecutor
ha nincs szabad hely:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Ez a kód összeomlik a következő hibával:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Vagyis task
nem küldhető be, mert SynchronousQueue
úgy van megtervezve, hogy valójában egyetlen elemből álljon, és ne engedjen bele többet beletenni. Láthatjuk, hogy queued tasks
itt nulla ("sorban álló feladatok = 0") van. De nincs ebben semmi különös, mert ez egy speciális tulajdonsága SynchronousQueue
, ami valójában egy 1 elemű sor, ami mindig üres! Amikor az egyik szál egy elemet helyez a sorba, megvárja, amíg egy másik szál kiveszi az elemet a sorból. Ennek megfelelően lecserélhetjük a következőre, new LinkedBlockingQueue<>(1)
és a hiba a következőre módosul: most jelenik meg queued tasks = 1
. Mivel a sor csak 1 elemből áll, nem tudunk második elemet hozzáadni. És ez okozza a program kudarcát. A sorról szóló tárgyalásunkat folytatva érdemes megjegyezni, hogy aThreadPoolExecutor
osztály további metódusokkal rendelkezik a várólista kiszolgálására. Például a threadPoolExecutor.purge()
metódus eltávolítja az összes törölt feladatot a sorból, hogy helyet szabadítson fel a sorban. Egy másik érdekes, sorral kapcsolatos funkció az elutasított feladatok kezelője:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Ebben a példában a kezelőnk egyszerűen megjeleníti Rejected
minden alkalommal, amikor egy feladatot elutasítanak a sorban. Kényelmes, nem? Ezen kívül ThreadPoolExecutor
van egy érdekes alosztálya: ScheduledThreadPoolExecutor
, ami egy ScheduledExecutorService
. Lehetővé teszi a feladat időzítő alapján történő végrehajtását.
ScheduledExecutorService
ScheduledExecutorService
(ami egyfajta ExecutorService
) lehetővé teszi a feladatok ütemezett futtatását. Nézzünk egy példát:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Itt minden egyszerű. A feladatokat leadjuk, majd kapunk egy java.util.concurrent.ScheduledFuture
. Az ütemterv a következő esetekben is hasznos lehet:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Itt egy feladatot küldünk be Runnable
meghatározott gyakorisággal ("FixedRate") végrehajtásra bizonyos kezdeti késleltetéssel. Ebben az esetben 1 másodperc elteltével a feladat végrehajtása 2 másodpercenként indul el. Van egy hasonló lehetőség:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
De ebben az esetben a feladatokat az egyes végrehajtások KÖZÖTT meghatározott időközönként hajtják végre. Ez azt jelenti, hogy a parancs task
1 másodperc múlva végrehajtódik. Ezután, amint befejeződött, eltelik 2 másodperc, és egy új feladat kezdődik. Íme néhány további forrás ebben a témában:
- Bevezetés a Java szálkészletekbe
- Bevezetés a Java szálkészletekbe
- Java Multithreading Steeplechase: Feladatok törlése a végrehajtókban
- Java végrehajtók használata háttérfeladatokhoz

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
A fenti szálmedencék mellett van még egy. Őszintén mondhatjuk, hogy ez egy kicsit különleges. Munkalopó medencének hívják. Röviden, a munkalopás egy olyan algoritmus, amelyben a tétlen szálak más szálaktól kezdenek el feladatokat átvenni, vagy feladatokat egy megosztott sorból. Nézzünk egy példát:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Ha ezt a kódot lefuttatjuk, akkor a ExecutorService
5 szálat hoz létre számunkra, mert minden szál a zárobjektum várakozási sorába kerül. Már kitaláltuk a monitorokat és a zárakat a Jobb együtt: Java és a Thread osztályban. II. rész – Szinkronizálás . Most cseréljük le Executors.newCachedThreadPool()
a -ra Executors.newWorkStealingPool()
. Mi fog változni? Látni fogjuk, hogy a feladatainkat 5-nél kevesebb szálon hajtják végre. Emlékszel, hogy CachedThreadPool
minden feladathoz létrehoz egy szálat? Ennek az az oka wait()
, hogy blokkolta a szálat, a következő feladatokat be kell fejezni, és új szálak jöttek létre a készletben. A lopási medencénél a szálak nem állnak tétlenül örökké. Elkezdik ellátni szomszédaik feladatait. Miben különbözik WorkStealingPool
a többi szálkészlettől? Az a tény, hogy a varázslatosForkJoinPool
lakik benne:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Valójában van még egy különbség. Alapértelmezés szerint az a számára létrehozott szálak ForkJoinPool
démonszálak, ellentétben az onrdinary segítségével létrehozott szálakkal ThreadPool
. Általában emlékeznie kell a démonszálakra, mert például CompletableFuture
a démonszálakat is használja, hacsak nem adja meg a sajátját ThreadFactory
, amely nem démonszálakat hoz létre. Ezek azok a meglepetések, amelyek váratlan helyeken leselkedhetnek! :)
ForkJoinPool
Ebben a részben ismét arról fogunk beszélniForkJoinPool
(más néven fork/join framework), amely a "burkolata alatt" él WorkStealingPool
. Általánosságban elmondható, hogy a fork/join keretrendszer még a Java 1.7-ben jelent meg. És bár a Java 11 kéznél van, még mindig érdemes emlékezni rá. Nem ez a leggyakoribb megvalósítás, de elég érdekes. Erről van egy jó vélemény a weben: A Java Fork-Join Framework megértése példákkal . -ra ForkJoinPool
támaszkodik java.util.concurrent.RecursiveTask
. Ott is van java.util.concurrent.RecursiveAction
. RecursiveAction
nem ad vissza eredményt. Így RecursiveTask
hasonló a Callable
, és RecursiveAction
hasonló a unnable
. Láthatjuk, hogy a név két fontos metódus nevét tartalmazza: fork
és join
. Afork
metódus aszinkron módon indít el valamilyen feladatot egy külön szálon. És a join
módszer lehetővé teszi, hogy megvárja a munka elvégzését. A legjobb megértés érdekében olvassa el az Imperative Programming to Fork/Join to Parallel Streams in Java 8 című részt .
Összegzés
Nos, ezzel lezárul az áttekintés ezen része. Megtudtuk, hogy eztExecutor
eredetileg szálak végrehajtására találták ki. Aztán a Java alkotói úgy döntöttek, hogy folytatják az ötletet, és előálltak a ExecutorService
. ExecutorService
lehetővé teszi számunkra, hogy feladatokat küldjünk el végrehajtásra submit()
a és segítségével invoke()
, valamint leállítsuk a szolgáltatást. Mivel ExecutorService
implementációkra van szükség, gyári metódusokkal írtak egy osztályt, és elhívták Executors
. Lehetővé teszi szálkészletek ( ThreadPoolExecutor
) létrehozását. Ezenkívül vannak olyan szálkészletek, amelyek végrehajtási ütemterv megadását is lehetővé teszik. És egy ForkJoinPool
elbújik a mögé WorkStealingPool
. Remélem nem csak érdekesnek, de érthetőnek is találtad amit fent írtam :) Mindig örömmel fogadom a javaslataidat, észrevételeidet. Jobb együtt: Java és a Thread osztály. I. rész – A végrehajtás szálai Jobb együtt: Java és a Thread osztály. II. rész – Szinkronizálás Jobb együtt: Java és a Thread osztály. III. rész – Interakció Jobb együtt: Java és a szál osztály. IV. rész – Hívható, jövő és barátok Jobb együtt: Java és a szál osztály. VI. rész – Tüzet el!
GO TO FULL VERSION