Ayo goleki metodeWorkStealingPool anyar , sing nyiapake ExecutorService kanggo kita.

Kolam thread iki khusus. Prilaku kasebut adhedhasar gagasan "nyolong" karya.

Tugas diantrekake lan disebarake ing antarane prosesor. Nanging yen prosesor sibuk, banjur liyane free prosesor bisa nyolong tugas saka lan nglakokaké. Format iki dikenalaké ing Jawa kanggo ngurangi konflik ing aplikasi multi-threaded. Dibangun ing kerangka garpu / gabung .

garpu / gabung

Ing kerangka garpu / gabung , tugas diurai kanthi rekursif, yaiku, dipecah dadi subtugas. Banjur subtugas dieksekusi kanthi individu, lan asil subtugas digabung dadi asil tugas asli.

Cara garpu miwiti tugas kanthi ora sinkron ing sawetara utas, lan metode gabung ngidini sampeyan ngenteni tugas iki rampung.

newWorkStealingPool

MetodeWorkStealingPool anyar duwe rong implementasine:


public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
 
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Saka awal, kita Wigati sing ing hood kita ora nelpon konstruktor ThreadPoolExecutor . Ing kene kita nggarap entitas ForkJoinPool . Kaya ThreadPoolExecutor , iku implementasine saka AbstractExecutorService .

Kita duwe 2 cara kanggo milih. Kaping pisanan, kita dhewe nuduhake tingkat paralelisme sing pengin kita deleng. Yen kita ora nemtokake nilai iki, paralelisme blumbang kita bakal padha karo jumlah inti prosesor sing kasedhiya kanggo mesin virtual Java.

Iku tetep kanggo nemtokake cara kerjane ing praktik:


Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);
 
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

Kita nggawe 10 tugas sing nampilake status rampung dhewe. Sawise iku, kita miwiti kabeh tugas nggunakake metode invokeAll .

Asil nalika nindakake 10 tugas ing 10 utas ing blumbang:

Panjaluk pangguna sing diproses #9 ing utas ForkJoinPool-1-worker-10
Panjaluk pangguna sing diproses #4 ing utas ForkJoinPool-1-worker-5
Panjaluk pangguna sing diproses #7 ing utas ForkJoinPool-1-worker-8 Panjaluk
pangguna sing diproses #1 ing ForkJoinPool- Utas 1-worker-2
Panjaluk pangguna sing wis diproses #2 ing Utas ForkJoinPool-1-worker-3 Utas
sing wis diproses Panjaluk pangguna #3 ing Utas ForkJoinPool-1-worker-4 Panjaluk
pangguna sing wis diproses #6 ing Utas ForkJoinPool-1-worker-7 Pangguna sing
wis diproses njaluk #0 ing utas ForkJoinPool-1-worker-1
Panjaluk pangguna sing diproses #5 ing utas ForkJoinPool-1-worker-6
Panjaluk pangguna sing diproses #8 ing utas ForkJoinPool-1-worker-9

Kita weruh yen sawise antrian dibentuk, benang njupuk tugas kanggo eksekusi. Sampeyan uga bisa mriksa carane 20 tugas bakal mbagekke ing blumbang 10 Utas.

Panjaluk pangguna sing diproses #3 ing utas ForkJoinPool-1-worker-4
Panjaluk pangguna sing diproses #7 ing utas ForkJoinPool-1-worker-8
Panjaluk pangguna sing diproses #2 ing utas ForkJoinPool-1-worker-3 Panjaluk
pangguna sing diproses #4 ing ForkJoinPool- Utas 1-worker-5
Panjaluk pangguna sing diproses #1 ing Utas ForkJoinPool-1-worker-2
Panjaluk pangguna sing diproses #5 ing Utas ForkJoinPool-1-worker-6 Panjaluk
pangguna sing wis diproses #8 ing Utas ForkJoinPool-1-worker-9 Pangguna
sing diproses panjalukan #9 ing utas ForkJoinPool-1-worker-10
Panjaluk pangguna sing diproses #0 ing utas ForkJoinPool-1-worker-1
Panjaluk pangguna sing diproses #6 ing utas ForkJoinPool-1-worker-7
Panjaluk pangguna sing diproses #10 ing ForkJoinPool-1- buruh-9 thread
Panjaluk pangguna sing diproses #12 ing utas ForkJoinPool-1-worker-1
Panjaluk pangguna sing diproses #13 ing utas ForkJoinPool-1-worker-8 Panjaluk
pangguna sing diproses #11 ing utas ForkJoinPool-1-worker-6 Panjaluk
pangguna sing diproses #15 ing ForkJoinPool- Utas 1-worker-8
Panjaluk pangguna sing diproses #14 ing Utas ForkJoinPool-1-worker-1
Panjaluk pangguna sing wis diproses #17 ing Utas ForkJoinPool-1-worker-6 Panjaluk
pangguna sing wis diproses #16 ing Utas ForkJoinPool-1-worker-7 Pangguna sing
diproses panjalukan #19 ing utas ForkJoinPool-1-worker-6
Panjaluk pangguna sing diproses #18 ing utas ForkJoinPool-1-worker-1

Saka output, kita bisa ndeleng manawa sawetara utas bisa ngrampungake sawetara tugas ( ForkJoinPool-1-worker-6 ngrampungake 4 tugas), dene sawetara mung ngrampungake siji ( ForkJoinPool-1-worker-2 ). Yen wektu tundha 1 detik ditambahake ing implementasine metode telpon , gambar kasebut diganti.


Callable<Void> callable = () -> {
   System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

Kanggo eksperimen, ayo mbukak kode sing padha ing mesin liyane. Output asil:

Panjaluk pangguna sing diproses #2 ing utas ForkJoinPool-1-worker-23
Panjaluk pangguna sing diproses #7 ing utas ForkJoinPool-1-worker-31 Panjaluk
pangguna sing diproses #4 ing utas ForkJoinPool-1-worker-27 Panjaluk
pangguna sing diproses #5 ing ForkJoinPool- Utas 1-worker-13
Panjaluk pangguna sing wis diproses #0 ing Utas ForkJoinPool-1-worker-19
Panjaluk pangguna sing wis diproses #8 ing Utas ForkJoinPool-1-worker-3 Panjaluk
pangguna sing wis diproses #9 ing Utas ForkJoinPool-1-worker-21 Pangguna sing
diproses panjalukan #6 ing utas ForkJoinPool-1-worker-17
Panjaluk pangguna sing diproses #3 ing utas ForkJoinPool-1-worker-9
Panjaluk pangguna sing diproses #1 ing utas ForkJoinPool-1-worker-5
Panjaluk pangguna sing diproses #12 ing ForkJoinPool-1- buruh-23 thread
Panjaluk pangguna sing diproses #15 ing utas ForkJoinPool-1-worker-19
Panjaluk pangguna sing diproses #14 ing utas ForkJoinPool-1-worker-27 Panjaluk
pangguna sing diproses #11 ing utas ForkJoinPool-1-worker-3 Panjaluk
pangguna sing diproses #13 ing ForkJoinPool- Utas 1-worker-13
Panjaluk pangguna sing wis diproses #10 ing Utas ForkJoinPool-1-worker-31
Panjaluk pangguna sing wis diproses #18 ing Utas ForkJoinPool-1-worker-5
Panjaluk pangguna sing wis diproses #16 ing Utas ForkJoinPool-1-worker-9 Pangguna sing
diproses panjalukan #17 ing utas ForkJoinPool-1-worker-21
Panjaluk pangguna sing diproses #19 ing utas ForkJoinPool-1-worker-17

Ing output iki, kacathet yen kita "njaluk" benang ing blumbang. Punapa malih ingkang nama sêrat-sêrat pegawe wau botên satunggal dumugi sadasa, malah kadhang kala langkung saking sadasa. Ndelok jeneng unik, kita weruh yen ana sepuluh buruh (3, 5, 9, 13, 17, 19, 21, 23, 27 lan 31). Kene iku cukup cukup kanggo takon apa iki kedaden? Yen sampeyan ora ngerti apa sing kedadeyan, gunakake debugger.

Iki sing bakal kita lakoni. Ayo dadi cast ingexecutorServiceobyek menyang ForkJoinPool :


final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

Kita bakal nggunakake tumindak Evaluate Expression kanggo mriksa obyek iki sawise nelpon cara invokeAll . Kanggo nindakake iki, sawise metode invokeAll , tambahake pratelan apa wae, kayata sout kosong, lan atur titik istirahat.

Kita bisa ndeleng sing blumbang wis 10 Utas, nanging ukuran Uploaded Utas buruh 32. Aneh, nanging oke. Ayo terus ndhudhuk. Nalika nggawe blumbang, ayo nyoba nyetel level paralelisme dadi luwih saka 32, ucapake 40.


ExecutorService executorService = Executors.newWorkStealingPool(40);

Ing debugger, ayo kang katon ingobyek forkJoinPool maneh.

Saiki ukuran Uploaded Utas buruh 128. Kita bisa nganggep manawa iki minangka salah sawijining optimasi internal JVM. Coba goleki ing kode JDK (openjdk-14):

Kaya sing diduga: ukuran susunan benang pekerja diitung kanthi nindakake manipulasi bitwise ing nilai paralelisme. Kita ora perlu nyoba ngerteni apa sing kedadeyan ing kene. Cukup ngerti yen optimasi kasebut ana.

Aspek liyane sing menarik saka conto kita yaiku nggunakake metode invokeAll . Wigati dicathet yen metode invokeAll bisa ngasilake asil, utawa malah dhaptar asil (ing kasus kita, List <Future<Void>>) , ing ngendi kita bisa nemokake asil saben tugas.


var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Process the task's result
        }

Kolam layanan lan benang khusus iki bisa digunakake ing tugas kanthi tingkat konkurensi sing bisa ditebak, utawa paling ora implisit.