5.1 Lucrări numai pe hartă

Este timpul să descrieți diferite tehnici care vă permit să utilizați eficient MapReduce pentru a rezolva probleme practice, precum și să afișați unele dintre caracteristicile Hadoop care pot simplifica dezvoltarea sau accelera semnificativ execuția unei sarcini MapReduce pe un cluster.

După cum ne amintim, MapReduce constă din etape Map, Shuffle și Reduce. De regulă, etapa de amestecare se dovedește a fi cea mai dificilă în sarcinile practice, deoarece datele sunt sortate în această etapă. De fapt, există o serie de sarcini în care doar etapa Hartă poate fi renunțată. Iată exemple de astfel de sarcini:

  • Filtrarea datelor (de exemplu, „Găsiți toate înregistrările de la adresa IP 123.123.123.123” în jurnalele serverului web);
  • Transformarea datelor („Șterge coloana în csv-logs”);
  • Încărcarea și descărcarea datelor dintr-o sursă externă („Inserați toate înregistrările din jurnal în baza de date”).

Astfel de sarcini sunt rezolvate folosind Map-Only. Când creați o sarcină numai pentru hărți în Hadoop, trebuie să specificați un număr zero de reductoare:

Un exemplu de configurare a sarcinii numai pe hartă pe Hadoop:

interfață nativă Interfață de streaming Hadoop

Specificați numărul zero de reductoare atunci când configurați job'a:

job.setNumReduceTasks(0); 

Nu specificăm un reductor și specificăm un număr zero de reductoare. Exemplu:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Lucrările numai pe hartă pot fi de fapt foarte utile. De exemplu, în platforma Facetz.DCA, pentru a identifica caracteristicile utilizatorilor după comportamentul lor, este folosită doar o hartă mare, fiecare mapper ia un utilizator ca intrare și returnează caracteristicile acestuia ca ieșire.

5.2 Combinați

După cum am scris deja, de obicei, cea mai dificilă etapă atunci când efectuați o sarcină Map-Reduce este etapa de amestecare. Acest lucru se întâmplă deoarece rezultatele intermediare (ieșirea mapper-ului) sunt scrise pe disc, sortate și transmise prin rețea. Cu toate acestea, există sarcini în care un astfel de comportament nu pare foarte rezonabil. De exemplu, în aceeași sarcină de numărare a cuvintelor în documente, puteți pre-agrega rezultatele ieșirilor mai multor cartografi pe un nod de reducere a hărții al sarcinii și puteți transmite valorile deja însumate pentru fiecare mașină la reductor .

În hadoop, pentru aceasta, puteți defini o funcție de combinare care va procesa ieșirea unei părți din mapper. Funcția de combinare este foarte asemănătoare cu reducere - ia ieșirea unor cartografi ca intrare și produce un rezultat agregat pentru acești mapatori, astfel încât reductorul este adesea folosit și ca combinator. O diferență importantă față de reducere este că nu toate valorile corespunzătoare unei taste ajung la funcția de combinare .

Mai mult, Hadoop nu garantează că funcția de combinare va fi executată deloc pentru ieșirea mapperului. Prin urmare, funcția de combinare nu este întotdeauna aplicabilă, de exemplu, în cazul căutării valorii mediane după cheie. Cu toate acestea, în acele sarcini în care funcția de combinare este aplicabilă, utilizarea acesteia permite obținerea unei creșteri semnificative a vitezei sarcinii MapReduce.

Folosind combinatorul pe Hadoop:

interfață nativă Streaming Hadoop

Când configurați job-a, specificați class-Combiner. De regulă, este același cu Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Specificați comanda -combiner în opțiunile liniei de comandă. De obicei, această comandă este aceeași cu comanda reductor. Exemplu:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Lanțuri de sarcini MapReduce

Există situații în care un MapReduce nu este suficient pentru a rezolva o problemă. De exemplu, luați în considerare o sarcină WordCount ușor modificată: există un set de documente text, trebuie să numărați câte cuvinte au apărut de la 1 la 1000 de ori în set, câte cuvinte de la 1001 la 2000, câte de la 2001 la 3000, și așa mai departe. Pentru soluție, avem nevoie de 2 joburi MapReduce:

  • Număr de cuvinte modificat, care pentru fiecare cuvânt va calcula în care dintre intervalele a căzut;
  • Un MapReduce care numără de câte ori a fost întâlnit fiecare interval în rezultatul primului MapReduce.

Soluție de pseudocod:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Pentru a executa o secvență de sarcini MapReduce pe hadoop, este suficient să specificați folderul care a fost specificat ca ieșire pentru prima ca intrare pentru a doua sarcină și să le executați pe rând.

În practică, lanțurile de sarcini MapReduce pot fi secvențe destul de complexe în care sarcinile MapReduce pot fi conectate atât secvențial, cât și în paralel unele cu altele. Pentru a simplifica gestionarea unor astfel de planuri de execuție a sarcinilor, există instrumente separate precum oozie și luigi, care vor fi discutate într-un articol separat din această serie.

5.4 Cache distribuit

Un mecanism important în Hadoop este cache-ul distribuit. Distributed Cache vă permite să adăugați fișiere (de exemplu fișiere text, arhive, fișiere jar) în mediul în care rulează sarcina MapReduce.

Puteți adăuga fișiere stocate pe HDFS, fișiere locale (locale pe mașina de pe care este lansată sarcina). Am arătat deja implicit cum să utilizați Distributed Cache cu streaming Hadoop, adăugând fișierele mapper.py și reducer.py prin opțiunea -file. De fapt, puteți adăuga nu numai mapper.py și reducer.py, ci și fișiere arbitrare în general și apoi să le utilizați ca și cum ar fi într-un folder local.

Utilizarea cache-ului distribuit:

API nativ
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Streaming Hadoop

#enumerăm fișierele care trebuie adăugate în memoria cache distribuită în parametrul –files. Opțiunea --files trebuie să fie înaintea celorlalte opțiuni.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

exemplu de utilizare:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Reduce Join

Cei care sunt obișnuiți să lucreze cu baze de date relaționale folosesc adesea operația Join foarte convenabilă, care le permite să proceseze în comun conținutul unor tabele prin alăturarea lor în funcție de o cheie. Când lucrați cu date mari, uneori apare și această problemă. Luați în considerare următorul exemplu:

Există jurnalele a două servere web, fiecare jurnal arată astfel:

t\t

Exemplu de fragment de jurnal:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Este necesar să se calculeze pentru fiecare adresă IP care dintre cele 2 servere a vizitat mai des. Rezultatul ar trebui să fie sub forma:

\t

Un exemplu de o parte a rezultatului:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Din păcate, spre deosebire de bazele de date relaționale, în general, alăturarea a două loguri prin cheie (în acest caz, după adresa IP) este o operațiune destul de grea și se rezolvă folosind 3 MapReduce și modelul Reduce Join:

ReduceJoin funcționează astfel:

1) Pentru fiecare dintre jurnalele de intrare, se lansează un MapReduce separat (numai pentru Hartă), transformând datele de intrare în următoarea formă:

key -> (type, value

Unde cheie este cheia pentru a se alătura tabelelor, Type este tipul tabelului (primul sau al doilea în cazul nostru), iar Value este orice date suplimentare legate de cheie.

2) Ieșirile ambelor MapReduces sunt alimentate la intrarea celui de-al treilea MapReduce, care, de fapt, realizează unirea. Acest MapReduce conține un Mapper gol care pur și simplu copiază intrarea. Apoi, shuffle descompune datele în taste și le transmite reductorului ca intrare:

key -> [(type, value)]

Este important ca in acest moment reductorul sa primeasca inregistrari de la ambele loguri si, in acelasi timp, este posibil sa se identifice dupa campul tip din care dintre cele doua loguri a venit o anumita valoare. Deci există suficiente date pentru a rezolva problema inițială. În cazul nostru, reductorul trebuie pur și simplu să calculeze pentru fiecare cheie de înregistrare ce tip a întâlnit mai mult și să scoată acest tip.

5.6 MapJoins

Modelul ReduceJoin descrie cazul general al îmbinării a două jurnaluri prin cheie. Cu toate acestea, există un caz special în care sarcina poate fi simplificată și accelerată semnificativ. Acesta este cazul în care unul dintre bușteni este semnificativ mai mic decât celălalt. Luați în considerare următoarea problemă:

Sunt 2 busteni. Primul jurnal conține jurnalul serverului web (la fel ca în sarcina anterioară), al doilea fișier (dimensiune de 100 kb) conține URL-> potrivirea temei. Exemplu al doilea fișier:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Pentru fiecare adresă IP, este necesar să se calculeze paginile din care categorie din această adresă IP au fost încărcate cel mai des.

În acest caz, trebuie să ne unim și 2 jurnaluri prin URL. Cu toate acestea, în acest caz, nu trebuie să rulăm 3 MapReduce, deoarece al doilea jurnal se va încadra complet în memorie. Pentru a rezolva problema folosind primul MapReduce, putem încărca cel de-al doilea log în Distributed Cache, iar când Mapper-ul este inițializat, pur și simplu îl citim în memorie, punându-l în -> dicționarul de subiecte.

În plus, problema se rezolvă după cum urmează:

Hartă:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

reduce:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce primește un ip și o listă cu toate subiectele ca intrare, pur și simplu calculează care dintre subiecte a fost întâlnită cel mai des. Astfel, sarcina este rezolvată folosind primul MapReduce, iar Join-ul propriu-zis are loc în general în interiorul hărții (prin urmare, dacă nu era nevoie de agregare suplimentară după cheie, jobul MapOnly ar putea fi renunțat):