5.1 Mapper uniquement le travail

Il est temps de décrire diverses techniques qui vous permettent d'utiliser efficacement MapReduce pour résoudre des problèmes pratiques, ainsi que de montrer certaines des fonctionnalités de Hadoop qui peuvent simplifier le développement ou accélérer considérablement l'exécution d'une tâche MapReduce sur un cluster.

Comme nous nous en souvenons, MapReduce se compose des étapes Map, Shuffle et Reduce. En règle générale, l'étape Shuffle s'avère être la plus difficile dans les tâches pratiques, car les données sont triées à ce stade. En fait, il existe un certain nombre de tâches dans lesquelles l'étape Carte seule peut être supprimée. Voici des exemples de telles tâches :

  • Filtrage des données (par exemple, "Rechercher tous les enregistrements de l'adresse IP 123.123.123.123" dans les journaux du serveur Web) ;
  • Transformation des données ("Supprimer la colonne dans les journaux csv");
  • Chargement et déchargement de données depuis une source externe ("Insérer tous les enregistrements du journal dans la base de données").

Ces tâches sont résolues à l'aide de Map-Only. Lors de la création d'une tâche Map-Only dans Hadoop, vous devez spécifier un nombre nul de réducteurs :

Un exemple de configuration de tâche map-only sur hadoop :

interface native Interface de diffusion Hadoop

Spécifiez zéro nombre de réducteurs lors de la configuration de job'a :

job.setNumReduceTasks(0); 

Nous ne spécifions pas de réducteur et spécifions un nombre nul de réducteurs. Exemple:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Les travaux Map Only peuvent en fait être très utiles. Par exemple, dans la plateforme Facetz.DCA, pour identifier les caractéristiques des utilisateurs par leur comportement, c'est précisément une grande map-only qui est utilisée, dont chaque mappeur prend un utilisateur en entrée et retourne ses caractéristiques en sortie.

5.2 Combiner

Comme je l'ai déjà écrit, l'étape la plus difficile lors de l'exécution d'une tâche Map-Reduce est généralement l'étape de mélange. Cela se produit parce que les résultats intermédiaires (sortie du mappeur) sont écrits sur le disque, triés et transmis sur le réseau. Cependant, il existe des tâches dans lesquelles un tel comportement ne semble pas très raisonnable. Par exemple, dans la même tâche de comptage de mots dans des documents, vous pouvez pré-agréger les résultats des sorties de plusieurs mappeurs sur un nœud map-reduce de la tâche, et transmettre les valeurs déjà additionnées pour chaque machine au réducteur .

Dans hadoop, pour cela, vous pouvez définir une fonction de combinaison qui traitera la sortie d'une partie des mappeurs. La fonction de combinaison est très similaire à réduire - elle prend la sortie de certains mappeurs en entrée et produit un résultat agrégé pour ces mappeurs, de sorte que le réducteur est souvent également utilisé comme combineur. Une différence importante par rapport à reduce est que toutes les valeurs correspondant à une clé n'atteignent pas la fonction de combinaison .

De plus, hadoop ne garantit pas du tout que la fonction de combinaison sera exécutée pour la sortie du mappeur. Par conséquent, la fonction de combinaison n'est pas toujours applicable, par exemple dans le cas d'une recherche de la valeur médiane par clé. Néanmoins, dans les tâches où la fonction de combinaison est applicable, son utilisation permet d'obtenir une augmentation significative de la vitesse de la tâche MapReduce.

Utilisation du Combiner sur hadoop :

interface native Diffusion Hadoop

Lors de la configuration de job-a, spécifiez le class-Combiner. En règle générale, c'est la même chose que Reducer :

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Spécifiez la commande -combiner dans les options de ligne de commande. En règle générale, cette commande est identique à la commande reducer. Exemple:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Chaînes de tâches MapReduce

Il y a des situations où un MapReduce n'est pas suffisant pour résoudre un problème. Par exemple, considérons une tâche WordCount légèrement modifiée : il y a un ensemble de documents texte, vous devez compter combien de mots sont apparus de 1 à 1000 fois dans l'ensemble, combien de mots de 1001 à 2000, combien de 2001 à 3000, et ainsi de suite. Pour la solution, nous avons besoin de 2 tâches MapReduce :

  • Nombre de mots modifié, qui pour chaque mot calculera dans lequel des intervalles il est tombé ;
  • Un MapReduce qui compte combien de fois chaque intervalle a été rencontré dans la sortie du premier MapReduce.

Solution de pseudo-code :

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Pour exécuter une séquence de tâches MapReduce sur hadoop, il suffit de spécifier le dossier qui a été spécifié en sortie pour la première en entrée pour la seconde tâche et de les exécuter à tour de rôle.

En pratique, les chaînes de tâches MapReduce peuvent être des séquences assez complexes dans lesquelles les tâches MapReduce peuvent être connectées à la fois séquentiellement et parallèlement les unes aux autres. Pour simplifier la gestion de tels plans d'exécution de tâches, il existe des outils distincts comme oozie et luigi, qui seront abordés dans un article séparé de cette série.

5.4 Cache distribué

Un mécanisme important dans Hadoop est le cache distribué. Le cache distribué vous permet d'ajouter des fichiers (par exemple, des fichiers texte, des archives, des fichiers jar) à l'environnement dans lequel la tâche MapReduce est en cours d'exécution.

Vous pouvez ajouter des fichiers stockés sur HDFS, des fichiers locaux (locaux à la machine à partir de laquelle la tâche est lancée). J'ai déjà implicitement montré comment utiliser le cache distribué avec le streaming hadoop en ajoutant les fichiers mapper.py et reducer.py via l'option -file. En fait, vous pouvez ajouter non seulement mapper.py et reducer.py, mais des fichiers arbitraires en général, puis les utiliser comme s'ils se trouvaient dans un dossier local.

Utilisation du cache distribué :

API native
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Streaming Hadoop

#nous listons les fichiers qui doivent être ajoutés au cache distribué dans le paramètre –files. L'option --files doit précéder les autres options.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

exemple d'utilisation :

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Réduire la jointure

Ceux qui sont habitués à travailler avec des bases de données relationnelles utilisent souvent l'opération Join très pratique, qui leur permet de traiter conjointement le contenu de certaines tables en les joignant selon une clé. Lorsque vous travaillez avec des données volumineuses, ce problème se pose également parfois. Considérez l'exemple suivant :

Il existe des journaux de deux serveurs Web, chaque journal ressemble à ceci :

t\t

Exemple d'extrait de journal :

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Il faut calculer pour chaque adresse IP lequel des 2 serveurs elle a visité le plus souvent. Le résultat doit être sous la forme :

\t

Un exemple d'une partie du résultat :

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Malheureusement, contrairement aux bases de données relationnelles, en général, joindre deux logs par clé (dans ce cas, par adresse IP) est une opération assez lourde et se résout à l'aide de 3 MapReduce et du pattern Reduce Join :

ReduceJoin fonctionne comme ceci :

1) Pour chacun des journaux d'entrée, un MapReduce séparé (Map uniquement) est lancé, convertissant les données d'entrée sous la forme suivante :

key -> (type, value

Où key est la clé sur laquelle joindre les tables, Type est le type de la table (premier ou deuxième dans notre cas) et Value est toute donnée supplémentaire liée à la clé.

2) Les sorties des deux MapReduce sont envoyées à l'entrée du 3ème MapReduce, qui, en fait, effectue l'union. Ce MapReduce contient un Mapper vide qui copie simplement l'entrée. Ensuite, shuffle décompose les données en clés et les transmet au réducteur en entrée :

key -> [(type, value)]

Il est important qu'à ce moment le réducteur reçoive les enregistrements des deux journaux, et en même temps, il est possible d'identifier par le champ type de lequel des deux journaux provient une valeur particulière. Il y a donc suffisamment de données pour résoudre le problème initial. Dans notre cas, le réducteur doit simplement calculer pour chaque clé d'enregistrement quel type a rencontré le plus et afficher ce type.

5.6 MapJoins

Le modèle ReduceJoin décrit le cas général de la jointure de deux journaux par clé. Cependant, il existe un cas particulier dans lequel la tâche peut être considérablement simplifiée et accélérée. C'est le cas où l'un des journaux est nettement plus petit que l'autre. Considérez le problème suivant :

Il y a 2 journaux. Le premier journal contient le journal du serveur Web (le même que dans la tâche précédente), le second fichier (taille de 100 Ko) contient l'URL-> Correspondance du thème. Exemple 2ème fichier :

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Pour chaque adresse IP, il faut calculer les pages de quelle catégorie de cette adresse IP ont été chargées le plus souvent.

Dans ce cas, nous devons également joindre 2 logs par URL. Cependant, dans ce cas, nous n'avons pas besoin d'exécuter 3 MapReduce, car le deuxième journal tiendra complètement en mémoire. Afin de résoudre le problème en utilisant le 1er MapReduce, nous pouvons charger le deuxième journal dans le cache distribué, et lorsque le mappeur est initialisé, il suffit de le lire en mémoire, en le mettant dans le -> dictionnaire de sujets.

De plus, le problème est résolu comme suit :

carte:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

réduire:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce reçoit une adresse IP et une liste de tous les sujets en entrée, il calcule simplement lequel des sujets a été rencontré le plus souvent. Ainsi, la tâche est résolue à l'aide du 1er MapReduce, et la jointure proprement dite a généralement lieu à l'intérieur de la carte (ainsi, si une agrégation supplémentaire par clé n'était pas nécessaire, le travail MapOnly pourrait être supprimé):