5.1 फक्त नोकरीचा नकाशा

विविध तंत्रांचे वर्णन करण्याची ही वेळ आहे जी तुम्हाला व्यावहारिक समस्या सोडवण्यासाठी MapReduce प्रभावीपणे वापरण्याची परवानगी देतात, तसेच Hadoop ची काही वैशिष्ट्ये दर्शवू शकतात जी विकास सुलभ करू शकतात किंवा क्लस्टरवर MapReduce कार्याच्या अंमलबजावणीला लक्षणीय गती देऊ शकतात.

जसे आपल्याला आठवते, MapReduce मध्ये Map, Shuffle आणि Reduce चे टप्पे असतात. नियमानुसार, शफल स्टेज व्यावहारिक कार्यांमध्ये सर्वात कठीण असल्याचे दिसून येते, कारण या टप्प्यावर डेटा क्रमवारी लावला जातो. खरं तर, अशी अनेक कार्ये आहेत ज्यात केवळ नकाशाच्या टप्प्यावरच वितरीत केले जाऊ शकते. येथे अशा कार्यांची उदाहरणे आहेत:

  • डेटा फिल्टरिंग (उदाहरणार्थ, वेब सर्व्हर लॉगमध्ये "आयपी अॅड्रेस 123.123.123.123 वरून सर्व रेकॉर्ड शोधा");
  • डेटा ट्रान्सफॉर्मेशन ("csv-logs मधील स्तंभ हटवा");
  • बाह्य स्त्रोतावरून डेटा लोड करणे आणि अनलोड करणे (“डेटाबेसमध्ये लॉगमधील सर्व रेकॉर्ड घाला”).

अशी कामे फक्त मॅप वापरून सोडवली जातात. हडूपमध्ये केवळ नकाशा-कार्य तयार करताना, तुम्हाला शून्य संख्या कमी करणारे निर्दिष्ट करणे आवश्यक आहे:

हॅडूपवरील नकाशा-केवळ कार्य कॉन्फिगरेशनचे उदाहरण:

मूळ इंटरफेस हडूप स्ट्रीमिंग इंटरफेस

जॉब'ए कॉन्फिगर करताना शून्य संख्या निर्दिष्ट करा:

job.setNumReduceTasks(0); 

आम्ही रीड्यूसर निर्दिष्ट करत नाही आणि शून्य संख्या कमी करणारे निर्दिष्ट करतो. उदाहरण:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

नकाशा केवळ नोकर्‍या प्रत्यक्षात खूप उपयुक्त असू शकतात. उदाहरणार्थ, Facetz.DCA प्लॅटफॉर्ममध्ये, वापरकर्त्यांच्या वागणुकीद्वारे त्यांची वैशिष्ट्ये ओळखण्यासाठी, तो तंतोतंत एक मोठा नकाशा-केवळ वापरला जातो, ज्याचा प्रत्येक मॅपर वापरकर्त्याला इनपुट म्हणून घेतो आणि त्याची वैशिष्ट्ये आउटपुट म्हणून परत करतो.

5.2 एकत्र करा

मी आधीच लिहिल्याप्रमाणे, मॅप-रिड्यूस टास्क करताना सामान्यतः सर्वात कठीण टप्पा म्हणजे शफल स्टेज. असे घडते कारण इंटरमीडिएट परिणाम (मॅपरचे आउटपुट) डिस्कवर लिहिले जातात, क्रमवारी लावले जातात आणि नेटवर्कवर प्रसारित केले जातात. तथापि, अशी कार्ये आहेत ज्यात असे वर्तन फारसे वाजवी वाटत नाही. उदाहरणार्थ, दस्तऐवजांमध्ये शब्द मोजण्याच्या समान कार्यात, तुम्ही टास्कच्या एका नकाशा-रिड्यूस नोडवर अनेक मॅपर्सच्या आउटपुटचे परिणाम पूर्व-एकत्रित करू शकता आणि प्रत्येक मशीनसाठी आधीच बेरीज केलेली मूल्ये रेड्यूसरकडे पास करू शकता. .

हॅडूपमध्ये, यासाठी, तुम्ही एक संयोजन फंक्शन परिभाषित करू शकता जे मॅपर्सच्या भागाच्या आउटपुटवर प्रक्रिया करेल. कॉम्बिनिंग फंक्शन कमी करण्यासारखेच आहे - ते काही मॅपर्सचे आउटपुट इनपुट म्हणून घेते आणि या मॅपर्ससाठी एकत्रित परिणाम तयार करते, म्हणून रेड्यूसरचा वापर अनेकदा कंबाईनर म्हणून देखील केला जातो. कमी मधील महत्त्वाचा फरक असा आहे की एका कीशी संबंधित सर्व मूल्ये एकत्रित फंक्शनला मिळत नाहीत .

शिवाय, मॅपरच्या आउटपुटसाठी कंबाईन फंक्शन अजिबात कार्यान्वित केले जाईल याची हॅडूप हमी देत ​​नाही. म्हणून, संयोजन फंक्शन नेहमी लागू होत नाही, उदाहरणार्थ, की द्वारे मध्यक मूल्य शोधण्याच्या बाबतीत. असे असले तरी, ज्या कार्यांमध्ये एकत्रित कार्य लागू आहे, तेथे त्याचा वापर MapReduce कार्याच्या गतीमध्ये लक्षणीय वाढ करण्यास अनुमती देतो.

हॅडूपवर कंबाईनर वापरणे:

मूळ इंटरफेस हडूप स्ट्रीमिंग

जॉब-ए कॉन्फिगर करताना, क्लास-कॉम्बिनर निर्दिष्ट करा. नियमानुसार, ते रेड्यूसरसारखेच आहे:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

कमांड लाइन पर्यायांमध्ये -combiner कमांड निर्दिष्ट करा. सामान्यतः, ही कमांड रेड्यूसर कमांड सारखीच असते. उदाहरण:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce टास्क चेन

अशी परिस्थिती असते जेव्हा समस्या सोडवण्यासाठी एक MapReduce पुरेसे नसते. उदाहरणार्थ, किंचित सुधारित WordCount कार्य विचारात घ्या: मजकूर दस्तऐवजांचा एक संच आहे, आपल्याला सेटमध्ये 1 ते 1000 वेळा किती शब्द आले आहेत, 1001 ते 2000 पर्यंत किती शब्द आहेत, 2001 ते 3000 पर्यंत किती शब्द आहेत हे मोजणे आवश्यक आहे. आणि असेच. समाधानासाठी, आम्हाला 2 MapReduce जॉबची आवश्यकता आहे:

  • सुधारित शब्दसंख्या, जो प्रत्येक शब्दासाठी तो कोणत्या मध्यांतरात पडला याची गणना करेल;
  • MapReduce जे पहिल्या MapReduce च्या आउटपुटमध्ये प्रत्येक मध्यांतर किती वेळा आले हे मोजते.

छद्म कोड उपाय:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

हॅडूपवर MapReduce टास्कचा क्रम कार्यान्वित करण्यासाठी, फक्त पहिल्यासाठी आउटपुट म्हणून निर्दिष्ट केलेले फोल्डर दुसर्‍या टास्कसाठी इनपुट म्हणून निर्दिष्ट करणे आणि त्या बदल्यात चालवणे पुरेसे आहे.

सराव मध्ये, MapReduce कार्यांची साखळी खूप जटिल अनुक्रम असू शकतात ज्यामध्ये MapReduce कार्ये अनुक्रमे आणि एकमेकांना समांतर जोडली जाऊ शकतात. अशा कार्य अंमलबजावणी योजनांचे व्यवस्थापन सुलभ करण्यासाठी, oozie आणि luigi सारखी स्वतंत्र साधने आहेत, ज्यांची या मालिकेतील एका स्वतंत्र लेखात चर्चा केली जाईल.

5.4 वितरित कॅशे

हडूपमधील एक महत्त्वाची यंत्रणा म्हणजे वितरित कॅशे. वितरित कॅशे तुम्हाला फाइल्स (उदा. मजकूर फाइल्स, संग्रहण, जार फाइल्स) ज्या वातावरणात MapReduce कार्य चालू आहे तेथे जोडण्याची परवानगी देते.

तुम्ही HDFS वर संग्रहित केलेल्या फाइल्स, स्थानिक फाइल्स (ज्या मशीनवरून टास्क लाँच केले आहे त्या मशीनवर स्थानिक) जोडू शकता. -file पर्यायाद्वारे mapper.py आणि reducer.py फाइल्स जोडून हॅडूप स्ट्रीमिंगसह वितरित कॅशे कसे वापरायचे ते मी आधीच स्पष्टपणे दाखवले आहे. खरं तर, तुम्ही फक्त mapper.py आणि reducer.pyच नाही तर सर्वसाधारणपणे अनियंत्रित फाइल्स जोडू शकता आणि नंतर त्या एखाद्या स्थानिक फोल्डरमध्ये असल्याप्रमाणे वापरू शकता.

वितरित कॅशे वापरणे:

मूळ API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
हडूप स्ट्रीमिंग

# आम्ही फाइल्स पॅरामीटरमध्ये वितरित कॅशेमध्ये जोडण्यासाठी आवश्यक असलेल्या फाइल्सची यादी करतो. --files पर्याय इतर पर्यायांपूर्वी येणे आवश्यक आहे.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

वापर उदाहरण:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 सामील होणे कमी करा

ज्यांना रिलेशनल डेटाबेससह काम करण्याची सवय आहे ते सहसा अतिशय सोयीस्कर जॉईन ऑपरेशन वापरतात, जे त्यांना काही टेबलच्या सामग्रीवर काही की नुसार सामील करून एकत्रितपणे प्रक्रिया करण्यास अनुमती देते. बिग डेटासह काम करताना, ही समस्या कधीकधी उद्भवते. खालील उदाहरणाचा विचार करा:

दोन वेब सर्व्हरचे लॉग आहेत, प्रत्येक लॉग असे दिसते:

t\t

लॉग स्निपेट उदाहरण:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

प्रत्येक IP पत्त्यासाठी 2 पैकी कोणत्या सर्व्हरला अधिक वेळा भेट दिली याची गणना करणे आवश्यक आहे. परिणाम फॉर्ममध्ये असावा:

\t

निकालाच्या भागाचे उदाहरणः

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

दुर्दैवाने, रिलेशनल डेटाबेसच्या विपरीत, सर्वसाधारणपणे, कीद्वारे दोन लॉग जोडणे (या प्रकरणात, आयपी पत्त्याद्वारे) एक ऐवजी भारी ऑपरेशन आहे आणि 3 MapReduce आणि Reduce Join पॅटर्न वापरून सोडवले जाते:

ReduceJoin असे कार्य करते:

1) प्रत्येक इनपुट लॉगसाठी, एक वेगळा MapReduce (केवळ नकाशा) लाँच केला जातो, इनपुट डेटा खालील फॉर्ममध्ये रूपांतरित करतो:

key -> (type, value

जेथे की ही टेबलमध्ये सामील होण्यासाठी की आहे, Type हा टेबलचा प्रकार आहे (आमच्या बाबतीत पहिला किंवा दुसरा), आणि मूल्य म्हणजे कीला बांधलेला कोणताही अतिरिक्त डेटा.

2) दोन्ही MapReduce चे आउटपुट 3rd MapReduce च्या इनपुटला दिले जाते, जे खरेतर, युनियन करते. या MapReduce मध्ये एक रिक्त मॅपर आहे जो फक्त इनपुट कॉपी करतो. पुढे, शफल डेटाचे की मध्ये विघटन करते आणि इनपुट म्हणून रेड्यूसरला फीड करते:

key -> [(type, value)]

हे महत्त्वाचे आहे की या क्षणी रीड्यूसरला दोन्ही लॉगमधून रेकॉर्ड प्राप्त होते आणि त्याच वेळी, विशिष्ट मूल्य दोनपैकी कोणत्या लॉगमधून आले हे टाइप फील्डद्वारे ओळखणे शक्य आहे. त्यामुळे मूळ समस्या सोडवण्यासाठी पुरेसा डेटा आहे. आमच्या बाबतीत, रीड्यूसरला प्रत्येक रेकॉर्ड कीसाठी फक्त गणना करावी लागते आणि कोणत्या प्रकाराला अधिक सामोरे जावे लागते आणि या प्रकाराला आउटपुट करावे लागते.

5.6 MapJoins

ReduceJoin पॅटर्न किल्लीद्वारे दोन लॉग जोडण्याच्या सामान्य केसचे वर्णन करतो. तथापि, एक विशेष प्रकरण आहे ज्यामध्ये कार्य लक्षणीयरीत्या सरलीकृत आणि वेगवान केले जाऊ शकते. हे असे आहे ज्यामध्ये एक लॉग इतर पेक्षा लक्षणीय लहान आहे. खालील समस्या विचारात घ्या:

2 नोंदी आहेत. पहिल्या लॉगमध्ये वेब सर्व्हर लॉग आहे (मागील टास्क प्रमाणेच), दुसऱ्या फाइलमध्ये (100kb आकारात) URL-> थीम जुळणी आहे. उदाहरण 2 रा फाइल:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

प्रत्येक IP पत्त्यासाठी, या IP पत्त्यावरून कोणत्या श्रेणीची पृष्ठे बहुतेक वेळा लोड केली गेली याची गणना करणे आवश्यक आहे.

या प्रकरणात, आम्हाला URL द्वारे 2 लॉगमध्ये देखील सामील होणे आवश्यक आहे. तथापि, या प्रकरणात, आम्हाला 3 MapReduce चालवण्याची गरज नाही, कारण दुसरा लॉग पूर्णपणे मेमरीमध्ये फिट होईल. 1st MapReduce वापरून समस्या सोडवण्यासाठी, आम्ही वितरित कॅशेमध्ये दुसरा लॉग लोड करू शकतो आणि जेव्हा मॅपर सुरू होईल, तेव्हा ते -> विषय शब्दकोशात टाकून मेमरीमध्ये वाचू शकतो.

पुढे, समस्या खालीलप्रमाणे सोडविली जाते:

नकाशा:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

कमी करा:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce ला एक ip आणि इनपुट म्हणून सर्व विषयांची सूची प्राप्त होते, ते फक्त कोणते विषय सर्वात जास्त वेळा आले होते याची गणना करते. अशाप्रकारे, 1ल्या MapReduce वापरून कार्य सोडवले जाते आणि वास्तविक सामील होणे सामान्यत: नकाशाच्या आत होते (म्हणून, की द्वारे अतिरिक्त एकत्रीकरणाची आवश्यकता नसल्यास, MapOnly कार्य वितरीत केले जाऊ शकते):