5.1 Jobbet endast på kartan
Det är dags att beskriva olika tekniker som gör att du effektivt kan använda MapReduce för att lösa praktiska problem, samt visa några av funktionerna i Hadoop som kan förenkla utvecklingen eller avsevärt påskynda utförandet av en MapReduce-uppgift i ett kluster.
Som vi minns består MapReduce av Map-, Shuffle- och Reduce-steg. Som regel visar sig Shuffle-steget vara det svåraste i praktiska uppgifter, eftersom data sorteras i detta skede. Faktum är att det finns ett antal uppgifter där endast kartstadiet kan undvaras. Här är exempel på sådana uppgifter:
- Datafiltrering (till exempel "Hitta alla poster från IP-adressen 123.123.123.123" i webbserverloggarna);
- Datatransformation ("Ta bort kolumn i csv-loggar");
- Ladda och ta bort data från en extern källa ("Infoga alla poster från loggen i databasen").
Sådana uppgifter löses med Map-Only. När du skapar en Map-Only-uppgift i Hadoop måste du ange noll antal reducerare:

Ett exempel på en uppgiftskonfiguration för endast kartan på hadoop:
inbyggt gränssnitt | Hadoop Streaming Interface |
---|---|
Ange noll antal reducerare när du konfigurerar jobb'a:
|
Vi specificerar ingen reducering och specificerar ett noll antal reducerare. Exempel:
|
Map Only-jobb kan faktiskt vara väldigt användbara. Till exempel, i Facetz.DCA-plattformen, för att identifiera användarnas egenskaper genom deras beteende, är det just en stor map-only som används, vars mappare tar en användare som en input och returnerar hans egenskaper som en output.
5.2 Kombinera
Som jag redan skrivit är det vanligaste steget när man utför en Map-Reduce-uppgift blandstadiet. Detta beror på att mellanresultaten (mapparens utdata) skrivs till disk, sorteras och överförs över nätverket. Det finns dock uppgifter där ett sådant beteende inte verkar särskilt rimligt. Till exempel, i samma uppgift att räkna ord i dokument, kan du föraggregera resultaten av utdata från flera kartläggare på en kartreducerande nod för uppgiften och skicka de redan summerade värdena för varje maskin till reduceraren .

I hadoop, för detta, kan du definiera en kombinationsfunktion som kommer att bearbeta utdata från en del av kartläggarna. Kombinationsfunktionen är mycket lik reducering - den tar utdata från vissa mappare som indata och producerar ett aggregerat resultat för dessa mappare, så reduceraren används ofta också som en combiner. En viktig skillnad mot reducera är att inte alla värden som motsvarar en nyckel kommer till kombinationsfunktionen .
Dessutom garanterar hadoop inte att kombinationsfunktionen alls kommer att exekveras för utmatningen av mapparen. Därför är kombinationsfunktionen inte alltid tillämpbar, till exempel vid sökning efter medianvärdet med nyckel. Icke desto mindre, i de uppgifter där kombinationsfunktionen är tillämplig, gör användningen det möjligt att uppnå en betydande ökning av hastigheten för MapReduce-uppgiften.
Använda Combiner på hadoop:
inbyggt gränssnitt | Hadoop streaming |
---|---|
När du konfigurerar jobb-a, ange class-Combiner. Som regel är det samma som Reducer:
|
Ange kommandot -combiner i kommandoradsalternativen. Vanligtvis är detta kommando detsamma som reduceringskommandot. Exempel:
|
5.3 KartaReducera uppgiftskedjor
Det finns situationer när en MapReduce inte räcker för att lösa ett problem. Tänk till exempel på en något modifierad WordCount-uppgift: det finns en uppsättning textdokument, du måste räkna hur många ord som förekom från 1 till 1000 gånger i uppsättningen, hur många ord från 1001 till 2000, hur många från 2001 till 3000, och så vidare. För lösningen behöver vi 2 MapReduce-jobb:
- Modifierat ordantal, som för varje ord kommer att beräkna vilket av intervallen det föll in i;
- En MapReduce som räknar hur många gånger varje intervall påträffades i utdata från den första MapReduce.
Pseudokodlösning:
|
|
|
|
För att utföra en sekvens av MapReduce-uppgifter på hadoop räcker det att bara ange mappen som specificerades som utdata för den första som indata för den andra uppgiften och köra dem i tur och ordning.
I praktiken kan kedjor av MapReduce-uppgifter vara ganska komplexa sekvenser där MapReduce-uppgifter kan kopplas både sekventiellt och parallellt med varandra. För att förenkla hanteringen av sådana planer för uppgiftsexekvering finns det separata verktyg som oozie och luigi, som kommer att diskuteras i en separat artikel i den här serien.

5.4 Distribuerad cache
En viktig mekanism i Hadoop är den distribuerade cachen. Distributed Cache låter dig lägga till filer (t.ex. textfiler, arkiv, jar-filer) till miljön där MapReduce-uppgiften körs.
Du kan lägga till filer lagrade på HDFS, lokala filer (lokala på maskinen från vilken uppgiften startas). Jag har redan implicit visat hur man använder distribuerad cache med hadoop-strömning genom att lägga till filerna mapper.py och reducer.py via alternativet -file. Faktum är att du kan lägga till inte bara mapper.py och reducer.py, utan godtyckliga filer i allmänhet, och sedan använda dem som om de fanns i en lokal mapp.
Använda distribuerad cache:
Native API |
---|
|
Hadoop Streaming |
---|
#vi listar filerna som behöver läggas till i den distribuerade cachen i parametern –files. Alternativet --files måste komma före de andra alternativen.
användningsexempel:
|
5.5 Minska gå med
De som är vana vid att arbeta med relationsdatabaser använder ofta den mycket bekväma Join-operationen, som tillåter dem att gemensamt bearbeta innehållet i vissa tabeller genom att sammanfoga dem enligt någon nyckel. När man arbetar med big data uppstår även detta problem ibland. Tänk på följande exempel:
Det finns loggar för två webbservrar, varje logg ser ut så här:
t\t
Exempel på loggkod:
1446792139
178.78.82.1
/sphingosine/unhurrying.css
1446792139
126.31.163.222
/accentually.js
1446792139
154.164.149.83
/pyroacid/unkemptly.jpg
1446792139
202.27.13.181
/Chawia.js
1446792139
67.123.248.174
/morphographical/dismain.css
1446792139
226.74.123.135
/phanerite.php
1446792139
157.109.106.104
/bisonant.css
Det är nödvändigt att för varje IP-adress beräkna vilken av de två servrarna den besökte oftare. Resultatet bör vara i formen:
\t
Ett exempel på en del av resultatet:
178.78.82.1
first
126.31.163.222
second
154.164.149.83
second
226.74.123.135
first
Tyvärr, till skillnad från relationsdatabaser, i allmänhet, är sammanfogning av två loggar med nyckel (i det här fallet med IP-adress) en ganska tung operation och löses med 3 MapReduce och Reduce Join-mönstret:

ReduceJoin fungerar så här:
1) För var och en av ingångsloggarna startas en separat MapReduce (endast karta) som konverterar indata till följande form:
key -> (type, value
Där nyckel är nyckeln att sammanfoga tabeller på, Typ är typen av tabell (första eller andra i vårt fall), och Value är eventuell ytterligare data bunden till nyckeln.
2) Utgångarna från båda MapReduces matas till ingången på den 3:e MapReduce, som i själva verket utför sammankopplingen. Denna MapReduce innehåller en tom Mapper som helt enkelt kopierar indata. Därefter sönderdelar shuffle data till nycklar och matar den till reduceraren som indata:
key -> [(type, value)]
Det är viktigt att reduceraren i detta ögonblick tar emot poster från båda loggarna, och samtidigt är det möjligt att identifiera med typfältet vilken av de två loggarna ett visst värde kom från. Så det finns tillräckligt med data för att lösa det ursprungliga problemet. I vårt fall måste reduceraren helt enkelt beräkna för varje postnyckel vilken typ som har stött på mer och mata ut denna typ.
5.6 MapJoins
ReduceJoin-mönstret beskriver det allmänna fallet att sammanfoga två loggar med nyckel. Det finns dock ett specialfall där uppgiften kan avsevärt förenklas och påskyndas. Detta är fallet när en av stockarna är betydligt mindre än den andra. Tänk på följande problem:
Det finns 2 stockar. Den första loggen innehåller webbserverloggen (samma som i föregående uppgift), den andra filen (100kb stor) innehåller URL-> Temamatchning. Exempel 2:a fil:
/toyota.php
auto
/football/spartak.html
sport
/cars
auto
/finances/money
business
För varje IP-adress är det nödvändigt att beräkna sidorna i vilken kategori från denna IP-adress som laddades oftast.
I det här fallet måste vi också ansluta 2 loggar via URL. Men i det här fallet behöver vi inte köra 3 MapReduce, eftersom den andra loggen kommer att passa helt in i minnet. För att lösa problemet med 1:a MapReduce kan vi ladda den andra loggen i den distribuerade cachen, och när kartläggaren är initialiserad, läs den helt enkelt in i minnet och lägg den i -> ämnesordboken.
Vidare löses problemet enligt följande:
Karta:
# find the subject of each of the pages of the first log
input_line -> [ip, topic]
minska:
Ip -> [topics] -> [ip, most_popular_topic]
Reduce får en ip och en lista över alla ämnen som input, den räknar helt enkelt ut vilket av ämnena som stöttes på oftast. Sålunda löses uppgiften med den första MapReduce, och den faktiska Join äger i allmänhet rum inuti kartan (därför, om ytterligare aggregering med nyckel inte behövdes, kunde MapOnly-jobbet undvaras):

GO TO FULL VERSION