Въведение
И така, знаем, че Java има нишки. Можете да прочетете за това в ревюто, озаглавено По-добре заедно: Java и класът Thread. Част I — Нишки на изпълнение . Нека да разгледаме още веднъж типичния code:public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Както можете да видите, codeът за стартиране на задача е доста типичен, но трябва да го повторим за нова задача. Едно решение е да го поставите в отделен метод, напр execute(Runnable runnable)
. Но създателите на Java са обмислor нашето затруднение и са измислor интерфейса Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Този code е очевидно по-сбит: сега просто пишем code, за да стартираме в Runnable
нишката. Това е страхотно, нали? Но това е само началото:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
има ExecutorService
подинтерфейс. Javadoc за този интерфейс казва, че an ExecutorService
описва част Executor
, която предоставя методи за изключване на Executor
. Той също така дава възможност да получите, java.util.concurrent.Future
за да проследите процеса на изпълнение. Преди това в Better together: Java и клас Thread. Част IV — Callable, Future и приятели , прегледахме накратко възможностите на Future
. Ако сте забравor or никога не сте го чели, предлагам ви да опресните паметта си ;) Какво друго казва Javadoc? Казва ни, че имаме специална java.util.concurrent.Executors
фабрика, която ни позволява да създаваме реализации по подразбиране на ExecutorService
.
ExecutorService
Нека прегледаме. ТрябваExecutor
да изпълним (т.е. да извикаме execute()
) определена задача в нишка, а codeът, който създава нишката, е скрит от нас. Имаме ExecutorService
— специфичен Executor
, който има няколко опции за контролиране на напредъка. И имаме Executors
фабрика, която ни позволява да създадем ExecutorService
. Сега нека го направим сами:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Можете да видите, че посочихме фиксиран пул от нишки, чийто размер е 2. След това изпращаме задачи към пула една по една. Всяка задача връща String
съдържащо името на нишката ( currentThread().GetName()
). Важно е да изключите ExecutorService
в самия край, защото иначе програмата ни няма да приключи. Фабриката Executors
има допълнителни фабрични методи. Например, можем да създадем пул, състоящ се само от една нишка ( newSingleThreadExecutor
) or пул, който включва кеш ( newCachedThreadPool
), от който нишките се премахват, след като са неактивни за 1 minutesа. В действителност те ExecutorService
са подкрепени от блокираща опашка , в която се поставят задачи и от които се изпълняват задачи. Повече информация за опашките за блокиране можете да намерите в този видеоклип . Можете също да прочетете товапреглед на BlockingQueue . И вижте отговора на въпроса „Кога да предпочетем LinkedBlockingQueue пред ArrayBlockingQueue?“ Най-просто казано, a BlockingQueue
блокира нишка в два случая:
- нишката се опитва да получи елементи от празна опашка
- нишката се опитва да постави елементи в пълна опашка
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
or
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Както виждаме, реализациите на ExecutorService
се създават във фабричните методи. И в по-голямата си част говорим за ThreadPoolExecutor
. Променят се само параметрите, засягащи работата.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Както видяхме по-рано,ThreadPoolExecutor
това обикновено се създава във фабричните методи. Функционалността се влияе от аргументите, които предаваме като максимален и минимален брой нишки, Howто и от вида на използваната опашка. Но всяка реализация на java.util.concurrent.BlockingQueue
интерфейса може да се използва. Говорейки за ThreadPoolExecutor
, трябва да споменем някои интересни функции. Например, не можете да изпращате задачи на a, ThreadPoolExecutor
ако няма свободно място:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Този code ще се срине с грешка като тази:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
С други думи, task
не може да бъде подаден, защото SynchronousQueue
е проектиран така, че всъщност се състои от един елемент и не ни позволява да влагаме нищо повече в него. Можем да видим, че тук имаме нула queued tasks
(„задачи в опашка = 0“). Но в това няма нищо странно, защото това е специална функция на SynchronousQueue
, която всъщност е опашка от 1 елемент, която винаги е празна! Когато една нишка постави елемент в опашката, тя ще изчака, докато друга нишка вземе елемента от опашката. Съответно можем да го заменим с new LinkedBlockingQueue<>(1)
и грешката ще се промени, за да покаже сега queued tasks = 1
. Тъй като опашката е само 1 елемент, не можем да добавим втори елемент. И това е причината програмата да се провали. Продължавайки нашата дискусия за опашката, заслужава да се отбележи, чеThreadPoolExecutor
има допълнителни методи за обслужване на опашката. Например, threadPoolExecutor.purge()
методът ще премахне всички отменени задачи от опашката, за да освободи място в опашката. Друга интересна функция, свързана с опашката, е манипулаторът за отхвърлени задачи:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
В този пример нашият манипулатор просто се показва Rejected
всеки път, когато задача в опашката бъде отхвърлена. Удобно, нали? Освен това ThreadPoolExecutor
има интересен подклас: ScheduledThreadPoolExecutor
, който е ScheduledExecutorService
. Той предоставя възможност за изпълнение на задача въз основа на таймер.
ScheduledExecutorService
ScheduledExecutorService
(което е вид ExecutorService
) ни позволява да изпълняваме задачи по график. Да разгледаме един пример:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Тук всичко е просто. Задачите се изпращат и след това получаваме java.util.concurrent.ScheduledFuture
. Графикът може да бъде полезен и в следната ситуация:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Тук изпращаме Runnable
задача за изпълнение с фиксирана честота ("FixedRate") с определено първоначално забавяне. В този случай след 1 секунда задачата ще започне да се изпълнява на всеки 2 секунди. Има подобен вариант:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Но в този случай задачите се изпълняват с определен интервал МЕЖДУ всяко изпълнение. Тоест task
ще се изпълни след 1 секунда. След това, веднага след като приключи, ще минат 2 секунди и след това ще бъде стартирана нова задача. Ето някои допълнителни ресурси по тази тема:
- Въведение в пуловете от нишки в Java
- Въведение в пуловете от нишки в Java
- Java Multithreading Steeplechase: Отмяна на задачи в Executors
- Използване на Java екзекутори за фонови задачи
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
В допълнение към горните пулове от нишки има още един. Можем честно да кажем, че е малко специално. Нарича се пул за кражба на работа. Накратко, кражбата на работа е алгоритъм, при който неактивните нишки започват да вземат задачи от други нишки or задачи от споделена опашка. Да разгледаме един пример:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Ако изпълним този code, тогава ExecutorService
ще създаде 5 нишки за нас, защото всяка нишка ще бъде поставена в опашката за изчакване за заключващия обект. Вече измислихме мониторите и заключванията в Better заедно: Java и класа Thread. Част II — Синхронизация . Сега нека заменим Executors.newCachedThreadPool()
с Executors.newWorkStealingPool()
. Какво ще се промени? Ще видим, че нашите задачи се изпълняват на по-малко от 5 нишки. Не забравяйте, че CachedThreadPool
създава нишка за всяка задача? Това е така, защото wait()
нишката е блокирана, следващите задачи искат да бъдат завършени и за тях са създадени нови нишки в пула. С пул за кражби нишките не стоят празни вечно. Те започват да изпълняват задачите на своите съседи. Какво прави един WorkStealingPool
толкова различен от другите пулове с теми? Фактът, че магическотоForkJoinPool
живее вътре в него:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Всъщност има още една разлика. По подразбиране нишките, създадени за a, ForkJoinPool
са демон нишки, за разлика от нишките, създадени чрез onrdinary ThreadPool
. По принцип трябва да помните демон нишки, защото например CompletableFuture
също използва демон нишки, освен ако не посочите свои собствени ThreadFactory
, които създават недемон нишки. Това са изненадите, които може да дебнат на неочаквани места! :)
ForkJoinPool
В тази част отново ще говорим заForkJoinPool
(наричана също рамка fork/join), която живее „под капака“ на WorkStealingPool
. Като цяло рамката fork/join се появи още в Java 1.7. И въпреки че Java 11 е под ръка, все пак си струва да си спомните. Това не е най-често срещаното изпълнение, но е доста интересно. Има добра рецензия за това в мрежата: Разбиране на Java Fork-Join Framework с примери . Разчита се ForkJoinPool
на java.util.concurrent.RecursiveTask
. Има и java.util.concurrent.RecursiveAction
. RecursiveAction
не връща резултат. По този начин RecursiveTask
е подобно на Callable
и RecursiveAction
е подобно на unnable
. Виждаме, че името включва имената на два важни метода: fork
и join
. Thefork
метод стартира няHowва задача асинхронно в отделна нишка. И join
методът ви позволява да изчакате работата да бъде свършена. За да получите най-добро разбиране, трябва да прочетете От императивното програмиране към Fork/Join към паралелни потоци в Java 8 .
Резюме
Е, това приключва тази част от прегледа. Научихме, чеExecutor
първоначално е изобретен за изпълнение на нишки. Тогава създателите на Java решиха да продължат идеята и излязоха с ExecutorService
. ExecutorService
ни позволява да изпратим задачи за изпълнение с помощта submit()
на и invoke()
, а също и да изключим услугата. Тъй като ExecutorService
има нужда от реализации, те написаха клас с фабрични методи и го нарекоха Executors
. Позволява ви да създавате пулове от нишки ( ThreadPoolExecutor
). Освен това има пулове от нишки, които също ни позволяват да посочим график за изпълнение. И a ForkJoinPool
се крие зад a WorkStealingPool
. Надявам се да сте намерor написаното по-горе не само интересно, но и разбираемо :) Винаги се радвам да чуя вашите предложения и коментари. По-добре заедно: Java и клас Thread. Част I — Нишки за изпълнение По-добре заедно: Java и класът Thread. Част II — По-добра синхронизация заедно: Java и класът Thread. Част III — Взаимодействието е по-добро заедно: Java и класът Thread. Част IV — Callable, Future и приятели По-добре заедно: Java и класът Thread. Част VI — Изстрелвай!