4.1 Informações gerais sobre o Hadoop

O paradigma MapReduce foi proposto pelo Google em 2004 em seu artigo MapReduce: Simplified Data Processing on Large Clusters . Como o artigo proposto continha uma descrição do paradigma, mas faltava a implementação, vários programadores do Yahoo propuseram sua implementação como parte do trabalho no rastreador da web nutch. Você pode ler mais sobre a história do Hadoop no artigo The history of Hadoop: From 4 nodes to the future of data .

Inicialmente, o Hadoop era principalmente uma ferramenta para armazenar dados e executar tarefas MapReduce, mas agora o Hadoop é uma grande pilha de tecnologias relacionadas de uma forma ou de outra ao processamento de big data (não apenas com MapReduce).

Os principais (principais) componentes do Hadoop são:

  • O Hadoop Distributed File System (HDFS) é um sistema de arquivos distribuído que permite armazenar informações de tamanho quase ilimitado.
  • Hadoop YARN é uma estrutura para gerenciamento de recursos de cluster e gerenciamento de tarefas, incluindo a estrutura MapReduce.
  • Hadoop comum

Há também um grande número de projetos diretamente relacionados ao Hadoop, mas não incluídos no núcleo do Hadoop:

  • Hive - uma ferramenta para consultas semelhantes a SQL sobre big data (transforma consultas SQL em uma série de tarefas MapReduce);
  • Pig é uma linguagem de programação para análise de dados de alto nível. Uma linha de código nessa linguagem pode se transformar em uma sequência de tarefas MapReduce;
  • Hbase é um banco de dados colunar que implementa o paradigma BigTable;
  • Cassandra é um banco de dados de valor-chave distribuído de alto desempenho;
  • ZooKeeper é um serviço para armazenamento de configuração distribuído e sincronização de alterações de configuração;
  • Mahout é uma biblioteca e mecanismo de aprendizado de máquina de big data.

Separadamente, gostaria de observar o projeto Apache Spark , que é um mecanismo para processamento de dados distribuídos. O Apache Spark normalmente usa componentes Hadoop como HDFS e YARN para seu trabalho, embora recentemente tenha se tornado mais popular que o Hadoop:

Alguns desses componentes serão abordados em artigos separados nesta série de materiais, mas, por enquanto, vamos ver como você pode começar a trabalhar com o Hadoop e colocá-lo em prática.

4.2 Executando programas MapReduce no Hadoop

Agora vamos ver como executar uma tarefa MapReduce no Hadoop. Como tarefa, usaremos o clássico exemplo WordCount , discutido na lição anterior.

Deixe-me lembrá-lo da formulação do problema: existe um conjunto de documentos. É necessário para cada palavra que ocorre no conjunto de documentos contar quantas vezes a palavra ocorre no conjunto.

Solução:

Map divide o documento em palavras e retorna um conjunto de pares (palavra, 1).

Reduzir soma as ocorrências de cada palavra:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Agora a tarefa é programar esta solução na forma de código que possa ser executado no Hadoop e rodar.

4.3 Método número 1. Transmissão Hadoop

A maneira mais fácil de executar um programa MapReduce no Hadoop é usar a interface de streaming do Hadoop. A interface de streaming assume que map e reduce são implementados como programas que pegam dados de stdin e enviam para stdout .

O programa que executa a função map é chamado mapper. O programa que executa a redução é chamado, respectivamente, de redutor .

A interface Streaming assume por padrão que uma linha de entrada em um mapeador ou redutor corresponde a uma entrada de entrada para map .

A saída do mapeador chega à entrada do redutor na forma de pares (chave, valor), enquanto todos os pares correspondem à mesma chave:

  • Garantido para ser processado por um único lançamento do redutor;
  • Será submetido à entrada em uma linha (ou seja, se um redutor processar várias chaves diferentes, a entrada será agrupada por chave).

Então vamos implementar o mapper e o redutor em python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Os dados que o Hadoop processará devem ser armazenados no HDFS. Vamos enviar nossos artigos e colocá-los no HDFS. Para fazer isso, use o comando hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

O utilitário hadoop fs oferece suporte a um grande número de métodos para manipular o sistema de arquivos, muitos dos quais são idênticos aos utilitários Linux padrão.

Agora vamos iniciar a tarefa de streaming:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

O utilitário yarn é usado para iniciar e gerenciar vários aplicativos (incluindo os baseados em redução de mapa) em um cluster. Hadoop-streaming.jar é apenas um exemplo desse tipo de aplicativo yarn.

A seguir estão as opções de inicialização:

  • entrada - pasta com dados de origem em hdfs;
  • output - pasta em hdfs onde deseja colocar o resultado;
  • arquivo - arquivos necessários durante a operação da tarefa map-reduce;
  • mapper é o comando do console que será usado para o estágio do mapa;
  • reduzir é o comando do console que será usado para o estágio de redução.

Após o lançamento, você pode ver o progresso da tarefa no console e um URL para visualizar informações mais detalhadas sobre a tarefa.

Na interface disponível nesta URL, você pode saber mais detalhadamente o status de execução da tarefa, visualizar os logs de cada mapeador e redutor (o que é muito útil no caso de falhas nas tarefas).

O resultado do trabalho após a execução bem-sucedida é adicionado ao HDFS na pasta que especificamos no campo de saída. Você pode visualizar seu conteúdo usando o comando "hadoop fs -ls lenta_wordcount".

O resultado em si pode ser obtido da seguinte forma:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

O comando "hadoop fs -text" exibe o conteúdo da pasta em forma de texto. Classifiquei o resultado pelo número de ocorrências das palavras. Como esperado, as palavras mais comuns no idioma são preposições.

4.4 Método número 2: usar Java

O próprio Hadoop é escrito em java, e a interface nativa do Hadoop também é baseada em java. Vamos mostrar como é um aplicativo java nativo para contagem de palavras:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Essa classe faz exatamente o mesmo que nosso exemplo Python. Criamos as classes TokenizerMapper e IntSumReducer derivando das classes Mapper e Reducer, respectivamente. As classes passadas como parâmetros de modelo especificam os tipos de valores de entrada e saída. A API nativa assume que a função map recebe um par chave-valor como entrada. Como em nosso caso a chave está vazia, simplesmente definimos Object como o tipo de chave.

No método Main, iniciamos a tarefa mapreduce e definimos seus parâmetros - nome, mapeador e redutor, o caminho no HDFS, onde estão os dados de entrada e onde colocar o resultado. Para compilar, precisamos de bibliotecas hadoop. Eu uso o Maven para construir, para o qual a cloudera tem um repositório. As instruções para configurá-lo podem ser encontradas aqui. Como resultado, o arquivo pom.xmp (que é usado pelo maven para descrever a montagem do projeto) obtive o seguinte):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Vamos compilar o projeto em um pacote jar:

mvn clean package

Após compilar o projeto em um arquivo jar, o lançamento ocorre de forma semelhante, como no caso da interface de streaming:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Aguardamos a execução e verificamos o resultado:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Como você pode imaginar, o resultado da execução de nosso aplicativo nativo é o mesmo que o resultado do aplicativo de streaming que iniciamos da maneira anterior.