CodeGym/Java Blogu/Rastgele/Birlikte daha iyi: Java ve Thread sınıfı. Bölüm V — Yürüt...
John Squirrels
Seviye
San Francisco

Birlikte daha iyi: Java ve Thread sınıfı. Bölüm V — Yürütücü, ThreadPool, Fork/Join

grupta yayınlandı

giriiş

Yani, Java'nın iş parçacıklarına sahip olduğunu biliyoruz. Bunun hakkında Daha İyi Birlikte: Java ve İplik sınıfı başlıklı incelemede okuyabilirsiniz . Bölüm I - Yürütme konuları . Birlikte daha iyi: Java ve Thread sınıfı.  Bölüm V — Yürütücü, ThreadPool, Fork/Join - 1Tipik koda bir kez daha göz atalım:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Gördüğünüz gibi, bir göreve başlama kodu oldukça tipik, ancak yeni görev için onu tekrarlamamız gerekiyor. Çözümlerden biri, onu ayrı bir yönteme koymaktır, örneğin execute(Runnable runnable). Ancak Java'nın yaratıcıları içinde bulunduğumuz kötü durumu değerlendirdi ve şu arayüzü buldu Executor:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
RunnableBu kod açıkça daha özlü: şimdi sadece iş parçacığını başlatmak için kod yazıyoruz . Bu harika, değil mi? Ama bu sadece başlangıç: Birlikte daha iyi: Java ve Thread sınıfı.  Bölüm V — Yürütücü, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Gördüğünüz gibi arayüzün Executorbir ExecutorServicealt arayüzü var. Bu arabirim için Javadoc, an'nin ExecutorService, . Ayrıca , yürütme sürecini izlemek için a almayı da mümkün kılar . Önceden, Birlikte Daha İyi'de: Java ve Thread sınıfı. Bölüm IV — Callable, Future ve friends'in yeteneklerini kısaca gözden geçirdik . Unuttuysanız veya hiç okumadıysanız, hafızanızı tazelemenizi öneririm ;) Javadoc başka ne diyor? . _ _ ExecutorExecutorjava.util.concurrent.FutureFuturejava.util.concurrent.ExecutorsExecutorService

Yürütme Hizmeti

Hadi gözden geçirelim. ExecutorBir iş parçacığı üzerinde belirli bir görevi yürütmek (yani çağırmak ) zorundayız execute()ve iş parçacığını oluşturan kod bizden gizleniyor. Elimizde - ilerlemeyi kontrol etmek için birkaç seçeneğe sahip olan ExecutorServicebir özelliğimiz var. Executor. Executors_ ExecutorService_ Şimdi kendimiz yapalım:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Gördüğünüz gibi boyutu 2 olan sabit bir thread pool belirledik. Ardından görevleri tek tek havuza gönderiyoruz. Her görev, Stringiş parçacığı adını ( currentThread().GetName()) içeren bir döndürür. En sonunda kapatmak önemlidir ExecutorService, çünkü aksi halde programımız bitmeyecektir. Fabrikanın Executorsek fabrika yöntemleri vardır. Örneğin tek bir thread'den oluşan bir havuz ( ) veya 1 dakika boşta kaldıktan sonra thread'lerin kaldırıldığı newSingleThreadExecutorbir önbellek ( ) içeren bir havuz oluşturabiliriz . newCachedThreadPoolGerçekte bunlar, görevlerin yerleştirildiği ve yürütüldüğü bir engelleme kuyruğuExecutorService tarafından desteklenir . Kuyrukları engelleme hakkında daha fazla bilgiyi bu videoda bulabilirsiniz . Bunu da okuyabilirsinizBlockingQueue hakkında inceleme . Ve "ArrayBlockingQueue yerine LinkedBlockingQueue ne zaman tercih edilmeli?" sorusunun yanıtına bakın. En basit ifadeyle, a BlockingQueuebir ileti dizisini iki durumda engeller:
  • iş parçacığı boş bir sıradan öğeler almaya çalışır
  • iş parçacığı, öğeleri tam bir kuyruğa koymaya çalışır
Fabrika yöntemlerinin uygulanmasına bakarsak, nasıl çalıştıklarını görebiliriz. Örneğin:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
veya
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Gördüğümüz gibi, uygulamaları ExecutorServicefabrika yöntemlerinin içinde oluşturulur. Ve çoğunlukla, hakkında konuşuyoruz ThreadPoolExecutor. Sadece çalışmayı etkileyen parametreler değiştirilir. Birlikte daha iyi: Java ve Thread sınıfı.  Bölüm V — Yürütücü, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolYürütücü

Daha önce gördüğümüz gibi, ThreadPoolExecutorgenellikle fabrika yöntemleri içinde yaratılan şeydir. İşlevsellik, maksimum ve minimum iş parçacığı sayısı olarak ilettiğimiz argümanların yanı sıra kullanılan sıra türünden etkilenir. Ancak arayüzün herhangi bir uygulaması java.util.concurrent.BlockingQueuekullanılabilir. Bahsetmişken ThreadPoolExecutor, bazı ilginç özelliklerden bahsetmeliyiz. ThreadPoolExecutorÖrneğin, boş alan yoksa a'ya görev gönderemezsiniz :
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Bu kod şöyle bir hatayla çökecek:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Yani taskgönderilemez, çünkü SynchronousQueueaslında tek bir elemandan oluşacak şekilde tasarlanmıştır ve içine daha fazla bir şey koymamıza izin vermez. Burada sıfır ("kuyruktaki görevler = 0") olduğunu görebiliriz queued tasks. Ancak bunda garip bir şey yok, çünkü bu, SynchronousQueueaslında her zaman boş olan 1 öğeli bir sıra olan , 'nin özel bir özelliğidir! Bir iş parçacığı sıraya bir öğe koyduğunda, başka bir iş parçacığı öğeyi sıradan alana kadar bekler. Buna göre, ile değiştirebiliriz new LinkedBlockingQueue<>(1)ve hata şimdi show olarak değişecektir queued tasks = 1. Kuyruk sadece 1 eleman olduğu için ikinci bir eleman ekleyemiyoruz. Programın başarısız olmasına neden olan da budur. Kuyruk tartışmamıza devam ederken,ThreadPoolExecutorsınıfın kuyruğa hizmet vermek için ek yöntemleri vardır. Örneğin, threadPoolExecutor.purge()yöntem, kuyrukta yer açmak için iptal edilen tüm görevleri sıradan kaldıracaktır. Kuyrukla ilgili başka bir ilginç işlev, reddedilen görevler için işleyicidir:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Bu örnekte işleyicimiz, Rejectedkuyruktaki bir görev her reddedildiğinde basitçe görüntüler. Uygun, değil mi? Ek olarak, ThreadPoolExecutorilginç bir alt sınıfı vardır ScheduledThreadPoolExecutor: ScheduledExecutorService. Bir zamanlayıcıya dayalı bir görevi gerçekleştirme yeteneği sağlar.

ScheduledExecutorService

ScheduledExecutorService(bir türdür ExecutorService), görevleri bir programa göre çalıştırmamıza izin verir. Bir örneğe bakalım:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Burada her şey basit. Görevler gönderilir ve ardından bir java.util.concurrent.ScheduledFuture. Bir program aşağıdaki durumlarda da yardımcı olabilir:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
RunnableBurada , belirli bir başlangıç ​​gecikmesi ile sabit bir frekansta ("FixedRate") yürütülmesi için bir görev gönderiyoruz . Bu durumda 1 saniye sonra görev her 2 saniyede bir yürütülmeye başlayacaktır. Benzer bir seçenek var:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Ancak bu durumda, görevler her yürütme ARASINDA belirli bir aralıkla gerçekleştirilir. Yani, task1 saniye sonra yürütülecektir. Ardından, tamamlanır tamamlanmaz 2 saniye geçecek ve ardından yeni bir göreve başlanacaktır. İşte bu konuyla ilgili bazı ek kaynaklar: Birlikte daha iyi: Java ve Thread sınıfı.  Bölüm V — Yürütücü, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

ÇalışmaÇalmaHavuzu

Yukarıdaki iş parçacığı havuzlarına ek olarak bir tane daha var. Dürüst olmak gerekirse biraz özel olduğunu söyleyebiliriz. Buna iş çalma havuzu deniyor. Kısacası, iş çalma, boştaki iş parçacıklarının diğer iş parçacıklarından veya paylaşılan bir kuyruktan görevler almaya başladığı bir algoritmadır. Bir örneğe bakalım:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Bu kodu çalıştırırsak, ExecutorServicebizim için 5 iş parçacığı oluşturacaktır, çünkü her iş parçacığı kilit nesnesi için bekleme kuyruğuna alınacaktır. Better'da monitörleri ve kilitleri zaten birlikte bulduk : Java ve Thread sınıfı. Kısım II — Senkronizasyon . Executors.newCachedThreadPool()Şimdi ile değiştirelim Executors.newWorkStealingPool(). Ne değişecek? Görevlerimizin 5'ten az iş parçacığında yürütüldüğünü göreceğiz. CachedThreadPoolHer görev için bir iş parçacığı oluşturduğunu hatırlıyor musunuz ? Bunun nedeni, wait()iş parçacığının engellenmesi, sonraki görevlerin tamamlanmak istemesi ve onlar için havuzda yeni iş parçacıklarının oluşturulmasıdır. Bir çalma havuzuyla, iş parçacıkları sonsuza kadar boşta kalmaz. Komşularının görevlerini yerine getirmeye başlarlar. WorkStealingPoola'yı diğer iş parçacığı havuzlarından bu kadar farklı kılan nedir ? Büyülü olduğu gerçeğiForkJoinPooliçinde yaşıyor:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Aslında bir fark daha var. Varsayılan olarak, a için oluşturulan iş parçacıkları ForkJoinPool, bir onrdinary aracılığıyla oluşturulan iş parçacıklarından farklı olarak arka plan programı iş parçacıklarıdır ThreadPool. Genel olarak, daemon iş parçacıklarını hatırlamanız gerekir, çünkü örneğin, arka plan programı olmayan iş parçacıklarını oluşturan CompletableFuturekendinizinkini belirtmediğiniz sürece daemon iş parçacıklarını da kullanır . ThreadFactoryİşte beklenmedik yerlerde pusuya yatabilecek sürprizler! :)

ForkJoinPool

Bu bölümde ForkJoinPool, WorkStealingPool. Genel olarak çatal/birleştirme çerçevesi Java 1.7'de ortaya çıktı. Ve Java 11 yakında olmasına rağmen yine de hatırlamaya değer. Bu en yaygın uygulama değil, ama oldukça ilginç. Web'de bununla ilgili iyi bir inceleme var: Java Fork-Join Çerçevesini Örneklerle Anlamak . dayanır . ForkJoinPool_ java.util.concurrent.RecursiveTaskAyrıca var java.util.concurrent.RecursiveAction. RecursiveActionbir sonuç döndürmez. Böylece, RecursiveTaskbenzerdir Callableve RecursiveActionbenzerdir unnable. Adın iki önemli yöntemin adını içerdiğini görebiliriz: forkve join. buforkyöntem, bazı görevleri eşzamansız olarak ayrı bir iş parçacığında başlatır. Ve joinyöntem, işin yapılmasını beklemenizi sağlar. En iyi anlayışı elde etmek için Java 8'de Zorunlu Programlamadan Çatal/Birleştirmeye Paralel Akışlara kadar okumalısınız .

Özet

Bu, incelemenin bu bölümünü tamamlıyor. ExecutorBaşlangıçta iş parçacıklarını yürütmek için icat edildiğini öğrendik . Sonra Java'nın yaratıcıları bu fikri sürdürmeye karar verdiler ve ExecutorService. görevleri yürütme için ve ExecutorServicekullanarak göndermemize ve ayrıca hizmeti kapatmamıza izin verir. Uygulamalara ihtiyaç duyduğu için , fabrika yöntemleriyle bir sınıf yazdılar ve buna . İş parçacığı havuzları ( ) oluşturmanıza izin verir . Ek olarak, bir yürütme programı belirtmemize izin veren iş parçacığı havuzları da vardır. Ve a, a'nın arkasına gizlenir . Umarım yukarıda yazdıklarımı sadece ilginç değil, aynı zamanda anlaşılır bulmuşsunuzdur :) Önerilerinizi ve yorumlarınızı duymaktan her zaman memnuniyet duyarım. submit()invoke()ExecutorServiceExecutorsThreadPoolExecutorForkJoinPoolWorkStealingPoolBirlikte daha iyi: Java ve Thread sınıfı. Bölüm I — Yürütme konuları Birlikte daha iyi: Java ve Thread sınıfı. Bölüm II — Eşitleme Birlikte daha iyi: Java ve Thread sınıfı. Bölüm III — Etkileşim Birlikte Daha İyi: Java ve Thread sınıfı. Bölüm IV — Callable, Future ve arkadaşlar Birlikte daha iyi: Java ve Thread sınıfı. Bölüm VI - Ateş edin!
Yorumlar
  • Popüler
  • Yeni
  • Eskimiş
Yorum bırakmak için giriş yapmalısınız
Bu sayfada henüz yorum yok