5.1 Mapear apenas o trabalho

É hora de descrever várias técnicas que permitem usar o MapReduce de forma eficaz para resolver problemas práticos, bem como mostrar alguns dos recursos do Hadoop que podem simplificar o desenvolvimento ou acelerar significativamente a execução de uma tarefa MapReduce em um cluster.

Como lembramos, o MapReduce consiste nos estágios Map, Shuffle e Reduce. Via de regra, a etapa Shuffle acaba sendo a mais difícil em tarefas práticas, pois os dados são ordenados nesta etapa. De fato, há uma série de tarefas nas quais o estágio Mapa sozinho pode ser dispensado. Aqui estão exemplos de tais tarefas:

  • Filtragem de dados (por exemplo, "Encontrar todos os registros do endereço IP 123.123.123.123" nos logs do servidor web);
  • Transformação de dados (“Excluir coluna em csv-logs”);
  • Carregando e descarregando dados de uma fonte externa (“Inserir todos os registros do log no banco de dados”).

Essas tarefas são resolvidas usando Map-Only. Ao criar uma tarefa Map-Only no Hadoop, você precisa especificar o número zero de redutores:

Um exemplo de configuração de tarefa apenas de mapa no hadoop:

interface nativa Interface de Transmissão do Hadoop

Especifique o número zero de redutores ao configurar job'a:

job.setNumReduceTasks(0); 

Não especificamos um redutor e especificamos um número zero de redutores. Exemplo:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Os trabalhos Map Only podem realmente ser muito úteis. Por exemplo, na plataforma Facetz.DCA, para identificar as características dos usuários por seu comportamento, é utilizado precisamente um grande mapa-somente, cada mapeador recebe um usuário como entrada e retorna suas características como saída.

5.2 Combinar

Como já escrevi, geralmente o estágio mais difícil ao executar uma tarefa Map-Reduce é o estágio shuffle. Isso acontece porque os resultados intermediários (saída do mapeador) são gravados no disco, classificados e transmitidos pela rede. No entanto, existem tarefas em que tal comportamento não parece muito razoável. Por exemplo, na mesma tarefa de contagem de palavras em documentos, você pode pré-agregar os resultados das saídas de vários mapeadores em um nó map-reduce da tarefa e passar os valores já somados de cada máquina para o redutor .

No hadoop, para isso, você pode definir uma função de combinação que processará a saída de parte dos mapeadores. A função de combinação é muito parecida com a redução - ela pega a saída de alguns mapeadores como entrada e produz um resultado agregado para esses mapeadores, portanto, o redutor também costuma ser usado como um combinador. Uma diferença importante de reduzir é que nem todos os valores correspondentes a uma chave chegam à função de combinação .

Além disso, o hadoop não garante que a função combine seja executada para a saída do mapeador. Portanto, nem sempre a função de combinação é aplicável, por exemplo, no caso de busca do valor da mediana por chave. No entanto, naquelas tarefas onde a função de combinação é aplicável, seu uso permite alcançar um aumento significativo na velocidade da tarefa MapReduce.

Usando o combinador no hadoop:

interface nativa Transmissão do Hadoop

Ao configurar o job-a, especifique a classe-Combiner. Via de regra, é o mesmo que Redutor:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Especifique o comando -combiner nas opções de linha de comando. Normalmente, esse comando é igual ao comando redutor. Exemplo:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Cadeias de tarefas MapReduce

Existem situações em que um MapReduce não é suficiente para resolver um problema. Por exemplo, considere uma tarefa WordCount ligeiramente modificada: há um conjunto de documentos de texto, você precisa contar quantas palavras ocorreram de 1 a 1000 vezes no conjunto, quantas palavras de 1001 a 2000, quantas de 2001 a 3000, e assim por diante. Para a solução, precisamos de 2 jobs MapReduce:

  • Contagem de palavras modificada, que para cada palavra calculará em qual dos intervalos ela caiu;
  • Um MapReduce que conta quantas vezes cada intervalo foi encontrado na saída do primeiro MapReduce.

Solução de pseudocódigo:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Para executar uma sequência de tarefas MapReduce no hadoop, basta especificar a pasta que foi especificada como saída para a primeira como entrada para a segunda tarefa e executá-las sucessivamente.

Na prática, as cadeias de tarefas MapReduce podem ser sequências bastante complexas nas quais as tarefas MapReduce podem ser conectadas sequencialmente e em paralelo entre si. Para simplificar o gerenciamento de tais planos de execução de tarefas, existem ferramentas separadas como oozie e luigi, que serão discutidas em um artigo separado desta série.

5.4 Cache distribuído

Um mecanismo importante no Hadoop é o Cache Distribuído. O cache distribuído permite adicionar arquivos (por exemplo, arquivos de texto, arquivos, arquivos jar) ao ambiente onde a tarefa MapReduce está sendo executada.

Você pode adicionar arquivos armazenados no HDFS, arquivos locais (local para a máquina da qual a tarefa é iniciada). Já mostrei implicitamente como usar o Cache distribuído com streaming de hadoop adicionando os arquivos mapper.py e reducer.py por meio da opção -file. Na verdade, você pode adicionar não apenas mapper.py e reducer.py, mas arquivos arbitrários em geral e usá-los como se estivessem em uma pasta local.

Usando Cache Distribuído:

API nativa
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Transmissão Hadoop

#listamos os arquivos que precisam ser adicionados ao cache distribuído no parâmetro –files. A opção --files deve vir antes das outras opções.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

exemplo de uso:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Reduzir junção

Aqueles que estão acostumados a trabalhar com bancos de dados relacionais costumam usar a operação Join muito conveniente, que permite processar conjuntamente o conteúdo de algumas tabelas, unindo-as de acordo com alguma chave. Ao trabalhar com big data, às vezes esse problema também surge. Considere o seguinte exemplo:

Existem logs de dois servidores web, cada log se parece com isto:

t\t

Exemplo de trecho de log:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

É necessário calcular para cada endereço IP qual dos 2 servidores ele visitou com mais frequência. O resultado deve estar na forma:

\t

Um exemplo de uma parte do resultado:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Infelizmente, ao contrário dos bancos de dados relacionais, em geral, unir dois logs por chave (neste caso, por endereço IP) é uma operação bastante pesada e é resolvida usando 3 MapReduce e o padrão Reduce Join:

O ReduceJoin funciona assim:

1) Para cada um dos logs de entrada, um MapReduce separado (somente mapa) é iniciado, convertendo os dados de entrada no seguinte formato:

key -> (type, value

Onde chave é a chave para unir as tabelas, Tipo é o tipo da tabela (primeiro ou segundo em nosso caso) e Valor são quaisquer dados adicionais vinculados à chave.

2) As saídas de ambos os MapReduces são alimentadas na entrada do 3º MapReduce, que, de fato, realiza a união. Este MapReduce contém um Mapper vazio que simplesmente copia a entrada. Em seguida, o shuffle decompõe os dados em chaves e os alimenta no redutor como entrada:

key -> [(type, value)]

É importante que neste momento o redutor receba os registros dos dois logs, e ao mesmo tempo, seja possível identificar pelo campo type de qual dos dois logs veio determinado valor. Portanto, há dados suficientes para resolver o problema original. Em nosso caso, o redutor simplesmente precisa calcular para cada chave de registro qual tipo encontrou mais e gerar esse tipo.

5.6 MapJoins

O padrão ReduceJoin descreve o caso geral de unir dois logs por chave. No entanto, há um caso especial em que a tarefa pode ser significativamente simplificada e acelerada. É o caso em que uma das toras é significativamente menor que a outra. Considere o seguinte problema:

Existem 2 registros. O primeiro log contém o log do servidor web (igual ao da tarefa anterior), o segundo arquivo (100kb de tamanho) contém a URL-> correspondência de tema. Exemplo 2º arquivo:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Para cada endereço IP, é necessário calcular as páginas de qual categoria desse endereço IP foram carregadas com mais frequência.

Neste caso também precisamos juntar 2 logs por URL. No entanto, neste caso, não precisamos executar 3 MapReduce, pois o segundo log caberá completamente na memória. Para resolver o problema usando o 1º MapReduce, podemos carregar o segundo log no Cache Distribuído, e quando o Mapper for inicializado, basta ler na memória, colocando no -> dicionário de tópicos.

Além disso, o problema é resolvido da seguinte forma:

mapa:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

reduzir:


Ip -> [topics] -> [ip, most_popular_topic]

Reduzir recebe um ip e uma lista de todos os tópicos como entrada, ele simplesmente calcula qual dos tópicos foi encontrado com mais frequência. Assim, a tarefa é resolvida usando o 1º MapReduce, e o Join real geralmente ocorre dentro do mapa (portanto, se não fosse necessária agregação adicional por chave, o trabalho MapOnly poderia ser dispensado):