CodeGym /Java Blog /यादृच्छिक /एकत्र चांगले: Java आणि थ्रेड वर्ग. भाग V — एक्झिक्युटर, थ...
John Squirrels
पातळी 41
San Francisco

एकत्र चांगले: Java आणि थ्रेड वर्ग. भाग V — एक्झिक्युटर, थ्रेडपूल, फोर्क/जॉइन

यादृच्छिक या ग्रुपमध्ये प्रकाशित केले

परिचय

तर, आम्हाला माहित आहे की Java मध्ये थ्रेड्स आहेत. त्याबद्दल तुम्ही Better together: Java and the Thread class या शीर्षकाच्या पुनरावलोकनात वाचू शकता . भाग I - अंमलबजावणीचे धागे . एकत्र चांगले: Java आणि थ्रेड वर्ग.  भाग V — एक्झिक्युटर, थ्रेडपूल, फोर्क/जॉईन - 1चला ठराविक कोडकडे आणखी एक नजर टाकूया:

public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
तुम्ही बघू शकता, एखादे कार्य सुरू करण्यासाठीचा कोड अगदी सामान्य आहे, परंतु आम्हाला नवीन कार्यासाठी त्याची पुनरावृत्ती करावी लागेल. एक उपाय म्हणजे ते वेगळ्या पद्धतीने ठेवणे, उदा execute(Runnable runnable). पण जावाच्या निर्मात्यांनी आमच्या दुर्दशेचा विचार केला आणि इंटरफेस तयार केला Executor:

public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Runnableहा कोड स्पष्टपणे अधिक संक्षिप्त आहे: आता आम्ही थ्रेडवर सुरू करण्यासाठी कोड लिहितो . ते छान आहे, नाही का? पण ही फक्त सुरुवात आहे: एकत्र चांगले: Java आणि थ्रेड वर्ग.  भाग V — एक्झिक्युटर, थ्रेडपूल, फोर्क/जॉइन - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

जसे आपण पाहू शकता, Executorइंटरफेसमध्ये एक ExecutorServiceउप-इंटरफेस आहे. या इंटरफेससाठी Javadoc म्हणते की एक ExecutorServiceविशिष्ट वर्णन करते Executorजे बंद करण्याच्या पद्धती प्रदान करते Executor. java.util.concurrent.Futureअंमलबजावणी प्रक्रियेचा मागोवा घेण्यासाठी ते मिळवणे देखील शक्य करते . पूर्वी, Better together: Java आणि the Thread वर्ग. भाग IV — कॉल करण्यायोग्य, भविष्य आणि मित्र , आम्ही च्या क्षमतांचे थोडक्यात पुनरावलोकन केले Future. तुम्ही ते विसरलात किंवा कधीच वाचलात तर, मी तुम्हाला तुमची स्मृती ताजी करण्याचा सल्ला देतो;) Javadoc आणखी काय म्हणतो? हे आम्हाला सांगते की आमच्याकडे एक विशेष java.util.concurrent.Executorsकारखाना आहे जो आम्हाला ची डीफॉल्ट अंमलबजावणी तयार करू देतो ExecutorService.

एक्झिक्युटर सर्व्हिस

चला पुनरावलोकन करूया. आपल्याला थ्रेडवर एखादे विशिष्ट कार्य Executorकार्यान्वित करावे लागते (म्हणजे कॉल करणे), आणि थ्रेड तयार करणारा कोड आपल्यापासून लपलेला असतो. execute()आमच्याकडे आहे ExecutorService— एक विशिष्ट Executorज्यामध्ये प्रगती नियंत्रित करण्यासाठी अनेक पर्याय आहेत. आणि आमच्याकडे Executorsकारखाना आहे जो आम्हाला तयार करू देतो ExecutorService. आता ते स्वतः करूया:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
तुम्ही पाहू शकता की आम्ही एक निश्चित थ्रेड पूल निर्दिष्ट केला आहे ज्याचा आकार 2 आहे. नंतर आम्ही पूलमध्ये एक एक कार्य सबमिट करतो. प्रत्येक कार्य Stringथ्रेड नाव ( currentThread().GetName()) असलेले परत करते. अगदी शेवटी बंद करणे महत्वाचे आहे ExecutorService, कारण अन्यथा आमचा कार्यक्रम संपणार नाही. कारखान्यात Executorsअतिरिक्त फॅक्टरी पद्धती आहेत. उदाहरणार्थ, आम्ही फक्त एक थ्रेड ( newSingleThreadExecutor) असलेला पूल तयार करू शकतो किंवा कॅशे ( ) असलेला पूल तयार newCachedThreadPoolकरू शकतो ज्यामधून थ्रेड 1 मिनिट निष्क्रिय राहिल्यानंतर काढले जातात. प्रत्यक्षात, हे ब्लॉकिंग रांगेद्वारेExecutorService समर्थित आहेत , ज्यामध्ये कार्ये ठेवली जातात आणि ज्यातून कार्ये अंमलात आणली जातात. रांगा अवरोधित करण्याबद्दल अधिक माहिती या व्हिडिओमध्ये आढळू शकते . आपण हे देखील वाचू शकताBlockingQueue बद्दल पुनरावलोकन करा . आणि "ArrayBlockingQueue पेक्षा LinkedBlockingQueue ला कधी प्राधान्य द्यायचे?" या प्रश्नाचे उत्तर पहा. सोप्या भाषेत, एक BlockingQueueधागा दोन प्रकरणांमध्ये अवरोधित करतो:
  • थ्रेड रिकाम्या रांगेतून आयटम मिळवण्याचा प्रयत्न करतो
  • थ्रेड आयटम पूर्ण रांगेत ठेवण्याचा प्रयत्न करतो
जर आपण फॅक्टरी पद्धतींची अंमलबजावणी पाहिली तर ते कसे कार्य करतात ते आपण पाहू शकतो. उदाहरणार्थ:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
किंवा

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
जसे आपण पाहू शकतो, ची अंमलबजावणी ExecutorServiceफॅक्टरी पद्धतींमध्ये तयार केली जाते. आणि बहुतेक, आम्ही याबद्दल बोलत आहोत ThreadPoolExecutor. केवळ कामावर परिणाम करणारे पॅरामीटर्स बदलले आहेत. एकत्र चांगले: Java आणि थ्रेड वर्ग.  भाग V — एक्झिक्युटर, थ्रेडपूल, फोर्क/जॉईन - ३

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

आपण आधी पाहिल्याप्रमाणे, ThreadPoolExecutorजे सहसा फॅक्टरी पद्धतींमध्ये तयार होते. थ्रेड्सची जास्तीत जास्त आणि किमान संख्या, तसेच कोणत्या प्रकारची रांग वापरली जात आहे म्हणून आम्ही पास करत असलेल्या वितर्कांमुळे कार्यक्षमता प्रभावित होते. परंतु इंटरफेसची कोणतीही अंमलबजावणी java.util.concurrent.BlockingQueueवापरली जाऊ शकते. बद्दल बोलणे ThreadPoolExecutor, आपण काही मनोरंजक वैशिष्ट्यांचा उल्लेख केला पाहिजे. उदाहरणार्थ, ThreadPoolExecutorजागा उपलब्ध नसल्यास तुम्ही कार्ये सबमिट करू शकत नाही:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
हा कोड यासारख्या त्रुटीसह क्रॅश होईल:

Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
दुस-या शब्दात, taskसबमिट केले जाऊ शकत नाही, कारण SynchronousQueueडिझाइन केले आहे की ते प्रत्यक्षात एक घटक समाविष्ट करते आणि आम्हाला त्यात आणखी काहीही ठेवण्याची परवानगी देत ​​​​नाही. आपण पाहू शकतो की आपल्याकडे शून्य queued tasks("रांगेत कार्य = 0") आहे. परंतु यात काही विचित्र नाही, कारण हे एक विशेष वैशिष्ट्य आहे SynchronousQueue, जे खरं तर 1-घटकांची रांग आहे जी नेहमी रिकामी असते! जेव्हा एक थ्रेड रांगेत एक घटक ठेवतो, तेव्हा तो दुसरा थ्रेड रांगेतून घटक घेत नाही तोपर्यंत प्रतीक्षा करेल. त्यानुसार, आम्ही ते बदलू शकतो new LinkedBlockingQueue<>(1)आणि त्रुटी आता दर्शविण्यामध्ये बदलली जाईल queued tasks = 1. कारण रांग फक्त 1 घटक आहे, आम्ही दुसरा घटक जोडू शकत नाही. आणि त्यामुळेच कार्यक्रम अयशस्वी होतो. रांगेची आमची चर्चा सुरू ठेवून, हे लक्षात घेण्यासारखे आहे की दThreadPoolExecutorवर्गात रांगेत सेवा देण्यासाठी अतिरिक्त पद्धती आहेत. उदाहरणार्थ, threadPoolExecutor.purge()रांगेतील जागा मोकळी करण्यासाठी ही पद्धत रांगेतील सर्व रद्द केलेली कार्ये काढून टाकेल. रांगेशी संबंधित आणखी एक मनोरंजक कार्य नाकारलेल्या कार्यांसाठी हँडलर आहे:

public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
या उदाहरणात, Rejectedप्रत्येक वेळी रांगेतील एखादे कार्य नाकारल्यावर आमचा हँडलर फक्त दाखवतो. सोयीस्कर, नाही का? याव्यतिरिक्त, ThreadPoolExecutorएक मनोरंजक उपवर्ग आहे: ScheduledThreadPoolExecutor, जे एक आहे ScheduledExecutorService. हे टायमरवर आधारित कार्य करण्याची क्षमता प्रदान करते.

शेड्यूल्ड एक्झिक्यूटरसेवा

ScheduledExecutorService(जे एक प्रकार आहे ExecutorService) आम्हाला शेड्यूलनुसार कार्ये चालवू देते. चला एक उदाहरण पाहू:

public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
येथे सर्व काही सोपे आहे. कार्ये सबमिट केली जातात आणि नंतर आम्हाला एक मिळेल java.util.concurrent.ScheduledFuture. खालील परिस्थितीत शेड्यूल देखील उपयुक्त ठरू शकते:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
येथे आम्ही Runnableठराविक विलंबाने निश्चित वारंवारतेवर ("फिक्स्ड रेट") अंमलबजावणीसाठी कार्य सबमिट करतो. या प्रकरणात, 1 सेकंदानंतर, कार्य दर 2 सेकंदांनी कार्यान्वित करणे सुरू होईल. एक समान पर्याय आहे:

scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
परंतु या प्रकरणात, कार्ये प्रत्येक अंमलबजावणी दरम्यान विशिष्ट अंतराने केली जातात. म्हणजेच, task1 सेकंदानंतर कार्यान्वित होईल. मग, ते पूर्ण होताच, 2 सेकंद निघून जातील आणि नंतर एक नवीन कार्य सुरू होईल. या विषयावरील काही अतिरिक्त संसाधने येथे आहेत: एकत्र चांगले: Java आणि थ्रेड वर्ग.  भाग V — एक्झिक्युटर, थ्रेडपूल, फोर्क/जॉईन - ४

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

वर्कस्टीलिंगपूल

वरील थ्रेड पूल्स व्यतिरिक्त, आणखी एक आहे. आम्ही प्रामाणिकपणे म्हणू शकतो की ते थोडे विशेष आहे. त्याला वर्क-स्टिलिंग पूल म्हणतात. थोडक्यात, वर्क-स्टिलिंग हा एक अल्गोरिदम आहे ज्यामध्ये निष्क्रिय थ्रेड्स इतर थ्रेड्समधून किंवा सामायिक रांगेतून कार्ये घेण्यास सुरुवात करतात. चला एक उदाहरण पाहू:

public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
जर आपण हा कोड रन केला, तर ExecutorServiceआपल्यासाठी 5 थ्रेड तयार होतील, कारण प्रत्येक थ्रेड लॉक ऑब्जेक्टसाठी प्रतीक्षा रांगेत ठेवला जाईल. आम्ही आधीपासून बेटरमध्ये मॉनिटर्स आणि लॉक एकत्र शोधून काढले आहेत : Java आणि थ्रेड क्लास. भाग II — सिंक्रोनाइझेशन . Executors.newCachedThreadPool()आता सह बदलू Executors.newWorkStealingPool(). काय बदलणार? आमची कार्ये ५ पेक्षा कमी थ्रेड्सवर कार्यान्वित झाल्याचे आपण पाहू. लक्षात ठेवा की CachedThreadPoolप्रत्येक कार्यासाठी एक धागा तयार होतो? कारण wait()थ्रेड ब्लॉक केला आहे, त्यानंतरची कामे पूर्ण करायची आहेत आणि त्यांच्यासाठी पूलमध्ये नवीन थ्रेड तयार केले आहेत. स्टिलिंग पूलसह, धागे कायमचे निष्क्रिय राहत नाहीत. ते शेजाऱ्यांची कामे करू लागतात. WorkStealingPoolइतर थ्रेड पूल्सपेक्षा वेगळे काय आहे ? जादुई वस्तुस्थितीForkJoinPoolत्याच्या आत राहतो:

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
खरं तर, आणखी एक फरक आहे. डीफॉल्टनुसार, a साठी तयार केलेले थ्रेड ForkJoinPoolहे डिमन थ्रेड्स आहेत, onrdinary द्वारे तयार केलेल्या थ्रेड्सपेक्षा वेगळे ThreadPool. सर्वसाधारणपणे, तुम्ही डिमन थ्रेड्स लक्षात ठेवावे, कारण, उदाहरणार्थ, CompletableFutureडिमन धागे देखील वापरतात जोपर्यंत तुम्ही तुमचा स्वतःचा ThreadFactoryनॉन-डिमन थ्रेड तयार करत नाही. ही अशी आश्चर्ये आहेत जी अनपेक्षित ठिकाणी लपून राहू शकतात! :)

ForkJoinPool

या भागात, आम्ही पुन्हा बोलू ForkJoinPool(ज्याला फोर्क/जॉइन फ्रेमवर्क देखील म्हणतात), जे "हुड अंतर्गत" राहतात WorkStealingPool. सर्वसाधारणपणे, जावा 1.7 मध्ये फोर्क/जॉईन फ्रेमवर्क परत दिसले. आणि जरी Java 11 अगदी जवळ आहे, तरीही ते लक्षात ठेवण्यासारखे आहे. ही सर्वात सामान्य अंमलबजावणी नाही, परंतु ती खूपच मनोरंजक आहे. वेबवर याबद्दल एक चांगले पुनरावलोकन आहे: Java Fork-Join Framework with Examples समजून घेणे . ForkJoinPoolवर अवलंबून आहे java.util.concurrent.RecursiveTask. तसेच आहे java.util.concurrent.RecursiveAction. RecursiveActionनिकाल देत नाही. अशा प्रकारे, RecursiveTaskसारखे आहे Callable, आणि RecursiveActionसारखे आहे unnable. आपण पाहू शकतो की नावामध्ये दोन महत्त्वाच्या पद्धतींची नावे समाविष्ट आहेत: forkआणि join. दforkपद्धत वेगळ्या धाग्यावर असिंक्रोनसपणे काही कार्य सुरू करते. आणि joinपद्धत आपल्याला काम पूर्ण होण्याची प्रतीक्षा करू देते. उत्तम समज मिळविण्यासाठी, तुम्ही Java 8 मध्ये Imperative Programming पासून Fork/Join to Parallel Streams पर्यंत वाचले पाहिजे .

सारांश

बरं, ते पुनरावलोकनाचा हा भाग गुंडाळतो. आम्ही शिकलो आहोत की Executorमूळत: थ्रेड्स कार्यान्वित करण्यासाठी शोध लावला गेला होता. मग Java च्या निर्मात्यांनी ही कल्पना पुढे चालू ठेवण्याचा निर्णय घेतला आणि पुढे आले ExecutorService. आणि ExecutorServiceवापरून अंमलबजावणीसाठी कार्ये सबमिट करू आणि सेवा बंद करू. कारण अंमलबजावणीची गरज आहे, त्यांनी फॅक्टरी पद्धतींसह एक वर्ग लिहिला आणि त्याला म्हणतात . हे तुम्हाला थ्रेड पूल ( ) तयार करू देते. याव्यतिरिक्त, असे थ्रेड पूल आहेत जे आम्हाला अंमलबजावणीचे वेळापत्रक निर्दिष्ट करण्यास देखील परवानगी देतात. आणि एक मागे लपतो . मला आशा आहे की मी वर जे लिहिले आहे ते तुम्हाला केवळ मनोरंजकच नाही तर समजण्यासारखे देखील आहे :) तुमच्या सूचना आणि टिप्पण्या ऐकून मला नेहमीच आनंद होतो. submit()invoke()ExecutorServiceExecutorsThreadPoolExecutorForkJoinPoolWorkStealingPoolएकत्र चांगले: Java आणि थ्रेड वर्ग. भाग I — अंमलबजावणीचे धागे एकत्र चांगले: Java आणि थ्रेड क्लास. भाग II — एकत्रितपणे सिंक्रोनाइझेशन उत्तम: Java आणि थ्रेड वर्ग. भाग तिसरा — परस्परसंवाद चांगले एकत्र: Java आणि थ्रेड वर्ग. भाग IV — कॉल करण्यायोग्य, भविष्य आणि मित्र एकत्र चांगले: Java आणि थ्रेड वर्ग. भाग VI - आग दूर!
टिप्पण्या
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION