5.1 Job bare kart

Det er på tide å beskrive ulike teknikker som lar deg effektivt bruke MapReduce til å løse praktiske problemer, samt vise noen av funksjonene til Hadoop som kan forenkle utviklingen eller betydelig fremskynde utførelsen av en MapReduce-oppgave på en klynge.

Som vi husker, består MapReduce av kart, Shuffle og Reduce-stadier. Som regel viser Shuffle-stadiet seg å være det vanskeligste i praktiske oppgaver, siden data sorteres på dette stadiet. Faktisk er det en rekke oppgaver der kartstadiet alene kan unnlates. Her er eksempler på slike oppgaver:

  • Datafiltrering (for eksempel "Finn alle poster fra IP-adressen 123.123.123.123" i webserverloggene);
  • Datatransformasjon ("Slett kolonne i csv-logger");
  • Laste og losse data fra en ekstern kilde ("Sett inn alle poster fra loggen i databasen").

Slike oppgaver løses ved hjelp av Map-Only. Når du oppretter en Map-Only-oppgave i Hadoop, må du spesifisere null antall reduksjoner:

Et eksempel på en oppgavekonfigurasjon for kun kart på hadoop:

innfødt grensesnitt Hadoop strømmegrensesnitt

Angi null antall reduksjoner når du konfigurerer jobb'a:

job.setNumReduceTasks(0); 

Vi spesifiserer ikke en reduksjon og angir et null antall reduksjonsmidler. Eksempel:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Bare jobber kan faktisk være veldig nyttige. For eksempel, i Facetz.DCA-plattformen, for å identifisere egenskapene til brukere ved deres oppførsel, er det nettopp én stor kart-only som brukes, hvor hver kartlegger tar en bruker som input og returnerer egenskapene hans som en utgang.

5.2 Kombiner

Som jeg allerede har skrevet, er vanligvis det vanskeligste stadiet når du utfører en Map-Reduce-oppgave, shuffle-stadiet. Dette skjer fordi mellomresultatene (kartleggerens utgang) skrives til disk, sorteres og overføres over nettverket. Det er imidlertid oppgaver der slik oppførsel ikke virker veldig rimelig. For eksempel, i den samme oppgaven med å telle ord i dokumenter, kan du forhåndsaggregere resultatene av utdataene fra flere kartleggere på en kartreduserende node for oppgaven, og sende de allerede summerte verdiene for hver maskin til reduseringsenheten .

I hadoop kan du for dette definere en kombinasjonsfunksjon som skal behandle utdataene til deler av kartleggerne. Kombineringsfunksjonen er veldig lik redusering - den tar utdata fra noen kartleggere som input og produserer et aggregert resultat for disse kartleggerne, så reduseringsverktøyet brukes ofte også som en kombinerer. En viktig forskjell fra redusere er at ikke alle verdier som tilsvarer én nøkkel kommer til kombineringsfunksjonen .

Dessuten garanterer ikke hadoop at kombinasjonsfunksjonen i det hele tatt vil bli utført for utdata fra kartleggeren. Derfor er kombineringsfunksjonen ikke alltid anvendelig, for eksempel ved søk etter medianverdien med nøkkel. Ikke desto mindre, i de oppgavene der kombineringsfunksjonen er anvendelig, gjør bruken det mulig å oppnå en betydelig økning i hastigheten til MapReduce-oppgaven.

Bruke Combiner på hadoop:

innfødt grensesnitt Hadoop streaming

Når du konfigurerer jobb-a, spesifiser klassekombineren. Som regel er det det samme som Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Spesifiser kommandoen -combiner i kommandolinjealternativene. Vanligvis er denne kommandoen den samme som reduseringskommandoen. Eksempel:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 KartReduser oppgavekjeder

Det er situasjoner der én MapReduce ikke er nok til å løse et problem. Tenk for eksempel på en litt modifisert WordCount-oppgave: det er et sett med tekstdokumenter, du må telle hvor mange ord som oppstod fra 1 til 1000 ganger i settet, hvor mange ord fra 1001 til 2000, hvor mange fra 2001 til 3000, og så videre. For løsningen trenger vi 2 MapReduce-jobber:

  • Modifisert ordtelling, som for hvert ord vil beregne hvilket av intervallene det falt inn i;
  • En MapReduce som teller hvor mange ganger hvert intervall ble oppdaget i utdataene fra den første MapReduce.

Pseudokodeløsning:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

For å utføre en sekvens av MapReduce-oppgaver på hadoop, er det nok bare å spesifisere mappen som ble spesifisert som utdata for den første som input for den andre oppgaven og kjøre dem etter tur.

I praksis kan kjeder med MapReduce-oppgaver være ganske komplekse sekvenser der MapReduce-oppgaver kan kobles både sekvensielt og parallelt med hverandre. For å forenkle administrasjonen av slike oppgaveutførelsesplaner, finnes det separate verktøy som oozie og luigi, som vil bli diskutert i en egen artikkel i denne serien.

5.4 Distribuert cache

En viktig mekanisme i Hadoop er den distribuerte cachen. Distribuert cache lar deg legge til filer (f.eks. tekstfiler, arkiver, jar-filer) til miljøet der MapReduce-oppgaven kjører.

Du kan legge til filer som er lagret på HDFS, lokale filer (lokale på maskinen som oppgaven startes fra). Jeg har allerede implisitt vist hvordan du bruker distribuert cache med hadoop-streaming ved å legge til mapper.py og reducer.py-filene via -file-alternativet. Faktisk kan du legge til ikke bare mapper.py og reducer.py, men vilkårlige filer generelt, og deretter bruke dem som om de var i en lokal mappe.

Bruke distribuert cache:

Native API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop Streaming

#vi viser filene som må legges til den distribuerte cachen i –files-parameteren. Alternativet --files må komme før de andre alternativene.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

brukseksempel:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Reduser deltakelse

De som er vant til å jobbe med relasjonsdatabaser bruker ofte den svært praktiske Join-operasjonen, som lar dem behandle innholdet i noen tabeller i fellesskap ved å slå dem sammen i henhold til en nøkkel. Når man jobber med big data, oppstår også dette problemet noen ganger. Tenk på følgende eksempel:

Det er logger av to webservere, hver logg ser slik ut:

t\t

Eksempel på loggkode:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Det er nødvendig å beregne for hver IP-adresse hvilken av de 2 serverne den besøkte oftere. Resultatet skal være i formen:

\t

Et eksempel på en del av resultatet:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Dessverre, i motsetning til relasjonsdatabaser, generelt, er det å slå sammen to logger med nøkkel (i dette tilfellet etter IP-adresse) en ganske tung operasjon og løses ved å bruke 3 MapReduce og Reduce Join-mønsteret:

ReduceJoin fungerer slik:

1) For hver av inndataloggene lanseres en separat MapReduce (kun kart), som konverterer inndataene til følgende form:

key -> (type, value

Der nøkkel er nøkkelen å slå sammen tabeller på, er Type tabellens type (første eller andre i vårt tilfelle), og Verdi er eventuelle tilleggsdata som er bundet til nøkkelen.

2) Utgangene fra begge MapReduces mates til inngangen til den 3. MapReduce, som faktisk utfører sammenkoblingen. Denne MapReduce inneholder en tom Mapper som ganske enkelt kopierer inndataene. Deretter dekomponerer shuffle dataene til nøkler og mater dem til reduseringen som input:

key -> [(type, value)]

Det er viktig at i dette øyeblikket mottar reduseringen poster fra begge loggene, og samtidig er det mulig å identifisere ved typefeltet hvilken av de to loggene en bestemt verdi kom fra. Så det er nok data til å løse det opprinnelige problemet. I vårt tilfelle må reduseringen ganske enkelt beregne for hver postnøkkel hvilken type som har møtt mer og gi ut denne typen.

5.6 MapJoins

ReduceJoin-mønsteret beskriver det generelle tilfellet med å slå sammen to logger med nøkkel. Det er imidlertid et spesielt tilfelle der oppgaven kan forenkles og fremskyndes betydelig. Dette er tilfellet der en av stokkene er betydelig mindre enn den andre. Vurder følgende problem:

Det er 2 logger. Den første loggen inneholder webserverloggen (samme som i forrige oppgave), den andre filen (100kb stor) inneholder URL-> Temamatch. Eksempel 2. fil:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

For hver IP-adresse er det nødvendig å beregne sidene i hvilken kategori fra denne IP-adressen som ble lastet oftest.

I dette tilfellet må vi også slå sammen 2 logger etter URL. Men i dette tilfellet trenger vi ikke å kjøre 3 MapReduce, siden den andre loggen vil passe fullstendig inn i minnet. For å løse problemet ved å bruke 1. MapReduce, kan vi laste den andre loggen inn i den distribuerte cachen, og når Mapper er initialisert, kan du ganske enkelt lese den inn i minnet og legge den i -> emneordboken.

Videre er problemet løst som følger:

kart:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

redusere:


Ip -> [topics] -> [ip, most_popular_topic]

Reduser mottar en ip og en liste over alle emner som input, den beregner ganske enkelt hvilke av emnene som ble møtt oftest. Dermed løses oppgaven ved å bruke den 1. MapReduce, og selve Join foregår vanligvis inne i kartet (derfor, hvis ytterligere aggregering med nøkkel ikke var nødvendig, kan MapOnly-jobben utelates):