KodeGym/Blog Jawa/Acak/Luwih apik bebarengan: Jawa lan kelas Utas. Bagian V - Pe...
John Squirrels
tingkat
San Francisco

Luwih apik bebarengan: Jawa lan kelas Utas. Bagian V - Pelaksana, ThreadPool, Fork/Join

Diterbitake ing grup

Pambuka

Dadi, kita ngerti yen Jawa duwe benang. Sampeyan bisa maca babagan kasebut ing review kanthi irah-irahan Better together: Java and the Thread class. Bagian I - Utas eksekusi . Luwih apik bebarengan: Jawa lan kelas Utas.  Bagian V - Pelaksana, ThreadPool, Fork/Gabung - 1Ayo goleki maneh kode khas:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Nalika sampeyan bisa ndeleng, kode kanggo miwiti tugas cukup khas, nanging kita kudu mbaleni kanggo tugas anyar. Salah siji solusi kanggo sijine iku ing cara kapisah, contone execute(Runnable runnable). Nanging pangripta Jawa wis nganggep kahanan kita lan nggawe antarmuka Executor:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Kode iki cetha luwih ringkes: saiki kita mung nulis kode kanggo miwiti ing Runnablethread. Apik banget, ta? Nanging iki mung wiwitan: Luwih apik bebarengan: Jawa lan kelas Utas.  Bagian V - Pelaksana, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Nalika sampeyan bisa ndeleng, Executorantarmuka duwe ExecutorServicesubinterface. Javadoc kanggo antarmuka iki ngandhakake yen an ExecutorServicenjlèntrèhaké khusus Executorsing nyedhiyakake cara kanggo mateni file Executor. Iku uga ndadekake iku bisa kanggo njaluk java.util.concurrent.Futuresupaya trek proses eksekusi. Sadurunge, ing Better together: Java and the Thread class. Part IV — Callable, Future, lan kanca-kanca , kita sedhela dideleng kemampuan saka Future. Yen sampeyan kelalen utawa ora tau maca, Aku suggest sing refresh memori;) Apa maneh Javadoc ngandika? Iku ngandhani yen kita duwe java.util.concurrent.Executorspabrik khusus sing ngidini kita nggawe implementasi standar saka ExecutorService.

PelaksanaLayanan

Ayo dideleng. Kita kudu Executornglakokake (yaiku nelpon execute()) tugas tartamtu ing thread, lan kode sing nggawe thread didhelikake saka kita. Kita duwe ExecutorService- khusus Executorsing duwe sawetara opsi kanggo ngontrol kemajuan. Lan kita duwe Executorspabrik sing ngidini kita nggawe ExecutorService. Saiki ayo nindakake dhewe:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Sampeyan bisa ndeleng manawa kita nemtokake blumbang benang tetep sing ukurane 2. Banjur kita ngirim tugas menyang blumbang siji-siji. Saben tugas ngasilake a Stringngemot jeneng utas ( currentThread().GetName()). Penting kanggo mateni ExecutorServiceing pungkasan, amarga yen program kita ora bakal mungkasi. Pabrik Executorsduwe cara pabrik tambahan. Contone, kita bisa nggawe blumbang sing kasusun saka mung siji utas ( newSingleThreadExecutor) utawa blumbang sing kalebu cache ( newCachedThreadPool) saka Utas dibusak sawise padha nganggur kanggo 1 menit. Ing kasunyatan, iki ExecutorServicedidhukung dening antrian pamblokiran , menyang ngendi tugas diselehake lan saka ngendi tugas dieksekusi. Informasi liyane babagan mblokir antrian bisa ditemokake ing video iki . Sampeyan uga bisa maca ikireview babagan BlockingQueue . Lan priksa jawaban kanggo pitakonan "Nalika luwih seneng LinkedBlockingQueue tinimbang ArrayBlockingQueue?" Ing istilah sing paling gampang, BlockingQueuemblokir thread ing rong kasus:
  • thread nyoba kanggo njaluk item saka antrian kosong
  • thread nyoba kanggo sijine item menyang antrian lengkap
Yen kita ndeleng implementasine metode pabrik, kita bisa ndeleng cara kerjane. Tuladhane:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
utawa
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Kaya sing kita deleng, implementasine ExecutorServicedigawe ing metode pabrik. Lan umume, kita ngomong babagan ThreadPoolExecutor. Mung paramèter sing mengaruhi karya diganti. Luwih apik bebarengan: Jawa lan kelas Utas.  Bagian V - Pelaksana, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Kaya sing wis dingerteni sadurunge, ThreadPoolExecutorbiasane digawe ing metode pabrik. Fungsi kasebut kena pengaruh argumen sing kita lewati minangka jumlah utas maksimal lan minimal, uga jinis antrian sing digunakake. Nanging apa wae implementasine antarmuka java.util.concurrent.BlockingQueuebisa digunakake. Ngomong babagan ThreadPoolExecutor, kita kudu nyebutake sawetara fitur sing menarik. Contone, sampeyan ora bisa ngirim tugas menyang ThreadPoolExecutoryen ora ana papan sing kasedhiya:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Kode iki bakal nabrak kanthi kesalahan kaya iki:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Ing tembung liyane, taskora bisa diajukake, amarga SynchronousQueuewis dirancang supaya bener kasusun saka siji unsur lan ora ngidini kita kanggo sijine liyane menyang. Kita bisa ndeleng manawa ana nol queued tasks("tugas antrian = 0") ing kene. Nanging ora ana sing aneh, amarga iki minangka fitur khusus SynchronousQueue, sing sejatine minangka antrian 1 unsur sing tansah kosong! Nalika siji thread nempatno unsur ing antrian, iku bakal ngenteni nganti thread liyane njupuk unsur saka antrian. Patut, kita bisa ngganti karo new LinkedBlockingQueue<>(1)lan kesalahan bakal diganti saiki nuduhake queued tasks = 1. Amarga antrian mung 1 unsur, kita ora bisa nambah unsur liya. Lan sing nyebabake program gagal. Terus diskusi kita antrian, iku worth kang lagi nyimak singThreadPoolExecutorkelas wis cara tambahan kanggo layanan antrian. Contone, threadPoolExecutor.purge()cara kasebut bakal mbusak kabeh tugas sing dibatalake saka antrian kanggo mbebasake spasi ing antrian. Fungsi liyane sing gegandhengan karo antrian yaiku panangan kanggo tugas sing ditolak:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Ing conto iki, pawang kita mung nampilake Rejectedsaben tugas ing antrian ditolak. Trep, ta? Kajaba iku, ThreadPoolExecutorwis subclass menarik: ScheduledThreadPoolExecutor, kang a ScheduledExecutorService. Iki menehi kemampuan kanggo nindakake tugas adhedhasar timer.

ScheduledExecutorService

ScheduledExecutorService(kang jinis ExecutorService) ngijini kita mbukak tugas ing jadwal. Ayo katon ing conto:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Kabeh iku prasaja ing kene. Tugas dikirim banjur entuk java.util.concurrent.ScheduledFuture. Jadwal uga bisa mbantu ing kahanan ing ngisor iki:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Ing kene kita ngirim Runnabletugas kanggo eksekusi kanthi frekuensi tetep ("FixedRate") kanthi wektu tundha awal. Ing kasus iki, sawise 1 detik, tugas bakal diwiwiti saben 2 detik. Ana pilihan sing padha:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Nanging ing kasus iki, tugas ditindakake kanthi interval tartamtu ANTARA saben eksekusi. Yaiku, taskbakal dieksekusi sawise 1 detik. Banjur, yen wis rampung, 2 detik bakal liwati, banjur tugas anyar bakal diwiwiti. Mangkene sawetara sumber tambahan babagan topik iki: Luwih apik bebarengan: Jawa lan kelas Utas.  Bagian V — Pelaksana, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Saliyane pools thread ndhuwur, ana siji liyane. Kita bisa ngomong kanthi jujur ​​yen iki rada khusus. Iki diarani kolam nyolong karya. Singkatipun, nyolong karya minangka algoritma ing ngendi benang nganggur miwiti njupuk tugas saka utas utawa tugas liyane saka antrian sing dienggo bareng. Ayo katon ing conto:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Yen kita mbukak kode iki, banjur ExecutorServicebakal nggawe 5 thread kanggo kita, amarga saben thread bakal dilebokake ing antrian Enteni kanggo obyek kunci. Kita wis ngerti monitor lan kunci ing Better together: Java lan kelas Thread. Bagean II - Sinkronisasi . Saiki ayo diganti Executors.newCachedThreadPool()karo Executors.newWorkStealingPool(). Apa sing bakal diganti? Kita bakal weruh yen tugas kita dieksekusi ing kurang saka 5 utas. Elinga yen CachedThreadPoolnggawe thread kanggo saben tugas? Iku amarga wait()diblokir thread, tugas sakteruse pengin rampung, lan thread anyar digawe kanggo wong-wong mau ing blumbang. Kanthi kolam nyolong, benang ora mandheg ing salawas-lawase. Dheweke wiwit nindakake tugas tanggane. Apa sing ndadekake WorkStealingPoolbeda saka pools thread liyane? Kasunyatan sing gaibForkJoinPoolmanggon ing njero:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Bener, ana siji liyane bedane. Kanthi gawan, utas sing digawe kanggo a ForkJoinPoolyaiku benang daemon, ora kaya benang sing digawe liwat ThreadPool. Umumé, sampeyan kudu ngelingi benang daemon, amarga, contone, CompletableFutureuga nggunakake benang daemon kajaba sampeyan nemtokake dhewe ThreadFactorysing nggawe benang non-daemon. Iki minangka kejutan sing bisa kedadeyan ing papan sing ora dikarepake! :)

ForkJoinPool

Ing bagean iki, kita bakal maneh pirembagan bab ForkJoinPool(uga disebut garpu / framework gabung), kang urip "ing hood" saka WorkStealingPool. Umumé, framework fork/join muncul maneh ing Java 1.7. Lan sanajan Jawa 11 wis cedhak, nanging isih kudu dieling-eling. Iki dudu implementasine sing paling umum, nanging cukup menarik. Ana review apik babagan iki ing web: Understanding Java Fork-Join Framework with Conto . The ForkJoinPoolgumantung ing java.util.concurrent.RecursiveTask. Ana uga java.util.concurrent.RecursiveAction. RecursiveActionora ngasilake asil. Mangkono, RecursiveTaskpadha karo Callable, lan RecursiveActionpadha karo unnable. Kita bisa ndeleng manawa jeneng kasebut kalebu jeneng rong cara penting: forklan join. Ingforkcara miwiti sawetara tugas asynchronously ing thread kapisah. Lan joincara kasebut ngidini sampeyan ngenteni karya rampung. Kanggo entuk pangerten sing paling apik, sampeyan kudu maca Saka Imperative Programming to Fork/Join to Parallel Streams in Java 8 .

Ringkesan

Inggih, sing mbungkus bagean review iki. Kita wis sinau sing Executorasline diciptakake kanggo nglakokake benang. Banjur pangripta Jawa mutusake kanggo nerusake gagasan lan nggawe ExecutorService. ExecutorServicengidini kita ngirim tugas kanggo eksekusi nggunakake submit()lan invoke(), lan uga mateni layanan. Amarga ExecutorServicembutuhake implementasine, dheweke nulis kelas kanthi metode pabrik lan diarani Executors. Ngidini sampeyan nggawe pools thread ( ThreadPoolExecutor). Kajaba iku, ana pools thread sing uga ngidini kita nemtokake jadwal eksekusi. Lan ForkJoinPoolndhelikake konco a WorkStealingPool. Muga-muga sampeyan nemokake apa sing daktulis ing ndhuwur ora mung menarik, nanging uga bisa dingerteni :) Aku tansah seneng krungu saran lan komentar sampeyan. Luwih apik bebarengan: Jawa lan kelas Utas. Bagian I - Utas eksekusi Luwih apik bebarengan: Jawa lan kelas Utas. Part II - Sinkronisasi Luwih apik bebarengan: Jawa lan kelas Utas. Bagean III - Interaksi Better bebarengan: Jawa lan kelas Utas. Part IV - Callable, Future, lan kanca-kanca Luwih apik bebarengan: Jawa lan kelas Utas. Bab VI - Mbusak!
Komentar
  • Popular
  • Anyar
  • lawas
Sampeyan kudu mlebu kanggo ninggalake komentar
Kaca iki durung duwe komentar