4.1 Pangkalahatang impormasyon tungkol sa Hadoop

Ang paradigm ng MapReduce ay iminungkahi ng Google noong 2004 sa artikulong MapReduce: Pinasimpleng Pagproseso ng Data sa Malaking Cluster . Dahil ang iminungkahing artikulo ay naglalaman ng isang paglalarawan ng paradigm, ngunit ang pagpapatupad ay nawawala, ilang programmer mula sa Yahoo ang nagmungkahi ng kanilang pagpapatupad bilang bahagi ng gawain sa nutch web crawler. Maaari kang magbasa nang higit pa tungkol sa kasaysayan ng Hadoop sa artikulong Ang kasaysayan ng Hadoop: Mula sa 4 na node hanggang sa hinaharap ng data .

Sa una, ang Hadoop ay pangunahing kasangkapan para sa pag-iimbak ng data at pagpapatakbo ng mga gawain sa MapReduce, ngunit ngayon ang Hadoop ay isang malaking stack ng mga teknolohiyang nauugnay sa isang paraan o iba pa sa pagproseso ng malaking data (hindi lamang sa MapReduce).

Ang mga pangunahing (core) na bahagi ng Hadoop ay:

  • Ang Hadoop Distributed File System (HDFS) ay isang distributed file system na nagbibigay-daan sa iyong mag-imbak ng impormasyon ng halos walang limitasyong laki.
  • Ang Hadoop YARN ay isang framework para sa cluster resource management at task management, kasama ang MapReduce framework.
  • Karaniwan ang Hadoop

Mayroon ding malaking bilang ng mga proyektong direktang nauugnay sa Hadoop, ngunit hindi kasama sa Hadoop core:

  • Hive - isang tool para sa mga query na tulad ng SQL sa malaking data (ginagawa ang mga query sa SQL sa isang serye ng mga gawain sa MapReduce);
  • Ang baboy ay isang programming language para sa mataas na antas ng pagsusuri ng data. Ang isang linya ng code sa wikang ito ay maaaring maging isang pagkakasunud-sunod ng mga gawain sa MapReduce;
  • Ang Hbase ay isang columnar database na nagpapatupad ng BigTable paradigm;
  • Ang Cassandra ay isang database ng key-value na may mataas na pagganap na ipinamamahagi;
  • Ang ZooKeeper ay isang serbisyo para sa distributed configuration storage at pag-synchronize ng mga pagbabago sa configuration;
  • Ang Mahout ay isang malaking data machine learning library at engine.

Hiwalay, gusto kong tandaan ang proyekto ng Apache Spark , na isang makina para sa distributed data processing. Ang Apache Spark ay karaniwang gumagamit ng mga bahagi ng Hadoop tulad ng HDFS at YARN para sa trabaho nito, habang ang sarili nito ay naging mas sikat kamakailan kaysa sa Hadoop:

Ang ilan sa mga bahaging ito ay tatalakayin sa magkakahiwalay na artikulo sa seryeng ito ng mga materyales, ngunit sa ngayon, tingnan natin kung paano ka makakapagsimulang magtrabaho kasama ang Hadoop at isabuhay ito.

4.2 Pagpapatakbo ng mga programa ng MapReduce sa Hadoop

Ngayon tingnan natin kung paano magpatakbo ng isang MapReduce na gawain sa Hadoop. Bilang isang gawain, gagamitin namin ang klasikong halimbawa ng WordCount , na tinalakay sa nakaraang aralin.

Hayaan akong ipaalala sa iyo ang pagbabalangkas ng problema: mayroong isang hanay ng mga dokumento. Kinakailangan para sa bawat salita na nagaganap sa hanay ng mga dokumento upang mabilang kung gaano karaming beses naganap ang salita sa hanay.

Solusyon:

Hinahati ng mapa ang dokumento sa mga salita at ibinabalik ang isang hanay ng mga pares (salita, 1).

Bawasan ang mga kabuuan ng mga paglitaw ng bawat salita:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Ngayon ang gawain ay i-program ang solusyon na ito sa anyo ng code na maaaring isagawa sa Hadoop at tumakbo.

4.3 Paraan numero 1. Pag-stream ng Hadoop

Ang pinakamadaling paraan upang magpatakbo ng isang MapReduce program sa Hadoop ay ang paggamit ng Hadoop streaming interface. Ipinapalagay ng streaming interface na ang mapa at reduce ay ipinatupad bilang mga program na kumukuha ng data mula sa stdin at output sa stdout .

Ang program na nagpapatupad ng function ng mapa ay tinatawag na mapper. Ang program na nagpapatupad ng reduce ay tinatawag, ayon sa pagkakabanggit, reducer .

Ipinapalagay ng interface ng Streaming bilang default na ang isang papasok na linya sa isang mapper o reducer ay tumutugma sa isang papasok na entry para sa mapa .

Ang output ng mapper ay nakukuha sa input ng reducer sa anyo ng mga pares (key, value), habang ang lahat ng mga pares ay naaayon sa parehong key:

  • Ginagarantiya na maproseso ng isang solong paglulunsad ng reducer;
  • Isusumite sa input nang sunud-sunod (iyon ay, kung ang isang reducer ay nagpoproseso ng ilang magkakaibang key, ang input ay ipapangkat ayon sa key).

Kaya't ipatupad natin ang mapper at reducer sa python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Ang data na ipoproseso ng Hadoop ay dapat na nakaimbak sa HDFS. I-upload natin ang ating mga artikulo at ilagay ang mga ito sa HDFS. Upang gawin ito, gamitin ang hadoop fs command :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

Ang hadoop fs utility ay sumusuporta sa isang malaking bilang ng mga pamamaraan para sa pagmamanipula ng file system, marami sa mga ito ay kapareho ng mga karaniwang linux utilities.

Ngayon simulan natin ang gawain sa streaming:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Ginagamit ang yarn utility para ilunsad at pamahalaan ang iba't ibang application (kabilang ang map-reduce based) sa isang cluster. Ang Hadoop-streaming.jar ay isa lamang halimbawa ng naturang yarn application.

Susunod ay ang mga opsyon sa paglulunsad:

  • input - folder na may source data sa hdfs;
  • output - folder sa hdfs kung saan mo gustong ilagay ang resulta;
  • file - mga file na kinakailangan sa panahon ng pagpapatakbo ng mapa-reduce na gawain;
  • mapper ay ang console command na gagamitin para sa yugto ng mapa;
  • ang reduce ay ang console command na gagamitin para sa reduce stage.

Pagkatapos ilunsad, makikita mo ang pag-usad ng gawain sa console at isang URL para sa pagtingin ng mas detalyadong impormasyon tungkol sa gawain.

Sa interface na magagamit sa URL na ito, maaari mong malaman ang isang mas detalyadong katayuan ng pagpapatupad ng gawain, tingnan ang mga log ng bawat mapper at reducer (na kung saan ay lubhang kapaki-pakinabang sa kaso ng mga nabigong gawain).

Ang resulta ng trabaho pagkatapos ng matagumpay na pagpapatupad ay idinagdag sa HDFS sa folder na tinukoy namin sa field ng output. Maaari mong tingnan ang mga nilalaman nito gamit ang command na "hadoop fs -ls lenta_wordcount".

Ang resulta mismo ay maaaring makuha tulad ng sumusunod:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

Ipinapakita ng command na "hadoop fs -text" ang mga nilalaman ng folder sa text form. Inayos ko ang resulta ayon sa bilang ng paglitaw ng mga salita. Gaya ng inaasahan, ang pinakakaraniwang salita sa wika ay mga pang-ukol.

4.4 Paraan numero 2: gumamit ng Java

Ang Hadoop mismo ay nakasulat sa java, at ang katutubong interface ng Hadoop ay nakabatay din sa java. Ipakita natin kung ano ang hitsura ng native java application para sa wordcount:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Ang klase na ito ay eksaktong pareho sa aming halimbawa ng Python. Ginagawa namin ang mga klase ng TokenizerMapper at IntSumReducer sa pamamagitan ng pagkuha mula sa mga klase ng Mapper at Reducer, ayon sa pagkakabanggit. Ang mga klase na ipinasa bilang mga parameter ng template ay tumutukoy sa mga uri ng mga halaga ng input at output. Ipinapalagay ng native na API na ang mapa function ay binibigyan ng key-value pair bilang input. Dahil sa aming kaso ang susi ay walang laman, tinutukoy lang namin ang Bagay bilang ang uri ng susi.

Sa Pangunahing pamamaraan, sinisimulan namin ang gawain ng mapreduce at tukuyin ang mga parameter nito - pangalan, mapper at reducer, ang landas sa HDFS, kung saan matatagpuan ang data ng input at kung saan ilalagay ang resulta. Upang mag-compile, kailangan namin ng hadoop na mga aklatan. Gumagamit ako ng Maven upang bumuo, kung saan may imbakan ang cloudera. Ang mga tagubilin para sa pag-set up nito ay matatagpuan dito. Bilang resulta, ang pom.xmp file (na ginagamit ng maven upang ilarawan ang pagpupulong ng proyekto) nakuha ko ang sumusunod):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Isama natin ang proyekto sa isang pakete ng garapon:

mvn clean package

Matapos itayo ang proyekto sa isang jar file, ang paglulunsad ay nangyayari sa katulad na paraan, tulad ng sa kaso ng streaming interface:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Naghihintay kami para sa pagpapatupad at suriin ang resulta:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Tulad ng maaari mong hulaan, ang resulta ng pagpapatakbo ng aming katutubong application ay kapareho ng resulta ng streaming application na inilunsad namin sa nakaraang paraan.