5.1 Nur-Map-Job

Es ist an der Zeit, verschiedene Techniken zu beschreiben, mit denen Sie MapReduce effektiv zur Lösung praktischer Probleme einsetzen können, und einige der Funktionen von Hadoop aufzuzeigen, die die Entwicklung vereinfachen oder die Ausführung einer MapReduce-Aufgabe auf einem Cluster erheblich beschleunigen können.

Wie wir uns erinnern, besteht MapReduce aus den Phasen Map, Shuffle und Reduce. In der Regel erweist sich bei praktischen Aufgaben die Shuffle-Phase als die schwierigste, da in dieser Phase die Daten sortiert werden. Tatsächlich gibt es eine Reihe von Aufgaben, bei denen allein auf die Kartenphase verzichtet werden kann. Hier sind Beispiele für solche Aufgaben:

  • Datenfilterung (z. B. „Alle Datensätze der IP-Adresse 123.123.123.123 finden“ in den Webserver-Protokollen);
  • Datentransformation („Spalte in CSV-Protokollen löschen“);
  • Laden und Entladen von Daten aus einer externen Quelle („Alle Datensätze aus dem Protokoll in die Datenbank einfügen“).

Solche Aufgaben werden mit Map-Only gelöst. Beim Erstellen einer Nur-Map-Aufgabe in Hadoop müssen Sie die Anzahl der Reduzierer auf Null angeben:

Ein Beispiel für eine Nur-Map-Aufgabenkonfiguration auf Hadoop:

native Schnittstelle Hadoop-Streaming-Schnittstelle

Geben Sie bei der Konfiguration von Job'a die Anzahl der Reduzierungen auf Null an:

job.setNumReduceTasks(0); 

Wir geben keinen Reduzierer an und geben eine Anzahl von Reduzierern von Null an. Beispiel:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Nur Map-Jobs können tatsächlich sehr nützlich sein. Um beispielsweise die Merkmale von Benutzern anhand ihres Verhaltens zu identifizieren, wird in der Facetz.DCA-Plattform genau eine große Map-Only verwendet, deren jeder Mapper einen Benutzer als Eingabe nimmt und seine Eigenschaften als Ausgabe zurückgibt.

5.2 Kombinieren

Wie ich bereits geschrieben habe, ist die Shuffle-Phase normalerweise die schwierigste Phase bei der Durchführung einer Map-Reduce-Aufgabe. Dies geschieht, weil die Zwischenergebnisse (Mapper-Ausgabe) auf die Festplatte geschrieben, sortiert und über das Netzwerk übertragen werden. Allerdings gibt es Aufgaben, bei denen ein solches Verhalten nicht sehr sinnvoll erscheint. Beispielsweise können Sie in derselben Aufgabe, Wörter in Dokumenten zu zählen, die Ergebnisse der Ausgaben mehrerer Mapper auf einem Map-Reduction-Knoten der Aufgabe vorab aggregieren und die bereits summierten Werte für jede Maschine an den Reduzierer übergeben .

In Hadoop können Sie dazu eine Kombinationsfunktion definieren, die die Ausgabe eines Teils der Mapper verarbeitet. Die Kombinationsfunktion ist der Reduzierung sehr ähnlich – sie nimmt die Ausgabe einiger Mapper als Eingabe und erzeugt ein aggregiertes Ergebnis für diese Mapper, daher wird der Reduzierer oft auch als Kombinierer verwendet. Ein wichtiger Unterschied zur Reduzierung besteht darin , dass nicht alle Werte, die einem Schlüssel entsprechen, in die Kombinationsfunktion gelangen .

Darüber hinaus garantiert Hadoop nicht, dass die Combine-Funktion für die Ausgabe des Mappers überhaupt ausgeführt wird. Daher ist die Kombinationsfunktion nicht immer anwendbar, beispielsweise bei der Suche nach dem Medianwert nach Schlüssel. Dennoch ermöglicht ihre Verwendung bei Aufgaben, bei denen die Kombinationsfunktion anwendbar ist, eine deutliche Steigerung der Geschwindigkeit der MapReduce-Aufgabe.

Verwendung des Combiners auf Hadoop:

native Schnittstelle Hadoop-Streaming

Geben Sie beim Konfigurieren von Job-a den Klassen-Combiner an. In der Regel ist es dasselbe wie Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Geben Sie den Befehl -combiner in den Befehlszeilenoptionen an. Normalerweise ist dieser Befehl derselbe wie der Reducer-Befehl. Beispiel:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce-Aufgabenketten

Es gibt Situationen, in denen ein MapReduce nicht ausreicht, um ein Problem zu lösen. Betrachten Sie beispielsweise eine leicht modifizierte WordCount-Aufgabe: Es gibt eine Reihe von Textdokumenten. Sie müssen zählen, wie viele Wörter von 1 bis 1000 Mal in der Menge vorkommen, wie viele Wörter von 1001 bis 2000, wie viele von 2001 bis 3000. usw. Für die Lösung benötigen wir 2 MapReduce-Jobs:

  • Modifizierte Wortanzahl, die für jedes Wort berechnet, in welches der Intervalle es fällt;
  • Ein MapReduce, das zählt, wie oft jedes Intervall in der Ausgabe des ersten MapReduce angetroffen wurde.

Pseudocode-Lösung:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Um eine Folge von MapReduce-Aufgaben auf Hadoop auszuführen, reicht es aus, den Ordner, der als Ausgabe für die erste Aufgabe angegeben wurde, als Eingabe für die zweite Aufgabe anzugeben und diese nacheinander auszuführen.

In der Praxis können Ketten von MapReduce-Aufgaben recht komplexe Sequenzen sein, in denen MapReduce-Aufgaben sowohl sequentiell als auch parallel miteinander verbunden werden können. Um die Verwaltung solcher Aufgabenausführungspläne zu vereinfachen, gibt es separate Tools wie Oozie und Luigi, die in einem separaten Artikel dieser Serie besprochen werden.

5.4 Verteilter Cache

Ein wichtiger Mechanismus in Hadoop ist der Distributed Cache. Mit dem verteilten Cache können Sie Dateien (z. B. Textdateien, Archive, JAR-Dateien) zur Umgebung hinzufügen, in der die MapReduce-Aufgabe ausgeführt wird.

Sie können auf HDFS gespeicherte Dateien hinzufügen, lokale Dateien (lokal auf dem Computer, von dem aus die Aufgabe gestartet wird). Ich habe bereits implizit gezeigt, wie man Distributed Cache mit Hadoop-Streaming verwendet, indem ich die Dateien „mapper.py“ und „reducer.py“ über die Option „-file“ hinzugefügt habe. Tatsächlich können Sie nicht nur „mapper.py“ und „reducer.py“, sondern beliebige Dateien im Allgemeinen hinzufügen und diese dann verwenden, als ob sie sich in einem lokalen Ordner befänden.

Verwendung des verteilten Caches:

Native API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop-Streaming

#wir listen die Dateien, die dem verteilten Cache hinzugefügt werden müssen, im Parameter –files auf. Die Option --files muss vor den anderen Optionen stehen.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

Anwendungsbeispiel:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Join reduzieren

Diejenigen, die es gewohnt sind, mit relationalen Datenbanken zu arbeiten, verwenden häufig die sehr praktische Join-Operation, die es ihnen ermöglicht, den Inhalt einiger Tabellen gemeinsam zu verarbeiten, indem sie sie nach einem Schlüssel zusammenfügen. Auch bei der Arbeit mit Big Data tritt dieses Problem manchmal auf. Betrachten Sie das folgende Beispiel:

Es gibt Protokolle von zwei Webservern. Jedes Protokoll sieht folgendermaßen aus:

t\t

Beispiel für ein Log-Snippet:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Für jede IP-Adresse muss berechnet werden, welchen der beiden Server sie häufiger besucht. Das Ergebnis sollte in der Form vorliegen:

\t

Ein Beispiel für einen Teil des Ergebnisses:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Im Gegensatz zu relationalen Datenbanken ist das Zusammenführen zweier Protokolle per Schlüssel (in diesem Fall per IP-Adresse) leider ein ziemlich aufwändiger Vorgang und wird mit 3 MapReduce und dem Reduce Join-Muster gelöst:

ReduceJoin funktioniert so:

1) Für jedes der Eingabeprotokolle wird ein separates MapReduce (nur Karte) gestartet, das die Eingabedaten in das folgende Formular konvertiert:

key -> (type, value

Dabei ist „key“ der Schlüssel zum Verknüpfen von Tabellen, „Type“ der Typ der Tabelle (in unserem Fall „erster“ oder „zweiter“) und „Value“ alle zusätzlichen Daten, die an den Schlüssel gebunden sind.

2) Die Ausgänge beider MapReduces werden dem Eingang des dritten MapReduce zugeführt, der tatsächlich die Vereinigung durchführt. Dieses MapReduce enthält einen leeren Mapper, der einfach die Eingabe kopiert. Als nächstes zerlegt shuffle die Daten in Schlüssel und gibt sie als Eingabe an den Reduzierer weiter:

key -> [(type, value)]

Es ist wichtig, dass der Reduzierer in diesem Moment Datensätze aus beiden Protokollen erhält und gleichzeitig anhand des Typfelds identifiziert werden kann, aus welchem ​​der beiden Protokolle ein bestimmter Wert stammt. Es liegen also genügend Daten vor, um das ursprüngliche Problem zu lösen. In unserem Fall muss der Reducer lediglich für jeden Datensatzschlüssel berechnen, welcher Typ mehr gefunden wurde, und diesen Typ ausgeben.

5.6 MapJoins

Das ReduceJoin-Muster beschreibt den allgemeinen Fall der Verknüpfung zweier Protokolle per Schlüssel. Es gibt jedoch einen Sonderfall, in dem die Aufgabe deutlich vereinfacht und beschleunigt werden kann. Dies ist der Fall, wenn einer der Stämme deutlich kleiner ist als der andere. Betrachten Sie das folgende Problem:

Es gibt 2 Protokolle. Das erste Protokoll enthält das Webserverprotokoll (dasselbe wie in der vorherigen Aufgabe), die zweite Datei (100 KB groß) enthält die URL->Theme-Übereinstimmung. Beispiel 2. Datei:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Für jede IP-Adresse muss berechnet werden, welche Seiten welcher Kategorie von dieser IP-Adresse am häufigsten geladen wurden.

In diesem Fall müssen wir auch zwei Protokolle per URL verknüpfen. In diesem Fall müssen wir jedoch nicht 3 MapReduce ausführen, da das zweite Protokoll vollständig in den Speicher passt. Um das Problem mit dem ersten MapReduce zu lösen, können wir das zweite Protokoll in den Distributed Cache laden und es bei der Initialisierung des Mappers einfach in den Speicher einlesen und in das -> Themenwörterbuch einfügen.

Darüber hinaus wird das Problem wie folgt gelöst:

Karte:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

reduzieren:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce erhält als Eingabe eine IP und eine Liste aller Themen, es berechnet lediglich, welches der Themen am häufigsten angetroffen wurde. Somit wird die Aufgabe mit dem 1. MapReduce gelöst, und der Join selbst findet im Allgemeinen innerhalb der Karte statt (daher könnte auf den MapOnly-Job verzichtet werden, wenn keine zusätzliche Aggregation nach Schlüssel erforderlich wäre):