5.1 Nur-Map-Job
Es ist an der Zeit, verschiedene Techniken zu beschreiben, mit denen Sie MapReduce effektiv zur Lösung praktischer Probleme einsetzen können, und einige der Funktionen von Hadoop aufzuzeigen, die die Entwicklung vereinfachen oder die Ausführung einer MapReduce-Aufgabe auf einem Cluster erheblich beschleunigen können.
Wie wir uns erinnern, besteht MapReduce aus den Phasen Map, Shuffle und Reduce. In der Regel erweist sich bei praktischen Aufgaben die Shuffle-Phase als die schwierigste, da in dieser Phase die Daten sortiert werden. Tatsächlich gibt es eine Reihe von Aufgaben, bei denen allein auf die Kartenphase verzichtet werden kann. Hier sind Beispiele für solche Aufgaben:
- Datenfilterung (z. B. „Alle Datensätze der IP-Adresse 123.123.123.123 finden“ in den Webserver-Protokollen);
- Datentransformation („Spalte in CSV-Protokollen löschen“);
- Laden und Entladen von Daten aus einer externen Quelle („Alle Datensätze aus dem Protokoll in die Datenbank einfügen“).
Solche Aufgaben werden mit Map-Only gelöst. Beim Erstellen einer Nur-Map-Aufgabe in Hadoop müssen Sie die Anzahl der Reduzierer auf Null angeben:

Ein Beispiel für eine Nur-Map-Aufgabenkonfiguration auf Hadoop:
native Schnittstelle | Hadoop-Streaming-Schnittstelle |
---|---|
Geben Sie bei der Konfiguration von Job'a die Anzahl der Reduzierungen auf Null an:
|
Wir geben keinen Reduzierer an und geben eine Anzahl von Reduzierern von Null an. Beispiel:
|
Nur Map-Jobs können tatsächlich sehr nützlich sein. Um beispielsweise die Merkmale von Benutzern anhand ihres Verhaltens zu identifizieren, wird in der Facetz.DCA-Plattform genau eine große Map-Only verwendet, deren jeder Mapper einen Benutzer als Eingabe nimmt und seine Eigenschaften als Ausgabe zurückgibt.
5.2 Kombinieren
Wie ich bereits geschrieben habe, ist die Shuffle-Phase normalerweise die schwierigste Phase bei der Durchführung einer Map-Reduce-Aufgabe. Dies geschieht, weil die Zwischenergebnisse (Mapper-Ausgabe) auf die Festplatte geschrieben, sortiert und über das Netzwerk übertragen werden. Allerdings gibt es Aufgaben, bei denen ein solches Verhalten nicht sehr sinnvoll erscheint. Beispielsweise können Sie in derselben Aufgabe, Wörter in Dokumenten zu zählen, die Ergebnisse der Ausgaben mehrerer Mapper auf einem Map-Reduction-Knoten der Aufgabe vorab aggregieren und die bereits summierten Werte für jede Maschine an den Reduzierer übergeben .

In Hadoop können Sie dazu eine Kombinationsfunktion definieren, die die Ausgabe eines Teils der Mapper verarbeitet. Die Kombinationsfunktion ist der Reduzierung sehr ähnlich – sie nimmt die Ausgabe einiger Mapper als Eingabe und erzeugt ein aggregiertes Ergebnis für diese Mapper, daher wird der Reduzierer oft auch als Kombinierer verwendet. Ein wichtiger Unterschied zur Reduzierung besteht darin , dass nicht alle Werte, die einem Schlüssel entsprechen, in die Kombinationsfunktion gelangen .
Darüber hinaus garantiert Hadoop nicht, dass die Combine-Funktion für die Ausgabe des Mappers überhaupt ausgeführt wird. Daher ist die Kombinationsfunktion nicht immer anwendbar, beispielsweise bei der Suche nach dem Medianwert nach Schlüssel. Dennoch ermöglicht ihre Verwendung bei Aufgaben, bei denen die Kombinationsfunktion anwendbar ist, eine deutliche Steigerung der Geschwindigkeit der MapReduce-Aufgabe.
Verwendung des Combiners auf Hadoop:
native Schnittstelle | Hadoop-Streaming |
---|---|
Geben Sie beim Konfigurieren von Job-a den Klassen-Combiner an. In der Regel ist es dasselbe wie Reducer:
|
Geben Sie den Befehl -combiner in den Befehlszeilenoptionen an. Normalerweise ist dieser Befehl derselbe wie der Reducer-Befehl. Beispiel:
|
5.3 MapReduce-Aufgabenketten
Es gibt Situationen, in denen ein MapReduce nicht ausreicht, um ein Problem zu lösen. Betrachten Sie beispielsweise eine leicht modifizierte WordCount-Aufgabe: Es gibt eine Reihe von Textdokumenten. Sie müssen zählen, wie viele Wörter von 1 bis 1000 Mal in der Menge vorkommen, wie viele Wörter von 1001 bis 2000, wie viele von 2001 bis 3000. usw. Für die Lösung benötigen wir 2 MapReduce-Jobs:
- Modifizierte Wortanzahl, die für jedes Wort berechnet, in welches der Intervalle es fällt;
- Ein MapReduce, das zählt, wie oft jedes Intervall in der Ausgabe des ersten MapReduce angetroffen wurde.
Pseudocode-Lösung:
|
|
|
|
Um eine Folge von MapReduce-Aufgaben auf Hadoop auszuführen, reicht es aus, den Ordner, der als Ausgabe für die erste Aufgabe angegeben wurde, als Eingabe für die zweite Aufgabe anzugeben und diese nacheinander auszuführen.
In der Praxis können Ketten von MapReduce-Aufgaben recht komplexe Sequenzen sein, in denen MapReduce-Aufgaben sowohl sequentiell als auch parallel miteinander verbunden werden können. Um die Verwaltung solcher Aufgabenausführungspläne zu vereinfachen, gibt es separate Tools wie Oozie und Luigi, die in einem separaten Artikel dieser Serie besprochen werden.

5.4 Verteilter Cache
Ein wichtiger Mechanismus in Hadoop ist der Distributed Cache. Mit dem verteilten Cache können Sie Dateien (z. B. Textdateien, Archive, JAR-Dateien) zur Umgebung hinzufügen, in der die MapReduce-Aufgabe ausgeführt wird.
Sie können auf HDFS gespeicherte Dateien hinzufügen, lokale Dateien (lokal auf dem Computer, von dem aus die Aufgabe gestartet wird). Ich habe bereits implizit gezeigt, wie man Distributed Cache mit Hadoop-Streaming verwendet, indem ich die Dateien „mapper.py“ und „reducer.py“ über die Option „-file“ hinzugefügt habe. Tatsächlich können Sie nicht nur „mapper.py“ und „reducer.py“, sondern beliebige Dateien im Allgemeinen hinzufügen und diese dann verwenden, als ob sie sich in einem lokalen Ordner befänden.
Verwendung des verteilten Caches:
Native API |
---|
|
Hadoop-Streaming |
---|
#wir listen die Dateien, die dem verteilten Cache hinzugefügt werden müssen, im Parameter –files auf. Die Option --files muss vor den anderen Optionen stehen. Anwendungsbeispiel:
|
5.5 Join reduzieren
Diejenigen, die es gewohnt sind, mit relationalen Datenbanken zu arbeiten, verwenden häufig die sehr praktische Join-Operation, die es ihnen ermöglicht, den Inhalt einiger Tabellen gemeinsam zu verarbeiten, indem sie sie nach einem Schlüssel zusammenfügen. Auch bei der Arbeit mit Big Data tritt dieses Problem manchmal auf. Betrachten Sie das folgende Beispiel:
Es gibt Protokolle von zwei Webservern. Jedes Protokoll sieht folgendermaßen aus:
t\t
Beispiel für ein Log-Snippet:
1446792139
178.78.82.1
/sphingosine/unhurrying.css
1446792139
126.31.163.222
/accentually.js
1446792139
154.164.149.83
/pyroacid/unkemptly.jpg
1446792139
202.27.13.181
/Chawia.js
1446792139
67.123.248.174
/morphographical/dismain.css
1446792139
226.74.123.135
/phanerite.php
1446792139
157.109.106.104
/bisonant.css
Für jede IP-Adresse muss berechnet werden, welchen der beiden Server sie häufiger besucht. Das Ergebnis sollte in der Form vorliegen:
\t
Ein Beispiel für einen Teil des Ergebnisses:
178.78.82.1
first
126.31.163.222
second
154.164.149.83
second
226.74.123.135
first
Im Gegensatz zu relationalen Datenbanken ist das Zusammenführen zweier Protokolle per Schlüssel (in diesem Fall per IP-Adresse) leider ein ziemlich aufwändiger Vorgang und wird mit 3 MapReduce und dem Reduce Join-Muster gelöst:

ReduceJoin funktioniert so:
1) Für jedes der Eingabeprotokolle wird ein separates MapReduce (nur Karte) gestartet, das die Eingabedaten in das folgende Formular konvertiert:
key -> (type, value
Dabei ist „key“ der Schlüssel zum Verknüpfen von Tabellen, „Type“ der Typ der Tabelle (in unserem Fall „erster“ oder „zweiter“) und „Value“ alle zusätzlichen Daten, die an den Schlüssel gebunden sind.
2) Die Ausgänge beider MapReduces werden dem Eingang des dritten MapReduce zugeführt, der tatsächlich die Vereinigung durchführt. Dieses MapReduce enthält einen leeren Mapper, der einfach die Eingabe kopiert. Als nächstes zerlegt shuffle die Daten in Schlüssel und gibt sie als Eingabe an den Reduzierer weiter:
key -> [(type, value)]
Es ist wichtig, dass der Reduzierer in diesem Moment Datensätze aus beiden Protokollen erhält und gleichzeitig anhand des Typfelds identifiziert werden kann, aus welchem der beiden Protokolle ein bestimmter Wert stammt. Es liegen also genügend Daten vor, um das ursprüngliche Problem zu lösen. In unserem Fall muss der Reducer lediglich für jeden Datensatzschlüssel berechnen, welcher Typ mehr gefunden wurde, und diesen Typ ausgeben.
5.6 MapJoins
Das ReduceJoin-Muster beschreibt den allgemeinen Fall der Verknüpfung zweier Protokolle per Schlüssel. Es gibt jedoch einen Sonderfall, in dem die Aufgabe deutlich vereinfacht und beschleunigt werden kann. Dies ist der Fall, wenn einer der Stämme deutlich kleiner ist als der andere. Betrachten Sie das folgende Problem:
Es gibt 2 Protokolle. Das erste Protokoll enthält das Webserverprotokoll (dasselbe wie in der vorherigen Aufgabe), die zweite Datei (100 KB groß) enthält die URL->Theme-Übereinstimmung. Beispiel 2. Datei:
/toyota.php
auto
/football/spartak.html
sport
/cars
auto
/finances/money
business
Für jede IP-Adresse muss berechnet werden, welche Seiten welcher Kategorie von dieser IP-Adresse am häufigsten geladen wurden.
In diesem Fall müssen wir auch zwei Protokolle per URL verknüpfen. In diesem Fall müssen wir jedoch nicht 3 MapReduce ausführen, da das zweite Protokoll vollständig in den Speicher passt. Um das Problem mit dem ersten MapReduce zu lösen, können wir das zweite Protokoll in den Distributed Cache laden und es bei der Initialisierung des Mappers einfach in den Speicher einlesen und in das -> Themenwörterbuch einfügen.
Darüber hinaus wird das Problem wie folgt gelöst:
Karte:
# find the subject of each of the pages of the first log
input_line -> [ip, topic]
reduzieren:
Ip -> [topics] -> [ip, most_popular_topic]
Reduce erhält als Eingabe eine IP und eine Liste aller Themen, es berechnet lediglich, welches der Themen am häufigsten angetroffen wurde. Somit wird die Aufgabe mit dem 1. MapReduce gelöst, und der Join selbst findet im Allgemeinen innerhalb der Karte statt (daher könnte auf den MapOnly-Job verzichtet werden, wenn keine zusätzliche Aggregation nach Schlüssel erforderlich wäre):

GO TO FULL VERSION