5.1 Job kun på kort

Det er tid til at beskrive forskellige teknikker, der giver dig mulighed for effektivt at bruge MapReduce til at løse praktiske problemer, samt vise nogle af funktionerne i Hadoop, der kan forenkle udviklingen eller markant fremskynde udførelsen af ​​en MapReduce-opgave på en klynge.

Som vi husker, består MapReduce af Map-, Shuffle- og Reduce-stadier. Som regel viser Shuffle-stadiet sig at være det sværeste i praktiske opgaver, da data sorteres på dette stadium. Faktisk er der en række opgaver, hvor man alene kan undvære Map-stadiet. Her er eksempler på sådanne opgaver:

  • Datafiltrering (f.eks. "Find alle poster fra IP-adressen 123.123.123.123" i webserverlogfilerne);
  • Datatransformation ("Slet kolonne i csv-logs");
  • Indlæsning og aflæsning af data fra en ekstern kilde ("Indsæt alle poster fra loggen i databasen").

Sådanne opgaver løses ved hjælp af Map-Only. Når du opretter en Map-Only-opgave i Hadoop, skal du angive nul antal reducerere:

Et eksempel på en opgavekonfiguration, der kun er til kort på hadoop:

native grænseflade Hadoop Streaming Interface

Angiv nul antal reduktioner, når du konfigurerer job'a:

job.setNumReduceTasks(0); 

Vi specificerer ikke en reducering og angiver et nul antal af reduceringe. Eksempel:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Jobs, der kun er kort, kan faktisk være meget nyttige. For eksempel, i Facetz.DCA-platformen, for at identificere brugernes egenskaber ved deres adfærd, er det netop én stor map-only, der bruges, hvor hver mapper tager en bruger som input og returnerer hans egenskaber som output.

5.2 Kombiner

Som jeg allerede har skrevet, er shuffle-fasen normalt den sværeste fase, når du udfører en Map-Reduce-opgave. Dette sker, fordi de mellemliggende resultater (mapper's output) skrives til disken, sorteres og transmitteres over netværket. Der er dog opgaver, hvor en sådan adfærd ikke virker særlig rimelig. For eksempel, i den samme opgave med at tælle ord i dokumenter, kan du forudaggregere resultaterne af output fra flere kortlæggere på en kortreducerende node af opgaven og videregive de allerede summerede værdier for hver maskine til reduceringen .

I hadoop kan du til dette definere en kombinationsfunktion, der behandler output fra en del af kortlæggerne. Kombinationsfunktionen minder meget om at reducere - den tager output fra nogle kortlæggere som input og producerer et aggregeret resultat for disse kortlæggere, så reducereren bruges ofte også som en kombinerer. En vigtig forskel fra reducere er , at ikke alle værdier, der svarer til én nøgle, kommer til kombinationsfunktionen .

Desuden garanterer hadoop ikke, at kombinationsfunktionen overhovedet vil blive udført for output fra mapperen. Derfor er kombinationsfunktionen ikke altid anvendelig, for eksempel ved søgning efter medianværdien med nøgle. Ikke desto mindre, i de opgaver, hvor kombinationsfunktionen er anvendelig, gør det det muligt at opnå en betydelig stigning i hastigheden af ​​MapReduce-opgaven.

Brug af Combiner på hadoop:

native grænseflade Hadoop streaming

Når du konfigurerer job-a, skal du angive klasse-kombineren. Som regel er det det samme som Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Angiv kommandoen -combiner i kommandolinjeindstillingerne. Typisk er denne kommando den samme som reduceringskommandoen. Eksempel:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 KortReducer opgavekæder

Der er situationer, hvor en MapReduce ikke er nok til at løse et problem. Overvej for eksempel en let ændret WordCount-opgave: der er et sæt tekstdokumenter, du skal tælle, hvor mange ord der forekom fra 1 til 1000 gange i sættet, hvor mange ord fra 1001 til 2000, hvor mange fra 2001 til 3000, og så videre. Til løsningen har vi brug for 2 MapReduce-job:

  • Ændret ordtal, som for hvert ord vil beregne hvilket af intervallerne det faldt ind i;
  • En MapReduce, der tæller, hvor mange gange hvert interval blev stødt på i outputtet af den første MapReduce.

Pseudokode løsning:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

For at udføre en sekvens af MapReduce-opgaver på hadoop er det nok bare at angive den mappe, der blev angivet som output for den første som input til den anden opgave, og køre dem på skift.

I praksis kan kæder af MapReduce-opgaver være ret komplekse sekvenser, hvor MapReduce-opgaver kan forbindes både sekventielt og parallelt med hinanden. For at forenkle styringen af ​​sådanne opgaveudførelsesplaner er der separate værktøjer som oozie og luigi, som vil blive diskuteret i en separat artikel i denne serie.

5.4 Distribueret cache

En vigtig mekanisme i Hadoop er den distribuerede cache. Distribueret cache giver dig mulighed for at tilføje filer (f.eks. tekstfiler, arkiver, jar-filer) til det miljø, hvor MapReduce-opgaven kører.

Du kan tilføje filer gemt på HDFS, lokale filer (lokale til den maskine, hvorfra opgaven startes). Jeg har allerede implicit vist, hvordan man bruger distribueret cache med hadoop-streaming ved at tilføje filerne mapper.py og reducer.py via -file-indstillingen. Faktisk kan du tilføje ikke kun mapper.py og reducer.py, men vilkårlige filer generelt, og derefter bruge dem, som om de var i en lokal mappe.

Brug af distribueret cache:

Native API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop streaming

#vi lister de filer, der skal tilføjes til den distribuerede cache i parameteren –files. Indstillingen --files skal komme før de andre muligheder.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

eksempel på brug:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Reducer tilslutning

De, der er vant til at arbejde med relationelle databaser, bruger ofte den meget praktiske Join-operation, som giver dem mulighed for i fællesskab at behandle indholdet af nogle tabeller ved at forbinde dem efter en eller anden nøgle. Når man arbejder med big data, opstår dette problem også nogle gange. Overvej følgende eksempel:

Der er logfiler for to webservere, hver log ser sådan ud:

t\t

Eksempel på loguddrag:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Det er nødvendigt at beregne for hver IP-adresse, hvilken af ​​de 2 servere den besøgte oftest. Resultatet skal være i form:

\t

Et eksempel på en del af resultatet:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Desværre, i modsætning til relationelle databaser, er det generelt en ret tung operation at forbinde to logfiler med nøgle (i dette tilfælde efter IP-adresse) og løses ved hjælp af 3 MapReduce og Reduce Join-mønsteret:

ReduceJoin fungerer sådan her:

1) For hver af inputlogfilerne lanceres en separat MapReduce (kun Map), som konverterer inputdataene til følgende form:

key -> (type, value

Hvor nøgle er nøglen til at forbinde tabeller på, Type er tabellens type (første eller anden i vores tilfælde), og Værdi er enhver yderligere data bundet til nøglen.

2) Udgangene fra begge MapReduces føres til input fra den 3. MapReduce, som faktisk udfører sammenslutningen. Denne MapReduce indeholder en tom Mapper, der blot kopierer inputtet. Derefter opdeler shuffle dataene til nøgler og fører dem til reduceringen som input:

key -> [(type, value)]

Det er vigtigt, at reducereren i dette øjeblik modtager poster fra begge logfiler, og det er på samme tid muligt at identificere ved typefeltet, hvilken af ​​de to logfiler en bestemt værdi kom fra. Så der er nok data til at løse det oprindelige problem. I vores tilfælde skal reducereren simpelthen beregne for hver postnøgle, hvilken type der er stødt på mere og udlæse denne type.

5.6 MapJoins

ReduceJoin-mønsteret beskriver det generelle tilfælde med at forbinde to logfiler med nøgle. Der er dog et særligt tilfælde, hvor opgaven kan forenkles og fremskyndes væsentligt. Dette er tilfældet, hvor en af ​​stokkene er væsentligt mindre end den anden. Overvej følgende problem:

Der er 2 logs. Den første log indeholder webserverloggen (samme som i den forrige opgave), den anden fil (100kb i størrelse) indeholder URL-> Theme match. Eksempel 2. fil:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

For hver IP-adresse er det nødvendigt at beregne siderne i hvilken kategori fra denne IP-adresse, der blev indlæst oftest.

I dette tilfælde skal vi også tilslutte 2 logfiler efter URL. Men i dette tilfælde behøver vi ikke at køre 3 MapReduce, da den anden log vil passe helt ind i hukommelsen. For at løse problemet ved hjælp af 1. MapReduce, kan vi indlæse den anden log ind i den distribuerede cache, og når Mapperen er initialiseret, skal du blot læse den ind i hukommelsen og lægge den i -> emneordbogen.

Yderligere er problemet løst som følger:

kort:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

reducere:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce modtager en ip og en liste over alle emner som input, den beregner blot hvilket af emnerne der blev stødt på oftest. Således løses opgaven ved hjælp af 1. MapReduce, og selve Join foregår generelt inde i kortet (derfor, hvis yderligere aggregering med nøgle ikke var nødvendig, kunne MapOnly-jobbet undværes):