4.1 Informations générales sur Hadoop

Le paradigme MapReduce a été proposé par Google en 2004 dans son article MapReduce : Simplified Data Processing on Large Clusters . Étant donné que l'article proposé contenait une description du paradigme, mais que l'implémentation manquait, plusieurs programmeurs de Yahoo ont proposé leur implémentation dans le cadre des travaux sur le robot d'indexation nutch. Vous pouvez en savoir plus sur l'histoire d'Hadoop dans l'article L'histoire d'Hadoop : de 4 nœuds au futur des données .

Initialement, Hadoop était principalement un outil pour stocker des données et exécuter des tâches MapReduce, mais maintenant Hadoop est une grande pile de technologies liées d'une manière ou d'une autre au traitement du Big Data (pas seulement avec MapReduce).

Les principaux composants (de base) de Hadoop sont :

  • Hadoop Distributed File System (HDFS) est un système de fichiers distribué qui vous permet de stocker des informations d'une taille presque illimitée.
  • Hadoop YARN est un framework pour la gestion des ressources de cluster et la gestion des tâches, y compris le framework MapReduce.
  • Hadoop commun

Il existe également un grand nombre de projets directement liés à Hadoop, mais non inclus dans le noyau Hadoop :

  • Hive - un outil pour les requêtes de type SQL sur le Big Data (transforme les requêtes SQL en une série de tâches MapReduce) ;
  • Pig est un langage de programmation pour l'analyse de données de haut niveau. Une ligne de code dans ce langage peut se transformer en une séquence de tâches MapReduce ;
  • Hbase est une base de données en colonnes qui implémente le paradigme BigTable ;
  • Cassandra est une base de données clé-valeur distribuée hautes performances ;
  • ZooKeeper est un service de stockage de configuration distribué et de synchronisation des modifications de configuration ;
  • Mahout est une bibliothèque et un moteur d'apprentissage automatique de données volumineuses.

Par ailleurs, je voudrais noter le projet Apache Spark , qui est un moteur de traitement de données distribué. Apache Spark utilise généralement des composants Hadoop tels que HDFS et YARN pour son travail, alors que lui-même est récemment devenu plus populaire que Hadoop :

Certains de ces composants seront couverts dans des articles distincts de cette série de documents, mais pour l'instant, regardons comment vous pouvez commencer à travailler avec Hadoop et le mettre en pratique.

4.2 Exécution des programmes MapReduce sur Hadoop

Voyons maintenant comment exécuter une tâche MapReduce sur Hadoop. En tant que tâche, nous utiliserons l'exemple classique WordCount , qui a été abordé dans la leçon précédente.

Je rappelle la formulation du problème : il y a un ensemble de documents. Il est nécessaire pour chaque mot apparaissant dans l'ensemble de documents de compter combien de fois le mot apparaît dans l'ensemble.

Solution:

Map divise le document en mots et renvoie un ensemble de paires (mot, 1).

Réduire les sommes des occurrences de chaque mot :

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Maintenant, la tâche consiste à programmer cette solution sous la forme d'un code pouvant être exécuté sur Hadoop et exécuté.

4.3 Méthode numéro 1. Streaming Hadoop

Le moyen le plus simple d'exécuter un programme MapReduce sur Hadoop consiste à utiliser l'interface de diffusion Hadoop. L'interface de streaming suppose que map et reduce sont implémentés en tant que programmes qui récupèrent les données de stdin et les sortent vers stdout .

Le programme qui exécute la fonction map s'appelle mapper. Le programme qui exécute reduce est appelé, respectivement, reducer .

L'interface Streaming suppose par défaut qu'une ligne entrante dans un mappeur ou un réducteur correspond à une entrée entrante pour map .

La sortie du mappeur arrive à l'entrée du réducteur sous forme de paires (clé, valeur), alors que toutes les paires correspondent à la même clé :

  • Garanti d'être traité par un seul lancement du réducteur ;
  • Sera soumis à l'entrée dans une rangée (c'est-à-dire que si un réducteur traite plusieurs clés différentes, l'entrée sera regroupée par clé).

Implémentons donc le mappeur et le réducteur en python :

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Les données que Hadoop traitera doivent être stockées sur HDFS. Téléchargeons nos articles et mettons-les sur HDFS. Pour cela, utilisez la commande hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

L'utilitaire hadoop fs prend en charge un grand nombre de méthodes de manipulation du système de fichiers, dont beaucoup sont identiques aux utilitaires Linux standard.

Commençons maintenant la tâche de diffusion :

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

L'utilitaire yarn est utilisé pour lancer et gérer diverses applications (y compris basées sur map-reduce) sur un cluster. Hadoop-streaming.jar n'est qu'un exemple d'une telle application de fil.

Viennent ensuite les options de lancement :

  • input - dossier avec les données source sur hdfs ;
  • output - dossier sur hdfs où vous voulez mettre le résultat ;
  • fichier - fichiers nécessaires lors de l'exécution de la tâche de réduction de carte ;
  • mapper est la commande console qui sera utilisée pour l'étape map ;
  • reduce est la commande de console qui sera utilisée pour l'étape de réduction.

Après le lancement, vous pouvez voir la progression de la tâche dans la console et une URL pour afficher des informations plus détaillées sur la tâche.

Dans l'interface disponible à cette URL, vous pouvez connaître un état d'exécution plus détaillé des tâches, consulter les journaux de chaque mappeur et réducteur (ce qui est très utile en cas d'échec des tâches).

Le résultat du travail après une exécution réussie est ajouté à HDFS dans le dossier que nous avons spécifié dans le champ de sortie. Vous pouvez afficher son contenu à l'aide de la commande "hadoop fs -ls lenta_wordcount".

Le résultat lui-même peut être obtenu comme suit :

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

La commande "hadoop fs -text" affiche le contenu du dossier sous forme de texte. J'ai trié le résultat par le nombre d'occurrences des mots. Comme prévu, les mots les plus courants dans la langue sont les prépositions.

4.4 Méthode numéro 2 : utiliser Java

Hadoop lui-même est écrit en Java, et l'interface native de Hadoop est également basée sur Java. Montrons à quoi ressemble une application java native pour wordcount :

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Cette classe fait exactement la même chose que notre exemple Python. Nous créons les classes TokenizerMapper et IntSumReducer en dérivant respectivement des classes Mapper et Reducer. Les classes transmises en tant que paramètres de modèle spécifient les types de valeurs d'entrée et de sortie. L'API native suppose que la fonction map reçoit une paire clé-valeur en entrée. Puisque dans notre cas la clé est vide, nous définissons simplement Object comme type de clé.

Dans la méthode Main, nous démarrons la tâche mapreduce et définissons ses paramètres - nom, mappeur et réducteur, le chemin dans HDFS, où se trouvent les données d'entrée et où placer le résultat. Pour compiler, nous avons besoin de bibliothèques hadoop. J'utilise Maven pour construire, pour lequel cloudera a un référentiel. Les instructions pour le configurer peuvent être trouvées ici. En conséquence, le fichier pom.xmp (qui est utilisé par maven pour décrire l'assemblage du projet) j'ai obtenu ce qui suit):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Compilons le projet dans un package jar :

mvn clean package

Après avoir construit le projet dans un fichier jar, le lancement se produit de la même manière, comme dans le cas de l'interface de streaming :

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Nous attendons l'exécution et vérifions le résultat:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Comme vous pouvez le deviner, le résultat de l'exécution de notre application native est le même que celui de l'application de streaming que nous avons lancée de la manière précédente.