4.1 Hadoop बद्दल सामान्य माहिती

Google ने 2004 मध्ये MapReduce: Simplified Data Processing on Large Clusters या लेखात MapReduce नमुना प्रस्तावित केला होता . प्रस्तावित लेखात प्रतिमानाचे वर्णन असल्याने, परंतु अंमलबजावणी गहाळ असल्याने, Yahoo मधील अनेक प्रोग्रामरनी नच वेब क्रॉलरवरील कामाचा भाग म्हणून त्यांची अंमलबजावणी प्रस्तावित केली. हडूपच्या इतिहासाबद्दल आपण लेखात अधिक वाचू शकता हडूपचा इतिहास: 4 नोड्सपासून डेटाच्या भविष्यापर्यंत .

सुरुवातीला, Hadoop हे प्रामुख्याने डेटा संचयित करण्यासाठी आणि MapReduce कार्ये चालविण्यासाठी एक साधन होते, परंतु आता Hadoop हे मोठ्या डेटावर प्रक्रिया करण्याशी संबंधित तंत्रज्ञानाचा एक मोठा स्टॅक आहे (केवळ MapReduce सह नाही).

हडूपचे मुख्य (मुख्य) घटक आहेत:

  • हडूप डिस्ट्रिब्युटेड फाइल सिस्टम (HDFS) ही एक वितरित फाइल सिस्टम आहे जी तुम्हाला जवळजवळ अमर्यादित आकाराची माहिती साठवण्याची परवानगी देते.
  • Hadoop YARN हे MapReduce फ्रेमवर्कसह क्लस्टर रिसोर्स मॅनेजमेंट आणि टास्क मॅनेजमेंटसाठी फ्रेमवर्क आहे.
  • हडूप सामान्य

हडूपशी थेट संबंधित मोठ्या संख्येने प्रकल्प देखील आहेत, परंतु हडूप कोरमध्ये समाविष्ट नाहीत:

  • Hive - मोठ्या डेटावर SQL सारख्या प्रश्नांसाठी एक साधन (SQL क्वेरी MapReduce कार्यांच्या मालिकेत बदलते);
  • डुक्कर ही उच्च-स्तरीय डेटा विश्लेषणासाठी एक प्रोग्रामिंग भाषा आहे. या भाषेतील कोडची एक ओळ MapReduce कार्यांच्या क्रमात बदलू शकते;
  • Hbase हा एक स्तंभीय डेटाबेस आहे जो BigTable पॅराडाइम लागू करतो;
  • कॅसॅंड्रा एक उच्च-कार्यक्षमता वितरित की-मूल्य डेटाबेस आहे;
  • ZooKeeper ही वितरीत कॉन्फिगरेशन स्टोरेज आणि कॉन्फिगरेशन बदलांच्या सिंक्रोनाइझेशनसाठी सेवा आहे;
  • माहूत ही एक मोठी डेटा मशीन लर्निंग लायब्ररी आणि इंजिन आहे.

स्वतंत्रपणे, मी Apache Spark प्रकल्पाची नोंद घेऊ इच्छितो , जे वितरित डेटा प्रक्रियेसाठी इंजिन आहे. अपाचे स्पार्क त्याच्या कामासाठी एचडीएफएस आणि यार्न सारख्या हडूप घटकांचा वापर करते, तर अलीकडेच हडूपपेक्षा अधिक लोकप्रिय झाले आहे:

यातील काही घटक सामग्रीच्या या मालिकेतील स्वतंत्र लेखांमध्ये समाविष्ट केले जातील, परंतु आत्तासाठी, आपण हडूपसह कार्य कसे सुरू करू शकता आणि ते प्रत्यक्षात कसे आणू शकता ते पाहू या.

4.2 Hadoop वर MapReduce प्रोग्राम चालवणे

आता Hadoop वर MapReduce कार्य कसे चालवायचे ते पाहू. एक कार्य म्हणून, आम्ही क्लासिक WordCount उदाहरण वापरू , ज्याची चर्चा मागील पाठात केली होती.

मी तुम्हाला समस्येचे फॉर्म्युलेशन स्मरण करून देतो: कागदपत्रांचा एक संच आहे. दस्तऐवजांच्या संचामध्ये येणार्‍या प्रत्येक शब्दासाठी संचामध्ये शब्द किती वेळा येतो हे मोजणे आवश्यक आहे.

उपाय:

नकाशा दस्तऐवजाचे शब्दांमध्ये विभाजन करतो आणि जोड्यांचा संच देतो (शब्द, 1).

प्रत्येक शब्दाच्या घटनांची बेरीज कमी करा:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

आता हे सोल्यूशन कोडच्या स्वरूपात प्रोग्राम करणे हे काम आहे जे हडूप आणि रनवर कार्यान्वित केले जाऊ शकते.

४.३ पद्धत क्रमांक १. हडूप स्ट्रीमिंग

Hadoop वर MapReduce प्रोग्राम चालवण्याचा सर्वात सोपा मार्ग म्हणजे Hadoop स्ट्रीमिंग इंटरफेस वापरणे. स्ट्रीमिंग इंटरफेस असे गृहीत धरते की मॅप आणि रिड्यूस हे प्रोग्राम्स म्हणून लागू केले जातात जे stdin आणि आउटपुट पासून stdout मध्ये डेटा घेतात .

मॅप फंक्शन कार्यान्वित करणार्‍या प्रोग्रामला मॅपर म्हणतात. कमी कार्यान्वित करणार्‍या प्रोग्रामला अनुक्रमे रेड्यूसर म्हणतात .

स्ट्रीमिंग इंटरफेस डीफॉल्टनुसार असे गृहीत धरतो की मॅपर किंवा रीड्यूसरमधील एक इनकमिंग लाइन नकाशासाठी येणाऱ्या एका एंट्रीशी संबंधित आहे .

मॅपरचे आउटपुट रिड्यूसरच्या इनपुटला जोड्यांच्या स्वरूपात मिळते (की, मूल्य), तर सर्व जोड्या समान कीशी संबंधित आहेत:

  • रेड्यूसरच्या एकाच प्रक्षेपणाद्वारे प्रक्रिया करण्याची हमी;
  • एका ओळीत इनपुटवर सबमिट केले जाईल (म्हणजेच, जर एक रेड्यूसर अनेक भिन्न की प्रक्रिया करत असेल, तर इनपुट की द्वारे गटबद्ध केले जाईल).

तर पायथनमध्ये मॅपर आणि रेड्यूसर लागू करूया:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

हडूप ज्या डेटावर प्रक्रिया करेल तो HDFS वर संग्रहित करणे आवश्यक आहे. चला आमचे लेख अपलोड करू आणि HDFS वर टाकू. हे करण्यासाठी, hadoop fs कमांड वापरा :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

hadoop fs युटिलिटी फाईल सिस्टीममध्ये फेरफार करण्यासाठी मोठ्या संख्येने पद्धतींना समर्थन देते, ज्यापैकी अनेक मानक लिनक्स युटिलिटिज प्रमाणेच आहेत.

आता स्ट्रीमिंग कार्य सुरू करूया:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

यार्न युटिलिटीचा उपयोग क्लस्टरवर विविध ऍप्लिकेशन्स (नकाशा-रिड्यूस आधारित) लाँच आणि व्यवस्थापित करण्यासाठी केला जातो. Hadoop-streaming.jar हे अशा यार्न ऍप्लिकेशनचे फक्त एक उदाहरण आहे.

पुढील लॉन्च पर्याय आहेत:

  • इनपुट - hdfs वर स्त्रोत डेटा असलेले फोल्डर;
  • आउटपुट - hdfs वर फोल्डर जेथे तुम्हाला निकाल लावायचा आहे;
  • फाइल - मॅप-रिड्यूस टास्कच्या ऑपरेशन दरम्यान आवश्यक असलेल्या फाइल्स;
  • मॅपर ही कन्सोल कमांड आहे जी नकाशा स्टेजसाठी वापरली जाईल;
  • reduce ही कन्सोल कमांड आहे जी कमी स्टेजसाठी वापरली जाईल.

लॉन्च केल्यानंतर, तुम्ही कन्सोलमध्ये टास्कची प्रगती पाहू शकता आणि टास्कबद्दल अधिक तपशीलवार माहिती पाहण्यासाठी URL पाहू शकता.

या URL वर उपलब्ध असलेल्या इंटरफेसमध्ये, आपण अधिक तपशीलवार कार्य अंमलबजावणी स्थिती शोधू शकता, प्रत्येक मॅपर आणि रेड्यूसरचे लॉग पाहू शकता (जे अयशस्वी कार्यांच्या बाबतीत खूप उपयुक्त आहे).

आम्ही आउटपुट फील्डमध्ये निर्दिष्ट केलेल्या फोल्डरमध्ये यशस्वी अंमलबजावणीनंतर कामाचा परिणाम HDFS मध्ये जोडला जातो. तुम्ही "hadoop fs -ls lenta_wordcount" कमांड वापरून त्यातील सामग्री पाहू शकता.

परिणाम स्वतः खालीलप्रमाणे मिळू शकतो:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

"hadoop fs -text" कमांड फोल्डरमधील मजकूर मजकूर स्वरूपात प्रदर्शित करते. मी शब्दांच्या घटनांच्या संख्येनुसार निकाल क्रमवारी लावला. अपेक्षेप्रमाणे, भाषेतील सर्वात सामान्य शब्द हे प्रीपोजिशन आहेत.

४.४ पद्धत क्रमांक २: Java वापरा

हडूप स्वतः जावामध्ये लिहिलेले आहे आणि हडूपचा मूळ इंटरफेस देखील जावा-आधारित आहे. शब्दसंख्येसाठी मूळ जावा अनुप्रयोग कसा दिसतो ते दाखवूया:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

हा वर्ग आमच्या पायथनच्या उदाहरणाप्रमाणेच करतो. आम्ही TokenizerMapper आणि IntSumReducer वर्ग अनुक्रमे मॅपर आणि Reducer वर्गांमधून तयार करतो. टेम्पलेट पॅरामीटर्स म्हणून उत्तीर्ण केलेले वर्ग इनपुट आणि आउटपुट मूल्यांचे प्रकार निर्दिष्ट करतात. नेटिव्ह API असे गृहीत धरते की नकाशा फंक्शनला इनपुट म्हणून एक की-व्हॅल्यू जोडी दिली जाते. आमच्या बाबतीत की रिकामी असल्याने, आम्ही फक्त ऑब्जेक्टला की प्रकार म्हणून परिभाषित करतो.

मुख्य पद्धतीमध्ये, आम्ही मॅप्रेड्यूस टास्क सुरू करतो आणि त्याचे पॅरामीटर्स परिभाषित करतो - नाव, मॅपर आणि रीड्यूसर, HDFS मधील पथ, इनपुट डेटा कुठे आहे आणि परिणाम कुठे ठेवायचा. संकलित करण्यासाठी, आम्हाला हॅडूप लायब्ररीची आवश्यकता आहे. मी तयार करण्यासाठी मावेन वापरतो, ज्यासाठी क्लाउडेराकडे भांडार आहे. ते सेट करण्यासाठी सूचना येथे आढळू शकतात. परिणामी, pom.xmp फाईल (जी प्रकल्पाच्या असेंब्लीचे वर्णन करण्यासाठी मॅवेनद्वारे वापरली जाते) मला खालील गोष्टी मिळाल्या):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

चला प्रकल्प एका जार पॅकेजमध्ये संकलित करूया:

mvn clean package

प्रोजेक्टला जार फाइलमध्ये तयार केल्यानंतर, लाँच त्याच प्रकारे होते, जसे की स्ट्रीमिंग इंटरफेसच्या बाबतीत:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

आम्ही अंमलबजावणीची प्रतीक्षा करतो आणि निकाल तपासतो:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

तुम्ही अंदाज लावू शकता की, आमचे मूळ ॲप्लिकेशन चालवण्याचा परिणाम आम्ही मागील मार्गाने सुरू केलेल्या स्ट्रीमिंग ऍप्लिकेशनच्या परिणामासारखाच आहे.