5.1 मानचित्र केवल नौकरी

यह विभिन्न तकनीकों का वर्णन करने का समय है जो आपको व्यावहारिक समस्याओं को हल करने के लिए MapReduce का प्रभावी ढंग से उपयोग करने की अनुमति देती हैं, साथ ही Hadoop की कुछ विशेषताओं को दिखाती हैं जो विकास को आसान बना सकती हैं या किसी क्लस्टर पर MapReduce कार्य के निष्पादन को तेज कर सकती हैं।

जैसा कि हमें याद है, MapReduce में मैप, शफल और कम करने के चरण होते हैं। एक नियम के रूप में, फेरबदल चरण व्यावहारिक कार्यों में सबसे कठिन हो जाता है, क्योंकि इस स्तर पर डेटा को क्रमबद्ध किया जाता है। वास्तव में, ऐसे कई कार्य हैं जिनमें केवल मानचित्र चरण से ही छुटकारा पाया जा सकता है। यहाँ ऐसे कार्यों के उदाहरण दिए गए हैं:

  • डेटा फ़िल्टरिंग (उदाहरण के लिए, वेब सर्वर लॉग में "आईपी पते 123.123.123.123 से सभी रिकॉर्ड ढूंढें");
  • डेटा परिवर्तन ("सीएसवी-लॉग में कॉलम हटाएं");
  • बाहरी स्रोत से डेटा लोड और अनलोड करना ("लॉग से डेटाबेस में सभी रिकॉर्ड डालें")।

इस तरह के कार्यों को मैप-ओनली का उपयोग करके हल किया जाता है। Hadoop में मैप-ओनली टास्क बनाते समय, आपको रिड्यूसर की शून्य संख्या निर्दिष्ट करने की आवश्यकता होती है:

हडूप पर मानचित्र-केवल कार्य विन्यास का एक उदाहरण:

देशी इंटरफ़ेस Hadoop स्ट्रीमिंग इंटरफ़ेस

जॉब को कॉन्फ़िगर करते समय रेड्यूसर की शून्य संख्या निर्दिष्ट करें:

job.setNumReduceTasks(0); 

हम एक रेड्यूसर निर्दिष्ट नहीं करते हैं और रेड्यूसर की शून्य संख्या निर्दिष्ट करते हैं। उदाहरण:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

मानचित्र केवल नौकरियां वास्तव में बहुत उपयोगी हो सकती हैं। उदाहरण के लिए, Facetz.DCA प्लेटफॉर्म में, उपयोगकर्ताओं के व्यवहार द्वारा उनकी विशेषताओं की पहचान करने के लिए, यह केवल एक बड़े मानचित्र का उपयोग किया जाता है, जिसमें से प्रत्येक मैपर उपयोगकर्ता को इनपुट के रूप में लेता है और आउटपुट के रूप में उसकी विशेषताओं को वापस करता है।

5.2 मिलाना

जैसा कि मैंने पहले ही लिखा था, आमतौर पर मैप-कम करने का कार्य करते समय सबसे कठिन चरण फेरबदल चरण होता है। ऐसा इसलिए होता है क्योंकि इंटरमीडिएट परिणाम (मैपर का आउटपुट) डिस्क पर लिखे जाते हैं, सॉर्ट किए जाते हैं और नेटवर्क पर प्रसारित होते हैं। हालाँकि, ऐसे कार्य हैं जिनमें ऐसा व्यवहार बहुत उचित नहीं लगता है। उदाहरण के लिए, दस्तावेज़ों में शब्दों की गिनती के एक ही कार्य में, आप कार्य के एक मानचित्र-कम नोड पर कई मैपर्स के आउटपुट के परिणामों को पूर्व-एकत्रित कर सकते हैं, और प्रत्येक मशीन के लिए पहले से ही सम्‍मिलित मानों को रेड्यूसर को पास कर सकते हैं। .

हडूप में, इसके लिए, आप एक संयोजन फ़ंक्शन को परिभाषित कर सकते हैं जो मैपर्स के भाग के आउटपुट को प्रोसेस करेगा। संयोजन फ़ंक्शन कम करने के समान है - यह कुछ मैपर्स के आउटपुट को इनपुट के रूप में लेता है और इन मैपर्स के लिए एक समग्र परिणाम उत्पन्न करता है, इसलिए रेड्यूसर को अक्सर कॉम्बिनर के रूप में भी उपयोग किया जाता है। कम करने से एक महत्वपूर्ण अंतर यह है कि एक कुंजी के अनुरूप सभी मान संयोजन समारोह में नहीं आते हैं

इसके अलावा, हडूप इस बात की गारंटी नहीं देता है कि मैपर के आउटपुट के लिए कंबाइन फ़ंक्शन को बिल्कुल भी निष्पादित किया जाएगा। इसलिए, संयोजन समारोह हमेशा लागू नहीं होता है, उदाहरण के लिए, कुंजी द्वारा औसत मूल्य खोजने के मामले में। फिर भी, उन कार्यों में जहां संयोजन कार्य लागू होता है, इसका उपयोग MapReduce कार्य की गति में उल्लेखनीय वृद्धि प्राप्त करने की अनुमति देता है।

हडूप पर कंबाइनर का उपयोग करना:

देशी इंटरफ़ेस हडूप स्ट्रीमिंग

जॉब-ए को कॉन्फ़िगर करते समय, क्लास-कॉम्बिनेर निर्दिष्ट करें। एक नियम के रूप में, यह रेड्यूसर जैसा ही है:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

कमांड लाइन विकल्पों में -combiner कमांड निर्दिष्ट करें। आमतौर पर, यह कमांड रिड्यूसर कमांड के समान होता है। उदाहरण:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce टास्क चेन

ऐसी स्थितियाँ होती हैं जब किसी समस्या को हल करने के लिए एक MapReduce पर्याप्त नहीं होता है। उदाहरण के लिए, थोड़ा संशोधित WordCount कार्य पर विचार करें: पाठ दस्तावेज़ों का एक सेट है, आपको यह गिनने की आवश्यकता है कि सेट में 1 से 1000 बार कितने शब्द आए, 1001 से 2000 तक कितने शब्द, 2001 से 3000 तक कितने शब्द, और इसी तरह। समाधान के लिए, हमें 2 MapReduce नौकरियों की आवश्यकता है:

  • संशोधित शब्द गणना, जो प्रत्येक शब्द के लिए गणना करेगी कि यह किस अंतराल में गिर गया;
  • एक MapReduce जो यह गिनता है कि पहले MapReduce के आउटपुट में प्रत्येक अंतराल कितनी बार आया था।

छद्म कोड समाधान:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Hadoop पर MapReduce कार्यों के अनुक्रम को निष्पादित करने के लिए, यह केवल उस फ़ोल्डर को निर्दिष्ट करने के लिए पर्याप्त है जो दूसरे कार्य के इनपुट के रूप में पहले के लिए आउटपुट के रूप में निर्दिष्ट किया गया था और उन्हें बदले में चलाएं।

व्यवहार में, MapReduce कार्यों की श्रृंखला काफी जटिल क्रम हो सकती है जिसमें MapReduce कार्यों को क्रमिक रूप से और एक दूसरे के समानांतर दोनों में जोड़ा जा सकता है। ऐसी कार्य निष्पादन योजनाओं के प्रबंधन को आसान बनाने के लिए, oozie और luigi जैसे अलग-अलग उपकरण हैं, जिनकी चर्चा इस श्रृंखला के एक अलग लेख में की जाएगी।

5.4 वितरित कैश

हडूप में एक महत्वपूर्ण तंत्र वितरित कैश है। वितरित कैश आपको उस वातावरण में फ़ाइलें (जैसे पाठ फ़ाइलें, संग्रह, जार फ़ाइलें) जोड़ने की अनुमति देता है जहाँ MapReduce कार्य चल रहा है।

आप एचडीएफएस, स्थानीय फाइलों (मशीन से स्थानीय जहां से कार्य लॉन्च किया गया है) पर संग्रहीत फ़ाइलों को जोड़ सकते हैं। मैंने पहले ही स्पष्ट रूप से दिखाया है कि -file विकल्प के माध्यम से mapper.py और reducer.py फ़ाइलों को जोड़कर हडूप स्ट्रीमिंग के साथ वितरित कैश का उपयोग कैसे करें। वास्तव में, आप न केवल mapper.py और reducer.py जोड़ सकते हैं, बल्कि सामान्य रूप से मनमानी फाइलें भी जोड़ सकते हैं, और फिर उनका उपयोग इस तरह कर सकते हैं जैसे कि वे एक स्थानीय फ़ोल्डर में हों।

वितरित कैश का उपयोग करना:

देशी एपीआई
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
हडूप स्ट्रीमिंग

# हम उन फाइलों को सूचीबद्ध करते हैं जिन्हें -फाइल्स पैरामीटर में वितरित कैश में जोड़ने की आवश्यकता होती है। --files विकल्प को अन्य विकल्पों से पहले आना चाहिए।

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

उपयोग उदाहरण:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 ज्वाइन कम करें

जो लोग रिलेशनल डेटाबेस के साथ काम करने के आदी हैं, वे अक्सर बहुत सुविधाजनक जॉइन ऑपरेशन का उपयोग करते हैं, जो उन्हें कुछ कुंजियों के अनुसार कुछ तालिकाओं की सामग्री को संयुक्त रूप से संसाधित करने की अनुमति देता है। बड़े डेटा के साथ काम करते समय भी कभी-कभी यह समस्या उत्पन्न हो जाती है। निम्नलिखित उदाहरण पर विचार करें:

दो वेब सर्वर के लॉग हैं, प्रत्येक लॉग इस तरह दिखता है:

t\t

लॉग स्निपेट उदाहरण:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

प्रत्येक आईपी पते के लिए यह गणना करना आवश्यक है कि वह किन 2 सर्वरों पर अधिक बार गया। परिणाम रूप में होना चाहिए:

\t

परिणाम के एक भाग का एक उदाहरण:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

दुर्भाग्य से, संबंधपरक डेटाबेस के विपरीत, सामान्य तौर पर, कुंजी द्वारा दो लॉग (इस मामले में, आईपी पते द्वारा) में शामिल होना एक भारी ऑपरेशन है और इसे 3 MapReduce और Reduce Join पैटर्न का उपयोग करके हल किया जाता है:

ReduceJoin इस तरह काम करता है:

1) प्रत्येक इनपुट लॉग के लिए, एक अलग MapReduce (केवल मानचित्र) लॉन्च किया जाता है, इनपुट डेटा को निम्न रूप में परिवर्तित करता है:

key -> (type, value

जहाँ कुंजी तालिकाओं में शामिल होने की कुंजी है, प्रकार तालिका का प्रकार है (हमारे मामले में पहला या दूसरा), और मान कुंजी से जुड़ा कोई अतिरिक्त डेटा है।

2) दोनों MapReduces के आउटपुट को 3rd MapReduce के इनपुट में फीड किया जाता है, जो वास्तव में यूनियन करता है। इस MapReduce में एक खाली मैपर है जो केवल इनपुट को कॉपी करता है। अगला, फेरबदल डेटा को कुंजियों में विघटित करता है और इसे इनपुट के रूप में रिड्यूसर को फीड करता है:

key -> [(type, value)]

यह महत्वपूर्ण है कि इस समय रिड्यूसर दोनों लॉग से रिकॉर्ड प्राप्त करता है, और साथ ही, टाइप फ़ील्ड द्वारा यह पहचानना संभव है कि दो लॉग में से कौन सा मान एक विशेष मान से आया है। इसलिए मूल समस्या को हल करने के लिए पर्याप्त डेटा है। हमारे मामले में, रिड्यूसर को केवल प्रत्येक रिकॉर्ड कुंजी के लिए गणना करनी होगी कि किस प्रकार का अधिक सामना करना पड़ा है और इस प्रकार का आउटपुट।

5.6 मैपजॉइन

रिड्यूसजॉइन पैटर्न कुंजी द्वारा दो लॉग में शामिल होने के सामान्य मामले का वर्णन करता है। हालाँकि, एक विशेष मामला है जिसमें कार्य को काफी सरल और त्वरित किया जा सकता है। यह वह स्थिति है जिसमें एक लॉग दूसरे की तुलना में काफी छोटा होता है। निम्नलिखित समस्या पर विचार करें:

2 लॉग हैं। पहले लॉग में वेब सर्वर लॉग होता है (पिछले कार्य के समान), दूसरी फ़ाइल (आकार में 100kb) में URL-> थीम मैच होता है। उदाहरण दूसरी फ़ाइल:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

प्रत्येक आईपी पते के लिए, यह गणना करना आवश्यक है कि इस आईपी पते से किस श्रेणी के पृष्ठ सबसे अधिक बार लोड किए गए थे।

इस स्थिति में, हमें URL द्वारा 2 लॉग्स में शामिल होने की भी आवश्यकता है। हालाँकि, इस मामले में, हमें 3 MapReduce चलाने की आवश्यकता नहीं है, क्योंकि दूसरा लॉग पूरी तरह से मेमोरी में फिट हो जाएगा। पहले MapReduce का उपयोग करके समस्या को हल करने के लिए, हम दूसरे लॉग को वितरित कैश में लोड कर सकते हैं, और जब मैपर को इनिशियलाइज़ किया जाता है, तो बस इसे मेमोरी में पढ़ें, इसे -> टॉपिक डिक्शनरी में डालें।

इसके अलावा, समस्या का समाधान इस प्रकार है:

नक्शा:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

कम करना:


Ip -> [topics] -> [ip, most_popular_topic]

कम एक आईपी प्राप्त करता है और इनपुट के रूप में सभी विषयों की एक सूची प्राप्त करता है, यह केवल गणना करता है कि कौन से विषयों को अक्सर सामना किया गया था। इस प्रकार, कार्य को पहले MapReduce का उपयोग करके हल किया जाता है, और वास्तविक जुड़ाव आम तौर पर मानचित्र के अंदर होता है (इसलिए, यदि कुंजी द्वारा अतिरिक्त एकत्रीकरण की आवश्यकता नहीं थी, तो MapOnly कार्य को समाप्त किया जा सकता है):