Bizim için bir ExecutorService hazırlayan newWorkStealingPool yöntemini bulalım .

Bu iş parçacığı havuzu özeldir. Davranışı, işi "çalmak" fikrine dayanmaktadır.

Görevler sıraya alınır ve işlemciler arasında dağıtılır. Ancak bir işlemci meşgulse, başka bir ücretsiz işlemci ondan bir görevi çalabilir ve yürütebilir. Bu biçim, çok iş parçacıklı uygulamalardaki çakışmaları azaltmak için Java'da tanıtıldı. Çatal/birleştirme çerçevesi üzerine inşa edilmiştir .

çatal/birleştirme

Çatal/birleştirme çerçevesinde , görevler yinelemeli olarak ayrıştırılır, yani alt görevlere bölünürler. Daha sonra alt görevler ayrı ayrı yürütülür ve alt görevlerin sonuçları orijinal görevin sonucunu oluşturmak için birleştirilir.

Çatal yöntemi, bir iş parçacığında eşzamansız olarak bir görev başlatır ve birleştirme yöntemi , bu görevin bitmesini beklemenizi sağlar.

yeniİşÇalmaHavuzu

newWorkStealingPool yönteminin iki uygulaması vardır:

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

En başından beri, başlığın altında ThreadPoolExecutor yapıcısını çağırmadığımıza dikkat ediyoruz . Burada ForkJoinPool varlığı ile çalışıyoruz . ThreadPoolExecutor gibi , bir AbstractExecutorService uygulamasıdır .

Seçebileceğimiz 2 yöntemimiz var. İlkinde, hangi düzeyde paralellik görmek istediğimizi kendimiz belirtiyoruz. Bu değeri belirtmezsek, havuzumuzun paralelliği, Java sanal makinesinin kullanabileceği işlemci çekirdeği sayısına eşit olacaktır.

Pratikte nasıl çalıştığını anlamaya devam ediyor:

Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);

        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

Kendi tamamlanma durumlarını gösteren 10 görev oluşturuyoruz. Daha sonra invokeAll metodunu kullanarak tüm görevleri başlatıyoruz .

Havuzdaki 10 iş parçacığında 10 görev yürütülürken elde edilen sonuçlar:

ForkJoinPool-1-worker-10 iş parçacığında
işlenen kullanıcı isteği #9 ForkJoinPool-1-worker-5 iş parçacığında
işlenen kullanıcı isteği #4 ForkJoinPool-1-worker-8 iş parçacığında işlenen kullanıcı isteği #7
ForkJoinPool- üzerinde işlenen kullanıcı isteği #1 1-worker-2 iş parçacığı
ForkJoinPool-1-worker-3 iş parçacığında
işlenen kullanıcı isteği #2 ForkJoinPool-1-worker-4 iş parçacığında
işlenen kullanıcı isteği #3 ForkJoinPool-1-worker-7 iş parçacığında işlenen kullanıcı isteği #6
İşlenen kullanıcı ForkJoinPool-1-worker-1 iş parçacığında istek #0
ForkJoinPool-1-worker-6 iş parçacığında işlenen kullanıcı isteği #5
ForkJoinPool-1-işçi-9 iş parçacığında işlenen kullanıcı isteği #8

Sıra oluşturulduktan sonra iş parçacıklarının yürütme için görevler aldığını görüyoruz. Ayrıca 10 iş parçacığından oluşan bir havuzda 20 görevin nasıl dağıtılacağını da kontrol edebilirsiniz.

ForkJoinPool-1-worker-4 iş parçacığında işlenen kullanıcı isteği #3
ForkJoinPool-1-worker-8 iş parçacığında
işlenen kullanıcı isteği #7 ForkJoinPool-1-worker-3 iş parçacığında
işlenen kullanıcı isteği #2 ForkJoinPool- üzerinde işlenen kullanıcı isteği #4 1-worker-5 iş parçacığı
ForkJoinPool-1-worker-2 iş parçacığında
işlenen kullanıcı isteği #1 ForkJoinPool-1-worker-6 iş parçacığında
işlenen kullanıcı isteği #5 ForkJoinPool-1-worker-9 iş parçacığında işlenen kullanıcı isteği #8
İşlenen kullanıcı ForkJoinPool-1-worker-10 iş parçacığında istek #9
ForkJoinPool-1-worker-1 iş parçacığında işlenen kullanıcı isteği #0
ForkJoinPool-1-worker-7 iş parçacığında işlenen kullanıcı isteği #6
ForkJoinPool-1- üzerinde İşlenen kullanıcı isteği #10 işçi-9 iş parçacığı
ForkJoinPool-1-worker-1 iş parçacığında
işlenen kullanıcı isteği #12 ForkJoinPool-1-worker-8 iş parçacığında
işlenen kullanıcı isteği #13 ForkJoinPool-1-worker-6 iş parçacığında
işlenen kullanıcı isteği #11 ForkJoinPool- üzerinde işlenen kullanıcı isteği #15 1-worker-8 iş parçacığı
ForkJoinPool-1-worker-1 iş parçacığında
işlenen kullanıcı isteği #14 ForkJoinPool-1-worker-6 iş parçacığında
işlenen kullanıcı isteği #17 ForkJoinPool-1-worker-7 iş parçacığında işlenen kullanıcı isteği #16
İşlenen kullanıcı ForkJoinPool-1-worker-6 dizisindeki istek #19
ForkJoinPool-1-worker-1 dizisindeki işlenen kullanıcı isteği #18

Çıktıdan, bazı iş parçacıklarının birkaç görevi tamamlamayı başardığını görebiliriz ( ForkJoinPool-1-worker-6 4 görevi tamamladı), bazılarının ise yalnızca birini tamamladığını ( ForkJoinPool-1-worker-2 ). Çağrı yönteminin uygulanmasına 1 saniyelik bir gecikme eklenirse resim değişir.

Callable<Void> callable = () -> {
   System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

Deneme amacıyla aynı kodu başka bir makinede çalıştıralım. Ortaya çıkan çıktı:

ForkJoinPool-1-worker-23 iş parçacığında işlenen kullanıcı isteği #2
ForkJoinPool-1-worker-31 iş parçacığında
işlenen kullanıcı isteği #7 ForkJoinPool-1-worker-27 iş parçacığında
işlenen kullanıcı isteği #4 ForkJoinPool- üzerinde işlenen kullanıcı isteği #5 1-worker-13 iş parçacığı
ForkJoinPool-1-worker-19 iş parçacığında
işlenen kullanıcı isteği #0 ForkJoinPool-1-worker-3 iş parçacığında
işlenen kullanıcı isteği #8 ForkJoinPool-1-worker-21 iş parçacığında işlenen kullanıcı isteği #9
İşlenen kullanıcı ForkJoinPool-1-worker-17 iş parçacığında istek #6
ForkJoinPool-1-worker-9 iş parçacığında işlenen kullanıcı isteği #3
ForkJoinPool-1-worker-5 iş parçacığında
işlenen kullanıcı isteği #1 işçi-23 iş parçacığı
ForkJoinPool-1-worker-19 iş parçacığında işlenen kullanıcı isteği
#15 ForkJoinPool-1-worker-27 iş parçacığında
işlenen kullanıcı isteği #14 ForkJoinPool-1-worker-3 iş parçacığında
işlenen kullanıcı isteği #11 ForkJoinPool'da işlenen kullanıcı isteği #13- 1-worker-13 iş parçacığı
ForkJoinPool-1-worker-31 iş parçacığında
işlenen kullanıcı isteği #10 ForkJoinPool-1-worker-5 iş parçacığında
işlenen kullanıcı isteği #18 ForkJoinPool-1-worker-9 iş parçacığında işlenen kullanıcı isteği #16
İşlenen kullanıcı ForkJoinPool-1-worker-21 dizisindeki istek #17
ForkJoinPool-1-worker-17 dizisindeki işlenen kullanıcı isteği #19

Bu çıktıda, havuzdaki konuları "sorduğumuz" dikkat çekicidir. Dahası, çalışan iş parçacıklarının adları birden ona gitmez, bunun yerine bazen ondan daha yüksektir. Benzersiz isimlere baktığımızda gerçekten on işçi olduğunu görüyoruz (3, 5, 9, 13, 17, 19, 21, 23, 27 ve 31). Burada bunun neden olduğunu sormak oldukça mantıklı? Neler olduğunu anlamadığınızda, hata ayıklayıcıyı kullanın.

Yapacağımız şey bu. hadi atalımyürütücüHizmetbir ForkJoinPool'a nesne :

final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

invokeAll yöntemini çağırdıktan sonra bu nesneyi incelemek için İfadeyi Değerlendir eylemini kullanacağız . Bunu yapmak için, invokeAll yönteminden sonra boş bir sout gibi herhangi bir ifade ekleyin ve üzerinde bir kesme noktası ayarlayın.

Havuzun 10 iş parçacığı olduğunu görebiliriz, ancak işçi iş parçacığı dizisinin boyutu 32'dir. Garip, ama tamam. Kazmaya devam edelim. Havuz oluştururken paralellik seviyesini 32 den fazla diyelim 40 yapmaya çalışalım.

ExecutorService executorService = Executors.newWorkStealingPool(40);

Hata ayıklayıcıda şuna bakalım:tekrar forkJoinPool nesnesi.

Artık çalışan iş parçacığı dizisinin boyutu 128'dir. Bunun JVM'nin dahili optimizasyonlarından biri olduğunu varsayabiliriz. JDK (openjdk-14) kodunda bulmaya çalışalım:

Tam da şüphelendiğimiz gibi: çalışan iş parçacığı dizisinin boyutu, paralellik değeri üzerinde bit düzeyinde işlemler gerçekleştirilerek hesaplanır. Burada tam olarak ne olduğunu anlamaya çalışmamıza gerek yok. Böyle bir optimizasyonun var olduğunu bilmek yeterlidir.

Örneğimizin bir başka ilginç yönü de invokeAll yönteminin kullanılmasıdır . invokeAll yönteminin , görevlerin her birinin sonucunu bulabileceğimiz bir sonuç veya daha doğrusu bir sonuç listesi (bizim durumumuzda bir List<Future<Void>>) döndürebileceğini belirtmekte fayda var .

var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Process the task's result
        }

Bu özel tür hizmet ve iş parçacığı havuzu, öngörülebilir veya en azından örtük bir eşzamanlılık düzeyine sahip görevlerde kullanılabilir.