CodeGym /Java Blog /Acak /Lebih baik bersama: Java dan kelas Thread. Bagian V — Pel...
John Squirrels
Level 41
San Francisco

Lebih baik bersama: Java dan kelas Thread. Bagian V — Pelaksana, ThreadPool, Fork/Bergabung

Dipublikasikan di grup Acak

Perkenalan

Jadi, kita tahu bahwa Java memiliki utas. Anda dapat membacanya di ulasan berjudul Lebih baik bersama: Java dan kelas Thread. Bagian I — Utas eksekusi . Lebih baik bersama: Java dan kelas Thread.  Bagian V — Pelaksana, ThreadPool, Fork/Bergabung - 1Mari kita lihat lagi kode tipikal:

public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Seperti yang Anda lihat, kode untuk memulai tugas cukup umum, tetapi kita harus mengulanginya untuk tugas baru. Salah satu solusinya adalah dengan memasukkannya ke dalam metode terpisah, misalnya execute(Runnable runnable). Tapi pencipta Java telah mempertimbangkan keadaan buruk kami dan muncul dengan antarmuka Executor:

public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Kode ini jelas lebih ringkas: sekarang kita cukup menulis kode untuk memulai di Runnableutas. Itu bagus, bukan? Tapi ini baru permulaan: Lebih baik bersama: Java dan kelas Thread.  Bagian V — Pelaksana, ThreadPool, Fork/Bergabung - 2

https://docs.Oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Seperti yang Anda lihat, Executorantarmuka memiliki ExecutorServicesubantarmuka. Javadoc untuk antarmuka ini mengatakan bahwa an ExecutorServicemenjelaskan tertentu Executoryang menyediakan metode untuk mematikan Executor. Itu juga memungkinkan untuk mendapatkan java.util.concurrent.Futureuntuk melacak proses eksekusi. Sebelumnya, di Lebih baik bersama: Java dan kelas Thread. Bagian IV — Callable, Future, dan teman-teman , kami meninjau secara singkat kemampuan Future. Jika Anda lupa atau tidak pernah membacanya, saya sarankan Anda menyegarkan ingatan Anda ;) Apa lagi yang dikatakan Javadoc? Ini memberi tahu kita bahwa kita memiliki java.util.concurrent.Executorspabrik khusus yang memungkinkan kita membuat implementasi default dari ExecutorService.

Layanan Pelaksana

Mari kita tinjau. Kita harus Executormenjalankan (yaitu untuk memanggil execute()) tugas tertentu pada utas, dan kode yang membuat utas disembunyikan dari kita. Kami memiliki ExecutorService— spesifik Executoryang memiliki beberapa opsi untuk mengontrol kemajuan. Dan kami memiliki Executorspabrik yang memungkinkan kami membuat file ExecutorService. Sekarang mari kita lakukan sendiri:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Anda dapat melihat bahwa kami menentukan kumpulan utas tetap yang ukurannya 2. Kemudian kami mengirimkan tugas ke kumpulan satu per satu. Setiap tugas mengembalikan a Stringyang berisi nama utas ( currentThread().GetName()). Sangat penting untuk mematikannya ExecutorServicedi bagian paling akhir, karena jika tidak, program kita tidak akan berakhir. Pabrik Executorsmemiliki metode pabrik tambahan. Misalnya, kita dapat membuat kumpulan yang hanya terdiri dari satu utas ( newSingleThreadExecutor) atau kumpulan yang menyertakan cache ( newCachedThreadPool) dari mana utas dihapus setelah tidak digunakan selama 1 menit. Pada kenyataannya, ini ExecutorServicedidukung oleh antrian pemblokiran , tempat tugas ditempatkan dan dari mana tugas dijalankan. Informasi lebih lanjut tentang pemblokiran antrean dapat ditemukan di video ini . Anda juga bisa membaca iniulasan tentang BlockingQueue . Dan lihat jawaban untuk pertanyaan "Kapan memilih LinkedBlockingQueue daripada ArrayBlockingQueue?" Dalam istilah paling sederhana, sebuah BlockingQueueutas diblokir dalam dua kasus:
  • utas mencoba mendapatkan item dari antrean kosong
  • utas mencoba memasukkan item ke antrean penuh
Jika kita melihat penerapan metode pabrik, kita dapat melihat cara kerjanya. Misalnya:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
atau

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Seperti yang bisa kita lihat, implementasi dari ExecutorServicedibuat di dalam metode pabrik. Dan sebagian besar, kita berbicara tentang ThreadPoolExecutor. Hanya parameter yang mempengaruhi pekerjaan yang diubah. Lebih baik bersama: Java dan kelas Thread.  Bagian V — Pelaksana, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Seperti yang kita lihat sebelumnya, ThreadPoolExecutorinilah yang biasanya dibuat di dalam metode pabrik. Fungsionalitas dipengaruhi oleh argumen yang kami berikan sebagai jumlah utas maksimum dan minimum, serta jenis antrean yang digunakan. Tetapi implementasi antarmuka apa pun java.util.concurrent.BlockingQueuedapat digunakan. Omong-omong ThreadPoolExecutor, kami harus menyebutkan beberapa fitur menarik. Misalnya, Anda tidak dapat mengirimkan tugas ke a ThreadPoolExecutorjika tidak ada ruang yang tersedia:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Kode ini akan macet dengan kesalahan seperti ini:

Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Dengan kata lain, tasktidak dapat dikirimkan, karena SynchronousQueuedirancang sedemikian rupa sehingga benar-benar terdiri dari satu elemen dan tidak memungkinkan kita untuk menambahkan apa pun ke dalamnya. Kita dapat melihat bahwa kita memiliki nol queued tasks("tugas antri = 0") di sini. Tapi tidak ada yang aneh dengan ini, karena ini adalah fitur khusus dari SynchronousQueue, yang notabene adalah antrian 1 elemen yang selalu kosong! Ketika satu utas menempatkan elemen dalam antrean, ia akan menunggu hingga utas lainnya mengambil elemen dari antrean. Oleh karena itu, kita dapat menggantinya dengan new LinkedBlockingQueue<>(1)dan error akan berubah menjadi now show queued tasks = 1. Karena antriannya hanya 1 elemen, kita tidak bisa menambahkan elemen kedua. Dan itulah yang menyebabkan program gagal. Melanjutkan diskusi kita tentang antrian, perlu dicatat bahwaThreadPoolExecutorclass memiliki metode tambahan untuk melayani antrian. Misalnya, threadPoolExecutor.purge()metode ini akan menghapus semua tugas yang dibatalkan dari antrean untuk mengosongkan ruang antrean. Fungsi lain yang berhubungan dengan antrean yang menarik adalah penangan untuk tugas yang ditolak:

public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Dalam contoh ini, penangan kami hanya menampilkan Rejectedsetiap kali tugas dalam antrean ditolak. Nyaman, bukan? Selain itu, ThreadPoolExecutormemiliki subkelas yang menarik: ScheduledThreadPoolExecutor, yaitu ScheduledExecutorService. Ini memberikan kemampuan untuk melakukan tugas berdasarkan timer.

ScheduledExecutorService

ScheduledExecutorService(yang merupakan jenis ExecutorService) memungkinkan kita menjalankan tugas sesuai jadwal. Mari kita lihat sebuah contoh:

public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Semuanya sederhana di sini. Tugas diserahkan dan kemudian kami mendapatkan file java.util.concurrent.ScheduledFuture. Jadwal juga dapat membantu dalam situasi berikut:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Di sini kami mengirimkan Runnabletugas untuk dieksekusi pada frekuensi tetap ("FixedRate") dengan penundaan awal tertentu. Dalam hal ini, setelah 1 detik, tugas akan mulai dijalankan setiap 2 detik. Ada opsi serupa:

scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Namun dalam kasus ini, tugas dilakukan dengan interval tertentu ANTARA setiap eksekusi. Artinya, taskakan dieksekusi setelah 1 detik. Kemudian, segera setelah selesai, 2 detik akan berlalu, dan tugas baru akan dimulai. Berikut adalah beberapa sumber daya tambahan tentang topik ini: Lebih baik bersama: Java dan kelas Thread.  Bagian V — Pelaksana, ThreadPool, Fork/Bergabung - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Selain kumpulan utas di atas, ada satu lagi. Kami dapat dengan jujur ​​​​mengatakan bahwa itu sedikit istimewa. Ini disebut kumpulan pencuri pekerjaan. Singkatnya, work-stealing adalah algoritme di mana utas yang menganggur mulai mengambil tugas dari utas lain atau tugas dari antrean bersama. Mari kita lihat sebuah contoh:

public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Jika kita menjalankan kode ini, maka ExecutorServiceakan dibuatkan 5 thread untuk kita, karena setiap thread akan dimasukkan ke antrian tunggu untuk objek lock. Kami sudah menemukan monitor dan mengunci Lebih baik bersama: Java dan kelas Thread. Bagian II — Sinkronisasi . Sekarang mari kita ganti Executors.newCachedThreadPool()dengan Executors.newWorkStealingPool(). Apa yang akan berubah? Kita akan melihat bahwa tugas kita dijalankan di kurang dari 5 utas. Ingat bahwa CachedThreadPoolmembuat utas untuk setiap tugas? Itu karena wait()utas diblokir, tugas selanjutnya ingin diselesaikan, dan utas baru dibuat untuk mereka di kumpulan. Dengan kumpulan pencuri, utas tidak diam selamanya. Mereka mulai melakukan tugas tetangga mereka. Apa yang membuatnya WorkStealingPoolsangat berbeda dari kumpulan utas lainnya? Fakta bahwa magisForkJoinPoolhidup di dalamnya:

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Sebenarnya, ada satu perbedaan lagi. Secara default, utas yang dibuat untuk a ForkJoinPooladalah utas daemon, tidak seperti utas yang dibuat melalui onrdinary ThreadPool. Secara umum, Anda harus mengingat utas daemon, karena, misalnya, CompletableFutureutas daemon juga digunakan kecuali Anda menentukan utas Anda sendiri ThreadFactoryyang membuat utas non-daemon. Inilah kejutan-kejutan yang mungkin mengintai di tempat-tempat tak terduga! :)

ForkJoinPool

Pada bagian ini, kita akan berbicara lagi tentang ForkJoinPool(juga disebut fork/join framework), yang hidup "di bawah tenda" dari WorkStealingPool. Secara umum, framework fork/join muncul kembali di Java 1.7. Dan meskipun Java 11 sudah dekat, masih perlu diingat. Ini bukan implementasi yang paling umum, tetapi cukup menarik. Ada ulasan bagus tentang ini di web: Memahami Java Fork-Join Framework dengan Contoh . Itu ForkJoinPoolbergantung pada java.util.concurrent.RecursiveTask. Ada juga java.util.concurrent.RecursiveAction. RecursiveActiontidak mengembalikan hasil. Jadi, RecursiveTaskmirip dengan Callable, dan RecursiveActionmirip dengan unnable. Kita dapat melihat bahwa nama tersebut menyertakan nama dari dua metode penting: forkdan join. Ituforkmetode memulai beberapa tugas secara asinkron pada utas terpisah. Dan joinmetode ini memungkinkan Anda menunggu pekerjaan selesai. Untuk mendapatkan pemahaman terbaik, Anda harus membaca From Imperative Programming to Fork/Join to Parallel Streams in Java 8 .

Ringkasan

Nah, itu mengakhiri bagian ulasan ini. Kami telah mempelajari bahwa Executorawalnya diciptakan untuk mengeksekusi utas. Kemudian pencipta Java memutuskan untuk melanjutkan ide tersebut dan menghasilkan ExecutorService. ExecutorServicemari kita kirimkan tugas untuk dieksekusi menggunakan submit()and invoke(), dan juga mematikan layanan. Karena ExecutorServicemembutuhkan implementasi, mereka menulis kelas dengan metode pabrik dan menyebutnya Executors. Ini memungkinkan Anda membuat kumpulan utas ( ThreadPoolExecutor). Selain itu, ada kumpulan utas yang juga memungkinkan kami menentukan jadwal eksekusi. Dan a ForkJoinPoolbersembunyi di balik a WorkStealingPool. Saya harap Anda menemukan apa yang saya tulis di atas tidak hanya menarik, tetapi juga dapat dimengerti :) Saya selalu senang mendengar saran dan komentar Anda. Lebih baik bersama: Java dan kelas Thread. Bagian I — Utas eksekusi Lebih baik bersama: Java dan kelas Utas. Bagian II — Sinkronisasi Lebih baik bersama: Java dan kelas Thread. Bagian III — Interaksi Bersama yang lebih baik: Java dan kelas Thread. Bagian IV — Callable, Future, dan teman Lebih baik bersama: Java dan kelas Thread. Bagian VI — Tembak!
Komentar
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION