4.1 Informații generale despre Hadoop

Paradigma MapReduce a fost propusă de Google în 2004 în articolul său MapReduce: Simplified Data Processing on Large Clusters . Întrucât articolul propus conținea o descriere a paradigmei, dar implementarea lipsea, mai mulți programatori de la Yahoo au propus implementarea lor ca parte a lucrării la crawler-ul web nutch. Puteți citi mai multe despre istoria Hadoop în articolul Istoria Hadoop: De la 4 noduri la viitorul datelor .

Inițial, Hadoop a fost în primul rând un instrument pentru stocarea datelor și rularea sarcinilor MapReduce, dar acum Hadoop este un teanc mare de tehnologii legate într-un fel sau altul de procesarea datelor mari (nu numai cu MapReduce).

Componentele principale (de bază) ale Hadoop sunt:

  • Hadoop Distributed File System (HDFS) este un sistem de fișiere distribuit care vă permite să stocați informații de dimensiune aproape nelimitată.
  • Hadoop YARN este un cadru pentru managementul resurselor cluster și managementul sarcinilor, inclusiv cadrul MapReduce.
  • Hadoop comun

Există, de asemenea, un număr mare de proiecte legate direct de Hadoop, dar care nu sunt incluse în nucleul Hadoop:

  • Hive - un instrument pentru interogări asemănătoare SQL asupra datelor mari (transformă interogările SQL într-o serie de sarcini MapReduce);
  • Pig este un limbaj de programare pentru analiza datelor la nivel înalt. O linie de cod în această limbă se poate transforma într-o secvență de sarcini MapReduce;
  • Hbase este o bază de date coloană care implementează paradigma BigTable;
  • Cassandra este o bază de date de înaltă performanță distribuită cheie-valoare;
  • ZooKeeper este un serviciu de stocare distribuită a configurației și sincronizare a modificărilor de configurare;
  • Mahout este o bibliotecă și un motor de învățare automată a datelor mari.

Separat, aș dori să notez proiectul Apache Spark , care este un motor pentru procesarea distribuită a datelor. Apache Spark folosește de obicei componente Hadoop, cum ar fi HDFS și YARN pentru munca sa, în timp ce el însuși a devenit recent mai popular decât Hadoop:

Unele dintre aceste componente vor fi acoperite în articole separate din această serie de materiale, dar, deocamdată, să vedem cum puteți începe să lucrați cu Hadoop și să-l puneți în practică.

4.2 Rularea programelor MapReduce pe Hadoop

Acum să vedem cum să rulăm o sarcină MapReduce pe Hadoop. Ca sarcină, vom folosi exemplul clasic WordCount , despre care a fost discutat în lecția anterioară.

Permiteți-mi să vă reamintesc formularea problemei: există un set de documente. Este necesar ca fiecare cuvânt care apare în setul de documente să numere de câte ori apare cuvântul în set.

Soluţie:

Harta împarte documentul în cuvinte și returnează un set de perechi (cuvânt, 1).

Reduceți sumele aparițiile fiecărui cuvânt:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Acum sarcina este de a programa această soluție sub formă de cod care poate fi executat pe Hadoop și rulat.

4.3 Metoda numărul 1. Streaming Hadoop

Cel mai simplu mod de a rula un program MapReduce pe Hadoop este să folosești interfața de streaming Hadoop. Interfața de streaming presupune că map și reduce sunt implementate ca programe care preiau date de la stdin și ieșire la stdout .

Programul care execută funcția map se numește mapper. Programul care execută reduce se numește, respectiv, reducer .

Interfața de streaming presupune implicit că o linie de intrare într- un mapper sau reductor corespunde unei intrări de intrare pentru hartă .

Ieșirea mapatorului ajunge la intrarea reductorului sub formă de perechi (cheie, valoare), în timp ce toate perechile corespund aceleiași chei:

  • Procesat garantat printr-o singură lansare a reductorului;
  • Va fi transmisă la intrare într-un rând (adică, dacă un reductor prelucrează mai multe chei diferite, intrarea va fi grupată după cheie).

Deci, să implementăm mapper și reducer în python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Datele pe care le va procesa Hadoop trebuie să fie stocate pe HDFS. Să încărcăm articolele noastre și să le punem pe HDFS. Pentru a face acest lucru, utilizați comanda hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

Utilitarul hadoop fs acceptă un număr mare de metode de manipulare a sistemului de fișiere, dintre care multe sunt identice cu utilitarele standard Linux.

Acum să începem sarcina de streaming:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Utilitarul yarn este folosit pentru a lansa și gestiona diverse aplicații (inclusiv bazate pe hărți) pe un cluster. Hadoop-streaming.jar este doar un exemplu de astfel de aplicație de fire.

Urmează opțiunile de lansare:

  • input - folder cu date sursă pe hdfs;
  • output - folder pe hdfs unde vrei sa pui rezultatul;
  • fișier - fișiere care sunt necesare în timpul funcționării sarcinii de reducere a hărții;
  • mapper este comanda consolei care va fi folosită pentru etapa hărții;
  • reduce este comanda consolei care va fi folosită pentru etapa reduce.

După lansare, puteți vedea progresul sarcinii în consolă și o adresă URL pentru vizualizarea informațiilor mai detaliate despre sarcină.

În interfața disponibilă la acest URL, puteți afla o stare de execuție a sarcinilor mai detaliată, puteți vizualiza jurnalele fiecărui mapper și reductor (ceea ce este foarte util în cazul sarcinilor eșuate).

Rezultatul lucrării după executarea cu succes este adăugat la HDFS în folderul pe care l-am specificat în câmpul de ieșire. Puteți vizualiza conținutul acestuia folosind comanda „hadoop fs -ls lenta_wordcount”.

Rezultatul în sine poate fi obținut după cum urmează:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

Comanda „hadoop fs -text” afișează conținutul folderului sub formă de text. Am sortat rezultatul după numărul de apariții ale cuvintelor. După cum era de așteptat, cele mai comune cuvinte din limbă sunt prepozițiile.

4.4 Metoda numărul 2: utilizați Java

Hadoop în sine este scris în java, iar interfața nativă a lui Hadoop este, de asemenea, bazată pe java. Să arătăm cum arată o aplicație nativă java pentru numărul de cuvinte:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Această clasă face exact la fel ca exemplul nostru Python. Creăm clasele TokenizerMapper și IntSumReducer derivând din clasele Mapper și, respectiv, Reducer. Clasele transmise ca parametri șablon specifică tipurile de valori de intrare și de ieșire. API-ul nativ presupune că funcției de hartă i se dă o pereche cheie-valoare ca intrare. Deoarece în cazul nostru cheia este goală, definim pur și simplu Object ca tipul de cheie.

În metoda Main, începem sarcina mapreduce și definim parametrii acesteia - nume, mapper și reductor, calea în HDFS, unde se află datele de intrare și unde să punem rezultatul. Pentru a compila, avem nevoie de biblioteci Hadoop. Eu folosesc Maven pentru a construi, pentru care cloudera are un depozit. Instrucțiunile de configurare pot fi găsite aici. Ca rezultat, fișierul pom.xmp (care este folosit de Maven pentru a descrie ansamblul proiectului) am primit următoarele):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Să compilam proiectul într-un pachet jar:

mvn clean package

După construirea proiectului într-un fișier jar, lansarea are loc într-un mod similar, ca în cazul interfeței de streaming:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Așteptăm execuția și verificăm rezultatul:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

După cum ați putea ghici, rezultatul rulării aplicației noastre native este același cu rezultatul aplicației de streaming pe care am lansat-o în modul anterior.