Alamin natin ang bagong paraan ngWorkStealingPool , na naghahanda ng ExecutorService para sa atin.

Espesyal ang thread pool na ito. Ang pag-uugali nito ay batay sa ideya ng "pagnanakaw" ng trabaho.

Ang mga gawain ay nakapila at ipinamamahagi sa mga processor. Ngunit kung ang isang processor ay abala, kung gayon ang isa pang libreng processor ay maaaring magnakaw ng isang gawain mula dito at isagawa ito. Ang format na ito ay ipinakilala sa Java upang mabawasan ang mga salungatan sa mga multi-threaded na application. Ito ay binuo sa fork/join framework.

tinidor / sumali

Sa fork/join framework, ang mga gawain ay nabubulok nang paulit-ulit, ibig sabihin, hinahati-hati sila sa mga subtask. Pagkatapos ang mga subtasks ay isasagawa nang paisa-isa, at ang mga resulta ng mga subtasks ay pinagsama upang mabuo ang resulta ng orihinal na gawain.

Ang paraan ng fork ay nagsisimula ng isang gawain nang asynchronous sa ilang thread, at hinahayaan ka ng paraan ng pagsali na maghintay para matapos ang gawaing ito.

bagongWorkStealingPool

Ang bagongWorkStealingPool na paraan ay may dalawang pagpapatupad:

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Mula sa simula, tandaan namin na sa ilalim ng hood hindi namin tinatawag ang ThreadPoolExecutor constructor. Dito kami ay nakikipagtulungan sa ForkJoinPool entity. Tulad ng ThreadPoolExecutor , isa itong pagpapatupad ng AbstractExecutorService .

Mayroon kaming 2 paraan na mapagpipilian. Sa una, tayo mismo ang nagpapahiwatig kung anong antas ng paralelismo ang gusto nating makita. Kung hindi namin tinukoy ang halagang ito, ang parallelism ng aming pool ay magiging katumbas ng bilang ng mga processor core na magagamit sa Java virtual machine.

Ito ay nananatiling malaman kung paano ito gumagana sa pagsasanay:

Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);

        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

Gumagawa kami ng 10 gawain na nagpapakita ng sarili nilang katayuan sa pagkumpleto. Pagkatapos nito, inilunsad namin ang lahat ng mga gawain gamit ang invokeAll na paraan.

Mga resulta kapag nagsasagawa ng 10 gawain sa 10 thread sa pool:

Naprosesong kahilingan ng user #9 sa ForkJoinPool-1-worker-10 thread
Naprosesong kahilingan ng user #4 sa ForkJoinPool-1-worker-5 thread
Naprosesong kahilingan ng user #7 sa ForkJoinPool-1-worker-8 thread
Naprosesong kahilingan ng user #1 sa ForkJoinPool- 1-worker-2 thread
Naprosesong kahilingan ng user #2 sa ForkJoinPool-1-worker-3 thread
Naprosesong kahilingan ng user #3 sa ForkJoinPool-1-worker-4 thread
Naprosesong kahilingan ng user #6 sa ForkJoinPool-1-worker-7 thread
Naprosesong user kahilingan #0 sa ForkJoinPool-1-worker-1 thread
Naprosesong kahilingan ng user #5 sa ForkJoinPool-1-worker-6 thread
Naprosesong kahilingan ng user #8 sa ForkJoinPool-1-worker-9 thread

Nakita namin na pagkatapos mabuo ang pila, ang mga thread ay nagsasagawa ng mga gawain para sa pagpapatupad. Maaari mo ring tingnan kung paano ipapamahagi ang 20 gawain sa isang pool ng 10 thread.

Naprosesong kahilingan ng user #3 sa ForkJoinPool-1-worker-4 thread
Naprosesong kahilingan ng user #7 sa ForkJoinPool-1-worker-8 thread
Naprosesong kahilingan ng user #2 sa ForkJoinPool-1-worker-3 thread
Naprosesong kahilingan ng user #4 sa ForkJoinPool- 1-worker-5 thread
Naprosesong kahilingan ng user #1 sa ForkJoinPool-1-worker-2 thread
Naprosesong kahilingan ng user #5 sa ForkJoinPool-1-worker-6 thread
Naprosesong kahilingan ng user #8 sa ForkJoinPool-1-worker-9 thread
Naprosesong user kahilingan #9 sa ForkJoinPool-1-worker-10 thread
Naprosesong kahilingan ng user #0 sa ForkJoinPool-1-worker-1 thread
Naprosesong kahilingan ng user #6 sa ForkJoinPool-1-worker-7 thread
Naprosesong kahilingan ng user #10 sa ForkJoinPool-1- trabahador-9 thread
Naprosesong kahilingan ng user #12 sa ForkJoinPool-1-worker-1 thread
Naprosesong kahilingan ng user #13 sa ForkJoinPool-1-worker-8 thread
Naprosesong kahilingan ng user #11 sa ForkJoinPool-1-worker-6 thread
Naprosesong kahilingan ng user #15 sa ForkJoinPool- 1-worker-8 thread
Naprosesong kahilingan ng user #14 sa ForkJoinPool-1-worker-1 thread
Naprosesong kahilingan ng user #17 sa ForkJoinPool-1-worker-6 thread
Naprosesong kahilingan ng user #16 sa ForkJoinPool-1-worker-7 thread
Naprosesong user kahilingan #19 sa ForkJoinPool-1-worker-6 thread
Naprosesong kahilingan ng user #18 sa ForkJoinPool-1-worker-1 thread

Mula sa output, makikita natin na ang ilang mga thread ay namamahala upang makumpleto ang ilang mga gawain ( ForkJoinPool-1-worker-6 nakumpleto ang 4 na gawain), habang ang ilan ay kumpletuhin lamang ang isa ( ForkJoinPool-1-worker-2 ). Kung idinagdag ang 1 segundong pagkaantala sa pagpapatupad ng paraan ng pagtawag , magbabago ang larawan.

Callable<Void> callable = () -> {
   System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

Para sa kapakanan ng eksperimento, patakbuhin natin ang parehong code sa isa pang makina. Ang resultang output:

Naprosesong kahilingan ng user #2 sa ForkJoinPool-1-worker-23 thread
Naprosesong kahilingan ng user #7 sa ForkJoinPool-1-worker-31 thread
Naprosesong kahilingan ng user #4 sa ForkJoinPool-1-worker-27 thread
Naprosesong kahilingan ng user #5 sa ForkJoinPool- 1-worker-13 thread
Naprosesong kahilingan ng user #0 sa ForkJoinPool-1-worker-19 thread
Naprosesong kahilingan ng user #8 sa ForkJoinPool-1-worker-3 thread
Naprosesong kahilingan ng user #9 sa ForkJoinPool-1-worker-21 thread
Naprosesong user kahilingan #6 sa ForkJoinPool-1-worker-17 thread
Naprosesong kahilingan ng user #3 sa ForkJoinPool-1-worker-9 thread
Naprosesong kahilingan ng user #1 sa ForkJoinPool-1-worker-5 thread
Naprosesong kahilingan ng user #12 sa ForkJoinPool-1- trabahador-23 thread
Naprosesong kahilingan ng user #15 sa ForkJoinPool-1-worker-19 thread
Naprosesong kahilingan ng user #14 sa ForkJoinPool-1-worker-27 thread
Naprosesong kahilingan ng user #11 sa ForkJoinPool-1-worker-3 thread
Naprosesong kahilingan ng user #13 sa ForkJoinPool- 1-worker-13 thread
Naprosesong kahilingan ng user #10 sa ForkJoinPool-1-worker-31 thread
Naprosesong kahilingan ng user #18 sa ForkJoinPool-1-worker-5 thread
Naprosesong kahilingan ng user #16 sa ForkJoinPool-1-worker-9 thread
Naprosesong user kahilingan #17 sa ForkJoinPool-1-worker-21 thread
Naprosesong kahilingan ng user #19 sa ForkJoinPool-1-worker-17 thread

Sa output na ito, kapansin-pansin na "hiniling" namin ang mga thread sa pool. Higit pa rito, ang mga pangalan ng mga thread ng manggagawa ay hindi napupunta mula isa hanggang sampu, ngunit minsan ay mas mataas sa sampu. Kung titingnan ang mga natatanging pangalan, makikita natin na mayroon talagang sampung manggagawa (3, 5, 9, 13, 17, 19, 21, 23, 27 at 31). Dito ay lubos na makatwirang itanong kung bakit ito nangyari? Sa tuwing hindi mo naiintindihan kung ano ang nangyayari, gamitin ang debugger.

Ito ang gagawin natin. I-cast natin angexecutorServicetumutol sa isang ForkJoinPool :

final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

Gagamitin namin ang pagkilos na Evaluate Expression upang suriin ang object na ito pagkatapos tawagan ang invokeAll na paraan. Upang gawin ito, pagkatapos ng invokeAll na paraan, magdagdag ng anumang pahayag, tulad ng walang laman na sout, at magtakda ng breakpoint dito.

Nakikita natin na ang pool ay may 10 na mga thread, ngunit ang laki ng hanay ng mga thread ng manggagawa ay 32. Kakaiba, ngunit okay. Ipagpatuloy natin ang paghuhukay. Kapag gumagawa ng pool, subukan nating itakda ang antas ng parallelism sa higit sa 32, sabihin nating 40.

ExecutorService executorService = Executors.newWorkStealingPool(40);

Sa debugger, tingnan natin angforkJoinPool object muli.

Ngayon ang laki ng hanay ng mga thread ng manggagawa ay 128. Maaari nating ipagpalagay na ito ay isa sa mga panloob na pag-optimize ng JVM. Subukan nating hanapin ito sa code ng JDK (openjdk-14):

Gaya ng hinala namin: ang laki ng hanay ng mga thread ng manggagawa ay kinakalkula sa pamamagitan ng pagsasagawa ng mga bitwise na manipulasyon sa parallelism value. Hindi natin kailangang subukang alamin kung ano ang eksaktong nangyayari dito. Ito ay sapat na upang malaman lamang na ang gayong pag-optimize ay umiiral.

Ang isa pang kawili-wiling aspeto ng aming halimbawa ay ang paggamit ng invokeAll na paraan. Kapansin-pansin na ang pamamaraang invokeAll ay maaaring magbalik ng resulta, o sa halip ay isang listahan ng mga resulta (sa aming kaso, isang List<Future<Void>>) , kung saan mahahanap namin ang resulta ng bawat isa sa mga gawain.

var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Process the task's result
        }

Ang espesyal na uri ng serbisyo at thread pool na ito ay maaaring gamitin sa mga gawaing may predictable, o hindi bababa sa implicit, na antas ng concurrency.