CodeGym/Blog Java/rawak/Lebih baik bersama: Java dan kelas Thread. Bahagian V — P...
John Squirrels
Tahap
San Francisco

Lebih baik bersama: Java dan kelas Thread. Bahagian V — Pelaksana, ThreadPool, Fork/Join

Diterbitkan dalam kumpulan

pengenalan

Jadi, kita tahu bahawa Java mempunyai benang. Anda boleh membaca tentang itu dalam ulasan bertajuk Better together: Java and the Thread class. Bahagian I — Benang pelaksanaan . Lebih baik bersama: Java dan kelas Thread.  Bahagian V — Pelaksana, ThreadPool, Fork/Join - 1Mari kita lihat lagi kod biasa:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Seperti yang anda lihat, kod untuk memulakan tugasan adalah agak tipikal, tetapi kita perlu mengulanginya untuk tugasan baharu. Satu penyelesaian ialah meletakkannya dalam kaedah yang berasingan, cth execute(Runnable runnable). Tetapi pencipta Java telah mempertimbangkan nasib kami dan menghasilkan antara Executormuka:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Kod ini jelas lebih ringkas: kini kita hanya menulis kod untuk memulakan pada Runnablebenang. Itu hebat, bukan? Tetapi ini hanya permulaan: Lebih baik bersama: Java dan kelas Thread.  Bahagian V — Pelaksana, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Seperti yang anda lihat, Executorantara muka mempunyai ExecutorServicesubantara muka. Javadoc untuk antara muka ini mengatakan bahawa an ExecutorServicemenerangkan perkara tertentu Executoryang menyediakan kaedah untuk menutup fail Executor. Ia juga memungkinkan untuk mendapatkan java.util.concurrent.Futureuntuk menjejaki proses pelaksanaan. Sebelum ini, dalam Better together: Java dan kelas Thread. Bahagian IV — Boleh Dipanggil, Masa Depan dan rakan , kami menyemak secara ringkas keupayaan Future. Jika anda terlupa atau tidak pernah membacanya, saya cadangkan anda menyegarkan ingatan anda ;) Apa lagi yang Javadoc katakan? Ia memberitahu kami bahawa kami mempunyai java.util.concurrent.Executorskilang khas yang membolehkan kami membuat pelaksanaan lalai ExecutorService.

Perkhidmatan Pelaksana

Jom tinjau. Kita perlu Executormelaksanakan (iaitu untuk memanggil execute()) tugas tertentu pada utas, dan kod yang mencipta utas disembunyikan daripada kami. Kami ada ExecutorService— khusus Executoryang mempunyai beberapa pilihan untuk mengawal kemajuan. Dan kami mempunyai Executorskilang yang membolehkan kami mencipta ExecutorService. Sekarang mari kita lakukan sendiri:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Anda dapat melihat bahawa kami menetapkan kumpulan benang tetap yang saiznya ialah 2. Kemudian kami menyerahkan tugasan kepada kumpulan satu demi satu. Setiap tugasan mengembalikan a Stringyang mengandungi nama utas ( currentThread().GetName()). Adalah penting untuk menutupnya ExecutorServicepada penghujungnya, kerana jika tidak, program kami tidak akan tamat. Kilang Executorsmempunyai kaedah kilang tambahan. Sebagai contoh, kita boleh mencipta kumpulan yang terdiri daripada hanya satu utas ( newSingleThreadExecutor) atau kumpulan yang termasuk cache ( newCachedThreadPool) yang mana utas dialih keluar selepas ia melahu selama 1 minit. Pada hakikatnya, ini ExecutorServicedisokong oleh baris gilir menyekat , di mana tugas diletakkan dan dari mana tugas dilaksanakan. Maklumat lanjut tentang menyekat baris gilir boleh didapati dalam video ini . Anda juga boleh membaca iniulasan tentang BlockingQueue . Dan lihat jawapan kepada soalan "Bila lebih suka LinkedBlockingQueue berbanding ArrayBlockingQueue?" Dalam istilah yang paling mudah, BlockingQueuesekatan benang dalam dua kes:
  • benang cuba mendapatkan item daripada baris gilir kosong
  • benang cuba meletakkan item ke dalam baris gilir penuh
Jika kita melihat kepada pelaksanaan kaedah kilang, kita boleh melihat bagaimana ia berfungsi. Sebagai contoh:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
atau
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Seperti yang dapat kita lihat, pelaksanaan ExecutorServicedibuat di dalam kaedah kilang. Dan untuk sebahagian besar, kita bercakap tentang ThreadPoolExecutor. Hanya parameter yang mempengaruhi kerja diubah. Lebih baik bersama: Java dan kelas Thread.  Bahagian V — Pelaksana, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/Fail:Thread_pool.svg

ThreadPoolExecutor

Seperti yang kita lihat sebelum ini, ThreadPoolExecutoradalah apa yang biasanya dibuat di dalam kaedah kilang. Kefungsian dipengaruhi oleh hujah yang kami luluskan sebagai bilangan maksimum dan minimum utas, serta jenis baris gilir yang digunakan. Tetapi sebarang pelaksanaan antara java.util.concurrent.BlockingQueuemuka boleh digunakan. Bercakap tentang ThreadPoolExecutor, kita harus menyebut beberapa ciri menarik. Sebagai contoh, anda tidak boleh menyerahkan tugas kepada a ThreadPoolExecutorjika tiada ruang yang tersedia:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Kod ini akan ranap dengan ralat seperti ini:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Dalam erti kata lain, tasktidak boleh diserahkan, kerana SynchronousQueuedireka supaya ia sebenarnya terdiri daripada satu elemen dan tidak membenarkan kami memasukkan apa-apa lagi ke dalamnya. Kita dapat melihat bahawa kita mempunyai sifar queued tasks("tugas beratur = 0") di sini. Tetapi tidak ada yang pelik tentang ini, kerana ini adalah ciri khas SynchronousQueue, yang sebenarnya adalah baris gilir 1 elemen yang sentiasa kosong! Apabila satu utas meletakkan elemen dalam baris gilir, ia akan menunggu sehingga satu benang lain mengambil elemen itu daripada baris gilir. Sehubungan itu, kita boleh menggantikannya dengan new LinkedBlockingQueue<>(1)dan ralat akan ditukar kepada show queued tasks = 1. Oleh kerana baris gilir hanya 1 elemen, kita tidak boleh menambah elemen kedua. Dan itulah yang menyebabkan program gagal. Meneruskan perbincangan kami tentang baris gilir, perlu diperhatikan bahawaThreadPoolExecutorkelas mempunyai kaedah tambahan untuk melayan baris gilir. Sebagai contoh, threadPoolExecutor.purge()kaedah akan mengalih keluar semua tugas yang dibatalkan daripada baris gilir untuk mengosongkan ruang dalam baris gilir. Satu lagi fungsi berkaitan baris gilir yang menarik ialah pengendali untuk tugas yang ditolak:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Dalam contoh ini, pengendali kami hanya memaparkan Rejectedsetiap kali tugas dalam baris gilir ditolak. Mudah, bukan? Di samping itu, ThreadPoolExecutormempunyai subkelas yang menarik: ScheduledThreadPoolExecutor, yang merupakan ScheduledExecutorService. Ia menyediakan keupayaan untuk melaksanakan tugas berdasarkan pemasa.

ScheduledExecutorService

ScheduledExecutorService(yang merupakan jenis ExecutorService) membolehkan kami menjalankan tugas mengikut jadual. Mari lihat contoh:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Semuanya mudah di sini. Tugasan diserahkan dan kemudian kami mendapat java.util.concurrent.ScheduledFuture. Jadual juga boleh membantu dalam situasi berikut:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Di sini kami menyerahkan Runnabletugas untuk pelaksanaan pada frekuensi tetap ("Kadar Tetap") dengan kelewatan awal tertentu. Dalam kes ini, selepas 1 saat, tugasan akan mula dilaksanakan setiap 2 saat. Terdapat pilihan yang serupa:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Tetapi dalam kes ini, tugasan dilakukan dengan selang waktu tertentu ANTARA setiap pelaksanaan. Iaitu, taskakan dilaksanakan selepas 1 saat. Kemudian, sebaik sahaja ia selesai, 2 saat akan berlalu, dan kemudian tugas baharu akan dimulakan. Berikut ialah beberapa sumber tambahan mengenai topik ini: Lebih baik bersama: Java dan kelas Thread.  Bahagian V — Pelaksana, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Sebagai tambahan kepada kumpulan benang di atas, terdapat satu lagi. Sejujurnya kami boleh mengatakan bahawa ia adalah sedikit istimewa. Ia dipanggil kolam curi kerja. Ringkasnya, curi kerja ialah algoritma di mana utas terbiar mula mengambil tugasan daripada utas lain atau tugasan daripada baris gilir yang dikongsi. Mari lihat contoh:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Jika kita menjalankan kod ini, maka ia ExecutorServiceakan membuat 5 utas untuk kita, kerana setiap utas akan dimasukkan ke dalam baris gilir menunggu untuk objek kunci. Kami sudah mengetahui monitor dan kunci dalam Better together: Java dan kelas Thread. Bahagian II — Penyegerakan . Sekarang mari kita ganti Executors.newCachedThreadPool()dengan Executors.newWorkStealingPool(). Apa yang akan berubah? Kami akan melihat bahawa tugas kami dilaksanakan pada kurang daripada 5 urutan. Ingat bahawa CachedThreadPoolmencipta benang untuk setiap tugas? Itu kerana wait()telah menyekat urutan, tugasan berikutnya ingin diselesaikan dan urutan baharu telah dibuat untuk mereka dalam kumpulan. Dengan kolam mencuri, benang tidak terbiar selama-lamanya. Mereka mula melaksanakan tugas jiran mereka. Apakah yang membezakannya WorkStealingPooldaripada kumpulan benang lain? Hakikat bahawa ajaibForkJoinPooltinggal di dalamnya:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Sebenarnya, ada satu lagi perbezaan. Secara lalai, utas yang dibuat untuk a ForkJoinPooladalah utas daemon, tidak seperti utas yang dibuat melalui onrdinary ThreadPool. Secara umum, anda harus ingat benang daemon, kerana, sebagai contoh, CompletableFuturejuga menggunakan benang daemon melainkan anda menentukan sendiri ThreadFactoryyang mencipta benang bukan daemon. Ini adalah kejutan yang mungkin mengintai di tempat yang tidak dijangka! :)

ForkJoinPool

Dalam bahagian ini, kita akan bercakap sekali lagi tentang ForkJoinPool(juga dipanggil rangka kerja fork/join), yang hidup "di bawah tudung" WorkStealingPool. Secara umum, rangka kerja fork/join muncul semula di Java 1.7. Dan walaupun Java 11 hampir dekat, ia masih patut diingati. Ini bukan pelaksanaan yang paling biasa, tetapi ia agak menarik. Terdapat ulasan yang baik tentang perkara ini di web: Memahami Rangka Kerja Java Fork-Join dengan Contoh . Bergantung ForkJoinPoolkepada java.util.concurrent.RecursiveTask. Ada juga java.util.concurrent.RecursiveAction. RecursiveActiontidak mengembalikan hasil. Oleh itu, RecursiveTaskadalah serupa dengan Callable, dan RecursiveActionserupa dengan unnable. Kita dapat melihat bahawa nama itu termasuk nama dua kaedah penting: forkdan join. Theforkkaedah memulakan beberapa tugas secara tidak segerak pada benang yang berasingan. Dan joinkaedah ini membolehkan anda menunggu kerja selesai. Untuk mendapatkan pemahaman yang terbaik, anda harus membaca Daripada Pengaturcaraan Imperatif ke Fork/Join ke Aliran Selari di Java 8 .

Ringkasan

Nah, itu menutup bahagian ulasan ini. Kami telah mengetahui bahawa Executorpada asalnya dicipta untuk melaksanakan urutan. Kemudian pencipta Java memutuskan untuk meneruskan idea dan menghasilkan ExecutorService. ExecutorServicemembolehkan kami menyerahkan tugasan untuk pelaksanaan menggunakan submit()dan invoke(), dan juga menutup perkhidmatan. Kerana ExecutorServicememerlukan pelaksanaan, mereka menulis kelas dengan kaedah kilang dan memanggilnya Executors. Ia membolehkan anda membuat kumpulan benang ( ThreadPoolExecutor). Selain itu, terdapat kumpulan benang yang turut membenarkan kami menentukan jadual pelaksanaan. Dan seorang ForkJoinPoolbersembunyi di sebalik sebuah WorkStealingPool. Saya harap anda mendapati apa yang saya tulis di atas bukan sahaja menarik, tetapi juga boleh difahami :) Saya sentiasa gembira mendengar cadangan dan komen anda. Lebih baik bersama: Java dan kelas Thread. Bahagian I — Benang pelaksanaan Lebih baik bersama: Java dan kelas Benang. Bahagian II — Penyegerakan Lebih baik bersama: Java dan kelas Thread. Bahagian III — Interaksi Lebih Baik bersama: Java dan kelas Thread. Bahagian IV — Boleh Dipanggil, Masa Depan dan rakan Lebih baik bersama: Java dan kelas Thread. Bahagian VI - Jauhkan api!
Komen
  • Popular
  • Baru
  • Tua
Anda mesti log masuk untuk meninggalkan ulasan
Halaman ini tidak mempunyai sebarang ulasan lagi