5.1 Mappa solo lavoro

È giunto il momento di descrivere varie tecniche che consentono di utilizzare efficacemente MapReduce per risolvere problemi pratici, oltre a mostrare alcune delle funzionalità di Hadoop che possono semplificare lo sviluppo o velocizzare notevolmente l'esecuzione di un'attività MapReduce su un cluster.

Come ricordiamo, MapReduce consiste nelle fasi Map, Shuffle e Reduce. Di norma, la fase Shuffle risulta essere la più difficile nelle attività pratiche, poiché i dati vengono ordinati in questa fase. In effetti, ci sono una serie di compiti in cui è possibile fare a meno della sola fase Mappa. Ecco alcuni esempi di tali attività:

  • Filtraggio dei dati (ad esempio, "Trova tutti i record dall'indirizzo IP 123.123.123.123" nei log del server web);
  • Trasformazione dei dati ("Elimina colonna in csv-logs");
  • Caricamento e scaricamento di dati da una fonte esterna ("Inserisci tutti i record dal registro nel database").

Tali attività vengono risolte utilizzando Map-Only. Quando si crea un'attività Solo mappa in Hadoop, è necessario specificare un numero zero di riduttori:

Un esempio di configurazione di un'attività solo mappa su hadoop:

interfaccia nativa Interfaccia di streaming Hadoop

Specificare il numero zero di riduttori durante la configurazione di job'a:

job.setNumReduceTasks(0); 

Non specifichiamo un riduttore e specifichiamo un numero zero di riduttori. Esempio:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

I lavori Map Only possono effettivamente essere molto utili. Ad esempio, nella piattaforma Facetz.DCA, per identificare le caratteristiche degli utenti in base al loro comportamento, viene utilizzata proprio una grande mappa-sola, ogni mappatore della quale prende un utente come input e restituisce le sue caratteristiche come output.

5.2 Combina

Come ho già scritto, di solito la fase più difficile quando si esegue un'attività Map-Reduce è la fase di shuffle. Ciò accade perché i risultati intermedi (l'output del mappatore) vengono scritti su disco, ordinati e trasmessi sulla rete. Tuttavia, ci sono compiti in cui tale comportamento non sembra molto ragionevole. Ad esempio, nella stessa attività di conteggio delle parole nei documenti, è possibile pre-aggregare i risultati degli output di diversi mappatori su un nodo map-reduce dell'attività e passare i valori già sommati per ciascuna macchina al riduttore .

In hadoop, per questo, puoi definire una funzione di combinazione che elaborerà l'output di parte dei mappatori. La funzione di combinazione è molto simile alla riduzione: prende l'output di alcuni mappatori come input e produce un risultato aggregato per questi mappatori, quindi il riduttore viene spesso utilizzato anche come combinatore. Una differenza importante rispetto a reduce è che non tutti i valori corrispondenti a una chiave arrivano alla funzione di combinazione .

Inoltre, hadoop non garantisce affatto che la funzione di combinazione verrà eseguita per l'output del mapper. Pertanto, la funzione di combinazione non è sempre applicabile, ad esempio, nel caso di ricerca del valore mediano per chiave. Tuttavia, in quei compiti in cui è applicabile la funzione di combinazione, il suo utilizzo consente di ottenere un aumento significativo della velocità del compito MapReduce.

Utilizzando il Combiner su hadoop:

interfaccia nativa Streaming Hadoop

Quando si configura job-a, specificare il class-Combiner. Di norma, è uguale a Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Specificare il comando -combiner nelle opzioni della riga di comando. In genere, questo comando è uguale al comando reducer. Esempio:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Catene di attività MapReduce

Ci sono situazioni in cui un MapReduce non è sufficiente per risolvere un problema. Ad esempio, considera un'attività WordCount leggermente modificata: c'è una serie di documenti di testo, devi contare quante parole si sono verificate da 1 a 1000 volte nel set, quante parole da 1001 a 2000, quante da 2001 a 3000, e così via. Per la soluzione, abbiamo bisogno di 2 lavori MapReduce:

  • Wordcount modificato, che per ogni parola calcolerà in quale degli intervalli è caduta;
  • Oggetto MapReduce che conta quante volte ogni intervallo è stato rilevato nell'output del primo MapReduce.

Soluzione con pseudo codice:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Per eseguire una sequenza di attività MapReduce su hadoop, è sufficiente specificare la cartella specificata come output per la prima come input per la seconda attività ed eseguirle a turno.

In pratica, le catene di attività MapReduce possono essere sequenze piuttosto complesse in cui le attività MapReduce possono essere collegate sia in sequenza che in parallelo tra loro. Per semplificare la gestione di tali piani di esecuzione delle attività, esistono strumenti separati come oozie e luigi, che verranno discussi in un articolo separato in questa serie.

5.4 Cache distribuita

Un meccanismo importante in Hadoop è la cache distribuita. La cache distribuita consente di aggiungere file (ad esempio file di testo, archivi, file jar) all'ambiente in cui è in esecuzione l'attività MapReduce.

È possibile aggiungere file archiviati su HDFS, file locali (locali sulla macchina da cui viene avviata l'attività). Ho già mostrato implicitamente come utilizzare la cache distribuita con lo streaming hadoop aggiungendo i file mapper.py e reducer.py tramite l'opzione -file. Infatti, puoi aggiungere non solo mapper.py e reducer.py, ma file arbitrari in generale, e poi usarli come se fossero in una cartella locale.

Utilizzo della cache distribuita:

API nativa
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Streaming Hadoop

#elenchiamo i file che devono essere aggiunti alla cache distribuita nel parametro –files. L'opzione --files deve precedere le altre opzioni.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

esempio di utilizzo:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Riduci Unisci

Chi è abituato a lavorare con database relazionali utilizza spesso la comodissima operazione Join, che permette di elaborare congiuntamente il contenuto di alcune tabelle unendole secondo una qualche chiave. Quando si lavora con i big data, a volte si presenta anche questo problema. Considera il seguente esempio:

Ci sono registri di due server web, ogni registro ha questo aspetto:

t\t

Esempio di frammento di registro:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

È necessario calcolare per ogni indirizzo IP quale dei 2 server ha visitato più spesso. Il risultato dovrebbe essere nella forma:

\t

Un esempio di una parte del risultato:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Purtroppo, a differenza dei database relazionali, in generale unire due log per chiave (in questo caso per indirizzo IP) è un'operazione piuttosto pesante e si risolve utilizzando 3 MapReduce e il pattern Reduce Join:

ReduceJoin funziona così:

1) Per ciascuno dei log di input, viene avviato un MapReduce (solo mappa) separato, convertendo i dati di input nel seguente formato:

key -> (type, value

Dove key è la chiave su cui unire le tabelle, Type è il tipo di tabella (primo o secondo nel nostro caso) e Value è qualsiasi dato aggiuntivo associato alla chiave.

2) Gli output di entrambi i MapReduce vengono inviati all'input del 3° MapReduce, che, di fatto, esegue l'unione. Questo MapReduce contiene un Mapper vuoto che copia semplicemente l'input. Successivamente, shuffle decompone i dati in chiavi e li invia al riduttore come input:

key -> [(type, value)]

È importante che in questo momento il riduttore riceva i record da entrambi i log e, allo stesso tempo, sia possibile identificare tramite il campo type da quale dei due log proviene un particolare valore. Quindi ci sono dati sufficienti per risolvere il problema originale. Nel nostro caso, il riduttore deve semplicemente calcolare per ogni chiave di record quale tipo ha incontrato di più e produrre questo tipo.

5.6 MapJoin

Il pattern ReduceJoin descrive il caso generale dell'unione di due log tramite chiave. Tuttavia, esiste un caso speciale in cui l'attività può essere notevolmente semplificata e accelerata. Questo è il caso in cui uno dei log è significativamente più piccolo dell'altro. Considera il seguente problema:

Ci sono 2 registri. Il primo registro contiene il registro del server Web (come nell'attività precedente), il secondo file (100kb di dimensione) contiene la corrispondenza URL->Tema. Esempio 2° file:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Per ogni indirizzo IP, è necessario calcolare le pagine di quale categoria da questo indirizzo IP sono state caricate più spesso.

In questo caso, dobbiamo anche unire 2 log tramite URL. Tuttavia, in questo caso, non è necessario eseguire 3 MapReduce, poiché il secondo registro entrerà completamente nella memoria. Per risolvere il problema utilizzando il 1° MapReduce, possiamo caricare il secondo log nella Cache Distribuita, e quando il Mapper viene inizializzato, basta leggerlo in memoria, mettendolo nel -> topic dictionary.

Inoltre, il problema è risolto come segue:

carta geografica:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

ridurre:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce riceve un IP e un elenco di tutti gli argomenti come input, calcola semplicemente quale degli argomenti è stato riscontrato più spesso. Pertanto, il compito viene risolto utilizzando il 1° MapReduce e l'effettivo Join avviene generalmente all'interno della mappa (quindi, se non fosse necessaria un'ulteriore aggregazione per chiave, si potrebbe fare a meno del lavoro MapOnly):