5.1 Alleen kaarttaak

Het is tijd om verschillende technieken te beschrijven waarmee u MapReduce effectief kunt gebruiken om praktische problemen op te lossen, en om enkele kenmerken van Hadoop te laten zien die de ontwikkeling kunnen vereenvoudigen of de uitvoering van een MapReduce-taak op een cluster aanzienlijk kunnen versnellen.

Zoals we ons herinneren, bestaat MapReduce uit Map-, Shuffle- en Reduce-fasen. In de regel blijkt de Shuffle-fase de moeilijkste te zijn in praktische taken, aangezien de gegevens in deze fase worden gesorteerd. In feite zijn er een aantal taken waarbij alleen de kaartfase achterwege kan blijven. Hier zijn voorbeelden van dergelijke taken:

  • Gegevensfiltering (bijvoorbeeld "Zoek alle records van het IP-adres 123.123.123.123" in de webserverlogboeken);
  • Gegevenstransformatie ("Kolom verwijderen in csv-logs");
  • Laden en ontladen van gegevens uit een externe bron (“Insert all records from the log into the database”).

Dergelijke taken worden opgelost met Map-Only. Bij het maken van een Map-Only-taak in Hadoop, moet u nul aantal reducers specificeren:

Een voorbeeld van een taakconfiguratie met alleen kaarten op hadoop:

oorspronkelijke interface Hadoop-streaminginterface

Geef nul aantal verloopstukken op bij het configureren van job'a:

job.setNumReduceTasks(0); 

We specificeren geen verloopstuk en specificeren een nul aantal verloopstukken. Voorbeeld:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Map Only-taken kunnen eigenlijk heel nuttig zijn. In het Facetz.DCA-platform wordt bijvoorbeeld, om de kenmerken van gebruikers aan de hand van hun gedrag te identificeren, precies één grote kaart gebruikt, waarvan elke mapper een gebruiker als invoer neemt en zijn kenmerken als uitvoer retourneert.

5.2 Combineren

Zoals ik al schreef, is meestal de moeilijkste fase bij het uitvoeren van een Map-Reduce-taak de shuffle-fase. Dit gebeurt omdat de tussentijdse resultaten (uitvoer van mapper) naar schijf worden geschreven, gesorteerd en via het netwerk worden verzonden. Er zijn echter taken waarbij dergelijk gedrag niet erg redelijk lijkt. Bijvoorbeeld, in dezelfde taak van het tellen van woorden in documenten, kunt u de resultaten van de uitvoer van verschillende mappers vooraf aggregeren op één map-reduce-knooppunt van de taak, en de reeds opgetelde waarden voor elke machine doorgeven aan de reducer .

In hadoop kunt u hiervoor een combinatiefunctie definiëren die de uitvoer van een deel van de mappers verwerkt. De combineerfunctie lijkt erg op reduce - het neemt de uitvoer van sommige mappers als invoer en produceert een geaggregeerd resultaat voor deze mappers, dus de reducer wordt ook vaak gebruikt als combiner. Een belangrijk verschil met verkleinen is dat niet alle waarden die overeenkomen met één sleutel bij de combineerfunctie terechtkomen .

Bovendien garandeert hadoop niet dat de combineerfunctie überhaupt wordt uitgevoerd voor de uitvoer van de mapper. Daarom is de combinatiefunctie niet altijd toepasbaar, bijvoorbeeld bij het zoeken naar de mediaanwaarde op sleutel. Niettemin, in die taken waar de combineerfunctie van toepassing is, maakt het gebruik ervan het mogelijk om de snelheid van de MapReduce-taak aanzienlijk te verhogen.

De Combiner gebruiken op hadoop:

oorspronkelijke interface Hadoop-streaming

Geef bij het configureren van job-a de class-Combiner op. In de regel is het hetzelfde als Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Geef de opdracht -combiner op in de opdrachtregelopties. Meestal is deze opdracht hetzelfde als de opdracht reducer. Voorbeeld:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce taakketens

Er zijn situaties waarin één MapReduce niet voldoende is om een ​​probleem op te lossen. Overweeg bijvoorbeeld een enigszins gewijzigde WordCount-taak: er is een set tekstdocumenten, u moet tellen hoeveel woorden van 1 tot 1000 keer in de set voorkomen, hoeveel woorden van 1001 tot 2000, hoeveel van 2001 tot 3000, enzovoort. Voor de oplossing hebben we 2 MapReduce-taken nodig:

  • Gewijzigd aantal woorden, dat voor elk woord berekent in welke van de intervallen het viel;
  • Een MapReduce die telt hoe vaak elk interval is aangetroffen in de uitvoer van de eerste MapReduce.

Pseudocode oplossing:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Om een ​​reeks MapReduce-taken op hadoop uit te voeren, volstaat het om de map die was opgegeven als uitvoer voor de eerste taak op te geven als invoer voor de tweede taak en deze beurtelings uit te voeren.

In de praktijk kunnen ketens van MapReduce-taken vrij complexe reeksen zijn waarin MapReduce-taken zowel sequentieel als parallel aan elkaar kunnen worden gekoppeld. Om het beheer van dergelijke taakuitvoeringsplannen te vereenvoudigen, zijn er aparte tools zoals oozie en luigi, die in een apart artikel in deze serie worden besproken.

5.4 Gedistribueerde cache

Een belangrijk mechanisme in Hadoop is de Distributed Cache. Met Distributed Cache kunt u bestanden (bijv. tekstbestanden, archieven, jar-bestanden) toevoegen aan de omgeving waar de MapReduce-taak wordt uitgevoerd.

U kunt bestanden toevoegen die zijn opgeslagen op HDFS, lokale bestanden (lokaal op de machine van waaruit de taak wordt gestart). Ik heb al impliciet laten zien hoe je Distributed Cache kunt gebruiken met hadoop-streaming door de bestanden mapper.py en reducer.py toe te voegen via de optie -file. In feite kunt u niet alleen mapper.py en reducer.py toevoegen, maar willekeurige bestanden in het algemeen, en ze vervolgens gebruiken alsof ze in een lokale map staan.

Gedistribueerde cache gebruiken:

Native-API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop-streaming

#we vermelden de bestanden die moeten worden toegevoegd aan de gedistribueerde cache in de parameter –files. De optie --files moet voor de andere opties komen.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

gebruik voorbeeld:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Deelname verminderen

Degenen die gewend zijn om met relationele databases te werken, gebruiken vaak de zeer handige bewerking Join, waarmee ze de inhoud van sommige tabellen gezamenlijk kunnen verwerken door ze samen te voegen volgens een bepaalde sleutel. Bij het werken met big data doet dit probleem zich soms ook voor. Beschouw het volgende voorbeeld:

Er zijn logboeken van twee webservers, elk logboek ziet er als volgt uit:

t\t

Voorbeeld van logboekfragment:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Het is noodzakelijk om voor elk IP-adres te berekenen welke van de 2 servers het vaker bezocht. Het resultaat moet de vorm hebben:

\t

Een voorbeeld van een deel van het resultaat:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Helaas, in tegenstelling tot relationele databases, is het samenvoegen van twee logboeken op sleutel (in dit geval op IP-adres) over het algemeen een vrij zware operatie en wordt opgelost met behulp van 3 MapReduce en het Reduce Join-patroon:

ReduceJoin werkt als volgt:

1) Voor elk van de invoerlogboeken wordt een afzonderlijke MapReduce (alleen kaart) gestart, waarbij de invoergegevens worden geconverteerd naar de volgende vorm:

key -> (type, value

Waar sleutel de sleutel is om tabellen aan te koppelen, is Type het type van de tabel (eerste of tweede in ons geval) en Waarde zijn eventuele aanvullende gegevens die aan de sleutel zijn gebonden.

2) De uitvoer van beide MapReduces wordt ingevoerd in de invoer van de 3e MapReduce, die in feite de unie uitvoert. Deze MapReduce bevat een lege Mapper die simpelweg de invoer kopieert. Shuffle ontleedt vervolgens de gegevens in sleutels en voert deze als invoer naar de reducer:

key -> [(type, value)]

Het is belangrijk dat op dit moment de verkleiner records van beide logs ontvangt en dat het tegelijkertijd mogelijk is om aan de hand van het type veld te identificeren uit welke van de twee logs een bepaalde waarde afkomstig is. Er zijn dus voldoende gegevens om het oorspronkelijke probleem op te lossen. In ons geval moet de verkleiner eenvoudigweg voor elke recordsleutel berekenen welk type meer is tegengekomen en dit type uitvoeren.

5.6 MapJoins

Het patroon ReduceJoin beschrijft het algemene geval van het samenvoegen van twee logboeken met een sleutel. Er is echter een speciaal geval waarin de taak aanzienlijk kan worden vereenvoudigd en versneld. Dit is het geval waarin een van de logboeken aanzienlijk kleiner is dan de andere. Beschouw het volgende probleem:

Er zijn 2 logboeken. Het eerste logboek bevat het webserverlogboek (hetzelfde als in de vorige taak), het tweede bestand (100 kb groot) bevat de URL-> Thema-overeenkomst. Voorbeeld 2e bestand:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Voor elk IP-adres is het nodig om te berekenen van welke categorie vanaf dit IP-adres de pagina's het vaakst werden geladen.

In dit geval moeten we ook 2 logs samenvoegen via URL. In dit geval hoeven we echter niet 3 MapReduce uit te voeren, aangezien het tweede logboek volledig in het geheugen past. Om het probleem met de 1e MapReduce op te lossen, kunnen we de tweede log in de gedistribueerde cache laden en wanneer de Mapper is geïnitialiseerd, kunnen we deze gewoon in het geheugen lezen en in de -> topic-woordenlijst plaatsen.

Verder wordt het probleem als volgt opgelost:

kaart:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

verminderen:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce krijgt een ip en een lijst met alle onderwerpen als invoer, het berekent eenvoudig welke van de onderwerpen het vaakst is tegengekomen. De taak wordt dus opgelost met behulp van de 1e MapReduce, en de eigenlijke samenvoeging vindt over het algemeen plaats binnen de kaart (daarom, als aanvullende aggregatie per sleutel niet nodig was, zou de MapOnly-taak achterwege kunnen blijven):