5.1 মানচিত্র শুধুমাত্র কাজ

এটি বিভিন্ন কৌশল বর্ণনা করার সময় যা আপনাকে কার্যকরীভাবে ব্যবহারিক সমস্যা সমাধানের জন্য MapReduce ব্যবহার করতে দেয়, সেইসাথে Hadoop-এর কিছু বৈশিষ্ট্য দেখায় যা বিকাশকে সহজ করতে পারে বা একটি ক্লাস্টারে একটি MapReduce কার্য সম্পাদনকে উল্লেখযোগ্যভাবে দ্রুততর করতে পারে।

যেমনটি আমরা মনে রাখি, MapReduce এর মধ্যে রয়েছে মানচিত্র, শাফেল এবং রিডুস ধাপ। একটি নিয়ম হিসাবে, শাফেল পর্যায়টি ব্যবহারিক কাজগুলিতে সবচেয়ে কঠিন হতে দেখা যায়, যেহেতু এই পর্যায়ে ডেটা সাজানো হয়। প্রকৃতপক্ষে, এমন অনেকগুলি কাজ রয়েছে যেখানে মানচিত্রের পর্যায়টি একা করা যেতে পারে। এখানে এই জাতীয় কাজের উদাহরণ রয়েছে:

  • ডেটা ফিল্টারিং (উদাহরণস্বরূপ, ওয়েব সার্ভার লগগুলিতে "আইপি ঠিকানা 123.123.123.123 থেকে সমস্ত রেকর্ড খুঁজুন");
  • ডেটা ট্রান্সফরমেশন ("CSv-লগে কলাম মুছুন");
  • একটি বাহ্যিক উত্স থেকে ডেটা লোড এবং আনলোড করা হচ্ছে ("ডাটাবেসে লগ থেকে সমস্ত রেকর্ড সন্নিবেশ করুন")।

এই ধরনের কাজ শুধুমাত্র মানচিত্র ব্যবহার করে সমাধান করা হয়. Hadoop-এ শুধুমাত্র ম্যাপ টাস্ক তৈরি করার সময়, আপনাকে রিডুসারের শূন্য সংখ্যা নির্দিষ্ট করতে হবে:

হ্যাডুপে একটি মানচিত্র-শুধু টাস্ক কনফিগারেশনের একটি উদাহরণ:

নেটিভ ইন্টারফেস Hadoop স্ট্রিমিং ইন্টারফেস

job'a কনফিগার করার সময় হ্রাসকারীর শূন্য সংখ্যা নির্দিষ্ট করুন:

job.setNumReduceTasks(0); 

আমরা একটি হ্রাসকারী নির্দিষ্ট করি না এবং হ্রাসকারীর একটি শূন্য সংখ্যা নির্দিষ্ট করি না। উদাহরণ:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

মানচিত্র শুধুমাত্র কাজ আসলে খুব দরকারী হতে পারে. উদাহরণস্বরূপ, Facetz.DCA প্ল্যাটফর্মে, ব্যবহারকারীদের আচরণের দ্বারা তাদের বৈশিষ্ট্যগুলি সনাক্ত করতে, এটি সঠিকভাবে একটি বড় মানচিত্র-কেবল ব্যবহার করা হয়, যার প্রতিটি ম্যাপার একজন ব্যবহারকারীকে একটি ইনপুট হিসাবে নেয় এবং একটি আউটপুট হিসাবে তার বৈশিষ্ট্যগুলি ফিরিয়ে দেয়।

5.2 একত্রিত করুন

যেমনটি আমি ইতিমধ্যেই লিখেছি, সাধারণত ম্যাপ-রিডুস টাস্ক করার সময় সবচেয়ে কঠিন পর্যায়টি হল শাফেল স্টেজ। এটি ঘটে কারণ মধ্যবর্তী ফলাফল (ম্যাপারের আউটপুট) ডিস্কে লেখা হয়, বাছাই করা হয় এবং নেটওয়ার্কের মাধ্যমে প্রেরণ করা হয়। যাইহোক, এমন কিছু কাজ আছে যেখানে এই ধরনের আচরণ খুব যুক্তিসঙ্গত বলে মনে হয় না। উদাহরণস্বরূপ, নথিতে শব্দ গণনার একই কাজটিতে, আপনি টাস্কের একটি ম্যাপ-রিডুস নোডে বেশ কয়েকটি ম্যাপারের আউটপুটগুলির ফলাফল প্রাক-একত্রিত করতে পারেন এবং প্রতিটি মেশিনের জন্য ইতিমধ্যে সংকলিত মানগুলিকে রিডুসারে পাস করতে পারেন। .

হাডুপে, এর জন্য, আপনি একটি সমন্বয় ফাংশন সংজ্ঞায়িত করতে পারেন যা ম্যাপারের অংশের আউটপুট প্রক্রিয়া করবে। কম্বিনিং ফাংশনটি হ্রাস করার মতোই - এটি কিছু ম্যাপারের আউটপুটকে ইনপুট হিসাবে নেয় এবং এই ম্যাপারগুলির জন্য একটি সমষ্টিগত ফলাফল তৈরি করে, তাই রিডুসারটি প্রায়শই একটি কম্বাইনার হিসাবেও ব্যবহৃত হয়। হ্রাস থেকে একটি গুরুত্বপূর্ণ পার্থক্য হল যে একটি কী এর সাথে সম্পর্কিত সমস্ত মানগুলি কম্বিনিং ফাংশনে যায় না

অধিকন্তু, হ্যাডুপ গ্যারান্টি দেয় না যে ম্যাপারের আউটপুটের জন্য কম্বাইন ফাংশনটি আদৌ কার্যকর করা হবে। অতএব, সমন্বয় ফাংশন সর্বদা প্রযোজ্য নয়, উদাহরণস্বরূপ, কী দ্বারা মধ্যম মান অনুসন্ধানের ক্ষেত্রে। তথাপি, যে সকল কাজে কম্বিনিং ফাংশন প্রযোজ্য, সেখানে এর ব্যবহার MapReduce টাস্কের গতিতে উল্লেখযোগ্য বৃদ্ধি পেতে দেয়।

হাডুপে কম্বাইনার ব্যবহার করা:

নেটিভ ইন্টারফেস Hadoop স্ট্রিমিং

job-a কনফিগার করার সময়, class-combiner উল্লেখ করুন। একটি নিয়ম হিসাবে, এটি Reducer হিসাবে একই:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

কমান্ড লাইন বিকল্পে -combiner কমান্ড উল্লেখ করুন। সাধারণত, এই কমান্ডটি হ্রাসকারী কমান্ডের মতোই। উদাহরণ:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce টাস্ক চেইন

এমন পরিস্থিতি রয়েছে যখন একটি MapReduce সমস্যা সমাধানের জন্য যথেষ্ট নয়। উদাহরণস্বরূপ, একটি সামান্য পরিবর্তিত WordCount টাস্ক বিবেচনা করুন: পাঠ্য নথির একটি সেট রয়েছে, আপনাকে গণনা করতে হবে সেটটিতে 1 থেকে 1000 বার কতগুলি শব্দ এসেছে, 1001 থেকে 2000 পর্যন্ত কতগুলি শব্দ, 2001 থেকে 3000 পর্যন্ত কতগুলি, এবং তাই সমাধানের জন্য, আমাদের 2টি MapReduce কাজের প্রয়োজন:

  • পরিবর্তিত শব্দসংখ্যা, যা প্রতিটি শব্দের জন্য গণনা করবে কোন ব্যবধানের মধ্যে পড়েছে;
  • একটি MapReduce যা গণনা করে যে প্রথম MapReduce-এর আউটপুটে প্রতিটি ব্যবধান কতবার সম্মুখীন হয়েছে।

ছদ্ম কোড সমাধান:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

হ্যাডুপে MapReduce টাস্কগুলির একটি ক্রম নির্বাহ করার জন্য, প্রথমটির জন্য আউটপুট হিসাবে নির্দিষ্ট করা ফোল্ডারটিকে দ্বিতীয় টাস্কের জন্য ইনপুট হিসাবে নির্দিষ্ট করা এবং পালাক্রমে সেগুলি চালানো যথেষ্ট।

অনুশীলনে, MapReduce টাস্কগুলির চেইনগুলি বেশ জটিল সিকোয়েন্স হতে পারে যেখানে MapReduce টাস্কগুলিকে ক্রমিকভাবে এবং একে অপরের সাথে সমান্তরালভাবে সংযুক্ত করা যেতে পারে। এই ধরনের টাস্ক এক্সিকিউশন প্ল্যানের ব্যবস্থাপনাকে সহজ করার জন্য, oozie এবং luigi এর মত আলাদা টুল আছে, যেগুলো এই সিরিজের একটি আলাদা প্রবন্ধে আলোচনা করা হবে।

5.4 বিতরণ করা ক্যাশে

Hadoop-এর একটি গুরুত্বপূর্ণ প্রক্রিয়া হল ডিস্ট্রিবিউটেড ক্যাশে। ডিস্ট্রিবিউটেড ক্যাশে আপনাকে ফাইলগুলি (যেমন টেক্সট ফাইল, আর্কাইভ, জার ফাইল) যোগ করার অনুমতি দেয় যেখানে MapReduce টাস্ক চলছে।

আপনি HDFS-এ সঞ্চিত ফাইলগুলি, স্থানীয় ফাইলগুলি (যে মেশিন থেকে কাজটি চালু করা হয়েছে সেই মেশিনে স্থানীয়) যোগ করতে পারেন। -file বিকল্পের মাধ্যমে mapper.py এবং reducer.py ফাইলগুলি যোগ করে হাডুপ স্ট্রিমিংয়ের সাথে কীভাবে বিতরণ করা ক্যাশে ব্যবহার করতে হয় তা আমি ইতিমধ্যেই স্পষ্টভাবে দেখিয়েছি। প্রকৃতপক্ষে, আপনি শুধুমাত্র mapper.py এবং reducer.py নয়, সাধারণভাবে নির্বিচারে ফাইল যোগ করতে পারেন এবং তারপরে সেগুলিকে স্থানীয় ফোল্ডারে ব্যবহার করতে পারেন।

বিতরণ করা ক্যাশে ব্যবহার করা:

নেটিভ API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop স্ট্রিমিং

# আমরা ফাইলের প্যারামিটারে বিতরণ করা ক্যাশে যুক্ত করা প্রয়োজন এমন ফাইলগুলির তালিকা করি। --files অপশনটি অন্যান্য অপশনের আগে অবশ্যই আসবে।

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

ব্যবহার উদাহরণ:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 যোগদান হ্রাস করুন

যারা রিলেশনাল ডাটাবেসের সাথে কাজ করতে অভ্যস্ত তারা প্রায়শই খুব সুবিধাজনক জয়েন অপারেশন ব্যবহার করে, যা তাদের কিছু টেবিলের বিষয়বস্তুগুলিকে কিছু কী অনুসারে যুক্ত করে যৌথভাবে প্রক্রিয়া করতে দেয়। বিগ ডাটা নিয়ে কাজ করার সময়ও মাঝে মাঝে এই সমস্যা দেখা দেয়। নিম্নলিখিত উদাহরণ বিবেচনা করুন:

দুটি ওয়েব সার্ভারের লগ আছে, প্রতিটি লগ এই মত দেখায়:

t\t

লগ স্নিপেট উদাহরণ:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

প্রতিটি IP ঠিকানার জন্য এটি গণনা করা প্রয়োজন যে 2টি সার্ভারের মধ্যে কোনটি এটি প্রায়শই পরিদর্শন করেছে৷ ফলাফলটি আকারে হওয়া উচিত:

\t

ফলাফলের একটি অংশের একটি উদাহরণ:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

দুর্ভাগ্যবশত, রিলেশনাল ডাটাবেসের বিপরীতে, সাধারণভাবে, কী দ্বারা দুটি লগ যোগ করা (এই ক্ষেত্রে, আইপি ঠিকানার মাধ্যমে) একটি বরং ভারী অপারেশন এবং 3 MapReduce এবং Reduce Join প্যাটার্ন ব্যবহার করে সমাধান করা হয়:

ReduceJoin এই মত কাজ করে:

1) প্রতিটি ইনপুট লগের জন্য, একটি পৃথক MapReduce (শুধুমাত্র মানচিত্র) চালু করা হয়, ইনপুট ডেটাকে নিম্নলিখিত ফর্মে রূপান্তর করে:

key -> (type, value

যেখানে কী হল টেবিলে যোগদানের চাবিকাঠি, Type হল টেবিলের ধরন (আমাদের ক্ষেত্রে প্রথম বা দ্বিতীয়), এবং মান হল কী-এর সাথে আবদ্ধ কোনো অতিরিক্ত ডেটা।

2) উভয় MapReduce-এর আউটপুটগুলিকে 3য় MapReduce-এর ইনপুট দেওয়া হয়, যা আসলে মিলন সম্পাদন করে। এই MapReduce-এ একটি খালি ম্যাপার রয়েছে যা সহজভাবে ইনপুট কপি করে। এর পরে, শাফেল কীগুলিতে ডেটা পচিয়ে দেয় এবং ইনপুট হিসাবে এটিকে রিডুসারে ফিড করে:

key -> [(type, value)]

এটি গুরুত্বপূর্ণ যে এই মুহুর্তে রিডুসার উভয় লগ থেকে রেকর্ড গ্রহণ করে এবং একই সময়ে, দুটি লগের মধ্যে কোনটি একটি নির্দিষ্ট মান থেকে এসেছে তা টাইপ ফিল্ড দ্বারা সনাক্ত করা সম্ভব। তাই মূল সমস্যা সমাধানের জন্য যথেষ্ট তথ্য আছে। আমাদের ক্ষেত্রে, রিডুসারকে কেবল প্রতিটি রেকর্ড কী এর জন্য গণনা করতে হবে কোন টাইপটি বেশি সম্মুখীন হয়েছে এবং এই ধরণের আউটপুট।

5.6 MapJoins

ReduceJoin প্যাটার্ন কী দ্বারা দুটি লগ যোগ করার সাধারণ ক্ষেত্রে বর্ণনা করে। যাইহোক, একটি বিশেষ ক্ষেত্রে রয়েছে যেখানে কাজটি উল্লেখযোগ্যভাবে সরলীকৃত এবং ত্বরান্বিত করা যেতে পারে। এটি এমন ক্ষেত্রে যেখানে একটি লগ অন্যটির চেয়ে উল্লেখযোগ্যভাবে ছোট। নিম্নলিখিত সমস্যা বিবেচনা করুন:

2 লগ আছে. প্রথম লগটিতে ওয়েব সার্ভার লগ রয়েছে (আগের টাস্কের মতো), দ্বিতীয় ফাইলটিতে (100kb আকারে) URL-> থিম ম্যাচ রয়েছে। উদাহরণ ২য় ফাইল:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

প্রতিটি আইপি ঠিকানার জন্য, এই আইপি ঠিকানা থেকে কোন বিভাগের পৃষ্ঠাগুলি প্রায়শই লোড হয়েছে তা গণনা করা প্রয়োজন৷

এই ক্ষেত্রে, আমাদের ইউআরএল দ্বারা 2টি লগ যোগ করতে হবে। যাইহোক, এই ক্ষেত্রে, আমাদের 3টি MapReduce চালাতে হবে না, যেহেতু দ্বিতীয় লগটি সম্পূর্ণরূপে মেমরিতে ফিট হবে। 1st MapReduce ব্যবহার করে সমস্যা সমাধানের জন্য, আমরা ডিস্ট্রিবিউটেড ক্যাশে দ্বিতীয় লগ লোড করতে পারি এবং যখন ম্যাপার আরম্ভ করা হয়, তখন এটিকে -> বিষয় অভিধানে রেখে মেমরিতে পড়তে পারি।

আরও, সমস্যাটি নিম্নরূপ সমাধান করা হয়:

মানচিত্র:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

কমানো:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce ইনপুট হিসাবে একটি আইপি এবং সমস্ত বিষয়ের একটি তালিকা পায়, এটি সহজভাবে হিসাব করে যে কোন বিষয়গুলি প্রায়শই সম্মুখীন হয়েছিল৷ এইভাবে, কাজটি 1ম MapReduce ব্যবহার করে সমাধান করা হয়, এবং প্রকৃত যোগদান সাধারণত মানচিত্রের ভিতরে ঘটে (অতএব, কী দ্বারা অতিরিক্ত একত্রিতকরণের প্রয়োজন না হলে, MapOnly কাজটি দিয়ে দেওয়া যেতে পারে):