CodeGym/Java blog/Véletlen/Jobb együtt: Java és a Thread osztály. V. rész – Végrehaj...
John Squirrels
Szint
San Francisco

Jobb együtt: Java és a Thread osztály. V. rész – Végrehajtó, ThreadPool, Fork/Join

Megjelent a csoportban

Bevezetés

Tehát tudjuk, hogy a Java-nak vannak szálai. Erről a Jobb együtt: Java és a szál osztály című ismertetőben olvashat . I. rész – A végrehajtás szálai . Jobb együtt: Java és a Thread osztály.  V. rész – Végrehajtó, ThreadPool, Fork/Join – 1Vessünk még egy pillantást a tipikus kódra:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Amint láthatja, a feladat indításához szükséges kód meglehetősen tipikus, de meg kell ismételnünk az új feladathoz. Az egyik megoldás, ha külön metódusba helyezzük, pl execute(Runnable runnable). De a Java készítői figyelembe vették a helyzetünket, és kitalálták a Executorfelületet:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Ez a kód egyértelműen tömörebb: most egyszerűen kódot írunk a Runnableszál elindításához. Ez nagyszerű, nem? De ez még csak a kezdet: Jobb együtt: Java és a Thread osztály.  V. rész – Végrehajtó, ThreadPool, Fork/Join – 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Mint látható, az Executorinterfésznek van egy ExecutorServicealfelülete. A Javadoc ehhez az interfészhez azt mondja, hogy ExecutorServiceegy olyan konkrétumot ír le Executor, amely módszereket biztosít a Executor. Lehetővé teszi továbbá java.util.concurrent.Futurea végrehajtási folyamat nyomon követésére szolgáló beszerzést is. Korábban a Jobb együtt: Java és a szál osztályban. IV. rész – Hívható, jövő és barátok , röviden áttekintettük a szolgáltatás lehetőségeit Future. Ha elfelejtetted vagy nem olvastad, javaslom, hogy frissítsd fel az emlékezeted ;) Mit mond még a Javadoc? Azt mondja nekünk, hogy van egy speciális java.util.concurrent.Executorsgyárunk, amely lehetővé teszi az alapértelmezett megvalósítások létrehozását ExecutorService.

VégrehajtóSzolgálat

Nézzük át. Egy szálon egy bizonyos feladatot végre kell hajtanunk (azaz fel kell hívnunk), és a szálat létrehozó kód el van rejtve előlünk Executor. execute()Van ExecutorServiceegy specifikusunk Executor, amely számos lehetőséget kínál a haladás szabályozására. És megvan a Executorsgyárunk, amely lehetővé teszi egy ExecutorService. Most csináljuk meg mi magunk:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Látható, hogy egy fix szálkészletet adtunk meg, amelynek mérete 2. Ezután egyesével küldjük el a feladatokat a készletbe. Minden feladat Stringa szál nevét ( currentThread().GetName()) tartalmazó egyet ad vissza. Fontos, hogy a ExecutorServicelegvégén zárjuk le, mert különben a programunk nem ér véget. A Executorsgyár további gyári módszerekkel rendelkezik. Létrehozhatunk például egy szálat ( newSingleThreadExecutor) vagy egy gyorsítótárat ( newCachedThreadPool) tartalmazó készletet, amelyből a szálak 1 perc tétlenség után eltávolításra kerülnek. Valójában ezek ExecutorServicemögött egy blokkoló sor áll , amelybe a feladatok kerülnek, és ahonnan a feladatok végrehajtásra kerülnek. A sorok blokkolásával kapcsolatos további információk ebben a videóban találhatók . Ezt is elolvashatodáttekintés a BlockingQueue-ról . És nézze meg a választ arra a kérdésre, hogy "Mikor érdemes előnyben részesíteni a LinkedBlockingQueue-t az ArrayBlockingQueue helyett?" A legegyszerűbben BlockingQueueegy szálat két esetben blokkol:
  • a szál egy üres sorból próbál elemeket beszerezni
  • a szál megpróbálja az elemeket egy teljes sorba helyezni
Ha megnézzük a gyári módszerek megvalósítását, láthatjuk, hogyan működnek. Például:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
vagy
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Amint látjuk, a megvalósítások ExecutorServicea gyári metódusokon belül jönnek létre. És nagyrészt arról beszélünk ThreadPoolExecutor. Csak a munkát befolyásoló paraméterek változnak. Jobb együtt: Java és a Thread osztály.  V. rész – Végrehajtó, ThreadPool, Fork/Join – 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Ahogy korábban láttuk, ThreadPoolExecutorez általában a gyári módszereken belül jön létre. A funkcionalitást a szálak maximális és minimális számaként átadott argumentumok, valamint a használt sor típusa befolyásolja. De az interfész bármely megvalósítása java.util.concurrent.BlockingQueuehasználható. Apropó ThreadPoolExecutor, meg kell említenünk néhány érdekes funkciót. Például nem küldhet be feladatokat a címre, ThreadPoolExecutorha nincs szabad hely:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Ez a kód összeomlik a következő hibával:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Vagyis tasknem küldhető be, mert SynchronousQueueúgy van megtervezve, hogy valójában egyetlen elemből álljon, és ne engedjen bele többet beletenni. Láthatjuk, hogy queued tasksitt nulla ("sorban álló feladatok = 0") van. De nincs ebben semmi különös, mert ez egy speciális tulajdonsága SynchronousQueue, ami valójában egy 1 elemű sor, ami mindig üres! Amikor az egyik szál egy elemet helyez a sorba, megvárja, amíg egy másik szál kiveszi az elemet a sorból. Ennek megfelelően lecserélhetjük a következőre, new LinkedBlockingQueue<>(1)és a hiba a következőre módosul: most jelenik meg queued tasks = 1. Mivel a sor csak 1 elemből áll, nem tudunk második elemet hozzáadni. És ez okozza a program kudarcát. A sorról szóló tárgyalásunkat folytatva érdemes megjegyezni, hogy aThreadPoolExecutorosztály további metódusokkal rendelkezik a várólista kiszolgálására. Például a threadPoolExecutor.purge()metódus eltávolítja az összes törölt feladatot a sorból, hogy helyet szabadítson fel a sorban. Egy másik érdekes, sorral kapcsolatos funkció az elutasított feladatok kezelője:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Ebben a példában a kezelőnk egyszerűen megjeleníti Rejectedminden alkalommal, amikor egy feladatot elutasítanak a sorban. Kényelmes, nem? Ezen kívül ThreadPoolExecutorvan egy érdekes alosztálya: ScheduledThreadPoolExecutor, ami egy ScheduledExecutorService. Lehetővé teszi a feladat időzítő alapján történő végrehajtását.

ScheduledExecutorService

ScheduledExecutorService(ami egyfajta ExecutorService) lehetővé teszi a feladatok ütemezett futtatását. Nézzünk egy példát:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Itt minden egyszerű. A feladatokat leadjuk, majd kapunk egy java.util.concurrent.ScheduledFuture. Az ütemterv a következő esetekben is hasznos lehet:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Itt egy feladatot küldünk be Runnablemeghatározott gyakorisággal ("FixedRate") végrehajtásra bizonyos kezdeti késleltetéssel. Ebben az esetben 1 másodperc elteltével a feladat végrehajtása 2 másodpercenként indul el. Van egy hasonló lehetőség:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
De ebben az esetben a feladatokat az egyes végrehajtások KÖZÖTT meghatározott időközönként hajtják végre. Ez azt jelenti, hogy a parancs task1 másodperc múlva végrehajtódik. Ezután, amint befejeződött, eltelik 2 másodperc, és egy új feladat kezdődik. Íme néhány további forrás ebben a témában: Jobb együtt: Java és a Thread osztály.  V. rész – Végrehajtó, ThreadPool, Fork/Join – 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

A fenti szálmedencék mellett van még egy. Őszintén mondhatjuk, hogy ez egy kicsit különleges. Munkalopó medencének hívják. Röviden, a munkalopás egy olyan algoritmus, amelyben a tétlen szálak más szálaktól kezdenek el feladatokat átvenni, vagy feladatokat egy megosztott sorból. Nézzünk egy példát:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Ha ezt a kódot lefuttatjuk, akkor a ExecutorService5 szálat hoz létre számunkra, mert minden szál a zárobjektum várakozási sorába kerül. Már kitaláltuk a monitorokat és a zárakat a Jobb együtt: Java és a Thread osztályban. II. rész – Szinkronizálás . Most cseréljük le Executors.newCachedThreadPool()a -ra Executors.newWorkStealingPool(). Mi fog változni? Látni fogjuk, hogy a feladatainkat 5-nél kevesebb szálon hajtják végre. Emlékszel, hogy CachedThreadPoolminden feladathoz létrehoz egy szálat? Ennek az az oka wait(), hogy blokkolta a szálat, a következő feladatokat be kell fejezni, és új szálak jöttek létre a készletben. A lopási medencénél a szálak nem állnak tétlenül örökké. Elkezdik ellátni szomszédaik feladatait. Miben különbözik WorkStealingPoola többi szálkészlettől? Az a tény, hogy a varázslatosForkJoinPoollakik benne:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Valójában van még egy különbség. Alapértelmezés szerint az a számára létrehozott szálak ForkJoinPooldémonszálak, ellentétben az onrdinary segítségével létrehozott szálakkal ThreadPool. Általában emlékeznie kell a démonszálakra, mert például CompletableFuturea démonszálakat is használja, hacsak nem adja meg a sajátját ThreadFactory, amely nem démonszálakat hoz létre. Ezek azok a meglepetések, amelyek váratlan helyeken leselkedhetnek! :)

ForkJoinPool

Ebben a részben ismét arról fogunk beszélni ForkJoinPool(más néven fork/join framework), amely a "burkolata alatt" él WorkStealingPool. Általánosságban elmondható, hogy a fork/join keretrendszer még a Java 1.7-ben jelent meg. És bár a Java 11 kéznél van, még mindig érdemes emlékezni rá. Nem ez a leggyakoribb megvalósítás, de elég érdekes. Erről van egy jó vélemény a weben: A Java Fork-Join Framework megértése példákkal . -ra ForkJoinPooltámaszkodik java.util.concurrent.RecursiveTask. Ott is van java.util.concurrent.RecursiveAction. RecursiveActionnem ad vissza eredményt. Így RecursiveTaskhasonló a Callable, és RecursiveActionhasonló a unnable. Láthatjuk, hogy a név két fontos metódus nevét tartalmazza: forkés join. Aforkmetódus aszinkron módon indít el valamilyen feladatot egy külön szálon. És a joinmódszer lehetővé teszi, hogy megvárja a munka elvégzését. A legjobb megértés érdekében olvassa el az Imperative Programming to Fork/Join to Parallel Streams in Java 8 című részt .

Összegzés

Nos, ezzel lezárul az áttekintés ezen része. Megtudtuk, hogy ezt Executoreredetileg szálak végrehajtására találták ki. Aztán a Java alkotói úgy döntöttek, hogy folytatják az ötletet, és előálltak a ExecutorService. ExecutorServicelehetővé teszi számunkra, hogy feladatokat küldjünk el végrehajtásra submit()a és segítségével invoke(), valamint leállítsuk a szolgáltatást. Mivel ExecutorServiceimplementációkra van szükség, gyári metódusokkal írtak egy osztályt, és elhívták Executors. Lehetővé teszi szálkészletek ( ThreadPoolExecutor) létrehozását. Ezenkívül vannak olyan szálkészletek, amelyek végrehajtási ütemterv megadását is lehetővé teszik. És egy ForkJoinPoolelbújik a mögé WorkStealingPool. Remélem nem csak érdekesnek, de érthetőnek is találtad amit fent írtam :) Mindig örömmel fogadom a javaslataidat, észrevételeidet. Jobb együtt: Java és a Thread osztály. I. rész – A végrehajtás szálai Jobb együtt: Java és a Thread osztály. II. rész – Szinkronizálás Jobb együtt: Java és a Thread osztály. III. rész – Interakció Jobb együtt: Java és a szál osztály. IV. rész – Hívható, jövő és barátok Jobb együtt: Java és a szál osztály. VI. rész – Tüzet el!
Hozzászólások
  • Népszerű
  • Új
  • Régi
Hozzászólás írásához be kell jelentkeznie
Ennek az oldalnak még nincsenek megjegyzései