4.1 Información general sobre Hadoop

El paradigma MapReduce fue propuesto por Google en 2004 en su artículo MapReduce: Simplified Data Processing on Large Clusters . Dado que el artículo propuesto contenía una descripción del paradigma, pero faltaba la implementación, varios programadores de Yahoo propusieron su implementación como parte del trabajo en el rastreador web chiflado. Puedes leer más sobre la historia de Hadoop en el artículo La historia de Hadoop: De 4 nodos al futuro de los datos .

Inicialmente, Hadoop era principalmente una herramienta para almacenar datos y ejecutar tareas de MapReduce, pero ahora Hadoop es una gran cantidad de tecnologías relacionadas de una forma u otra con el procesamiento de grandes datos (no solo con MapReduce).

Los componentes principales (básicos) de Hadoop son:

  • Hadoop Distributed File System (HDFS) es un sistema de archivos distribuido que le permite almacenar información de tamaño casi ilimitado.
  • Hadoop YARN es un marco para la administración de tareas y recursos de clúster, incluido el marco MapReduce.
  • Hadoop común

También hay una gran cantidad de proyectos directamente relacionados con Hadoop, pero no incluidos en el núcleo de Hadoop:

  • Hive : una herramienta para consultas similares a SQL sobre big data (convierte las consultas SQL en una serie de tareas de MapReduce);
  • Pig es un lenguaje de programación para el análisis de datos de alto nivel. Una línea de código en este lenguaje puede convertirse en una secuencia de tareas de MapReduce;
  • Hbase es una base de datos en columnas que implementa el paradigma BigTable;
  • Cassandra es una base de datos clave-valor distribuida de alto rendimiento;
  • ZooKeeper es un servicio de almacenamiento de configuración distribuida y sincronización de cambios de configuración;
  • Mahout es una biblioteca y motor de aprendizaje automático de big data.

Por separado, me gustaría señalar el proyecto Apache Spark , que es un motor para el procesamiento de datos distribuidos. Apache Spark generalmente usa componentes de Hadoop como HDFS e YARN para su trabajo, mientras que recientemente se ha vuelto más popular que Hadoop:

Algunos de estos componentes se tratarán en artículos separados en esta serie de materiales, pero por ahora, veamos cómo puede comenzar a trabajar con Hadoop y ponerlo en práctica.

4.2 Ejecución de programas MapReduce en Hadoop

Ahora veamos cómo ejecutar una tarea de MapReduce en Hadoop. Como tarea, usaremos el ejemplo clásico de WordCount , que se discutió en la lección anterior.

Permítanme recordarles la formulación del problema: hay un conjunto de documentos. Es necesario que cada palabra que aparece en el conjunto de documentos cuente cuántas veces aparece la palabra en el conjunto.

Solución:

Mapa divide el documento en palabras y devuelve un conjunto de pares (palabra, 1).

Reducir suma las ocurrencias de cada palabra:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Ahora la tarea es programar esta solución en forma de código que se pueda ejecutar en Hadoop y correr.

4.3 Método número 1. Transmisión de Hadoop

La forma más fácil de ejecutar un programa MapReduce en Hadoop es usar la interfaz de transmisión de Hadoop. La interfaz de transmisión asume que map y reduce se implementan como programas que toman datos de stdin y los envían a stdout .

El programa que ejecuta la función map se llama mapper. El programa que ejecuta reduce se llama, respectivamente, reducer .

La interfaz Streaming asume por defecto que una línea entrante en un mapeador o reductor corresponde a una entrada entrante para map .

La salida del mapeador llega a la entrada del reductor en forma de pares (clave, valor), mientras que todos los pares corresponden a la misma clave:

  • Garantizado para ser procesado por un solo lanzamiento del reductor;
  • Se enviará a la entrada en una fila (es decir, si un reductor procesa varias claves diferentes, la entrada se agrupará por clave).

Así que implementemos mapeador y reductor en python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Los datos que procesará Hadoop deben almacenarse en HDFS. Carguemos nuestros artículos y póngalos en HDFS. Para hacer esto, use el comando hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

La utilidad hadoop fs admite una gran cantidad de métodos para manipular el sistema de archivos, muchos de los cuales son idénticos a las utilidades estándar de Linux.

Ahora comencemos la tarea de transmisión:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

La utilidad yarn se usa para iniciar y administrar varias aplicaciones (incluidas las basadas en map-reduce) en un clúster. Hadoop-streaming.jar es solo un ejemplo de una aplicación de este tipo.

A continuación se muestran las opciones de lanzamiento:

  • entrada - carpeta con datos de origen en hdfs;
  • salida - carpeta en hdfs donde desea colocar el resultado;
  • archivo: archivos que se necesitan durante la operación de la tarea de reducción de mapas;
  • mapper es el comando de la consola que se usará para la etapa del mapa;
  • reduce es el comando de la consola que se usará para la etapa de reducción.

Después del lanzamiento, puede ver el progreso de la tarea en la consola y una URL para ver información más detallada sobre la tarea.

En la interfaz disponible en esta URL, puede encontrar un estado de ejecución de tareas más detallado, ver los registros de cada mapeador y reductor (lo cual es muy útil en caso de tareas fallidas).

El resultado del trabajo después de la ejecución exitosa se agrega a HDFS en la carpeta que especificamos en el campo de salida. Puede ver su contenido usando el comando "hadoop fs -ls lenta_wordcount".

El resultado en sí se puede obtener de la siguiente manera:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

El comando "hadoop fs -text" muestra el contenido de la carpeta en forma de texto. Ordené el resultado por el número de ocurrencias de las palabras. Como era de esperar, las palabras más comunes en el idioma son las preposiciones.

4.4 Método número 2: usar Java

Hadoop en sí está escrito en Java, y la interfaz nativa de Hadoop también está basada en Java. Vamos a mostrar cómo se ve una aplicación Java nativa para el recuento de palabras:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Esta clase hace exactamente lo mismo que nuestro ejemplo de Python. Creamos las clases TokenizerMapper e IntSumReducer derivando de las clases Mapper y Reducer, respectivamente. Las clases pasadas como parámetros de plantilla especifican los tipos de valores de entrada y salida. La API nativa asume que la función de mapa recibe un par clave-valor como entrada. Dado que en nuestro caso la clave está vacía, simplemente definimos Objeto como el tipo de clave.

En el método principal, comenzamos la tarea mapreduce y definimos sus parámetros: nombre, mapeador y reductor, la ruta en HDFS, dónde se encuentran los datos de entrada y dónde colocar el resultado. Para compilar, necesitamos bibliotecas Hadoop. Uso Maven para compilar, para lo cual cloudera tiene un repositorio. Las instrucciones para configurarlo se pueden encontrar aquí. Como resultado, el archivo pom.xmp (que es utilizado por maven para describir el ensamblaje del proyecto) obtuvo lo siguiente):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Compilemos el proyecto en un paquete jar:

mvn clean package

Después de compilar el proyecto en un archivo jar, el lanzamiento ocurre de manera similar, como en el caso de la interfaz de transmisión:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Esperamos la ejecución y comprobamos el resultado:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Como puede suponer, el resultado de ejecutar nuestra aplicación nativa es el mismo que el resultado de la aplicación de transmisión que lanzamos de la forma anterior.