5.1 Mapear apenas o trabalho
É hora de descrever várias técnicas que permitem usar o MapReduce de forma eficaz para resolver problemas práticos, bem como mostrar alguns dos recursos do Hadoop que podem simplificar o desenvolvimento ou acelerar significativamente a execução de uma tarefa MapReduce em um cluster.
Como lembramos, o MapReduce consiste nos estágios Map, Shuffle e Reduce. Via de regra, a etapa Shuffle acaba sendo a mais difícil em tarefas práticas, pois os dados são ordenados nesta etapa. De fato, há uma série de tarefas nas quais o estágio Mapa sozinho pode ser dispensado. Aqui estão exemplos de tais tarefas:
- Filtragem de dados (por exemplo, "Encontrar todos os registros do endereço IP 123.123.123.123" nos logs do servidor web);
- Transformação de dados (“Excluir coluna em csv-logs”);
- Carregando e descarregando dados de uma fonte externa (“Inserir todos os registros do log no banco de dados”).
Essas tarefas são resolvidas usando Map-Only. Ao criar uma tarefa Map-Only no Hadoop, você precisa especificar o número zero de redutores:

Um exemplo de configuração de tarefa apenas de mapa no hadoop:
interface nativa | Interface de Transmissão do Hadoop |
---|---|
Especifique o número zero de redutores ao configurar job'a:
|
Não especificamos um redutor e especificamos um número zero de redutores. Exemplo:
|
Os trabalhos Map Only podem realmente ser muito úteis. Por exemplo, na plataforma Facetz.DCA, para identificar as características dos usuários por seu comportamento, é utilizado precisamente um grande mapa-somente, cada mapeador recebe um usuário como entrada e retorna suas características como saída.
5.2 Combinar
Como já escrevi, geralmente o estágio mais difícil ao executar uma tarefa Map-Reduce é o estágio shuffle. Isso acontece porque os resultados intermediários (saída do mapeador) são gravados no disco, classificados e transmitidos pela rede. No entanto, existem tarefas em que tal comportamento não parece muito razoável. Por exemplo, na mesma tarefa de contagem de palavras em documentos, você pode pré-agregar os resultados das saídas de vários mapeadores em um nó map-reduce da tarefa e passar os valores já somados de cada máquina para o redutor .

No hadoop, para isso, você pode definir uma função de combinação que processará a saída de parte dos mapeadores. A função de combinação é muito parecida com a redução - ela pega a saída de alguns mapeadores como entrada e produz um resultado agregado para esses mapeadores, portanto, o redutor também costuma ser usado como um combinador. Uma diferença importante de reduzir é que nem todos os valores correspondentes a uma chave chegam à função de combinação .
Além disso, o hadoop não garante que a função combine seja executada para a saída do mapeador. Portanto, nem sempre a função de combinação é aplicável, por exemplo, no caso de busca do valor da mediana por chave. No entanto, naquelas tarefas onde a função de combinação é aplicável, seu uso permite alcançar um aumento significativo na velocidade da tarefa MapReduce.
Usando o combinador no hadoop:
interface nativa | Transmissão do Hadoop |
---|---|
Ao configurar o job-a, especifique a classe-Combiner. Via de regra, é o mesmo que Redutor:
|
Especifique o comando -combiner nas opções de linha de comando. Normalmente, esse comando é igual ao comando redutor. Exemplo:
|
5.3 Cadeias de tarefas MapReduce
Existem situações em que um MapReduce não é suficiente para resolver um problema. Por exemplo, considere uma tarefa WordCount ligeiramente modificada: há um conjunto de documentos de texto, você precisa contar quantas palavras ocorreram de 1 a 1000 vezes no conjunto, quantas palavras de 1001 a 2000, quantas de 2001 a 3000, e assim por diante. Para a solução, precisamos de 2 jobs MapReduce:
- Contagem de palavras modificada, que para cada palavra calculará em qual dos intervalos ela caiu;
- Um MapReduce que conta quantas vezes cada intervalo foi encontrado na saída do primeiro MapReduce.
Solução de pseudocódigo:
|
|
|
|
Para executar uma sequência de tarefas MapReduce no hadoop, basta especificar a pasta que foi especificada como saída para a primeira como entrada para a segunda tarefa e executá-las sucessivamente.
Na prática, as cadeias de tarefas MapReduce podem ser sequências bastante complexas nas quais as tarefas MapReduce podem ser conectadas sequencialmente e em paralelo entre si. Para simplificar o gerenciamento de tais planos de execução de tarefas, existem ferramentas separadas como oozie e luigi, que serão discutidas em um artigo separado desta série.

5.4 Cache distribuído
Um mecanismo importante no Hadoop é o Cache Distribuído. O cache distribuído permite adicionar arquivos (por exemplo, arquivos de texto, arquivos, arquivos jar) ao ambiente onde a tarefa MapReduce está sendo executada.
Você pode adicionar arquivos armazenados no HDFS, arquivos locais (local para a máquina da qual a tarefa é iniciada). Já mostrei implicitamente como usar o Cache distribuído com streaming de hadoop adicionando os arquivos mapper.py e reducer.py por meio da opção -file. Na verdade, você pode adicionar não apenas mapper.py e reducer.py, mas arquivos arbitrários em geral e usá-los como se estivessem em uma pasta local.
Usando Cache Distribuído:
API nativa |
---|
|
Transmissão Hadoop |
---|
#listamos os arquivos que precisam ser adicionados ao cache distribuído no parâmetro –files. A opção --files deve vir antes das outras opções. exemplo de uso:
|
5.5 Reduzir junção
Aqueles que estão acostumados a trabalhar com bancos de dados relacionais costumam usar a operação Join muito conveniente, que permite processar conjuntamente o conteúdo de algumas tabelas, unindo-as de acordo com alguma chave. Ao trabalhar com big data, às vezes esse problema também surge. Considere o seguinte exemplo:
Existem logs de dois servidores web, cada log se parece com isto:
t\t
Exemplo de trecho de log:
1446792139
178.78.82.1
/sphingosine/unhurrying.css
1446792139
126.31.163.222
/accentually.js
1446792139
154.164.149.83
/pyroacid/unkemptly.jpg
1446792139
202.27.13.181
/Chawia.js
1446792139
67.123.248.174
/morphographical/dismain.css
1446792139
226.74.123.135
/phanerite.php
1446792139
157.109.106.104
/bisonant.css
É necessário calcular para cada endereço IP qual dos 2 servidores ele visitou com mais frequência. O resultado deve estar na forma:
\t
Um exemplo de uma parte do resultado:
178.78.82.1
first
126.31.163.222
second
154.164.149.83
second
226.74.123.135
first
Infelizmente, ao contrário dos bancos de dados relacionais, em geral, unir dois logs por chave (neste caso, por endereço IP) é uma operação bastante pesada e é resolvida usando 3 MapReduce e o padrão Reduce Join:

O ReduceJoin funciona assim:
1) Para cada um dos logs de entrada, um MapReduce separado (somente mapa) é iniciado, convertendo os dados de entrada no seguinte formato:
key -> (type, value
Onde chave é a chave para unir as tabelas, Tipo é o tipo da tabela (primeiro ou segundo em nosso caso) e Valor são quaisquer dados adicionais vinculados à chave.
2) As saídas de ambos os MapReduces são alimentadas na entrada do 3º MapReduce, que, de fato, realiza a união. Este MapReduce contém um Mapper vazio que simplesmente copia a entrada. Em seguida, o shuffle decompõe os dados em chaves e os alimenta no redutor como entrada:
key -> [(type, value)]
É importante que neste momento o redutor receba os registros dos dois logs, e ao mesmo tempo, seja possível identificar pelo campo type de qual dos dois logs veio determinado valor. Portanto, há dados suficientes para resolver o problema original. Em nosso caso, o redutor simplesmente precisa calcular para cada chave de registro qual tipo encontrou mais e gerar esse tipo.
5.6 MapJoins
O padrão ReduceJoin descreve o caso geral de unir dois logs por chave. No entanto, há um caso especial em que a tarefa pode ser significativamente simplificada e acelerada. É o caso em que uma das toras é significativamente menor que a outra. Considere o seguinte problema:
Existem 2 registros. O primeiro log contém o log do servidor web (igual ao da tarefa anterior), o segundo arquivo (100kb de tamanho) contém a URL-> correspondência de tema. Exemplo 2º arquivo:
/toyota.php
auto
/football/spartak.html
sport
/cars
auto
/finances/money
business
Para cada endereço IP, é necessário calcular as páginas de qual categoria desse endereço IP foram carregadas com mais frequência.
Neste caso também precisamos juntar 2 logs por URL. No entanto, neste caso, não precisamos executar 3 MapReduce, pois o segundo log caberá completamente na memória. Para resolver o problema usando o 1º MapReduce, podemos carregar o segundo log no Cache Distribuído, e quando o Mapper for inicializado, basta ler na memória, colocando no -> dicionário de tópicos.
Além disso, o problema é resolvido da seguinte forma:
mapa:
# find the subject of each of the pages of the first log
input_line -> [ip, topic]
reduzir:
Ip -> [topics] -> [ip, most_popular_topic]
Reduzir recebe um ip e uma lista de todos os tópicos como entrada, ele simplesmente calcula qual dos tópicos foi encontrado com mais frequência. Assim, a tarefa é resolvida usando o 1º MapReduce, e o Join real geralmente ocorre dentro do mapa (portanto, se não fosse necessária agregação adicional por chave, o trabalho MapOnly poderia ser dispensado):

GO TO FULL VERSION