CodeGym /وبلاگ جاوا /Random-FA /بهتر با هم: جاوا و کلاس Thread. قسمت پنجم - مجری، ThreadP...
John Squirrels
مرحله
San Francisco

بهتر با هم: جاوا و کلاس Thread. قسمت پنجم - مجری، ThreadPool، Fork/Join

در گروه منتشر شد

معرفی

بنابراین، ما می دانیم که جاوا دارای موضوعات است. شما می توانید در مورد آن در بررسی با عنوان Better together: Java and the Thread کلاس بخوانید. بخش اول - موضوعات اجرا . بهتر با هم: جاوا و کلاس Thread.  قسمت پنجم - مجری، ThreadPool، Fork/Join - 1بیایید نگاهی دیگر به کد معمولی بیندازیم:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
همانطور که می بینید، کد شروع یک کار بسیار معمولی است، اما ما باید آن را برای کار جدید تکرار کنیم. یک راه حل این است که آن را در یک روش جداگانه قرار دهید، به عنوان مثال execute(Runnable runnable). اما سازندگان جاوا مشکلات ما را در نظر گرفته اند و رابط کاربری را ارائه کرده اند Executor:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
این کد به وضوح مختصرتر است: اکنون ما به سادگی کدی را برای شروع Runnableروی موضوع می نویسیم. این عالی است، اینطور نیست؟ اما این تنها آغاز است: بهتر با هم: جاوا و کلاس Thread.  قسمت پنجم - مجری، ThreadPool، Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

همانطور که می بینید، Executorاینترفیس دارای یک ExecutorServiceرابط فرعی است. Javadoc برای این رابط می‌گوید که an ExecutorServiceیک ویژگی خاص را توصیف می‌کند Executorکه روش‌هایی را برای خاموش کردن آن ارائه می‌کند Executor. همچنین این امکان را فراهم می کند که java.util.concurrent.Futureبرای ردیابی روند اجرا، یک را دریافت کنید. قبلاً در کلاس بهتر با هم: Java and Thread. قسمت چهارم — Callable، Future و دوستان ، ما به طور خلاصه قابلیت های Future. اگر فراموشش کردید یا نخوانده اید، پیشنهاد می کنم که حافظه خود را تازه کنید ;) جاوادوک چه می گوید؟ به ما می گوید که ما یک java.util.concurrent.Executorsکارخانه ویژه داریم که به ما امکان می دهد پیاده سازی های پیش فرض را ایجاد کنیم ExecutorService.

سرویس مجری

بیاید مرور کنیم. ما باید یک کار خاص را روی یک رشته Executorاجرا کنیم (یعنی فراخوانی کنیم execute())، و کدی که این رشته را ایجاد می کند از ما پنهان می شود. ما داریم ExecutorService- یک ویژگی خاص Executorکه چندین گزینه برای کنترل پیشرفت دارد. و ما Executorsکارخانه ای داریم که به ما امکان می دهد یک ExecutorService. حالا بیایید خودمان این کار را انجام دهیم:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
می بینید که ما یک استخر رشته ثابت را مشخص کردیم که اندازه آن 2 است. سپس وظایف را یکی یکی به استخر ارسال می کنیم. هر وظیفه یک Stringنام حاوی رشته ( currentThread().GetName()) را برمی گرداند. ExecutorServiceمهم این است که در پایان آن را خاموش کنیم ، زیرا در غیر این صورت برنامه ما تمام نخواهد شد. کارخانه Executorsدارای روش های کارخانه ای اضافی است. برای مثال، می‌توانیم یک Pool متشکل از فقط یک رشته ( newSingleThreadExecutor) یا یک Pool که شامل یک حافظه پنهان ( newCachedThreadPool) است ایجاد کنیم که رشته‌ها پس از 1 دقیقه بیکار ماندن از آن حذف می‌شوند. در واقع، اینها ExecutorServiceتوسط یک صف مسدود کننده پشتیبانی می شوند ، که وظایف در آن قرار می گیرند و وظایف از آن اجرا می شوند. اطلاعات بیشتر در مورد مسدود کردن صف ها را می توانید در این ویدیو بیابید . همچنین می توانید این بررسی را در مورد BlockingQueue بخوانید . و پاسخ سوال "چه زمانی LinkedBlockingQueue را به ArrayBlockingQueue ترجیح دهیم؟" به عبارت ساده تر، BlockingQueueیک رشته در دو حالت مسدود می شود:
  • thread سعی می کند موارد را از یک صف خالی دریافت کند
  • thread سعی می کند موارد را در یک صف کامل قرار دهد
اگر به اجرای روش های کارخانه نگاه کنیم، می توانیم نحوه عملکرد آنها را مشاهده کنیم. مثلا:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
یا
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
همانطور که می بینیم، پیاده سازی ها ExecutorServiceدر داخل روش های کارخانه ایجاد می شوند. و در بیشتر موارد، ما در مورد ThreadPoolExecutor. فقط پارامترهای مؤثر بر کار تغییر می کنند. بهتر با هم: جاوا و کلاس Thread.  قسمت پنجم - مجری، ThreadPool، Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

همانطور که قبلا دیدیم، ThreadPoolExecutorچیزی است که معمولاً در روش های کارخانه ایجاد می شود. عملکرد تحت تأثیر آرگومان هایی است که ما به عنوان حداکثر و حداقل تعداد رشته ها ارسال می کنیم و همچنین نوع صف مورد استفاده قرار می گیرد. اما هر پیاده سازی java.util.concurrent.BlockingQueueرابط را می توان استفاده کرد. صحبت از ThreadPoolExecutor, باید به چند ویژگی جالب اشاره کنیم. ThreadPoolExecutorبه عنوان مثال، اگر فضای موجود وجود نداشته باشد، نمی‌توانید وظایف را به a ارسال کنید :
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
این کد با خطای زیر خراب می شود:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
به عبارت دیگر، taskنمی توان آن را ارسال کرد، زیرا SynchronousQueueبه گونه ای طراحی شده است که در واقع از یک عنصر تشکیل شده است و به ما اجازه نمی دهد چیز بیشتری در آن قرار دهیم. می‌توانیم ببینیم که در اینجا صفر داریم queued tasks("کارهای صف = 0"). اما هیچ چیز عجیبی در این مورد وجود ندارد، زیرا این یک ویژگی خاص است SynchronousQueueکه در واقع یک صف 1 عنصری است که همیشه خالی است! هنگامی که یک رشته یک عنصر را در صف قرار می دهد، منتظر می ماند تا رشته دیگری عنصر را از صف خارج کند. بر این اساس، می‌توانیم آن را با آن جایگزین کنیم new LinkedBlockingQueue<>(1)و خطا به now show تغییر می‌کند queued tasks = 1. از آنجایی که صف فقط 1 عنصر است، نمی توانیم عنصر دوم اضافه کنیم. و این همان چیزی است که باعث از کار افتادن برنامه می شود. در ادامه بحث ما در مورد صف، شایان ذکر است که ThreadPoolExecutorکلاس دارای متدهای اضافی برای سرویس دهی به صف است. به عنوان مثال، این threadPoolExecutor.purge()روش تمام کارهای لغو شده را از صف حذف می کند تا فضایی در صف آزاد شود. یکی دیگر از عملکردهای جالب مرتبط با صف، کنترل کننده وظایف رد شده است:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
در این مثال، کنترل کننده ما به سادگی Rejectedهر بار که یک وظیفه در صف رد می شود، نمایش می دهد. راحت است، اینطور نیست؟ علاوه بر این، ThreadPoolExecutorیک زیر کلاس جالب دارد: ScheduledThreadPoolExecutorکه یک است ScheduledExecutorService. این امکان را برای انجام یک کار بر اساس تایمر فراهم می کند.

ScheduledExecutorService

ScheduledExecutorService(که یک نوع ExecutorService) است به ما امکان می دهد وظایف را بر اساس یک زمانبندی اجرا کنیم. بیایید به یک مثال نگاه کنیم:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
اینجا همه چیز ساده است. وظایف ارسال می شوند و سپس ما یک java.util.concurrent.ScheduledFuture. یک برنامه زمانی نیز ممکن است در شرایط زیر مفید باشد:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
در اینجا ما یک Runnableکار را برای اجرا در یک فرکانس ثابت ("FixedRate") با تاخیر اولیه مشخص ارائه می دهیم. در این صورت پس از 1 ثانیه، هر 2 ثانیه کار شروع به اجرا می کند. گزینه مشابهی وجود دارد:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
اما در این حالت، وظایف با فاصله زمانی مشخص بین هر اجرا انجام می شود. یعنی اراده taskبعد از 1 ثانیه اجرا می شود. سپس به محض اتمام، 2 ثانیه می گذرد و سپس یک کار جدید شروع می شود. در اینجا چند منبع اضافی در مورد این موضوع وجود دارد: بهتر با هم: جاوا و کلاس Thread.  قسمت پنجم - مجری، ThreadPool، Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

علاوه بر استخرهای نخ فوق، یک مورد دیگر نیز وجود دارد. به جرات می توان گفت که کمی خاص است. به آن می گویند حوض کار دزدی. به طور خلاصه، کار سرقت الگوریتمی است که در آن موضوعات بیکار شروع به گرفتن وظایف از رشته های دیگر یا وظایف از یک صف مشترک می کنند. بیایید به یک مثال نگاه کنیم:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
اگر این کد را اجرا کنیم، ExecutorService5 رشته برای ما ایجاد می کند، زیرا هر رشته در صف انتظار شی lock قرار می گیرد. ما قبلاً مانیتورها و قفل‌های Better را با هم کشف کرده‌ایم : کلاس جاوا و Thread. بخش دوم - همگام سازی حالا بیایید Executors.newCachedThreadPool()با Executors.newWorkStealingPool(). چه چیزی تغییر خواهد کرد؟ خواهیم دید که وظایف ما در کمتر از 5 رشته اجرا می شود. به یاد داشته باشید که CachedThreadPoolبرای هر کار یک موضوع ایجاد می کند؟ به این دلیل که wait()موضوع مسدود شده است، کارهای بعدی می‌خواهند تکمیل شوند و رشته‌های جدیدی برای آنها در استخر ایجاد شده است. با استخر دزدی، نخ ها برای همیشه بیکار نمی مانند. آنها شروع به انجام وظایف همسایگان خود می کنند. چه چیزی یک WorkStealingPoolاستخر نخ را تا این حد از سایر استخرهای نخ متفاوت می کند؟ این واقعیت که جادو ForkJoinPoolدر درون آن زندگی می کند:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
در واقع، یک تفاوت دیگر وجود دارد. به‌طور پیش‌فرض، رشته‌های ایجاد شده برای a ForkJoinPool، رشته‌های شبح هستند، برخلاف رشته‌هایی که از طریق یک onrdinary ایجاد می‌شوند ThreadPool. به طور کلی، شما باید رشته‌های شبح را به خاطر بسپارید، زیرا، برای مثال، CompletableFutureاز نخ‌های شبح نیز استفاده می‌کند، مگر اینکه رشته‌های خود را مشخص کنید ThreadFactoryکه رشته‌های غیر دیمون ایجاد می‌کند. اینها شگفتی هایی هستند که ممکن است در کمین مکان های غیرمنتظره باشند! :)

ForkJoinPool

در این بخش، ما دوباره در مورد ForkJoinPool(که به آن چارچوب چنگال/پیوستن نیز گفته می‌شود) صحبت خواهیم کرد که در «زیر کاپوت» زندگی می‌کند WorkStealingPool. به طور کلی، چارچوب fork/join در جاوا 1.7 ظاهر شد. و حتی اگر جاوا 11 نزدیک است، هنوز هم ارزش به خاطر سپردن دارد. این رایج ترین پیاده سازی نیست، اما بسیار جالب است. یک بررسی خوب در مورد این در وب وجود دارد: درک چارچوب جاوا Fork-Join با مثال ها . متکی ForkJoinPoolبر java.util.concurrent.RecursiveTask. همچنین وجود دارد java.util.concurrent.RecursiveAction. RecursiveActionنتیجه ای بر نمی گرداند بنابراین، RecursiveTaskشبیه به Callable، و RecursiveActionمشابه است unnable. می بینیم که نام شامل نام دو روش مهم است: forkو join. این forkروش برخی از کارها را به صورت ناهمزمان در یک رشته جداگانه شروع می کند. و این joinروش به شما امکان می‌دهد منتظر بمانید تا کار انجام شود. برای دریافت بهترین درک، باید از برنامه‌نویسی ضروری تا فورک/پیوستن به جریان‌های موازی در جاوا 8 را بخوانید .

خلاصه

خوب، این بخش از بررسی را به پایان می رساند. ما آموخته ایم که Executorدر ابتدا برای اجرای رشته ها اختراع شد. سپس سازندگان جاوا تصمیم گرفتند که این ایده را ادامه دهند و به ExecutorService. به ما اجازه می دهد تا با استفاده از و ExecutorServiceوظایف را برای اجرا ارسال کنیم و همچنین سرویس را خاموش کنیم. چون نیاز به پیاده سازی دارد، کلاسی با متدهای کارخانه ای نوشتند و نام آن را گذاشتند . این به شما امکان می دهد استخرهای نخ ایجاد کنید ( ). علاوه بر این، استخرهای رشته ای وجود دارد که به ما اجازه می دهد تا یک زمان بندی اجرا را مشخص کنیم. و یک پنهان پشت یک . امیدوارم مطالبی را که در بالا نوشتم نه تنها جالب، بلکه قابل فهم هم پیدا کرده باشید :) من همیشه از شنیدن پیشنهادات و نظرات شما خوشحالم. بهتر با هم: جاوا و کلاس Thread. قسمت اول - موضوعات اجرا بهتر با هم: جاوا و کلاس Thread. بخش دوم - همگام سازی بهتر با هم: کلاس جاوا و Thread. بخش سوم - تعامل بهتر با هم: کلاس جاوا و Thread. بخش چهارم - Callable، Future، و دوستان بهتر با هم: Java and the Thread کلاس. قسمت ششم - آتش دور شوید! submit()invoke()ExecutorServiceExecutorsThreadPoolExecutorForkJoinPoolWorkStealingPool
نظرات
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION