ভূমিকা
সুতরাং, আমরা জানি যে জাভা থ্রেড আছে. আপনি এটি সম্পর্কে আরও ভাল একসাথে শিরোনামের পর্যালোচনাতে পড়তে পারেন : জাভা এবং থ্রেড ক্লাস। পার্ট I — থ্রেডস অফ এক্সিকিউশন । আসুন সাধারণ কোডটি আরও একবার দেখে নেওয়া যাক:
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
আপনি দেখতে পাচ্ছেন, একটি টাস্ক শুরু করার কোডটি বেশ সাধারণ, কিন্তু নতুন কাজের জন্য আমাদের এটি পুনরাবৃত্তি করতে হবে। একটি সমাধান হল এটিকে একটি পৃথক পদ্ধতিতে রাখা, যেমন execute(Runnable runnable)
। কিন্তু জাভার নির্মাতারা আমাদের দুর্দশা বিবেচনা করেছেন এবং ইন্টারফেস নিয়ে এসেছেন Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
এই কোডটি স্পষ্টভাবে আরও সংক্ষিপ্ত: এখন আমরা Runnable
থ্রেডে শুরু করার জন্য কোড লিখি। এটা দারুণ, তাই না? কিন্তু এই মাত্র শুরু:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
ইন্টারফেসের একটি সাব-ইন্টারফেস রয়েছে ExecutorService
। এই ইন্টারফেসের জন্য Javadoc বলে যে একটি ExecutorService
একটি নির্দিষ্ট বর্ণনা করে Executor
যা বন্ধ করার পদ্ধতি প্রদান করে Executor
। java.util.concurrent.Future
এটি কার্যকর করার প্রক্রিয়াটি ট্র্যাক করার জন্য একটি পেতেও সম্ভব করে তোলে । পূর্বে, বেটার একসাথে: জাভা এবং থ্রেড ক্লাস। পার্ট IV — কলযোগ্য, ভবিষ্যত, এবং বন্ধুরা , আমরা সংক্ষিপ্তভাবে এর ক্ষমতা পর্যালোচনা করেছি Future
। যদি আপনি ভুলে যান বা এটি পড়েন না, আমি আপনাকে আপনার স্মৃতি রিফ্রেশ করার পরামর্শ দিচ্ছি ;) জাভাডোক আর কি বলে? এটি আমাদের বলে যে আমাদের একটি বিশেষ java.util.concurrent.Executors
কারখানা রয়েছে যা আমাদের ডিফল্ট বাস্তবায়ন তৈরি করতে দেয় ExecutorService
।
এক্সিকিউটর সার্ভিস
এর পর্যালোচনা করা যাক. আমাদের একটি থ্রেডে একটি নির্দিষ্ট কাজExecutor
সম্পাদন করতে হবে (অর্থাৎ কল করতে ) এবং যে কোডটি থ্রেড তৈরি করে তা আমাদের কাছ থেকে লুকানো থাকে। execute()
আমাদের আছে ExecutorService
— একটি নির্দিষ্ট Executor
যাতে অগ্রগতি নিয়ন্ত্রণের জন্য বিভিন্ন বিকল্প রয়েছে। এবং আমাদের একটি Executors
কারখানা রয়েছে যা আমাদের একটি তৈরি করতে দেয় ExecutorService
। এখন আসুন এটি নিজেরাই করি:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
আপনি দেখতে পাচ্ছেন যে আমরা একটি নির্দিষ্ট থ্রেড পুল নির্দিষ্ট করেছি যার আকার 2। তারপর আমরা একটি করে পুলে কাজ জমা দিই। প্রতিটি টাস্ক String
থ্রেড নাম ( currentThread().GetName()
) ধারণকারী একটি প্রদান করে। একেবারে শেষে বন্ধ করা গুরুত্বপূর্ণ ExecutorService
, কারণ অন্যথায় আমাদের প্রোগ্রাম শেষ হবে না। কারখানায় Executors
অতিরিক্ত কারখানা পদ্ধতি রয়েছে। উদাহরণস্বরূপ, আমরা শুধুমাত্র একটি থ্রেড ( newSingleThreadExecutor
) নিয়ে গঠিত একটি পুল তৈরি করতে পারি বা একটি পুল যাতে একটি ক্যাশে ( newCachedThreadPool
) অন্তর্ভুক্ত থাকে যা থেকে থ্রেডগুলি 1 মিনিটের জন্য নিষ্ক্রিয় থাকার পরে সরানো হয়। বাস্তবে, এগুলি একটি ব্লকিং কিউExecutorService
দ্বারা সমর্থিত , যেখানে কাজগুলি স্থাপন করা হয় এবং যেগুলি থেকে কাজগুলি সম্পাদন করা হয়৷ সারি ব্লক করা সম্পর্কে আরও তথ্য এই ভিডিওতে পাওয়া যাবে । আপনি এটিও পড়তে পারেনBlockingQueue সম্পর্কে পর্যালোচনা করুন । এবং "কখন ArrayBlockingQueue এর চেয়ে LinkedBlockingQueue পছন্দ করবেন?" প্রশ্নের উত্তর দেখুন। সহজ শর্তে, একটি BlockingQueue
থ্রেড দুটি ক্ষেত্রে ব্লক করে:
- থ্রেডটি একটি খালি সারি থেকে আইটেম পেতে চেষ্টা করে
- থ্রেড আইটেমগুলিকে একটি পূর্ণ সারিতে রাখার চেষ্টা করে
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
বা
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
আমরা দেখতে পাচ্ছি, এর বাস্তবায়ন ExecutorService
কারখানা পদ্ধতির ভিতরে তৈরি করা হয়। এবং বেশিরভাগ অংশের জন্য, আমরা কথা বলছি ThreadPoolExecutor
। শুধুমাত্র কাজ প্রভাবিত পরামিতি পরিবর্তন করা হয়.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
আমরা আগে দেখেছি,ThreadPoolExecutor
সাধারণত কারখানা পদ্ধতির ভিতরে তৈরি হয় কি. কার্যকারিতা আমরা যে আর্গুমেন্টগুলিকে সর্বাধিক এবং সর্বনিম্ন থ্রেড হিসাবে পাস করি, সেইসাথে কোন ধরণের সারি ব্যবহার করা হচ্ছে তার দ্বারা প্রভাবিত হয়। কিন্তু ইন্টারফেসের যেকোনো বাস্তবায়ন java.util.concurrent.BlockingQueue
ব্যবহার করা যেতে পারে। কথা বলতে গেলে ThreadPoolExecutor
, আমাদের কিছু আকর্ষণীয় বৈশিষ্ট্য উল্লেখ করা উচিত। উদাহরণ স্বরূপ, আপনি কোন কাজ জমা দিতে পারবেন না ThreadPoolExecutor
যদি উপলভ্য স্থান না থাকে:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
এই কোড এই মত একটি ত্রুটি সঙ্গে ক্র্যাশ হবে:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
অন্য কথায়, task
জমা দেওয়া যাবে না, কারণ SynchronousQueue
এমনভাবে ডিজাইন করা হয়েছে যাতে এটি আসলে একটি একক উপাদান নিয়ে গঠিত এবং আমাদের এতে আরও কিছু রাখার অনুমতি দেয় না। আমরা দেখতে পাচ্ছি যে আমাদের queued tasks
এখানে শূন্য ("সারিবদ্ধ কাজ = 0") আছে। তবে এটিতে অদ্ভুত কিছু নেই, কারণ এটি একটি বিশেষ বৈশিষ্ট্য SynchronousQueue
, যা আসলে একটি 1-উপাদানের সারি যা সবসময় খালি থাকে! যখন একটি থ্রেড সারিতে একটি উপাদান রাখে, তখন এটি অপেক্ষা করবে যতক্ষণ না অন্য থ্রেডটি সারি থেকে উপাদানটি নেয়। তদনুসারে, আমরা এটির সাথে প্রতিস্থাপন করতে পারি new LinkedBlockingQueue<>(1)
এবং ত্রুটিটি এখন দেখাতে পরিবর্তিত হবে queued tasks = 1
। কারণ সারিটি শুধুমাত্র 1টি উপাদান, আমরা একটি দ্বিতীয় উপাদান যোগ করতে পারি না। এবং যে কারণে প্রোগ্রাম ব্যর্থ হয়. আমাদের সারি আলোচনা অব্যাহত, এটা লক্ষনীয় যে মূল্যThreadPoolExecutor
সারিতে সেবা দেওয়ার জন্য ক্লাসের অতিরিক্ত পদ্ধতি রয়েছে। উদাহরণস্বরূপ, threadPoolExecutor.purge()
পদ্ধতিটি সারিতে স্থান খালি করার জন্য সারি থেকে সমস্ত বাতিল করা কাজগুলি সরিয়ে দেবে। আরেকটি আকর্ষণীয় সারি-সম্পর্কিত ফাংশন হল প্রত্যাখ্যান করা কাজের জন্য হ্যান্ডলার:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
এই উদাহরণে, আমাদের হ্যান্ডলারটি Rejected
প্রতিবার সারিতে থাকা একটি টাস্ক প্রত্যাখ্যান করার সময় কেবল প্রদর্শন করে। সুবিধাজনক, তাই না? উপরন্তু, ThreadPoolExecutor
একটি আকর্ষণীয় উপশ্রেণী আছে: ScheduledThreadPoolExecutor
, যা একটি ScheduledExecutorService
. এটি একটি টাইমারের উপর ভিত্তি করে একটি কাজ সম্পাদন করার ক্ষমতা প্রদান করে।
নির্ধারিত এক্সিকিউটর সার্ভিস
ScheduledExecutorService
(যা এক প্রকার ExecutorService
) আমাদের একটি সময়সূচীতে কাজ চালাতে দেয়। আসুন একটি উদাহরণ দেখি:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
এখানে সবকিছু সহজ. কাজ জমা দেওয়া হয় এবং তারপর আমরা একটি পেতে java.util.concurrent.ScheduledFuture
. একটি সময়সূচী নিম্নলিখিত পরিস্থিতিতে সহায়ক হতে পারে:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
এখানে আমরা Runnable
একটি নির্দিষ্ট প্রাথমিক বিলম্বের সাথে একটি নির্দিষ্ট ফ্রিকোয়েন্সিতে ("FixedRate") সম্পাদনের জন্য একটি টাস্ক জমা দিই। এই ক্ষেত্রে, 1 সেকেন্ড পরে, টাস্কটি প্রতি 2 সেকেন্ডে কার্যকর করা শুরু হবে। একটি অনুরূপ বিকল্প আছে:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
কিন্তু এই ক্ষেত্রে, কাজগুলি প্রতিটি নির্বাহের মধ্যে একটি নির্দিষ্ট ব্যবধানের সাথে সঞ্চালিত হয়। অর্থাৎ, task
1 সেকেন্ড পরে কার্যকর করা হবে। তারপর, এটি সম্পূর্ণ হওয়ার সাথে সাথে, 2 সেকেন্ড কেটে যাবে এবং তারপরে একটি নতুন কাজ শুরু হবে। এখানে এই বিষয়ে কিছু অতিরিক্ত সংস্থান রয়েছে:
- জাভাতে থ্রেড পুলের একটি ভূমিকা
- জাভাতে থ্রেড পুলের পরিচিতি
- জাভা মাল্টিথ্রেডিং স্টিপলচেজ: এক্সিকিউটরগুলিতে কাজগুলি বাতিল করা
- ব্যাকগ্রাউন্ড টাস্কের জন্য জাভা এক্সিকিউটর ব্যবহার করা
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
ওয়ার্কস্টেলিংপুল
উপরের থ্রেড পুলগুলি ছাড়াও, আরও একটি রয়েছে। আমরা সত্যই বলতে পারি যে এটি একটু বিশেষ। এটাকে ওয়ার্ক-স্টিলিং পুল বলা হয়। সংক্ষেপে, ওয়ার্ক-স্টিলিং হল একটি অ্যালগরিদম যেখানে নিষ্ক্রিয় থ্রেডগুলি অন্য থ্রেড থেকে কাজগুলি বা একটি ভাগ করা সারি থেকে কাজগুলি নেওয়া শুরু করে। আসুন একটি উদাহরণ দেখি:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
যদি আমরা এই কোডটি চালাই, তাহলে ExecutorService
আমাদের জন্য 5টি থ্রেড তৈরি করবে, কারণ প্রতিটি থ্রেড লক অবজেক্টের জন্য অপেক্ষার সারিতে রাখা হবে। আমরা ইতিমধ্যেই একসাথে বেটারে মনিটর এবং লক খুঁজে বের করেছি : জাভা এবং থ্রেড ক্লাস। পার্ট II — সিঙ্ক্রোনাইজেশন । Executors.newCachedThreadPool()
এখন এর সাথে প্রতিস্থাপন করা যাক Executors.newWorkStealingPool()
। কি পরিবর্তন হবে? আমরা দেখব যে আমাদের কাজগুলি 5টিরও কম থ্রেডে কার্যকর করা হয়েছে। মনে রাখবেন যে CachedThreadPool
প্রতিটি কাজের জন্য একটি থ্রেড তৈরি করে? কারণ wait()
থ্রেড ব্লক করা হয়েছে, পরবর্তী কাজগুলি সম্পূর্ণ হতে চায় এবং পুলে তাদের জন্য নতুন থ্রেড তৈরি করা হয়েছে। একটি চুরি পুল সঙ্গে, থ্রেড চিরকাল নিষ্ক্রিয় দাঁড়ানো না. তারা তাদের প্রতিবেশীদের কাজ সম্পাদন করতে শুরু করে। কি WorkStealingPool
অন্য থ্রেড পুল থেকে এত আলাদা করে তোলে? যে জাদুকরী সত্যForkJoinPool
এর ভিতরে বাস করে:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
আসলে, আরও একটি পার্থক্য আছে। ডিফল্টরূপে, a-এর জন্য তৈরি থ্রেডগুলি ForkJoinPool
হল ডেমন থ্রেড, onrdinary-এর মাধ্যমে তৈরি থ্রেডের বিপরীতে ThreadPool
। সাধারণভাবে, আপনার ডেমন থ্রেডগুলি মনে রাখা উচিত, কারণ, উদাহরণস্বরূপ, CompletableFuture
ডেমন থ্রেডগুলিও ব্যবহার করা হয় যদি না আপনি আপনার নিজস্ব নির্দিষ্ট না করেন ThreadFactory
যা নন-ডেমন থ্রেড তৈরি করে। এই যে বিস্ময় অপ্রত্যাশিত জায়গায় লুকিয়ে থাকতে পারে! :)
ForkJoinPool
এই অংশে, আমরা আবার কথা বলবForkJoinPool
(যাকে ফর্ক/জইন ফ্রেমওয়ার্কও বলা হয়), যা "হুডের নিচে" থাকে WorkStealingPool
। সাধারণভাবে, জাভা 1.7-এ কাঁটা/জইন ফ্রেমওয়ার্ক ফিরে এসেছে। এবং জাভা 11 হাতের কাছে থাকলেও, এটি এখনও মনে রাখার মতো। এটি সবচেয়ে সাধারণ বাস্তবায়ন নয়, তবে এটি বেশ আকর্ষণীয়। ওয়েবে এটি সম্পর্কে একটি ভাল পর্যালোচনা রয়েছে: উদাহরণ সহ Java Fork-Join Framework বোঝা । ForkJoinPool
উপর নির্ভর করে java.util.concurrent.RecursiveTask
। এছাড়াও আছে java.util.concurrent.RecursiveAction
. RecursiveAction
ফলাফল ফেরত দেয় না। সুতরাং, RecursiveTask
অনুরূপ Callable
, এবং RecursiveAction
অনুরূপ unnable
. আমরা দেখতে পাচ্ছি যে নামটিতে দুটি গুরুত্বপূর্ণ পদ্ধতির নাম অন্তর্ভুক্ত রয়েছে: fork
এবং join
. দ্যfork
পদ্ধতি একটি পৃথক থ্রেডে অ্যাসিঙ্ক্রোনাসভাবে কিছু কাজ শুরু করে। এবং join
পদ্ধতিটি আপনাকে কাজ করার জন্য অপেক্ষা করতে দেয়। সর্বোত্তম বোঝার জন্য, আপনাকে জাভা 8-এ ইম্পেরেটিভ প্রোগ্রামিং থেকে ফর্ক/জোইন টু প্যারালাল স্ট্রীম পর্যন্ত পড়তে হবে ।
সারসংক্ষেপ
ভাল, যে পর্যালোচনা এই অংশ আপ মোড়ানো. আমরা শিখেছি যেExecutor
মূলত থ্রেড চালানোর জন্য উদ্ভাবিত হয়েছিল। তারপর জাভার নির্মাতারা ধারণাটি চালিয়ে যাওয়ার সিদ্ধান্ত নেন এবং এর সাথে আসেন ExecutorService
। ExecutorService
আমাদের submit()
এবং ব্যবহার করে কার্য সম্পাদনের জন্য জমা দিতে দেয় invoke()
এবং পরিষেবাটি বন্ধ করে দেয়। কারণ ExecutorService
বাস্তবায়নের প্রয়োজন, তারা কারখানার পদ্ধতি সহ একটি ক্লাস লিখেছিল এবং এটিকে বলে Executors
। এটি আপনাকে থ্রেড পুল ( ThreadPoolExecutor
) তৈরি করতে দেয়। অতিরিক্তভাবে, থ্রেড পুল রয়েছে যা আমাদের একটি কার্যকর করার সময়সূচী নির্দিষ্ট করার অনুমতি দেয়। এবং একটি ForkJoinPool
আড়ালে একটি WorkStealingPool
. আমি আশা করি আপনি উপরে আমি যা লিখেছি তা কেবল আকর্ষণীয়ই নয়, বোধগম্যও পেয়েছি :) আমি আপনার পরামর্শ এবং মন্তব্য শুনে সর্বদা আনন্দিত। একসাথে ভাল: জাভা এবং থ্রেড ক্লাস। পার্ট I — এক্সিকিউশনের থ্রেডগুলি একসাথে ভাল: জাভা এবং থ্রেড ক্লাস। পার্ট II — সিঙ্ক্রোনাইজেশন আরও ভাল একসাথে: জাভা এবং থ্রেড ক্লাস। পার্ট III — মিথস্ক্রিয়া আরও ভাল একসাথে: জাভা এবং থ্রেড ক্লাস। পার্ট IV — কলযোগ্য, ভবিষ্যত এবং বন্ধুরা একসাথে আরও ভাল: জাভা এবং থ্রেড ক্লাস। পার্ট VI — আগুন দূরে!
GO TO FULL VERSION