مقدمة
لذلك، نحن نعلم أن جافا لديها المواضيع. يمكنك أن تقرأ عن ذلك في المراجعة التي تحمل عنوان Better Together: Java and the Thread class. الجزء الأول – خيوط التنفيذ . دعونا نلقي نظرة أخرى على الكود النموذجي:public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
كما ترون، يعد رمز بدء المهمة نموذجيًا جدًا، ولكن يتعين علينا تكراره للمهمة الجديدة. أحد الحلول هو وضعه بطريقة منفصلة، على سبيل المثال execute(Runnable runnable)
. لكن منشئي Java أخذوا في الاعتبار محنتنا وتوصلوا إلى الواجهة Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
من الواضح أن هذا الرمز أكثر إيجازًا: الآن نكتب ببساطة رمزًا لبدء Runnable
الخيط. هذا عظيم، أليس كذلك؟ ولكن هذه هي البداية فقط:
https://docs.Oracle.com/javase/7/docs/api/Java/util/concurrent/Executor.html
Executor
الواجهة لديها ExecutorService
واجهة فرعية. يقول Javadoc لهذه الواجهة أن ExecutorService
يصف عنصرًا محددًا Executor
يوفر طرقًا لإيقاف تشغيل Executor
. كما أنه يجعل من الممكن الحصول على أمر java.util.concurrent.Future
لتتبع عملية التنفيذ. سابقًا، في Better Together: Java وفئة Thread. الجزء الرابع — القابل للاستدعاء والمستقبل والأصدقاء
، استعرضنا بإيجاز إمكانيات Future
. إذا نسيته أو لم تقرأه أبدًا، أقترح عليك تحديث ذاكرتك؛) ماذا يقول Javadoc أيضًا؟ يخبرنا أن لدينا java.util.concurrent.Executors
مصنعًا خاصًا يتيح لنا إنشاء تطبيقات افتراضية لـ ExecutorService
.
ExecutorService
دعونا نراجع. يتعين عليناExecutor
تنفيذ (أي استدعاء execute()
) مهمة معينة على سلسلة رسائل، ويكون الرمز الذي ينشئ سلسلة الرسائل مخفيًا عنا. لدينا ExecutorService
— محدد Executor
يحتوي على عدة خيارات للتحكم في التقدم. ولدينا Executors
المصنع الذي يتيح لنا إنشاء ملف ExecutorService
. الآن دعونا نفعل ذلك بأنفسنا:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
يمكنك أن ترى أننا حددنا مجموعة مؤشرات ترابط ثابتة بحجم 2. ثم نقوم بإرسال المهام إلى المجموعة واحدة تلو الأخرى. تقوم كل مهمة بإرجاع String
اسم مؤشر الترابط ( currentThread().GetName()
). من المهم إغلاق البرنامج ExecutorService
في النهاية، وإلا فلن ينتهي برنامجنا. المصنع Executors
لديه طرق مصنع إضافية. على سبيل المثال، يمكننا إنشاء تجمع يتكون من مؤشر ترابط واحد فقط ( newSingleThreadExecutor
) أو تجمع يتضمن ذاكرة تخزين مؤقت ( newCachedThreadPool
) تتم إزالة سلاسل الرسائل منه بعد أن تكون خاملة لمدة دقيقة واحدة. في الواقع، ExecutorService
يتم دعمها من خلال قائمة انتظار الحظر ، حيث يتم وضع المهام ومن خلالها يتم تنفيذ المهام. يمكن العثور على مزيد من المعلومات حول حظر قوائم الانتظار في هذا الفيديو
. يمكنك أيضًا قراءة هذه المراجعة حول BlockingQueue
. وتحقق من إجابة السؤال "متى تفضل LinkedBlockingQueue على ArrayBlockingQueue؟"
في أبسط العبارات، BlockingQueue
يقوم الخيط بحظر الخيط في حالتين:
- يحاول مؤشر الترابط الحصول على عناصر من قائمة انتظار فارغة
- يحاول الخيط وضع العناصر في قائمة انتظار كاملة
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
أو
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
كما نرى، ExecutorService
يتم إنشاء تطبيقات داخل أساليب المصنع. وبالنسبة للجزء الأكبر، نحن نتحدث عن ThreadPoolExecutor
. يتم تغيير المعلمات التي تؤثر على العمل فقط.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
كما رأينا سابقًا،ThreadPoolExecutor
هو ما يتم إنشاؤه عادةً داخل أساليب المصنع. تتأثر الوظيفة بالوسائط التي نمررها كحد أقصى وأدنى لعدد سلاسل العمليات، بالإضافة إلى نوع قائمة الانتظار المستخدمة. ولكن java.util.concurrent.BlockingQueue
يمكن استخدام أي تطبيق للواجهة. بالحديث عن ذلك ThreadPoolExecutor
، يجب أن نذكر بعض الميزات المثيرة للاهتمام. على سبيل المثال، لا يمكنك إرسال المهام إلى ThreadPoolExecutor
إذا لم تكن هناك مساحة متوفرة:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
سوف يتعطل هذا الرمز بسبب خطأ مثل هذا:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
بمعنى آخر، task
لا يمكن تقديمه، لأنه SynchronousQueue
مصمم بحيث يتكون فعليًا من عنصر واحد ولا يسمح لنا بإضافة أي شيء آخر إليه. يمكننا أن نرى أن لدينا صفر queued tasks
("المهام في قائمة الانتظار = 0") هنا. ولكن لا يوجد شيء غريب في هذا، لأن هذه ميزة خاصة لـ SynchronousQueue
، وهي في الواقع قائمة انتظار مكونة من عنصر واحد تكون فارغة دائمًا! عندما يقوم أحد الخيوط بوضع عنصر في قائمة الانتظار، فسوف ينتظر حتى يأخذ مؤشر ترابط آخر العنصر من قائمة الانتظار. وبناء على ذلك يمكننا استبداله new LinkedBlockingQueue<>(1)
وسيتغير الخطأ ليظهر الآن queued tasks = 1
. نظرًا لأن قائمة الانتظار تتكون من عنصر واحد فقط، فلا يمكننا إضافة عنصر ثانٍ. وهذا ما يتسبب في فشل البرنامج. لمواصلة مناقشتنا لقائمة الانتظار، تجدر الإشارة إلى أن الفصل ThreadPoolExecutor
لديه طرق إضافية لخدمة قائمة الانتظار. على سبيل المثال، threadPoolExecutor.purge()
ستقوم الطريقة بإزالة كافة المهام الملغاة من قائمة الانتظار لتحرير مساحة في قائمة الانتظار. وظيفة أخرى مثيرة للاهتمام تتعلق بقائمة الانتظار هي معالج المهام المرفوضة:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
في هذا المثال، يعرض معالجنا ببساطة Rejected
كل مرة يتم فيها رفض مهمة في قائمة الانتظار. مريحة، أليس كذلك؟ بالإضافة إلى ذلك، ThreadPoolExecutor
يحتوي على فئة فرعية مثيرة للاهتمام: ScheduledThreadPoolExecutor
وهي عبارة عن ScheduledExecutorService
. يوفر القدرة على أداء مهمة بناءً على مؤقت.
خدمة تنفيذية مجدولة
ScheduledExecutorService
(وهو نوع من ExecutorService
) يتيح لنا تشغيل المهام وفقًا لجدول زمني. لنلقي نظرة على مثال:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
كل شيء بسيط هنا. يتم إرسال المهام ومن ثم نحصل على ملف java.util.concurrent.ScheduledFuture
. قد يكون الجدول الزمني مفيدًا أيضًا في الموقف التالي:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
نرسل هنا Runnable
مهمة للتنفيذ بتردد ثابت ("FixedRate") مع تأخير أولي معين. في هذه الحالة، بعد ثانية واحدة، سيبدأ تنفيذ المهمة كل ثانيتين. هناك خيار مماثل:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
ولكن في هذه الحالة، يتم تنفيذ المهام بفاصل زمني محدد بين كل تنفيذ. أي أنه task
سيتم تنفيذه بعد ثانية واحدة. وبعد ذلك، بمجرد اكتمالها، ستمر ثانيتان، وبعد ذلك ستبدأ مهمة جديدة. فيما يلي بعض الموارد الإضافية حول هذا الموضوع:
- مقدمة إلى تجمعات الخيوط في Java
- مقدمة إلى تجمعات المواضيع في جافا
- Java Multithreading Steeplechase: إلغاء المهام في المنفذين
- استخدام منفذي Java لمهام الخلفية
https://dzone.com/articles/diving-into-Java-8s-newworkstealingpools
WorkStealingPool
بالإضافة إلى تجمعات المواضيع المذكورة أعلاه، هناك واحد آخر. يمكننا أن نقول بصراحة أنه خاص قليلاً. يطلق عليه بركة سرقة العمل. باختصار، سرقة العمل هي خوارزمية تبدأ فيها سلاسل العمليات الخاملة في أخذ المهام من سلاسل رسائل أخرى أو مهام من قائمة انتظار مشتركة. لنلقي نظرة على مثال:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
إذا قمنا بتشغيل هذا الكود، فسوف نقوم ExecutorService
بإنشاء 5 سلاسل رسائل لنا، لأنه سيتم وضع كل سلسلة رسائل في قائمة انتظار كائن القفل. لقد اكتشفنا بالفعل الشاشات والأقفال بشكل أفضل معًا: Java وفئة Thread. الجزء الثاني – التزامن
. الآن دعونا نستبدل Executors.newCachedThreadPool()
بـ Executors.newWorkStealingPool()
. ما الذي سيتغير؟ سنرى أن مهامنا يتم تنفيذها على أقل من 5 سلاسل رسائل. تذكر أن CachedThreadPool
يقوم بإنشاء موضوع لكل مهمة؟ وذلك بسبب wait()
حظر سلسلة الرسائل، وترغب المهام اللاحقة في إكمالها، وتم إنشاء سلاسل رسائل جديدة لها في التجمع. مع بركة السرقة، لا تبقى الخيوط خاملة إلى الأبد. يبدأون في أداء مهام جيرانهم. ما الذي يجعل الأمر WorkStealingPool
مختلفًا تمامًا عن تجمعات الخيوط الأخرى؟ حقيقة أن السحر ForkJoinPool
يعيش بداخله:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
في الواقع، هناك فرق آخر. بشكل افتراضي، المواضيع التي تم إنشاؤها من أجل ForkJoinPool
هي سلاسل خفية، على عكس المواضيع التي تم إنشاؤها من خلال onrdinary ThreadPool
. بشكل عام، يجب أن تتذكر الخيوط الخفية، لأنها، على سبيل المثال، CompletableFuture
تستخدم أيضًا الخيوط الخفية ما لم تحدد سلاسلك الخاصة ThreadFactory
التي تنشئ سلاسل رسائل غير خفية. هذه هي المفاجآت التي قد تكمن في أماكن غير متوقعة! :)
ForkJoinPool
في هذا الجزء، سنتحدث مرة أخرى عنForkJoinPool
(يسمى أيضًا إطار عمل الشوكة/الانضمام)، والذي يعيش "تحت غطاء" WorkStealingPool
. بشكل عام، ظهر إطار الشوكة/الانضمام مرة أخرى في Java 1.7. وعلى الرغم من أن Java 11 أصبح في متناول اليد، إلا أنه لا يزال يستحق التذكر. هذا ليس التطبيق الأكثر شيوعًا، لكنه مثير للاهتمام للغاية. هناك مراجعة جيدة حول هذا الأمر على الويب: فهم Java Fork-Join Framework مع الأمثلة
. يعتمد ForkJoinPool
على java.util.concurrent.RecursiveTask
. هناك ايضا java.util.concurrent.RecursiveAction
. RecursiveAction
لا يُرجع نتيجة. وهكذا، RecursiveTask
يشبه Callable
، RecursiveAction
ويشبه unnable
. يمكننا أن نرى أن الاسم يتضمن أسماء طريقتين مهمتين: fork
و join
. تبدأ الطريقة fork
بعض المهام بشكل غير متزامن في موضوع منفصل. وتتيح لك الطريقة join
الانتظار حتى يتم إنجاز العمل. للحصول على أفضل فهم، يجب عليك قراءة من البرمجة الحتمية إلى Fork/Join to Parallel Streams في Java 8
.
ملخص
حسنًا، هذا يختتم هذا الجزء من المراجعة. لقد تعلمنا أنهExecutor
تم اختراعه في الأصل لتنفيذ الخيوط. ثم قرر منشئو Java مواصلة الفكرة وتوصلوا إلى ExecutorService
. ExecutorService
يتيح لنا إرسال المهام للتنفيذ باستخدام submit()
و invoke()
، وكذلك إيقاف الخدمة. نظرًا لأنه ExecutorService
يحتاج إلى تطبيقات، فقد كتبوا فصلًا دراسيًا بأساليب المصنع وأطلقوا عليه اسم Executors
. يتيح لك إنشاء تجمعات مؤشرات الترابط ( ThreadPoolExecutor
). بالإضافة إلى ذلك، هناك تجمعات سلاسل رسائل تسمح لنا أيضًا بتحديد جدول التنفيذ. ويختبئ ForkJoinPool
خلف أ WorkStealingPool
. أتمنى أن تجد ما كتبته أعلاه ليس مثيرًا للاهتمام فحسب، بل مفهومًا أيضًا :) يسعدني دائمًا سماع اقتراحاتك وتعليقاتك. أفضل معًا: Java وفئة Thread. الجزء الأول - خيوط التنفيذ
أفضل معًا: Java وفئة Thread. الجزء الثاني – التزامن
بشكل أفضل معًا: Java وفئة Thread. الجزء الثالث - التفاعل
بشكل أفضل معًا: Java وفئة Thread. الجزء الرابع - الأشخاص القابلون للاستدعاء والمستقبل والأصدقاء
أفضل معًا: Java وفئة Thread. الجزء السادس – أطلق النار بعيدًا!
GO TO FULL VERSION