Pambuka
Dadi, kita ngerti yen Jawa duwe benang. Sampeyan bisa maca babagan kasebut ing review kanthi irah-irahan Better together: Java and the Thread class. Bagian I - Utas eksekusi .
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Nalika sampeyan bisa ndeleng, kode kanggo miwiti tugas cukup khas, nanging kita kudu mbaleni kanggo tugas anyar. Salah siji solusi kanggo sijine iku ing cara kapisah, contone execute(Runnable runnable)
. Nanging pangripta Jawa wis nganggep kahanan kita lan nggawe antarmuka Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Kode iki cetha luwih ringkes: saiki kita mung nulis kode kanggo miwiti ing Runnable
thread. Apik banget, ta? Nanging iki mung wiwitan: 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
antarmuka duwe ExecutorService
subinterface. Javadoc kanggo antarmuka iki ngandhakake yen an ExecutorService
njlèntrèhaké khusus Executor
sing nyedhiyakake cara kanggo mateni file Executor
. Iku uga ndadekake iku bisa kanggo njaluk java.util.concurrent.Future
supaya trek proses eksekusi. Sadurunge, ing Better together: Java and the Thread class. Part IV — Callable, Future, lan kanca-kanca , kita sedhela dideleng kemampuan saka Future
. Yen sampeyan kelalen utawa ora tau maca, Aku suggest sing refresh memori;) Apa maneh Javadoc ngandika? Iku ngandhani yen kita duwe java.util.concurrent.Executors
pabrik khusus sing ngidini kita nggawe implementasi standar saka ExecutorService
.
PelaksanaLayanan
Ayo dideleng. Kita kuduExecutor
nglakokake (yaiku nelpon execute()
) tugas tartamtu ing thread, lan kode sing nggawe thread didhelikake saka kita. Kita duwe ExecutorService
- khusus Executor
sing duwe sawetara opsi kanggo ngontrol kemajuan. Lan kita duwe Executors
pabrik sing ngidini kita nggawe ExecutorService
. Saiki ayo nindakake dhewe:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Sampeyan bisa ndeleng manawa kita nemtokake blumbang benang tetep sing ukurane 2. Banjur kita ngirim tugas menyang blumbang siji-siji. Saben tugas ngasilake a String
ngemot jeneng utas ( currentThread().GetName()
). Penting kanggo mateni ExecutorService
ing pungkasan, amarga yen program kita ora bakal mungkasi. Pabrik Executors
duwe cara pabrik tambahan. Contone, kita bisa nggawe blumbang sing kasusun saka mung siji utas ( newSingleThreadExecutor
) utawa blumbang sing kalebu cache ( newCachedThreadPool
) saka Utas dibusak sawise padha nganggur kanggo 1 menit. Ing kasunyatan, iki ExecutorService
didhukung dening antrian pamblokiran , menyang ngendi tugas diselehake lan saka ngendi tugas dieksekusi. Informasi liyane babagan mblokir antrian bisa ditemokake ing video iki . Sampeyan uga bisa maca ikireview babagan BlockingQueue . Lan priksa jawaban kanggo pitakonan "Nalika luwih seneng LinkedBlockingQueue tinimbang ArrayBlockingQueue?" Ing istilah sing paling gampang, BlockingQueue
mblokir thread ing rong kasus:
- thread nyoba kanggo njaluk item saka antrian kosong
- thread nyoba kanggo sijine item menyang antrian lengkap
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
utawa
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Kaya sing kita deleng, implementasine ExecutorService
digawe ing metode pabrik. Lan umume, kita ngomong babagan ThreadPoolExecutor
. Mung paramèter sing mengaruhi karya diganti. 
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Kaya sing wis dingerteni sadurunge,ThreadPoolExecutor
biasane digawe ing metode pabrik. Fungsi kasebut kena pengaruh argumen sing kita lewati minangka jumlah utas maksimal lan minimal, uga jinis antrian sing digunakake. Nanging apa wae implementasine antarmuka java.util.concurrent.BlockingQueue
bisa digunakake. Ngomong babagan ThreadPoolExecutor
, kita kudu nyebutake sawetara fitur sing menarik. Contone, sampeyan ora bisa ngirim tugas menyang ThreadPoolExecutor
yen ora ana papan sing kasedhiya:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Kode iki bakal nabrak kanthi kesalahan kaya iki:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Ing tembung liyane, task
ora bisa diajukake, amarga SynchronousQueue
wis dirancang supaya bener kasusun saka siji unsur lan ora ngidini kita kanggo sijine liyane menyang. Kita bisa ndeleng manawa ana nol queued tasks
("tugas antrian = 0") ing kene. Nanging ora ana sing aneh, amarga iki minangka fitur khusus SynchronousQueue
, sing sejatine minangka antrian 1 unsur sing tansah kosong! Nalika siji thread nempatno unsur ing antrian, iku bakal ngenteni nganti thread liyane njupuk unsur saka antrian. Patut, kita bisa ngganti karo new LinkedBlockingQueue<>(1)
lan kesalahan bakal diganti saiki nuduhake queued tasks = 1
. Amarga antrian mung 1 unsur, kita ora bisa nambah unsur liya. Lan sing nyebabake program gagal. Terus diskusi kita antrian, iku worth kang lagi nyimak singThreadPoolExecutor
kelas wis cara tambahan kanggo layanan antrian. Contone, threadPoolExecutor.purge()
cara kasebut bakal mbusak kabeh tugas sing dibatalake saka antrian kanggo mbebasake spasi ing antrian. Fungsi liyane sing gegandhengan karo antrian yaiku panangan kanggo tugas sing ditolak:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Ing conto iki, pawang kita mung nampilake Rejected
saben tugas ing antrian ditolak. Trep, ta? Kajaba iku, ThreadPoolExecutor
wis subclass menarik: ScheduledThreadPoolExecutor
, kang a ScheduledExecutorService
. Iki menehi kemampuan kanggo nindakake tugas adhedhasar timer.
ScheduledExecutorService
ScheduledExecutorService
(kang jinis ExecutorService
) ngijini kita mbukak tugas ing jadwal. Ayo katon ing conto:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Kabeh iku prasaja ing kene. Tugas dikirim banjur entuk java.util.concurrent.ScheduledFuture
. Jadwal uga bisa mbantu ing kahanan ing ngisor iki:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Ing kene kita ngirim Runnable
tugas kanggo eksekusi kanthi frekuensi tetep ("FixedRate") kanthi wektu tundha awal. Ing kasus iki, sawise 1 detik, tugas bakal diwiwiti saben 2 detik. Ana pilihan sing padha:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Nanging ing kasus iki, tugas ditindakake kanthi interval tartamtu ANTARA saben eksekusi. Yaiku, task
bakal dieksekusi sawise 1 detik. Banjur, yen wis rampung, 2 detik bakal liwati, banjur tugas anyar bakal diwiwiti. Mangkene sawetara sumber tambahan babagan topik iki:
- Pambuka kanggo pools thread ing Jawa
- Pambuka Kolam Utas ing Jawa
- Java Multithreading Steeplechase: Mbatalake Tugas ing Pelaksana
- Nggunakake Java Executors kanggo Tugas Latar mburi

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Saliyane pools thread ndhuwur, ana siji liyane. Kita bisa ngomong kanthi jujur yen iki rada khusus. Iki diarani kolam nyolong karya. Singkatipun, nyolong karya minangka algoritma ing ngendi benang nganggur miwiti njupuk tugas saka utas utawa tugas liyane saka antrian sing dienggo bareng. Ayo katon ing conto:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Yen kita mbukak kode iki, banjur ExecutorService
bakal nggawe 5 thread kanggo kita, amarga saben thread bakal dilebokake ing antrian Enteni kanggo obyek kunci. Kita wis ngerti monitor lan kunci ing Better together: Java lan kelas Thread. Bagean II - Sinkronisasi . Saiki ayo diganti Executors.newCachedThreadPool()
karo Executors.newWorkStealingPool()
. Apa sing bakal diganti? Kita bakal weruh yen tugas kita dieksekusi ing kurang saka 5 utas. Elinga yen CachedThreadPool
nggawe thread kanggo saben tugas? Iku amarga wait()
diblokir thread, tugas sakteruse pengin rampung, lan thread anyar digawe kanggo wong-wong mau ing blumbang. Kanthi kolam nyolong, benang ora mandheg ing salawas-lawase. Dheweke wiwit nindakake tugas tanggane. Apa sing ndadekake WorkStealingPool
beda saka pools thread liyane? Kasunyatan sing gaibForkJoinPool
manggon ing njero:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Bener, ana siji liyane bedane. Kanthi gawan, utas sing digawe kanggo a ForkJoinPool
yaiku benang daemon, ora kaya benang sing digawe liwat ThreadPool
. Umumé, sampeyan kudu ngelingi benang daemon, amarga, contone, CompletableFuture
uga nggunakake benang daemon kajaba sampeyan nemtokake dhewe ThreadFactory
sing nggawe benang non-daemon. Iki minangka kejutan sing bisa kedadeyan ing papan sing ora dikarepake! :)
ForkJoinPool
Ing bagean iki, kita bakal maneh pirembagan babForkJoinPool
(uga disebut garpu / framework gabung), kang urip "ing hood" saka WorkStealingPool
. Umumé, framework fork/join muncul maneh ing Java 1.7. Lan sanajan Jawa 11 wis cedhak, nanging isih kudu dieling-eling. Iki dudu implementasine sing paling umum, nanging cukup menarik. Ana review apik babagan iki ing web: Understanding Java Fork-Join Framework with Conto . The ForkJoinPool
gumantung ing java.util.concurrent.RecursiveTask
. Ana uga java.util.concurrent.RecursiveAction
. RecursiveAction
ora ngasilake asil. Mangkono, RecursiveTask
padha karo Callable
, lan RecursiveAction
padha karo unnable
. Kita bisa ndeleng manawa jeneng kasebut kalebu jeneng rong cara penting: fork
lan join
. Ingfork
cara miwiti sawetara tugas asynchronously ing thread kapisah. Lan join
cara kasebut ngidini sampeyan ngenteni karya rampung. Kanggo entuk pangerten sing paling apik, sampeyan kudu maca Saka Imperative Programming to Fork/Join to Parallel Streams in Java 8 .
Ringkesan
Inggih, sing mbungkus bagean review iki. Kita wis sinau singExecutor
asline diciptakake kanggo nglakokake benang. Banjur pangripta Jawa mutusake kanggo nerusake gagasan lan nggawe ExecutorService
. ExecutorService
ngidini kita ngirim tugas kanggo eksekusi nggunakake submit()
lan invoke()
, lan uga mateni layanan. Amarga ExecutorService
mbutuhake implementasine, dheweke nulis kelas kanthi metode pabrik lan diarani Executors
. Ngidini sampeyan nggawe pools thread ( ThreadPoolExecutor
). Kajaba iku, ana pools thread sing uga ngidini kita nemtokake jadwal eksekusi. Lan ForkJoinPool
ndhelikake konco a WorkStealingPool
. Muga-muga sampeyan nemokake apa sing daktulis ing ndhuwur ora mung menarik, nanging uga bisa dingerteni :) Aku tansah seneng krungu saran lan komentar sampeyan. Luwih apik bebarengan: Jawa lan kelas Utas. Bagian I - Utas eksekusi Luwih apik bebarengan: Jawa lan kelas Utas. Part II - Sinkronisasi Luwih apik bebarengan: Jawa lan kelas Utas. Bagean III - Interaksi Better bebarengan: Jawa lan kelas Utas. Part IV - Callable, Future, lan kanca-kanca Luwih apik bebarengan: Jawa lan kelas Utas. Bab VI - Mbusak!
GO TO FULL VERSION