4.1 हडूप के बारे में सामान्य जानकारी

MapReduce प्रतिमान Google द्वारा 2004 में अपने लेख MapReduce: सरलीकृत डेटा प्रोसेसिंग ऑन लार्ज क्लस्टर्स में प्रस्तावित किया गया था । चूंकि प्रस्तावित लेख में प्रतिमान का विवरण था, लेकिन कार्यान्वयन गायब था, याहू के कई प्रोग्रामरों ने नच वेब क्रॉलर पर काम के हिस्से के रूप में उनके कार्यान्वयन का प्रस्ताव दिया। आप लेख में हडूप के इतिहास के बारे में अधिक पढ़ सकते हैं हडूप का इतिहास: डेटा के भविष्य के लिए 4 नोड्स से

प्रारंभ में, Hadoop मुख्य रूप से डेटा संग्रहीत करने और MapReduce कार्यों को चलाने के लिए एक उपकरण था, लेकिन अब Hadoop बड़े डेटा को संसाधित करने के लिए एक या दूसरे तरीके से संबंधित तकनीकों का एक बड़ा ढेर है (न केवल MapReduce के साथ)।

Hadoop के मुख्य (कोर) घटक हैं:

  • Hadoop डिस्ट्रीब्यूटेड फाइल सिस्टम (HDFS) एक डिस्ट्रीब्यूटेड फाइल सिस्टम है जो आपको लगभग असीमित आकार की जानकारी स्टोर करने की अनुमति देता है।
  • Hadoop YARN क्लस्टर संसाधन प्रबंधन और कार्य प्रबंधन के लिए एक रूपरेखा है, जिसमें MapReduce रूपरेखा भी शामिल है।
  • हडूप आम

हडूप से सीधे संबंधित बड़ी संख्या में परियोजनाएं भी हैं, लेकिन हडूप कोर में शामिल नहीं हैं:

  • हाइव - बड़े डेटा पर SQL जैसी क्वेरी के लिए एक उपकरण (SQL क्वेरी को MapReduce कार्यों की एक श्रृंखला में बदल देता है);
  • सुअर उच्च स्तरीय डेटा विश्लेषण के लिए एक प्रोग्रामिंग भाषा है। इस भाषा में कोड की एक पंक्ति MapReduce कार्यों के अनुक्रम में बदल सकती है;
  • Hbase एक स्तंभ डेटाबेस है जो BigTable प्रतिमान को लागू करता है;
  • कैसेंड्रा एक उच्च-प्रदर्शन वितरित की-वैल्यू डेटाबेस है;
  • ज़ूकीपर वितरित कॉन्फ़िगरेशन स्टोरेज और कॉन्फ़िगरेशन परिवर्तनों के सिंक्रनाइज़ेशन के लिए एक सेवा है;
  • Mahout एक बिग डेटा मशीन लर्निंग लाइब्रेरी और इंजन है।

अलग से, मैं अपाचे स्पार्क परियोजना को नोट करना चाहूंगा , जो वितरित डेटा प्रोसेसिंग के लिए एक इंजन है। Apache Spark आमतौर पर अपने काम के लिए Hadoop घटकों जैसे HDFS और YARN का उपयोग करता है, जबकि खुद हाल ही में Hadoop की तुलना में अधिक लोकप्रिय हुआ है:

सामग्री की इस श्रृंखला में इनमें से कुछ घटकों को अलग-अलग लेखों में शामिल किया जाएगा, लेकिन अभी के लिए, देखते हैं कि आप हडूप के साथ कैसे काम करना शुरू कर सकते हैं और इसे अभ्यास में ला सकते हैं।

4.2 Hadoop पर MapReduce प्रोग्राम चलाना

अब देखते हैं कि Hadoop पर MapReduce टास्क कैसे चलाया जाता है। एक कार्य के रूप में, हम क्लासिक WordCount उदाहरण का उपयोग करेंगे , जिसकी चर्चा पिछले पाठ में की गई थी।

मैं आपको समस्या का सूत्रीकरण याद दिलाता हूं: दस्तावेजों का एक सेट है। दस्तावेज़ों के सेट में आने वाले प्रत्येक शब्द के लिए यह गिनना आवश्यक है कि शब्द सेट में कितनी बार आया है।

समाधान:

नक्शा दस्तावेज़ को शब्दों में विभाजित करता है और जोड़े का एक सेट देता है (शब्द, 1)।

प्रत्येक शब्द की घटनाओं का योग कम करें:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

अब कार्य इस समाधान को कोड के रूप में प्रोग्राम करना है जिसे Hadoop पर निष्पादित किया जा सकता है और चलाया जा सकता है।

4.3 विधि संख्या 1। हडूप स्ट्रीमिंग

Hadoop पर MapReduce प्रोग्राम चलाने का सबसे आसान तरीका Hadoop स्ट्रीमिंग इंटरफ़ेस का उपयोग करना है। स्ट्रीमिंग इंटरफ़ेस मानता है कि मैप और रिड्यूस को प्रोग्राम के रूप में लागू किया जाता है जो डेटा को स्टडिन और आउटपुट से स्टडआउट तक ले जाता है ।

मैप फ़ंक्शन को निष्पादित करने वाले प्रोग्राम को मैपर कहा जाता है। कम करने वाले प्रोग्राम को क्रमशः रिड्यूसर कहा जाता है ।

स्ट्रीमिंग इंटरफ़ेस डिफ़ॉल्ट रूप से मानता है कि मैपर या रेड्यूसर में आने वाली एक पंक्ति मानचित्र के लिए एक आने वाली प्रविष्टि से मेल खाती है ।

मैपर का आउटपुट जोड़े (कुंजी, मान) के रूप में रेड्यूसर के इनपुट पर जाता है, जबकि सभी जोड़े एक ही कुंजी के अनुरूप होते हैं:

  • रेड्यूसर के एकल लॉन्च द्वारा संसाधित होने की गारंटी;
  • इनपुट को एक पंक्ति में सबमिट किया जाएगा (अर्थात, यदि एक रिड्यूसर कई अलग-अलग कुंजियों को संसाधित करता है, तो इनपुट को कुंजी द्वारा समूहीकृत किया जाएगा)।

तो चलिए अजगर में मैपर और रिड्यूसर लागू करते हैं:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Hadoop जिस डेटा को प्रोसेस करेगा उसे HDFS पर स्टोर किया जाना चाहिए। चलिए अपने लेख अपलोड करते हैं और उन्हें HDFS पर डालते हैं। ऐसा करने के लिए, हडूप एफएस कमांड का उपयोग करें :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

हडूप एफएस उपयोगिता फ़ाइल सिस्टम में हेरफेर करने के लिए बड़ी संख्या में विधियों का समर्थन करती है, जिनमें से कई मानक लिनक्स उपयोगिताओं के समान हैं।

अब स्ट्रीमिंग कार्य शुरू करते हैं:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

यार्न यूटिलिटी का उपयोग क्लस्टर पर विभिन्न अनुप्रयोगों (मानचित्र-कम आधारित सहित) को लॉन्च करने और प्रबंधित करने के लिए किया जाता है। Hadoop-streaming.jar ऐसे यार्न एप्लिकेशन का सिर्फ एक उदाहरण है।

अगला लॉन्च विकल्प हैं:

  • इनपुट - एचडीएफएस पर स्रोत डेटा वाला फ़ोल्डर;
  • आउटपुट - एचडीएफएस पर फ़ोल्डर जहां आप परिणाम डालना चाहते हैं;
  • फ़ाइल - मानचित्र-कम कार्य के संचालन के दौरान आवश्यक फ़ाइलें;
  • मैपर कंसोल कमांड है जिसका उपयोग मैप स्टेज के लिए किया जाएगा;
  • कम करें कंसोल कमांड है जिसका उपयोग कम चरण के लिए किया जाएगा।

लॉन्च करने के बाद, आप कंसोल में कार्य की प्रगति और कार्य के बारे में अधिक विस्तृत जानकारी देखने के लिए एक URL देख सकते हैं।

इस URL पर उपलब्ध इंटरफ़ेस में, आप अधिक विस्तृत कार्य निष्पादन स्थिति का पता लगा सकते हैं, प्रत्येक मैपर और रिड्यूसर के लॉग देख सकते हैं (जो विफल कार्यों के मामले में बहुत उपयोगी है)।

सफल निष्पादन के बाद कार्य का परिणाम HDFS में उस फ़ोल्डर में जोड़ा जाता है जिसे हमने आउटपुट फ़ील्ड में निर्दिष्ट किया था। आप "हडूप एफएस -एलएस लेंटा_वर्डकाउंट" कमांड का उपयोग करके इसकी सामग्री देख सकते हैं।

परिणाम स्वयं इस प्रकार प्राप्त किया जा सकता है:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

"Hadoop fs -text" कमांड फ़ोल्डर की सामग्री को टेक्स्ट फॉर्म में प्रदर्शित करता है। मैंने परिणाम को शब्दों की घटनाओं की संख्या से क्रमबद्ध किया। जैसा कि अपेक्षित था, भाषा में सबसे आम शब्द पूर्वसर्ग हैं।

4.4 विधि संख्या 2: जावा का उपयोग करें

Hadoop स्वयं जावा में लिखा गया है, और Hadoop का मूल इंटरफ़ेस भी जावा-आधारित है। आइए दिखाते हैं कि वर्डकाउंट के लिए देशी जावा एप्लिकेशन कैसा दिखता है:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

यह वर्ग बिल्कुल हमारे पायथन उदाहरण के समान ही करता है। हम क्रमशः मैपर और रेड्यूसर कक्षाओं से प्राप्त करके TokenizerMapper और IntSumReducer कक्षाएं बनाते हैं। टेम्प्लेट पैरामीटर के रूप में पास की गई कक्षाएं इनपुट और आउटपुट मानों के प्रकार निर्दिष्ट करती हैं। देशी एपीआई मानता है कि मानचित्र फ़ंक्शन को इनपुट के रूप में एक कुंजी-मूल्य जोड़ी दी जाती है। चूंकि हमारे मामले में कुंजी खाली है, हम केवल वस्तु को कुंजी प्रकार के रूप में परिभाषित करते हैं।

मुख्य विधि में, हम मैप्रेड्यूस कार्य शुरू करते हैं और इसके मापदंडों को परिभाषित करते हैं - नाम, मैपर और रेड्यूसर, एचडीएफएस में पथ, जहां इनपुट डेटा स्थित है और जहां परिणाम डालना है। संकलित करने के लिए, हमें हडूप पुस्तकालयों की आवश्यकता है। मैं मेवेन का निर्माण करने के लिए उपयोग करता हूं, जिसके लिए क्लौडेरा का भंडार है। इसे स्थापित करने के निर्देश यहां देखे जा सकते हैं। परिणामस्वरूप, pom.xmp फ़ाइल (जिसका उपयोग मावेन द्वारा परियोजना की असेंबली का वर्णन करने के लिए किया जाता है) मुझे निम्नलिखित मिला:

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

आइए प्रोजेक्ट को एक जार पैकेज में संकलित करें:

mvn clean package

प्रोजेक्ट को जार फ़ाइल में बनाने के बाद, लॉन्च उसी तरह से होता है, जैसे स्ट्रीमिंग इंटरफ़ेस के मामले में:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

हम निष्पादन की प्रतीक्षा करते हैं और परिणाम की जांच करते हैं:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

जैसा कि आप अनुमान लगा सकते हैं, हमारे मूल एप्लिकेशन को चलाने का परिणाम स्ट्रीमिंग एप्लिकेशन के परिणाम के समान है जिसे हमने पिछले तरीके से लॉन्च किया था।