5.1 Trabajo de solo mapa

Es hora de describir varias técnicas que le permiten usar MapReduce de manera efectiva para resolver problemas prácticos, así como mostrar algunas de las características de Hadoop que pueden simplificar el desarrollo o acelerar significativamente la ejecución de una tarea de MapReduce en un clúster.

Como recordamos, MapReduce consta de etapas Map, Shuffle y Reduce. Como regla general, en las tareas prácticas, la etapa Shuffle resulta ser la más difícil, ya que en esta etapa se ordenan los datos. De hecho, hay una serie de tareas en las que se puede prescindir únicamente de la etapa Mapa. Aquí hay ejemplos de tales tareas:

  • Filtrado de datos (por ejemplo, "Buscar todos los registros de la dirección IP 123.123.123.123" en los registros del servidor web);
  • Transformación de datos ("Eliminar columna en csv-logs");
  • Cargar y descargar datos de una fuente externa ("Insertar todos los registros del registro en la base de datos").

Tales tareas se resuelven usando Map-Only. Al crear una tarea de solo mapa en Hadoop, debe especificar una cantidad cero de reductores:

Un ejemplo de una configuración de tarea de solo mapa en hadoop:

interfaz nativa Interfaz de transmisión de Hadoop

Especifique cero número de reductores al configurar job'a:

job.setNumReduceTasks(0); 

No especificamos un reductor y especificamos un número cero de reductores. Ejemplo:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Los trabajos Map Only pueden ser realmente muy útiles. Por ejemplo, en la plataforma Facetz.DCA, para identificar las características de los usuarios por su comportamiento, se utiliza precisamente un solo mapa grande, cada mapeador de los cuales toma un usuario como entrada y devuelve sus características como salida.

5.2 Combinar

Como ya escribí, generalmente la etapa más difícil al realizar una tarea Map-Reduce es la etapa de reproducción aleatoria. Esto sucede porque los resultados intermedios (salida del mapeador) se escriben en el disco, se clasifican y se transmiten a través de la red. Sin embargo, hay tareas en las que tal comportamiento no parece muy razonable. Por ejemplo, en la misma tarea de contar palabras en documentos, puede agregar previamente los resultados de las salidas de varios mapeadores en un nodo map-reduce de la tarea y pasar los valores ya sumados para cada máquina al reductor. .

En hadoop, para esto, puede definir una función de combinación que procesará la salida de parte de los mapeadores. La función de combinación es muy similar a reduce: toma la salida de algunos mapeadores como entrada y produce un resultado agregado para estos mapeadores, por lo que el reductor también se usa a menudo como un combinador. Una diferencia importante con reduce es que no todos los valores correspondientes a una tecla llegan a la función de combinación .

Además, hadoop no garantiza que la función de combinación se ejecutará en absoluto para la salida del mapeador. Por lo tanto, la función de combinación no siempre es aplicable, por ejemplo, en el caso de buscar el valor de la mediana por clave. No obstante, en aquellas tareas en las que es aplicable la función de combinación, su uso permite conseguir un aumento significativo de la velocidad de la tarea MapReduce.

Usando el combinador en hadoop:

interfaz nativa Transmisión de Hadoop

Al configurar el trabajo-a, especifique el combinador de clase. Como regla general, es lo mismo que Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Especifique el comando -combiner en las opciones de la línea de comandos. Normalmente, este comando es el mismo que el comando reducer. Ejemplo:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Cadenas de tareas de MapReduce

Hay situaciones en las que un MapReduce no es suficiente para resolver un problema. Por ejemplo, considere una tarea de WordCount ligeramente modificada: hay un conjunto de documentos de texto, necesita contar cuántas palabras ocurrieron de 1 a 1000 veces en el conjunto, cuántas palabras de 1001 a 2000, cuántas de 2001 a 3000, etcétera. Para la solución, necesitamos 2 trabajos de MapReduce:

  • Recuento de palabras modificado, que para cada palabra calculará en cuál de los intervalos cayó;
  • Un MapReduce que cuenta cuántas veces se encontró cada intervalo en la salida del primer MapReduce.

Solución de pseudocódigo:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Para ejecutar una secuencia de tareas de MapReduce en hadoop, basta con especificar la carpeta que se especificó como salida para la primera como entrada para la segunda tarea y ejecutarlas a su vez.

En la práctica, las cadenas de tareas de MapReduce pueden ser secuencias bastante complejas en las que las tareas de MapReduce se pueden conectar tanto secuencialmente como en paralelo entre sí. Para simplificar la gestión de dichos planes de ejecución de tareas, existen herramientas separadas como oozie y luigi, que se analizarán en un artículo separado de esta serie.

5.4 Caché distribuida

Un mecanismo importante en Hadoop es el caché distribuido. El caché distribuido le permite agregar archivos (por ejemplo, archivos de texto, archivos comprimidos, archivos jar) al entorno donde se ejecuta la tarea MapReduce.

Puede agregar archivos almacenados en HDFS, archivos locales (locales en la máquina desde la que se inicia la tarea). Ya mostré implícitamente cómo usar Distributed Cache con transmisión de hadoop agregando los archivos mapper.py y reducer.py a través de la opción -file. De hecho, puede agregar no solo mapper.py y reducer.py, sino también archivos arbitrarios en general, y luego usarlos como si estuvieran en una carpeta local.

Uso de caché distribuida:

API nativa
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Transmisión de Hadoop

#enumeramos los archivos que deben agregarse al caché distribuido en el parámetro –files. La opción --files debe venir antes que las otras opciones.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

ejemplo de uso:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Reducir unión

Aquellos que están acostumbrados a trabajar con bases de datos relacionales a menudo usan la muy conveniente operación Join, que les permite procesar conjuntamente los contenidos de algunas tablas uniéndolas de acuerdo con alguna clave. Cuando se trabaja con big data, a veces también surge este problema. Considere el siguiente ejemplo:

Hay registros de dos servidores web, cada registro se ve así:

t\t

Ejemplo de fragmento de registro:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Es necesario calcular para cada dirección IP cuál de los 2 servidores visitó con más frecuencia. El resultado debe tener la forma:

\t

Un ejemplo de una parte del resultado:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Desafortunadamente, a diferencia de las bases de datos relacionales, en general, unir dos registros por clave (en este caso, por dirección IP) es una operación bastante pesada y se resuelve usando 3 MapReduce y el patrón Reduce Join:

ReduceJoin funciona así:

1) Para cada uno de los registros de entrada, se inicia un MapReduce (solo mapa) separado, convirtiendo los datos de entrada al siguiente formulario:

key -> (type, value

Donde clave es la clave para unir tablas, Tipo es el tipo de tabla (primero o segundo en nuestro caso) y Valor es cualquier dato adicional vinculado a la clave.

2) Las salidas de ambos MapReduces se alimentan a la entrada del 3er MapReduce, que, de hecho, realiza la unión. Este MapReduce contiene un Mapeador vacío que simplemente copia la entrada. A continuación, shuffle descompone los datos en claves y los envía al reductor como entrada:

key -> [(type, value)]

Es importante que en este momento el reductor reciba registros de ambos registros, y al mismo tiempo, es posible identificar por el campo tipo de cuál de los dos registros proviene un determinado valor. Así que hay suficientes datos para resolver el problema original. En nuestro caso, el reductor simplemente tiene que calcular para cada clave de registro qué tipo ha encontrado más y generar este tipo.

5.6 Uniones de mapas

El patrón ReduceJoin describe el caso general de unir dos registros por clave. Sin embargo, hay un caso especial en el que la tarea se puede simplificar y acelerar significativamente. Este es el caso en el que uno de los registros es significativamente más pequeño que el otro. Considere el siguiente problema:

Hay 2 registros. El primer registro contiene el registro del servidor web (igual que en la tarea anterior), el segundo archivo (de 100 kb de tamaño) contiene la URL-> Coincidencia de tema. Ejemplo segundo archivo:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Para cada dirección IP, es necesario calcular las páginas de qué categoría de esta dirección IP se cargaron con mayor frecuencia.

En este caso, también necesitamos unir 2 registros por URL. Sin embargo, en este caso, no tenemos que ejecutar 3 MapReduce, ya que el segundo registro cabrá completamente en la memoria. Para resolver el problema usando el 1er MapReduce, podemos cargar el segundo registro en el Caché Distribuido, y cuando el Mapeador esté inicializado, simplemente léalo en la memoria, colocándolo en -> diccionario de temas.

Además, el problema se resuelve de la siguiente manera:

mapa:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

reducir:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce recibe una ip y una lista de todos los temas como entrada, simplemente calcula cuál de los temas se encontró con más frecuencia. Así, la tarea se resuelve utilizando el 1er MapReduce, y el propio Join generalmente se realiza dentro del mapa (por lo tanto, si no fuera necesaria una agregación adicional por clave, se podría prescindir del trabajo MapOnly):