CodeGym/Java блог/Случаен/По-добре заедно: Java и клас Thread. Част V — Изпълнител,...
John Squirrels
Ниво
San Francisco

По-добре заедно: Java и клас Thread. Част V — Изпълнител, ThreadPool, Fork/Join

Публикувано в групата

Въведение

И така, знаем, че Java има нишки. Можете да прочетете за това в ревюто, озаглавено По-добре заедно: Java и класът Thread. Част I — Нишки на изпълнение . По-добре заедно: Java и клас Thread.  Част V — Изпълнител, ThreadPool, Fork/Join — 1Нека да разгледаме още веднъж типичния code:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Както можете да видите, codeът за стартиране на задача е доста типичен, но трябва да го повторим за нова задача. Едно решение е да го поставите в отделен метод, напр execute(Runnable runnable). Но създателите на Java са обмислor нашето затруднение и са измислor интерфейса Executor:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Този code е очевидно по-сбит: сега просто пишем code, за да стартираме в Runnableнишката. Това е страхотно, нали? Но това е само началото: По-добре заедно: Java и клас Thread.  Част V — Executor, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Както можете да видите, интерфейсът Executorима ExecutorServiceподинтерфейс. Javadoc за този интерфейс казва, че an ExecutorServiceописва част Executor, която предоставя методи за изключване на Executor. Той също така дава възможност да получите, java.util.concurrent.Futureза да проследите процеса на изпълнение. Преди това в Better together: Java и клас Thread. Част IV — Callable, Future и приятели , прегледахме накратко възможностите на Future. Ако сте забравor or никога не сте го чели, предлагам ви да опресните паметта си ;) Какво друго казва Javadoc? Казва ни, че имаме специална java.util.concurrent.Executorsфабрика, която ни позволява да създаваме реализации по подразбиране на ExecutorService.

ExecutorService

Нека прегледаме. Трябва Executorда изпълним (т.е. да извикаме execute()) определена задача в нишка, а codeът, който създава нишката, е скрит от нас. Имаме ExecutorService— специфичен Executor, който има няколко опции за контролиране на напредъка. И имаме Executorsфабрика, която ни позволява да създадем ExecutorService. Сега нека го направим сами:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Можете да видите, че посочихме фиксиран пул от нишки, чийто размер е 2. След това изпращаме задачи към пула една по една. Всяка задача връща Stringсъдържащо името на нишката ( currentThread().GetName()). Важно е да изключите ExecutorServiceв самия край, защото иначе програмата ни няма да приключи. Фабриката Executorsима допълнителни фабрични методи. Например, можем да създадем пул, състоящ се само от една нишка ( newSingleThreadExecutor) or пул, който включва кеш ( newCachedThreadPool), от който нишките се премахват, след като са неактивни за 1 minutesа. В действителност те ExecutorServiceса подкрепени от блокираща опашка , в която се поставят задачи и от които се изпълняват задачи. Повече информация за опашките за блокиране можете да намерите в този видеоклип . Можете също да прочетете товапреглед на BlockingQueue . И вижте отговора на въпроса „Кога да предпочетем LinkedBlockingQueue пред ArrayBlockingQueue?“ Най-просто казано, a BlockingQueueблокира нишка в два случая:
  • нишката се опитва да получи елементи от празна опашка
  • нишката се опитва да постави елементи в пълна опашка
Ако погледнем изпълнението на фабричните методи, можем да видим How работят. Например:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
or
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Както виждаме, реализациите на ExecutorServiceсе създават във фабричните методи. И в по-голямата си част говорим за ThreadPoolExecutor. Променят се само параметрите, засягащи работата. По-добре заедно: Java и клас Thread.  Част V — Executor, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Както видяхме по-рано, ThreadPoolExecutorтова обикновено се създава във фабричните методи. Функционалността се влияе от аргументите, които предаваме като максимален и минимален брой нишки, Howто и от вида на използваната опашка. Но всяка реализация на java.util.concurrent.BlockingQueueинтерфейса може да се използва. Говорейки за ThreadPoolExecutor, трябва да споменем някои интересни функции. Например, не можете да изпращате задачи на a, ThreadPoolExecutorако няма свободно място:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Този code ще се срине с грешка като тази:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
С други думи, taskне може да бъде подаден, защото SynchronousQueueе проектиран така, че всъщност се състои от един елемент и не ни позволява да влагаме нищо повече в него. Можем да видим, че тук имаме нула queued tasks(„задачи в опашка = 0“). Но в това няма нищо странно, защото това е специална функция на SynchronousQueue, която всъщност е опашка от 1 елемент, която винаги е празна! Когато една нишка постави елемент в опашката, тя ще изчака, докато друга нишка вземе елемента от опашката. Съответно можем да го заменим с new LinkedBlockingQueue<>(1)и грешката ще се промени, за да покаже сега queued tasks = 1. Тъй като опашката е само 1 елемент, не можем да добавим втори елемент. И това е причината програмата да се провали. Продължавайки нашата дискусия за опашката, заслужава да се отбележи, чеThreadPoolExecutorима допълнителни методи за обслужване на опашката. Например, threadPoolExecutor.purge()методът ще премахне всички отменени задачи от опашката, за да освободи място в опашката. Друга интересна функция, свързана с опашката, е манипулаторът за отхвърлени задачи:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
В този пример нашият манипулатор просто се показва Rejectedвсеки път, когато задача в опашката бъде отхвърлена. Удобно, нали? Освен това ThreadPoolExecutorима интересен подклас: ScheduledThreadPoolExecutor, който е ScheduledExecutorService. Той предоставя възможност за изпълнение на задача въз основа на таймер.

ScheduledExecutorService

ScheduledExecutorService(което е вид ExecutorService) ни позволява да изпълняваме задачи по график. Да разгледаме един пример:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Тук всичко е просто. Задачите се изпращат и след това получаваме java.util.concurrent.ScheduledFuture. Графикът може да бъде полезен и в следната ситуация:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Тук изпращаме Runnableзадача за изпълнение с фиксирана честота ("FixedRate") с определено първоначално забавяне. В този случай след 1 секунда задачата ще започне да се изпълнява на всеки 2 секунди. Има подобен вариант:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Но в този случай задачите се изпълняват с определен интервал МЕЖДУ всяко изпълнение. Тоест taskще се изпълни след 1 секунда. След това, веднага след като приключи, ще минат 2 секунди и след това ще бъде стартирана нова задача. Ето някои допълнителни ресурси по тази тема: По-добре заедно: Java и клас Thread.  Част V — Executor, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

В допълнение към горните пулове от нишки има още един. Можем честно да кажем, че е малко специално. Нарича се пул за кражба на работа. Накратко, кражбата на работа е алгоритъм, при който неактивните нишки започват да вземат задачи от други нишки or задачи от споделена опашка. Да разгледаме един пример:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Ако изпълним този code, тогава ExecutorServiceще създаде 5 нишки за нас, защото всяка нишка ще бъде поставена в опашката за изчакване за заключващия обект. Вече измислихме мониторите и заключванията в Better заедно: Java и класа Thread. Част II — Синхронизация . Сега нека заменим Executors.newCachedThreadPool()с Executors.newWorkStealingPool(). Какво ще се промени? Ще видим, че нашите задачи се изпълняват на по-малко от 5 нишки. Не забравяйте, че CachedThreadPoolсъздава нишка за всяка задача? Това е така, защото wait()нишката е блокирана, следващите задачи искат да бъдат завършени и за тях са създадени нови нишки в пула. С пул за кражби нишките не стоят празни вечно. Те започват да изпълняват задачите на своите съседи. Какво прави един WorkStealingPoolтолкова различен от другите пулове с теми? Фактът, че магическотоForkJoinPoolживее вътре в него:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Всъщност има още една разлика. По подразбиране нишките, създадени за a, ForkJoinPoolса демон нишки, за разлика от нишките, създадени чрез onrdinary ThreadPool. По принцип трябва да помните демон нишки, защото например CompletableFutureсъщо използва демон нишки, освен ако не посочите свои собствени ThreadFactory, които създават недемон нишки. Това са изненадите, които може да дебнат на неочаквани места! :)

ForkJoinPool

В тази част отново ще говорим за ForkJoinPool(наричана също рамка fork/join), която живее „под капака“ на WorkStealingPool. Като цяло рамката fork/join се появи още в Java 1.7. И въпреки че Java 11 е под ръка, все пак си струва да си спомните. Това не е най-често срещаното изпълнение, но е доста интересно. Има добра рецензия за това в мрежата: Разбиране на Java Fork-Join Framework с примери . Разчита се ForkJoinPoolна java.util.concurrent.RecursiveTask. Има и java.util.concurrent.RecursiveAction. RecursiveActionне връща резултат. По този начин RecursiveTaskе подобно на Callableи RecursiveActionе подобно на unnable. Виждаме, че името включва имената на два важни метода: forkи join. Theforkметод стартира няHowва задача асинхронно в отделна нишка. И joinметодът ви позволява да изчакате работата да бъде свършена. За да получите най-добро разбиране, трябва да прочетете От императивното програмиране към Fork/Join към паралелни потоци в Java 8 .

Резюме

Е, това приключва тази част от прегледа. Научихме, че Executorпървоначално е изобретен за изпълнение на нишки. Тогава създателите на Java решиха да продължат идеята и излязоха с ExecutorService. ExecutorServiceни позволява да изпратим задачи за изпълнение с помощта submit()на и invoke(), а също и да изключим услугата. Тъй като ExecutorServiceима нужда от реализации, те написаха клас с фабрични методи и го нарекоха Executors. Позволява ви да създавате пулове от нишки ( ThreadPoolExecutor). Освен това има пулове от нишки, които също ни позволяват да посочим график за изпълнение. И a ForkJoinPoolсе крие зад a WorkStealingPool. Надявам се да сте намерor написаното по-горе не само интересно, но и разбираемо :) Винаги се радвам да чуя вашите предложения и коментари. По-добре заедно: Java и клас Thread. Част I — Нишки за изпълнение По-добре заедно: Java и класът Thread. Част II — По-добра синхронизация заедно: Java и класът Thread. Част III — Взаимодействието е по-добро заедно: Java и класът Thread. Част IV — Callable, Future и приятели По-добре заедно: Java и класът Thread. Част VI — Изстрелвай!
Коментари
  • Популярен
  • Нов
  • Стар
Трябва да сте влезли, за да оставите коментар
Тази страница все още няма коментари