4.1 Allgemeine Informationen zu Hadoop

Das MapReduce-Paradigma wurde 2004 von Google in seinem Artikel MapReduce: Simplified Data Processing on Large Clusters vorgeschlagen . Da der vorgeschlagene Artikel eine Beschreibung des Paradigmas enthielt, die Implementierung jedoch fehlte, schlugen mehrere Programmierer von Yahoo ihre Implementierung im Rahmen der Arbeit am Nutch-Webcrawler vor. Mehr über die Geschichte von Hadoop können Sie im Artikel Die Geschichte von Hadoop: Von 4 Knoten zur Zukunft der Daten lesen .

Ursprünglich war Hadoop in erster Linie ein Tool zum Speichern von Daten und zum Ausführen von MapReduce-Aufgaben. Heute ist Hadoop jedoch ein großer Stapel von Technologien, die auf die eine oder andere Weise mit der Verarbeitung großer Datenmengen zusammenhängen (nicht nur mit MapReduce).

Die Haupt-(Kern-)Komponenten von Hadoop sind:

  • Hadoop Distributed File System (HDFS) ist ein verteiltes Dateisystem, mit dem Sie Informationen nahezu unbegrenzter Größe speichern können.
  • Hadoop YARN ist ein Framework für die Clusterressourcenverwaltung und Aufgabenverwaltung, einschließlich des MapReduce-Frameworks.
  • Hadoop üblich

Es gibt auch eine große Anzahl von Projekten, die in direktem Zusammenhang mit Hadoop stehen, aber nicht im Hadoop-Kern enthalten sind:

  • Hive – ein Tool für SQL-ähnliche Abfragen über Big Data (wandelt SQL-Abfragen in eine Reihe von MapReduce-Aufgaben um);
  • Pig ist eine Programmiersprache für die Datenanalyse auf hohem Niveau. Eine Codezeile in dieser Sprache kann in eine Folge von MapReduce-Aufgaben umgewandelt werden;
  • Hbase ist eine spaltenbasierte Datenbank, die das BigTable-Paradigma implementiert.
  • Cassandra ist eine leistungsstarke verteilte Schlüsselwertdatenbank.
  • ZooKeeper ist ein Dienst zur verteilten Konfigurationsspeicherung und Synchronisierung von Konfigurationsänderungen;
  • Mahout ist eine Big-Data-Bibliothek und Engine für maschinelles Lernen.

Unabhängig davon möchte ich das Apache Spark- Projekt erwähnen , bei dem es sich um eine Engine für die verteilte Datenverarbeitung handelt. Apache Spark verwendet für seine Arbeit typischerweise Hadoop-Komponenten wie HDFS und YARN, während es selbst in letzter Zeit beliebter geworden ist als Hadoop:

Einige dieser Komponenten werden in separaten Artikeln dieser Materialreihe behandelt, aber zunächst schauen wir uns an, wie Sie mit der Arbeit mit Hadoop beginnen und es in die Praxis umsetzen können.

4.2 Ausführen von MapReduce-Programmen auf Hadoop

Schauen wir uns nun an, wie man eine MapReduce-Aufgabe auf Hadoop ausführt. Als Aufgabe verwenden wir das klassische WordCount- Beispiel , das in der vorherigen Lektion besprochen wurde.

Ich möchte Sie an die Formulierung des Problems erinnern: Es gibt eine Reihe von Dokumenten. Für jedes in der Dokumentenmenge vorkommende Wort muss gezählt werden, wie oft das Wort in der Dokumentenmenge vorkommt.

Lösung:

Map teilt das Dokument in Wörter auf und gibt eine Reihe von Paaren zurück (Wort, 1).

Reduzieren summiert die Vorkommen jedes Wortes:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Nun besteht die Aufgabe darin, diese Lösung in Form von Code zu programmieren, der auf Hadoop ausgeführt und ausgeführt werden kann.

4.3 Methode Nummer 1. Hadoop-Streaming

Der einfachste Weg, ein MapReduce-Programm auf Hadoop auszuführen, ist die Verwendung der Hadoop-Streaming-Schnittstelle. Die Streaming-Schnittstelle geht davon aus, dass Map und Reduce als Programme implementiert sind, die Daten von stdin übernehmen und nach stdout ausgeben .

Das Programm, das die Kartenfunktion ausführt, heißt Mapper. Das Programm, das Reduzieren ausführt , heißt Reduzierer .

Die Streaming-Schnittstelle geht standardmäßig davon aus, dass eine eingehende Zeile in einem Mapper oder Reducer einem eingehenden Eintrag für map entspricht .

Die Ausgabe des Mappers gelangt in Form von Paaren (Schlüssel, Wert) zur Eingabe des Reduzierers, wobei alle Paare demselben Schlüssel entsprechen:

  • Garantierte Verarbeitung durch einen einzigen Start des Reduzierers;
  • Wird nacheinander an die Eingabe gesendet (d. h. wenn ein Reduzierer mehrere verschiedene Schlüssel verarbeitet, wird die Eingabe nach Schlüssel gruppiert).

Also implementieren wir Mapper und Reducer in Python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Die von Hadoop verarbeiteten Daten müssen auf HDFS gespeichert werden. Laden wir unsere Artikel hoch und stellen sie auf HDFS. Verwenden Sie dazu den Befehl hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

Das Hadoop-FS-Dienstprogramm unterstützt eine große Anzahl von Methoden zur Manipulation des Dateisystems, von denen viele mit den Standard-Linux-Dienstprogrammen identisch sind.

Beginnen wir nun mit der Streaming-Aufgabe:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Das Dienstprogramm Yarn wird zum Starten und Verwalten verschiedener Anwendungen (einschließlich Map-Reduction-basierter) in einem Cluster verwendet. Hadoop-streaming.jar ist nur ein Beispiel für eine solche Garnanwendung.

Als nächstes folgen die Startoptionen:

  • Eingabe – Ordner mit Quelldaten auf HDFS;
  • Ausgabe – Ordner auf HDFS, in dem Sie das Ergebnis ablegen möchten;
  • Datei – Dateien, die während der Ausführung der Kartenreduzierungsaufgabe benötigt werden;
  • Mapper ist der Konsolenbefehl, der für die Map-Stufe verwendet wird;
  • „reduce“ ist der Konsolenbefehl, der für die Reduce-Phase verwendet wird.

Nach dem Start können Sie den Fortschritt der Aufgabe in der Konsole sehen und eine URL zum Anzeigen detaillierterer Informationen zur Aufgabe aufrufen.

In der unter dieser URL verfügbaren Schnittstelle können Sie einen detaillierteren Status der Aufgabenausführung erfahren und die Protokolle jedes Mappers und Reducers anzeigen (was bei fehlgeschlagenen Aufgaben sehr nützlich ist).

Das Ergebnis der Arbeit wird nach erfolgreicher Ausführung zu HDFS in dem Ordner hinzugefügt, den wir im Ausgabefeld angegeben haben. Sie können den Inhalt mit dem Befehl „hadoop fs -ls lenta_wordcount“ anzeigen.

Das Ergebnis selbst kann wie folgt erhalten werden:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

Der Befehl „hadoop fs -text“ zeigt den Inhalt des Ordners in Textform an. Ich habe das Ergebnis nach der Häufigkeit des Vorkommens der Wörter sortiert. Wie erwartet sind Präpositionen die häufigsten Wörter in der Sprache.

4.4 Methode Nummer 2: Verwenden Sie Java

Hadoop selbst ist in Java geschrieben und die native Schnittstelle von Hadoop basiert ebenfalls auf Java. Lassen Sie uns zeigen, wie eine native Java-Anwendung für Wordcount aussieht:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Diese Klasse macht genau das Gleiche wie unser Python-Beispiel. Wir erstellen die Klassen TokenizerMapper und IntSumReducer, indem wir von den Klassen Mapper bzw. Reducer ableiten. Die als Vorlagenparameter übergebenen Klassen geben die Typen der Eingabe- und Ausgabewerte an. Die native API geht davon aus, dass der Kartenfunktion ein Schlüssel-Wert-Paar als Eingabe gegeben wird. Da in unserem Fall der Schlüssel leer ist, definieren wir einfach Object als Schlüsseltyp.

In der Main-Methode starten wir die Mapreduce-Aufgabe und definieren ihre Parameter – Name, Mapper und Reducer, den Pfad in HDFS, wo sich die Eingabedaten befinden und wo das Ergebnis abgelegt werden soll. Zum Kompilieren benötigen wir Hadoop-Bibliotheken. Zum Erstellen verwende ich Maven, wofür Cloudera ein Repository hat. Eine Anleitung zur Einrichtung finden Sie hier. Als Ergebnis der pom.xmp-Datei (die von Maven verwendet wird, um die Zusammenstellung des Projekts zu beschreiben) habe ich Folgendes erhalten:

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Kompilieren wir das Projekt in ein JAR-Paket:

mvn clean package

Nach dem Erstellen des Projekts in einer JAR-Datei erfolgt der Start auf ähnliche Weise wie im Fall der Streaming-Schnittstelle:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Wir warten auf die Ausführung und prüfen das Ergebnis:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Wie Sie vielleicht erraten haben, ist das Ergebnis der Ausführung unserer nativen Anwendung dasselbe wie das Ergebnis der Streaming-Anwendung, die wir auf die vorherige Weise gestartet haben.