5.1 Jobbet endast på kartan

Det är dags att beskriva olika tekniker som gör att du effektivt kan använda MapReduce för att lösa praktiska problem, samt visa några av funktionerna i Hadoop som kan förenkla utvecklingen eller avsevärt påskynda utförandet av en MapReduce-uppgift i ett kluster.

Som vi minns består MapReduce av Map-, Shuffle- och Reduce-steg. Som regel visar sig Shuffle-steget vara det svåraste i praktiska uppgifter, eftersom data sorteras i detta skede. Faktum är att det finns ett antal uppgifter där endast kartstadiet kan undvaras. Här är exempel på sådana uppgifter:

  • Datafiltrering (till exempel "Hitta alla poster från IP-adressen 123.123.123.123" i webbserverloggarna);
  • Datatransformation ("Ta bort kolumn i csv-loggar");
  • Ladda och ta bort data från en extern källa ("Infoga alla poster från loggen i databasen").

Sådana uppgifter löses med Map-Only. När du skapar en Map-Only-uppgift i Hadoop måste du ange noll antal reducerare:

Ett exempel på en uppgiftskonfiguration för endast kartan på hadoop:

inbyggt gränssnitt Hadoop Streaming Interface

Ange noll antal reducerare när du konfigurerar jobb'a:

job.setNumReduceTasks(0); 

Vi specificerar ingen reducering och specificerar ett noll antal reducerare. Exempel:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Map Only-jobb kan faktiskt vara väldigt användbara. Till exempel, i Facetz.DCA-plattformen, för att identifiera användarnas egenskaper genom deras beteende, är det just en stor map-only som används, vars mappare tar en användare som en input och returnerar hans egenskaper som en output.

5.2 Kombinera

Som jag redan skrivit är det vanligaste steget när man utför en Map-Reduce-uppgift blandstadiet. Detta beror på att mellanresultaten (mapparens utdata) skrivs till disk, sorteras och överförs över nätverket. Det finns dock uppgifter där ett sådant beteende inte verkar särskilt rimligt. Till exempel, i samma uppgift att räkna ord i dokument, kan du föraggregera resultaten av utdata från flera kartläggare på en kartreducerande nod för uppgiften och skicka de redan summerade värdena för varje maskin till reduceraren .

I hadoop, för detta, kan du definiera en kombinationsfunktion som kommer att bearbeta utdata från en del av kartläggarna. Kombinationsfunktionen är mycket lik reducering - den tar utdata från vissa mappare som indata och producerar ett aggregerat resultat för dessa mappare, så reduceraren används ofta också som en combiner. En viktig skillnad mot reducera är att inte alla värden som motsvarar en nyckel kommer till kombinationsfunktionen .

Dessutom garanterar hadoop inte att kombinationsfunktionen alls kommer att exekveras för utmatningen av mapparen. Därför är kombinationsfunktionen inte alltid tillämpbar, till exempel vid sökning efter medianvärdet med nyckel. Icke desto mindre, i de uppgifter där kombinationsfunktionen är tillämplig, gör användningen det möjligt att uppnå en betydande ökning av hastigheten för MapReduce-uppgiften.

Använda Combiner på hadoop:

inbyggt gränssnitt Hadoop streaming

När du konfigurerar jobb-a, ange class-Combiner. Som regel är det samma som Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Ange kommandot -combiner i kommandoradsalternativen. Vanligtvis är detta kommando detsamma som reduceringskommandot. Exempel:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 KartaReducera uppgiftskedjor

Det finns situationer när en MapReduce inte räcker för att lösa ett problem. Tänk till exempel på en något modifierad WordCount-uppgift: det finns en uppsättning textdokument, du måste räkna hur många ord som förekom från 1 till 1000 gånger i uppsättningen, hur många ord från 1001 till 2000, hur många från 2001 till 3000, och så vidare. För lösningen behöver vi 2 MapReduce-jobb:

  • Modifierat ordantal, som för varje ord kommer att beräkna vilket av intervallen det föll in i;
  • En MapReduce som räknar hur många gånger varje intervall påträffades i utdata från den första MapReduce.

Pseudokodlösning:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

För att utföra en sekvens av MapReduce-uppgifter på hadoop räcker det att bara ange mappen som specificerades som utdata för den första som indata för den andra uppgiften och köra dem i tur och ordning.

I praktiken kan kedjor av MapReduce-uppgifter vara ganska komplexa sekvenser där MapReduce-uppgifter kan kopplas både sekventiellt och parallellt med varandra. För att förenkla hanteringen av sådana planer för uppgiftsexekvering finns det separata verktyg som oozie och luigi, som kommer att diskuteras i en separat artikel i den här serien.

5.4 Distribuerad cache

En viktig mekanism i Hadoop är den distribuerade cachen. Distributed Cache låter dig lägga till filer (t.ex. textfiler, arkiv, jar-filer) till miljön där MapReduce-uppgiften körs.

Du kan lägga till filer lagrade på HDFS, lokala filer (lokala på maskinen från vilken uppgiften startas). Jag har redan implicit visat hur man använder distribuerad cache med hadoop-strömning genom att lägga till filerna mapper.py och reducer.py via alternativet -file. Faktum är att du kan lägga till inte bara mapper.py och reducer.py, utan godtyckliga filer i allmänhet, och sedan använda dem som om de fanns i en lokal mapp.

Använda distribuerad cache:

Native API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop Streaming

#vi listar filerna som behöver läggas till i den distribuerade cachen i parametern –files. Alternativet --files måste komma före de andra alternativen.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

användningsexempel:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Minska gå med

De som är vana vid att arbeta med relationsdatabaser använder ofta den mycket bekväma Join-operationen, som tillåter dem att gemensamt bearbeta innehållet i vissa tabeller genom att sammanfoga dem enligt någon nyckel. När man arbetar med big data uppstår även detta problem ibland. Tänk på följande exempel:

Det finns loggar för två webbservrar, varje logg ser ut så här:

t\t

Exempel på loggkod:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Det är nödvändigt att för varje IP-adress beräkna vilken av de två servrarna den besökte oftare. Resultatet bör vara i formen:

\t

Ett exempel på en del av resultatet:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Tyvärr, till skillnad från relationsdatabaser, i allmänhet, är sammanfogning av två loggar med nyckel (i det här fallet med IP-adress) en ganska tung operation och löses med 3 MapReduce och Reduce Join-mönstret:

ReduceJoin fungerar så här:

1) För var och en av ingångsloggarna startas en separat MapReduce (endast karta) som konverterar indata till följande form:

key -> (type, value

Där nyckel är nyckeln att sammanfoga tabeller på, Typ är typen av tabell (första eller andra i vårt fall), och Value är eventuell ytterligare data bunden till nyckeln.

2) Utgångarna från båda MapReduces matas till ingången på den 3:e MapReduce, som i själva verket utför sammankopplingen. Denna MapReduce innehåller en tom Mapper som helt enkelt kopierar indata. Därefter sönderdelar shuffle data till nycklar och matar den till reduceraren som indata:

key -> [(type, value)]

Det är viktigt att reduceraren i detta ögonblick tar emot poster från båda loggarna, och samtidigt är det möjligt att identifiera med typfältet vilken av de två loggarna ett visst värde kom från. Så det finns tillräckligt med data för att lösa det ursprungliga problemet. I vårt fall måste reduceraren helt enkelt beräkna för varje postnyckel vilken typ som har stött på mer och mata ut denna typ.

5.6 MapJoins

ReduceJoin-mönstret beskriver det allmänna fallet att sammanfoga två loggar med nyckel. Det finns dock ett specialfall där uppgiften kan avsevärt förenklas och påskyndas. Detta är fallet när en av stockarna är betydligt mindre än den andra. Tänk på följande problem:

Det finns 2 stockar. Den första loggen innehåller webbserverloggen (samma som i föregående uppgift), den andra filen (100kb stor) innehåller URL-> Temamatchning. Exempel 2:a fil:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

För varje IP-adress är det nödvändigt att beräkna sidorna i vilken kategori från denna IP-adress som laddades oftast.

I det här fallet måste vi också ansluta 2 loggar via URL. Men i det här fallet behöver vi inte köra 3 MapReduce, eftersom den andra loggen kommer att passa helt in i minnet. För att lösa problemet med 1:a MapReduce kan vi ladda den andra loggen i den distribuerade cachen, och när kartläggaren är initialiserad, läs den helt enkelt in i minnet och lägg den i -> ämnesordboken.

Vidare löses problemet enligt följande:

Karta:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

minska:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce får en ip och en lista över alla ämnen som input, den räknar helt enkelt ut vilket av ämnena som stöttes på oftast. Sålunda löses uppgiften med den första MapReduce, och den faktiska Join äger i allmänhet rum inuti kartan (därför, om ytterligare aggregering med nyckel inte behövdes, kunde MapOnly-jobbet undvaras):