CodeGym /مدونة جافا /Random-AR /أفضل معًا: Java وفئة Thread. الجزء الخامس — المنفذ، Threa...
John Squirrels
مستوى
San Francisco

أفضل معًا: Java وفئة Thread. الجزء الخامس — المنفذ، ThreadPool، الشوكة/الانضمام

نشرت في المجموعة

مقدمة

لذلك، نحن نعلم أن جافا لديها المواضيع. يمكنك أن تقرأ عن ذلك في المراجعة التي تحمل عنوان Better Together: Java and the Thread class. الجزء الأول – خيوط التنفيذ . أفضل معًا: Java وفئة Thread.  الجزء الخامس — المنفذ، ThreadPool، الشوكة/الانضمام - 1دعونا نلقي نظرة أخرى على الكود النموذجي:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
كما ترون، يعد رمز بدء المهمة نموذجيًا جدًا، ولكن يتعين علينا تكراره للمهمة الجديدة. أحد الحلول هو وضعه بطريقة منفصلة، ​​على سبيل المثال execute(Runnable runnable). لكن منشئي Java أخذوا في الاعتبار محنتنا وتوصلوا إلى الواجهة Executor:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
من الواضح أن هذا الرمز أكثر إيجازًا: الآن نكتب ببساطة رمزًا لبدء Runnableالخيط. هذا عظيم، أليس كذلك؟ ولكن هذه هي البداية فقط: أفضل معًا: Java وفئة Thread.  الجزء الخامس — المنفذ، ThreadPool، الشوكة/الانضمام - 2

https://docs.Oracle.com/javase/7/docs/api/Java/util/concurrent/Executor.html

كما ترون، Executorالواجهة لديها ExecutorServiceواجهة فرعية. يقول Javadoc لهذه الواجهة أن ExecutorServiceيصف عنصرًا محددًا Executorيوفر طرقًا لإيقاف تشغيل Executor. كما أنه يجعل من الممكن الحصول على أمر java.util.concurrent.Futureلتتبع عملية التنفيذ. سابقًا، في Better Together: Java وفئة Thread. الجزء الرابع — القابل للاستدعاء والمستقبل والأصدقاء ، استعرضنا بإيجاز إمكانيات Future. إذا نسيته أو لم تقرأه أبدًا، أقترح عليك تحديث ذاكرتك؛) ماذا يقول Javadoc أيضًا؟ يخبرنا أن لدينا java.util.concurrent.Executorsمصنعًا خاصًا يتيح لنا إنشاء تطبيقات افتراضية لـ ExecutorService.

ExecutorService

دعونا نراجع. يتعين علينا Executorتنفيذ (أي استدعاء execute()) مهمة معينة على سلسلة رسائل، ويكون الرمز الذي ينشئ سلسلة الرسائل مخفيًا عنا. لدينا ExecutorService— محدد Executorيحتوي على عدة خيارات للتحكم في التقدم. ولدينا Executorsالمصنع الذي يتيح لنا إنشاء ملف ExecutorService. الآن دعونا نفعل ذلك بأنفسنا:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
يمكنك أن ترى أننا حددنا مجموعة مؤشرات ترابط ثابتة بحجم 2. ثم نقوم بإرسال المهام إلى المجموعة واحدة تلو الأخرى. تقوم كل مهمة بإرجاع Stringاسم مؤشر الترابط ( currentThread().GetName()). من المهم إغلاق البرنامج ExecutorServiceفي النهاية، وإلا فلن ينتهي برنامجنا. المصنع Executorsلديه طرق مصنع إضافية. على سبيل المثال، يمكننا إنشاء تجمع يتكون من مؤشر ترابط واحد فقط ( newSingleThreadExecutor) أو تجمع يتضمن ذاكرة تخزين مؤقت ( newCachedThreadPool) تتم إزالة سلاسل الرسائل منه بعد أن تكون خاملة لمدة دقيقة واحدة. في الواقع، ExecutorServiceيتم دعمها من خلال قائمة انتظار الحظر ، حيث يتم وضع المهام ومن خلالها يتم تنفيذ المهام. يمكن العثور على مزيد من المعلومات حول حظر قوائم الانتظار في هذا الفيديو . يمكنك أيضًا قراءة هذه المراجعة حول BlockingQueue . وتحقق من إجابة السؤال "متى تفضل LinkedBlockingQueue على ArrayBlockingQueue؟" في أبسط العبارات، BlockingQueueيقوم الخيط بحظر الخيط في حالتين:
  • يحاول مؤشر الترابط الحصول على عناصر من قائمة انتظار فارغة
  • يحاول الخيط وضع العناصر في قائمة انتظار كاملة
إذا نظرنا إلى تنفيذ أساليب المصنع، يمكننا أن نرى كيف تعمل. على سبيل المثال:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
أو
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
كما نرى، ExecutorServiceيتم إنشاء تطبيقات داخل أساليب المصنع. وبالنسبة للجزء الأكبر، نحن نتحدث عن ThreadPoolExecutor. يتم تغيير المعلمات التي تؤثر على العمل فقط. أفضل معًا: Java وفئة Thread.  الجزء الخامس — المنفذ، ThreadPool، الشوكة/الانضمام - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

كما رأينا سابقًا، ThreadPoolExecutorهو ما يتم إنشاؤه عادةً داخل أساليب المصنع. تتأثر الوظيفة بالوسائط التي نمررها كحد أقصى وأدنى لعدد سلاسل العمليات، بالإضافة إلى نوع قائمة الانتظار المستخدمة. ولكن java.util.concurrent.BlockingQueueيمكن استخدام أي تطبيق للواجهة. بالحديث عن ذلك ThreadPoolExecutor، يجب أن نذكر بعض الميزات المثيرة للاهتمام. على سبيل المثال، لا يمكنك إرسال المهام إلى ThreadPoolExecutorإذا لم تكن هناك مساحة متوفرة:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
سوف يتعطل هذا الرمز بسبب خطأ مثل هذا:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
بمعنى آخر، taskلا يمكن تقديمه، لأنه SynchronousQueueمصمم بحيث يتكون فعليًا من عنصر واحد ولا يسمح لنا بإضافة أي شيء آخر إليه. يمكننا أن نرى أن لدينا صفر queued tasks("المهام في قائمة الانتظار = 0") هنا. ولكن لا يوجد شيء غريب في هذا، لأن هذه ميزة خاصة لـ SynchronousQueue، وهي في الواقع قائمة انتظار مكونة من عنصر واحد تكون فارغة دائمًا! عندما يقوم أحد الخيوط بوضع عنصر في قائمة الانتظار، فسوف ينتظر حتى يأخذ مؤشر ترابط آخر العنصر من قائمة الانتظار. وبناء على ذلك يمكننا استبداله new LinkedBlockingQueue<>(1)وسيتغير الخطأ ليظهر الآن queued tasks = 1. نظرًا لأن قائمة الانتظار تتكون من عنصر واحد فقط، فلا يمكننا إضافة عنصر ثانٍ. وهذا ما يتسبب في فشل البرنامج. لمواصلة مناقشتنا لقائمة الانتظار، تجدر الإشارة إلى أن الفصل ThreadPoolExecutorلديه طرق إضافية لخدمة قائمة الانتظار. على سبيل المثال، threadPoolExecutor.purge()ستقوم الطريقة بإزالة كافة المهام الملغاة من قائمة الانتظار لتحرير مساحة في قائمة الانتظار. وظيفة أخرى مثيرة للاهتمام تتعلق بقائمة الانتظار هي معالج المهام المرفوضة:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
في هذا المثال، يعرض معالجنا ببساطة Rejectedكل مرة يتم فيها رفض مهمة في قائمة الانتظار. مريحة، أليس كذلك؟ بالإضافة إلى ذلك، ThreadPoolExecutorيحتوي على فئة فرعية مثيرة للاهتمام: ScheduledThreadPoolExecutorوهي عبارة عن ScheduledExecutorService. يوفر القدرة على أداء مهمة بناءً على مؤقت.

خدمة تنفيذية مجدولة

ScheduledExecutorService(وهو نوع من ExecutorService) يتيح لنا تشغيل المهام وفقًا لجدول زمني. لنلقي نظرة على مثال:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
كل شيء بسيط هنا. يتم إرسال المهام ومن ثم نحصل على ملف java.util.concurrent.ScheduledFuture. قد يكون الجدول الزمني مفيدًا أيضًا في الموقف التالي:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
نرسل هنا Runnableمهمة للتنفيذ بتردد ثابت ("FixedRate") مع تأخير أولي معين. في هذه الحالة، بعد ثانية واحدة، سيبدأ تنفيذ المهمة كل ثانيتين. هناك خيار مماثل:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
ولكن في هذه الحالة، يتم تنفيذ المهام بفاصل زمني محدد بين كل تنفيذ. أي أنه taskسيتم تنفيذه بعد ثانية واحدة. وبعد ذلك، بمجرد اكتمالها، ستمر ثانيتان، وبعد ذلك ستبدأ مهمة جديدة. فيما يلي بعض الموارد الإضافية حول هذا الموضوع: أفضل معًا: Java وفئة Thread.  الجزء الخامس — المنفذ، ThreadPool، الشوكة/الانضمام - 4

https://dzone.com/articles/diving-into-Java-8s-newworkstealingpools

WorkStealingPool

بالإضافة إلى تجمعات المواضيع المذكورة أعلاه، هناك واحد آخر. يمكننا أن نقول بصراحة أنه خاص قليلاً. يطلق عليه بركة سرقة العمل. باختصار، سرقة العمل هي خوارزمية تبدأ فيها سلاسل العمليات الخاملة في أخذ المهام من سلاسل رسائل أخرى أو مهام من قائمة انتظار مشتركة. لنلقي نظرة على مثال:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
إذا قمنا بتشغيل هذا الكود، فسوف نقوم ExecutorServiceبإنشاء 5 سلاسل رسائل لنا، لأنه سيتم وضع كل سلسلة رسائل في قائمة انتظار كائن القفل. لقد اكتشفنا بالفعل الشاشات والأقفال بشكل أفضل معًا: Java وفئة Thread. الجزء الثاني – التزامن . الآن دعونا نستبدل Executors.newCachedThreadPool()بـ Executors.newWorkStealingPool(). ما الذي سيتغير؟ سنرى أن مهامنا يتم تنفيذها على أقل من 5 سلاسل رسائل. تذكر أن CachedThreadPoolيقوم بإنشاء موضوع لكل مهمة؟ وذلك بسبب wait()حظر سلسلة الرسائل، وترغب المهام اللاحقة في إكمالها، وتم إنشاء سلاسل رسائل جديدة لها في التجمع. مع بركة السرقة، لا تبقى الخيوط خاملة إلى الأبد. يبدأون في أداء مهام جيرانهم. ما الذي يجعل الأمر WorkStealingPoolمختلفًا تمامًا عن تجمعات الخيوط الأخرى؟ حقيقة أن السحر ForkJoinPoolيعيش بداخله:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
في الواقع، هناك فرق آخر. بشكل افتراضي، المواضيع التي تم إنشاؤها من أجل ForkJoinPoolهي سلاسل خفية، على عكس المواضيع التي تم إنشاؤها من خلال onrdinary ThreadPool. بشكل عام، يجب أن تتذكر الخيوط الخفية، لأنها، على سبيل المثال، CompletableFutureتستخدم أيضًا الخيوط الخفية ما لم تحدد سلاسلك الخاصة ThreadFactoryالتي تنشئ سلاسل رسائل غير خفية. هذه هي المفاجآت التي قد تكمن في أماكن غير متوقعة! :)

ForkJoinPool

في هذا الجزء، سنتحدث مرة أخرى عن ForkJoinPool(يسمى أيضًا إطار عمل الشوكة/الانضمام)، والذي يعيش "تحت غطاء" WorkStealingPool. بشكل عام، ظهر إطار الشوكة/الانضمام مرة أخرى في Java 1.7. وعلى الرغم من أن Java 11 أصبح في متناول اليد، إلا أنه لا يزال يستحق التذكر. هذا ليس التطبيق الأكثر شيوعًا، لكنه مثير للاهتمام للغاية. هناك مراجعة جيدة حول هذا الأمر على الويب: فهم Java Fork-Join Framework مع الأمثلة . يعتمد ForkJoinPoolعلى java.util.concurrent.RecursiveTask. هناك ايضا java.util.concurrent.RecursiveAction. RecursiveActionلا يُرجع نتيجة. وهكذا، RecursiveTaskيشبه Callable، RecursiveActionويشبه unnable. يمكننا أن نرى أن الاسم يتضمن أسماء طريقتين مهمتين: forkو join. تبدأ الطريقة forkبعض المهام بشكل غير متزامن في موضوع منفصل. وتتيح لك الطريقة joinالانتظار حتى يتم إنجاز العمل. للحصول على أفضل فهم، يجب عليك قراءة من البرمجة الحتمية إلى Fork/Join to Parallel Streams في Java 8 .

ملخص

حسنًا، هذا يختتم هذا الجزء من المراجعة. لقد تعلمنا أنه Executorتم اختراعه في الأصل لتنفيذ الخيوط. ثم قرر منشئو Java مواصلة الفكرة وتوصلوا إلى ExecutorService. ExecutorServiceيتيح لنا إرسال المهام للتنفيذ باستخدام submit()و invoke()، وكذلك إيقاف الخدمة. نظرًا لأنه ExecutorServiceيحتاج إلى تطبيقات، فقد كتبوا فصلًا دراسيًا بأساليب المصنع وأطلقوا عليه اسم Executors. يتيح لك إنشاء تجمعات مؤشرات الترابط ( ThreadPoolExecutor). بالإضافة إلى ذلك، هناك تجمعات سلاسل رسائل تسمح لنا أيضًا بتحديد جدول التنفيذ. ويختبئ ForkJoinPoolخلف أ WorkStealingPool. أتمنى أن تجد ما كتبته أعلاه ليس مثيرًا للاهتمام فحسب، بل مفهومًا أيضًا :) يسعدني دائمًا سماع اقتراحاتك وتعليقاتك. أفضل معًا: Java وفئة Thread. الجزء الأول - خيوط التنفيذ أفضل معًا: Java وفئة Thread. الجزء الثاني – التزامن بشكل أفضل معًا: Java وفئة Thread. الجزء الثالث - التفاعل بشكل أفضل معًا: Java وفئة Thread. الجزء الرابع - الأشخاص القابلون للاستدعاء والمستقبل والأصدقاء أفضل معًا: Java وفئة Thread. الجزء السادس – أطلق النار بعيدًا!
تعليقات
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION