4.1 Informações gerais sobre o Hadoop
![](https://cdn.codegym.cc/images/article/c546b694-2382-4fae-8483-aa03dd05b7c0/1024.jpeg)
O paradigma MapReduce foi proposto pelo Google em 2004 em seu artigo MapReduce: Simplified Data Processing on Large Clusters . Como o artigo proposto continha uma descrição do paradigma, mas faltava a implementação, vários programadores do Yahoo propuseram sua implementação como parte do trabalho no rastreador da web nutch. Você pode ler mais sobre a história do Hadoop no artigo The history of Hadoop: From 4 nodes to the future of data .
Inicialmente, o Hadoop era principalmente uma ferramenta para armazenar dados e executar tarefas MapReduce, mas agora o Hadoop é uma grande pilha de tecnologias relacionadas de uma forma ou de outra ao processamento de big data (não apenas com MapReduce).
Os principais (principais) componentes do Hadoop são:
- O Hadoop Distributed File System (HDFS) é um sistema de arquivos distribuído que permite armazenar informações de tamanho quase ilimitado.
- Hadoop YARN é uma estrutura para gerenciamento de recursos de cluster e gerenciamento de tarefas, incluindo a estrutura MapReduce.
- Hadoop comum
Há também um grande número de projetos diretamente relacionados ao Hadoop, mas não incluídos no núcleo do Hadoop:
- Hive - uma ferramenta para consultas semelhantes a SQL sobre big data (transforma consultas SQL em uma série de tarefas MapReduce);
- Pig é uma linguagem de programação para análise de dados de alto nível. Uma linha de código nessa linguagem pode se transformar em uma sequência de tarefas MapReduce;
- Hbase é um banco de dados colunar que implementa o paradigma BigTable;
- Cassandra é um banco de dados de valor-chave distribuído de alto desempenho;
- ZooKeeper é um serviço para armazenamento de configuração distribuído e sincronização de alterações de configuração;
- Mahout é uma biblioteca e mecanismo de aprendizado de máquina de big data.
Separadamente, gostaria de observar o projeto Apache Spark , que é um mecanismo para processamento de dados distribuídos. O Apache Spark normalmente usa componentes Hadoop como HDFS e YARN para seu trabalho, embora recentemente tenha se tornado mais popular que o Hadoop:
![](https://cdn.codegym.cc/images/article/10e9459a-0ec1-48a7-9f52-7cd7905cc93c/1024.jpeg)
Alguns desses componentes serão abordados em artigos separados nesta série de materiais, mas, por enquanto, vamos ver como você pode começar a trabalhar com o Hadoop e colocá-lo em prática.
4.2 Executando programas MapReduce no Hadoop
Agora vamos ver como executar uma tarefa MapReduce no Hadoop. Como tarefa, usaremos o clássico exemplo WordCount , discutido na lição anterior.
Deixe-me lembrá-lo da formulação do problema: existe um conjunto de documentos. É necessário para cada palavra que ocorre no conjunto de documentos contar quantas vezes a palavra ocorre no conjunto.
Solução:
Map divide o documento em palavras e retorna um conjunto de pares (palavra, 1).
Reduzir soma as ocorrências de cada palavra:
|
|
Agora a tarefa é programar esta solução na forma de código que possa ser executado no Hadoop e rodar.
4.3 Método número 1. Transmissão Hadoop
A maneira mais fácil de executar um programa MapReduce no Hadoop é usar a interface de streaming do Hadoop. A interface de streaming assume que map e reduce são implementados como programas que pegam dados de stdin e enviam para stdout .
O programa que executa a função map é chamado mapper. O programa que executa a redução é chamado, respectivamente, de redutor .
A interface Streaming assume por padrão que uma linha de entrada em um mapeador ou redutor corresponde a uma entrada de entrada para map .
A saída do mapeador chega à entrada do redutor na forma de pares (chave, valor), enquanto todos os pares correspondem à mesma chave:
- Garantido para ser processado por um único lançamento do redutor;
- Será submetido à entrada em uma linha (ou seja, se um redutor processar várias chaves diferentes, a entrada será agrupada por chave).
Então vamos implementar o mapper e o redutor em python:
#mapper.py
import sys
def do_map(doc):
for word in doc.split():
yield word.lower(), 1
for line in sys.stdin:
for key, value in do_map(line):
print(key + "\t" + str(value))
#reducer.py
import sys
def do_reduce(word, values):
return word, sum(values)
prev_key = None
values = []
for line in sys.stdin:
key, value = line.split("\t")
if key != prev_key and prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
values = []
prev_key = key
values.append(int(value))
if prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
Os dados que o Hadoop processará devem ser armazenados no HDFS. Vamos enviar nossos artigos e colocá-los no HDFS. Para fazer isso, use o comando hadoop fs :
wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz
tar xzvf lenta_articles.tar.gz
hadoop fs -put lenta_articles
O utilitário hadoop fs oferece suporte a um grande número de métodos para manipular o sistema de arquivos, muitos dos quais são idênticos aos utilitários Linux padrão.
Agora vamos iniciar a tarefa de streaming:
yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\
-input lenta_articles\
-output lenta_wordcount\
-file mapper.py\
-file reducer.py\
-mapper "python mapper.py"\
-reducer "python reducer.py"
O utilitário yarn é usado para iniciar e gerenciar vários aplicativos (incluindo os baseados em redução de mapa) em um cluster. Hadoop-streaming.jar é apenas um exemplo desse tipo de aplicativo yarn.
A seguir estão as opções de inicialização:
- entrada - pasta com dados de origem em hdfs;
- output - pasta em hdfs onde deseja colocar o resultado;
- arquivo - arquivos necessários durante a operação da tarefa map-reduce;
- mapper é o comando do console que será usado para o estágio do mapa;
- reduzir é o comando do console que será usado para o estágio de redução.
Após o lançamento, você pode ver o progresso da tarefa no console e um URL para visualizar informações mais detalhadas sobre a tarefa.
![](https://cdn.codegym.cc/images/article/844a189e-99f2-44f9-83cf-5e2e3cc25ca7/1024.jpeg)
Na interface disponível nesta URL, você pode saber mais detalhadamente o status de execução da tarefa, visualizar os logs de cada mapeador e redutor (o que é muito útil no caso de falhas nas tarefas).
![](https://cdn.codegym.cc/images/article/aa5f115f-3496-4551-800f-6bf1777d566f/1024.jpeg)
O resultado do trabalho após a execução bem-sucedida é adicionado ao HDFS na pasta que especificamos no campo de saída. Você pode visualizar seu conteúdo usando o comando "hadoop fs -ls lenta_wordcount".
O resultado em si pode ser obtido da seguinte forma:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
from
41
this
43
on
82
and
111
into
194
O comando "hadoop fs -text" exibe o conteúdo da pasta em forma de texto. Classifiquei o resultado pelo número de ocorrências das palavras. Como esperado, as palavras mais comuns no idioma são preposições.
4.4 Método número 2: usar Java
O próprio Hadoop é escrito em java, e a interface nativa do Hadoop também é baseada em java. Vamos mostrar como é um aplicativo java nativo para contagem de palavras:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Essa classe faz exatamente o mesmo que nosso exemplo Python. Criamos as classes TokenizerMapper e IntSumReducer derivando das classes Mapper e Reducer, respectivamente. As classes passadas como parâmetros de modelo especificam os tipos de valores de entrada e saída. A API nativa assume que a função map recebe um par chave-valor como entrada. Como em nosso caso a chave está vazia, simplesmente definimos Object como o tipo de chave.
No método Main, iniciamos a tarefa mapreduce e definimos seus parâmetros - nome, mapeador e redutor, o caminho no HDFS, onde estão os dados de entrada e onde colocar o resultado. Para compilar, precisamos de bibliotecas hadoop. Eu uso o Maven para construir, para o qual a cloudera tem um repositório. As instruções para configurá-lo podem ser encontradas aqui. Como resultado, o arquivo pom.xmp (que é usado pelo maven para descrever a montagem do projeto) obtive o seguinte):
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
</dependencies>
<groupId>org.dca.examples</groupId>
<artifactId>wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
</project>
Vamos compilar o projeto em um pacote jar:
mvn clean package
Após compilar o projeto em um arquivo jar, o lançamento ocorre de forma semelhante, como no caso da interface de streaming:
yarn jar wordcount-1.0-SNAPSHOT.jar WordCount
Aguardamos a execução e verificamos o resultado:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
from
41
this
43
on
82
and
111
into
194
Como você pode imaginar, o resultado da execução de nosso aplicativo nativo é o mesmo que o resultado do aplicativo de streaming que iniciamos da maneira anterior.
GO TO FULL VERSION