pengenalan
Jadi, kita tahu bahawa Java mempunyai benang. Anda boleh membaca tentang itu dalam ulasan bertajuk Better together: Java and the Thread class. Bahagian I — Benang pelaksanaan .
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Seperti yang anda lihat, kod untuk memulakan tugasan adalah agak tipikal, tetapi kita perlu mengulanginya untuk tugasan baharu. Satu penyelesaian ialah meletakkannya dalam kaedah yang berasingan, cth execute(Runnable runnable)
. Tetapi pencipta Java telah mempertimbangkan nasib kami dan menghasilkan antara Executor
muka:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Kod ini jelas lebih ringkas: kini kita hanya menulis kod untuk memulakan pada Runnable
benang. Itu hebat, bukan? Tetapi ini hanya permulaan: 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
antara muka mempunyai ExecutorService
subantara muka. Javadoc untuk antara muka ini mengatakan bahawa an ExecutorService
menerangkan perkara tertentu Executor
yang menyediakan kaedah untuk menutup fail Executor
. Ia juga memungkinkan untuk mendapatkan java.util.concurrent.Future
untuk menjejaki proses pelaksanaan. Sebelum ini, dalam Better together: Java dan kelas Thread. Bahagian IV — Boleh Dipanggil, Masa Depan dan rakan , kami menyemak secara ringkas keupayaan Future
. Jika anda terlupa atau tidak pernah membacanya, saya cadangkan anda menyegarkan ingatan anda ;) Apa lagi yang Javadoc katakan? Ia memberitahu kami bahawa kami mempunyai java.util.concurrent.Executors
kilang khas yang membolehkan kami membuat pelaksanaan lalai ExecutorService
.
Perkhidmatan Pelaksana
Jom tinjau. Kita perluExecutor
melaksanakan (iaitu untuk memanggil execute()
) tugas tertentu pada utas, dan kod yang mencipta utas disembunyikan daripada kami. Kami ada ExecutorService
— khusus Executor
yang mempunyai beberapa pilihan untuk mengawal kemajuan. Dan kami mempunyai Executors
kilang yang membolehkan kami mencipta ExecutorService
. Sekarang mari kita lakukan sendiri:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Anda dapat melihat bahawa kami menetapkan kumpulan benang tetap yang saiznya ialah 2. Kemudian kami menyerahkan tugasan kepada kumpulan satu demi satu. Setiap tugasan mengembalikan a String
yang mengandungi nama utas ( currentThread().GetName()
). Adalah penting untuk menutupnya ExecutorService
pada penghujungnya, kerana jika tidak, program kami tidak akan tamat. Kilang Executors
mempunyai kaedah kilang tambahan. Sebagai contoh, kita boleh mencipta kumpulan yang terdiri daripada hanya satu utas ( newSingleThreadExecutor
) atau kumpulan yang termasuk cache ( newCachedThreadPool
) yang mana utas dialih keluar selepas ia melahu selama 1 minit. Pada hakikatnya, ini ExecutorService
disokong oleh baris gilir menyekat , di mana tugas diletakkan dan dari mana tugas dilaksanakan. Maklumat lanjut tentang menyekat baris gilir boleh didapati dalam video ini . Anda juga boleh membaca iniulasan tentang BlockingQueue . Dan lihat jawapan kepada soalan "Bila lebih suka LinkedBlockingQueue berbanding ArrayBlockingQueue?" Dalam istilah yang paling mudah, BlockingQueue
sekatan benang dalam dua kes:
- benang cuba mendapatkan item daripada baris gilir kosong
- benang cuba meletakkan item ke dalam baris gilir penuh
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
atau
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Seperti yang dapat kita lihat, pelaksanaan ExecutorService
dibuat di dalam kaedah kilang. Dan untuk sebahagian besar, kita bercakap tentang ThreadPoolExecutor
. Hanya parameter yang mempengaruhi kerja diubah. 
https://en.wikipedia.org/wiki/Thread_pool#/media/Fail:Thread_pool.svg
ThreadPoolExecutor
Seperti yang kita lihat sebelum ini,ThreadPoolExecutor
adalah apa yang biasanya dibuat di dalam kaedah kilang. Kefungsian dipengaruhi oleh hujah yang kami luluskan sebagai bilangan maksimum dan minimum utas, serta jenis baris gilir yang digunakan. Tetapi sebarang pelaksanaan antara java.util.concurrent.BlockingQueue
muka boleh digunakan. Bercakap tentang ThreadPoolExecutor
, kita harus menyebut beberapa ciri menarik. Sebagai contoh, anda tidak boleh menyerahkan tugas kepada a ThreadPoolExecutor
jika tiada ruang yang tersedia:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Kod ini akan ranap dengan ralat seperti ini:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Dalam erti kata lain, task
tidak boleh diserahkan, kerana SynchronousQueue
direka supaya ia sebenarnya terdiri daripada satu elemen dan tidak membenarkan kami memasukkan apa-apa lagi ke dalamnya. Kita dapat melihat bahawa kita mempunyai sifar queued tasks
("tugas beratur = 0") di sini. Tetapi tidak ada yang pelik tentang ini, kerana ini adalah ciri khas SynchronousQueue
, yang sebenarnya adalah baris gilir 1 elemen yang sentiasa kosong! Apabila satu utas meletakkan elemen dalam baris gilir, ia akan menunggu sehingga satu benang lain mengambil elemen itu daripada baris gilir. Sehubungan itu, kita boleh menggantikannya dengan new LinkedBlockingQueue<>(1)
dan ralat akan ditukar kepada show queued tasks = 1
. Oleh kerana baris gilir hanya 1 elemen, kita tidak boleh menambah elemen kedua. Dan itulah yang menyebabkan program gagal. Meneruskan perbincangan kami tentang baris gilir, perlu diperhatikan bahawaThreadPoolExecutor
kelas mempunyai kaedah tambahan untuk melayan baris gilir. Sebagai contoh, threadPoolExecutor.purge()
kaedah akan mengalih keluar semua tugas yang dibatalkan daripada baris gilir untuk mengosongkan ruang dalam baris gilir. Satu lagi fungsi berkaitan baris gilir yang menarik ialah pengendali untuk tugas yang ditolak:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Dalam contoh ini, pengendali kami hanya memaparkan Rejected
setiap kali tugas dalam baris gilir ditolak. Mudah, bukan? Di samping itu, ThreadPoolExecutor
mempunyai subkelas yang menarik: ScheduledThreadPoolExecutor
, yang merupakan ScheduledExecutorService
. Ia menyediakan keupayaan untuk melaksanakan tugas berdasarkan pemasa.
ScheduledExecutorService
ScheduledExecutorService
(yang merupakan jenis ExecutorService
) membolehkan kami menjalankan tugas mengikut jadual. Mari lihat contoh:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Semuanya mudah di sini. Tugasan diserahkan dan kemudian kami mendapat java.util.concurrent.ScheduledFuture
. Jadual juga boleh membantu dalam situasi berikut:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Di sini kami menyerahkan Runnable
tugas untuk pelaksanaan pada frekuensi tetap ("Kadar Tetap") dengan kelewatan awal tertentu. Dalam kes ini, selepas 1 saat, tugasan akan mula dilaksanakan setiap 2 saat. Terdapat pilihan yang serupa:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Tetapi dalam kes ini, tugasan dilakukan dengan selang waktu tertentu ANTARA setiap pelaksanaan. Iaitu, task
akan dilaksanakan selepas 1 saat. Kemudian, sebaik sahaja ia selesai, 2 saat akan berlalu, dan kemudian tugas baharu akan dimulakan. Berikut ialah beberapa sumber tambahan mengenai topik ini:
- Pengenalan kepada kumpulan benang di Jawa
- Pengenalan kepada Kolam Benang di Jawa
- Java Multithreading Steeplechase: Membatalkan Tugas Dalam Pelaksana
- Menggunakan Java Executors untuk Tugas Latar Belakang

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Sebagai tambahan kepada kumpulan benang di atas, terdapat satu lagi. Sejujurnya kami boleh mengatakan bahawa ia adalah sedikit istimewa. Ia dipanggil kolam curi kerja. Ringkasnya, curi kerja ialah algoritma di mana utas terbiar mula mengambil tugasan daripada utas lain atau tugasan daripada baris gilir yang dikongsi. Mari lihat contoh:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Jika kita menjalankan kod ini, maka ia ExecutorService
akan membuat 5 utas untuk kita, kerana setiap utas akan dimasukkan ke dalam baris gilir menunggu untuk objek kunci. Kami sudah mengetahui monitor dan kunci dalam Better together: Java dan kelas Thread. Bahagian II — Penyegerakan . Sekarang mari kita ganti Executors.newCachedThreadPool()
dengan Executors.newWorkStealingPool()
. Apa yang akan berubah? Kami akan melihat bahawa tugas kami dilaksanakan pada kurang daripada 5 urutan. Ingat bahawa CachedThreadPool
mencipta benang untuk setiap tugas? Itu kerana wait()
telah menyekat urutan, tugasan berikutnya ingin diselesaikan dan urutan baharu telah dibuat untuk mereka dalam kumpulan. Dengan kolam mencuri, benang tidak terbiar selama-lamanya. Mereka mula melaksanakan tugas jiran mereka. Apakah yang membezakannya WorkStealingPool
daripada kumpulan benang lain? Hakikat bahawa ajaibForkJoinPool
tinggal di dalamnya:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Sebenarnya, ada satu lagi perbezaan. Secara lalai, utas yang dibuat untuk a ForkJoinPool
adalah utas daemon, tidak seperti utas yang dibuat melalui onrdinary ThreadPool
. Secara umum, anda harus ingat benang daemon, kerana, sebagai contoh, CompletableFuture
juga menggunakan benang daemon melainkan anda menentukan sendiri ThreadFactory
yang mencipta benang bukan daemon. Ini adalah kejutan yang mungkin mengintai di tempat yang tidak dijangka! :)
ForkJoinPool
Dalam bahagian ini, kita akan bercakap sekali lagi tentangForkJoinPool
(juga dipanggil rangka kerja fork/join), yang hidup "di bawah tudung" WorkStealingPool
. Secara umum, rangka kerja fork/join muncul semula di Java 1.7. Dan walaupun Java 11 hampir dekat, ia masih patut diingati. Ini bukan pelaksanaan yang paling biasa, tetapi ia agak menarik. Terdapat ulasan yang baik tentang perkara ini di web: Memahami Rangka Kerja Java Fork-Join dengan Contoh . Bergantung ForkJoinPool
kepada java.util.concurrent.RecursiveTask
. Ada juga java.util.concurrent.RecursiveAction
. RecursiveAction
tidak mengembalikan hasil. Oleh itu, RecursiveTask
adalah serupa dengan Callable
, dan RecursiveAction
serupa dengan unnable
. Kita dapat melihat bahawa nama itu termasuk nama dua kaedah penting: fork
dan join
. Thefork
kaedah memulakan beberapa tugas secara tidak segerak pada benang yang berasingan. Dan join
kaedah ini membolehkan anda menunggu kerja selesai. Untuk mendapatkan pemahaman yang terbaik, anda harus membaca Daripada Pengaturcaraan Imperatif ke Fork/Join ke Aliran Selari di Java 8 .
Ringkasan
Nah, itu menutup bahagian ulasan ini. Kami telah mengetahui bahawaExecutor
pada asalnya dicipta untuk melaksanakan urutan. Kemudian pencipta Java memutuskan untuk meneruskan idea dan menghasilkan ExecutorService
. ExecutorService
membolehkan kami menyerahkan tugasan untuk pelaksanaan menggunakan submit()
dan invoke()
, dan juga menutup perkhidmatan. Kerana ExecutorService
memerlukan pelaksanaan, mereka menulis kelas dengan kaedah kilang dan memanggilnya Executors
. Ia membolehkan anda membuat kumpulan benang ( ThreadPoolExecutor
). Selain itu, terdapat kumpulan benang yang turut membenarkan kami menentukan jadual pelaksanaan. Dan seorang ForkJoinPool
bersembunyi di sebalik sebuah WorkStealingPool
. Saya harap anda mendapati apa yang saya tulis di atas bukan sahaja menarik, tetapi juga boleh difahami :) Saya sentiasa gembira mendengar cadangan dan komen anda. Lebih baik bersama: Java dan kelas Thread. Bahagian I — Benang pelaksanaan Lebih baik bersama: Java dan kelas Benang. Bahagian II — Penyegerakan Lebih baik bersama: Java dan kelas Thread. Bahagian III — Interaksi Lebih Baik bersama: Java dan kelas Thread. Bahagian IV — Boleh Dipanggil, Masa Depan dan rakan Lebih baik bersama: Java dan kelas Thread. Bahagian VI - Jauhkan api!
GO TO FULL VERSION