4.1 Informazioni generali su Hadoop

Il paradigma MapReduce è stato proposto da Google nel 2004 nel suo articolo MapReduce: Simplified Data Processing on Large Clusters . Poiché l'articolo proposto conteneva una descrizione del paradigma, ma mancava l'implementazione, diversi programmatori di Yahoo ne hanno proposto l'implementazione come parte del lavoro sul web crawler nutch. Puoi leggere di più sulla storia di Hadoop nell'articolo La storia di Hadoop: dai 4 nodi al futuro dei dati .

Inizialmente, Hadoop era principalmente uno strumento per l'archiviazione dei dati e l'esecuzione di attività MapReduce, ma ora Hadoop è un grande insieme di tecnologie legate in un modo o nell'altro all'elaborazione di big data (non solo con MapReduce).

I componenti principali (core) di Hadoop sono:

  • Hadoop Distributed File System (HDFS) è un file system distribuito che consente di archiviare informazioni di dimensioni quasi illimitate.
  • Hadoop YARN è un framework per la gestione delle risorse del cluster e la gestione delle attività, incluso il framework MapReduce.
  • Hadoop comune

Esiste anche un gran numero di progetti direttamente correlati a Hadoop, ma non inclusi nel nucleo di Hadoop:

  • Hive : uno strumento per query simili a SQL su big data (trasforma le query SQL in una serie di attività MapReduce);
  • Pig è un linguaggio di programmazione per l'analisi dei dati di alto livello. Una riga di codice in questo linguaggio può trasformarsi in una sequenza di attività MapReduce;
  • Hbase è un database colonnare che implementa il paradigma BigTable;
  • Cassandra è un database chiave-valore distribuito ad alte prestazioni;
  • ZooKeeper è un servizio per l'archiviazione della configurazione distribuita e la sincronizzazione delle modifiche alla configurazione;
  • Mahout è una libreria e un motore di machine learning per big data.

Separatamente, vorrei notare il progetto Apache Spark , che è un motore per l'elaborazione distribuita dei dati. Apache Spark utilizza in genere componenti Hadoop come HDFS e YARN per il suo lavoro, mentre recentemente è diventato più popolare di Hadoop:

Alcuni di questi componenti saranno trattati in articoli separati in questa serie di materiali, ma per ora diamo un'occhiata a come puoi iniziare a lavorare con Hadoop e metterlo in pratica.

4.2 Esecuzione di programmi MapReduce su Hadoop

Ora diamo un'occhiata a come eseguire un'attività MapReduce su Hadoop. Come attività, utilizzeremo il classico esempio WordCount , discusso nella lezione precedente.

Permettetemi di ricordarvi la formulazione del problema: esiste una serie di documenti. È necessario per ogni parola che ricorre nell'insieme dei documenti contare quante volte la parola ricorre nell'insieme.

Soluzione:

Map divide il documento in parole e restituisce un insieme di coppie (parola, 1).

Riduci somma le occorrenze di ogni parola:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Ora il compito è programmare questa soluzione sotto forma di codice che può essere eseguito su Hadoop ed eseguito.

4.3 Metodo numero 1. Streaming Hadoop

Il modo più semplice per eseguire un programma MapReduce su Hadoop è utilizzare l'interfaccia di streaming Hadoop. L'interfaccia di streaming presuppone che map e reduce siano implementati come programmi che prendono i dati da stdin e inviano l'output a stdout .

Il programma che esegue la funzione map si chiama mapper. Il programma che esegue reduce si chiama, rispettivamente, reducer .

L'interfaccia Streaming presuppone per impostazione predefinita che una riga in entrata in un mapper o reducer corrisponda a una voce in entrata per map .

L'output del mapper arriva all'input del riduttore sotto forma di coppie (chiave, valore), mentre tutte le coppie corrispondenti alla stessa chiave:

  • Garantito per essere lavorato da un singolo lancio del riduttore;
  • Verrà inviato all'input in fila (ovvero, se un riduttore elabora più chiavi diverse, l'input verrà raggruppato per chiave).

Quindi implementiamo mapper e reducer in python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

I dati che Hadoop elaborerà devono essere archiviati su HDFS. Carichiamo i nostri articoli e mettiamoli su HDFS. Per fare ciò, usa il comando hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

L'utility hadoop fs supporta un gran numero di metodi per manipolare il file system, molti dei quali sono identici alle utility linux standard.

Ora iniziamo l'attività di streaming:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

L'utility yarn viene utilizzata per avviare e gestire varie applicazioni (incluse quelle basate su map-reduce) su un cluster. Hadoop-streaming.jar è solo un esempio di tale applicazione di filati.

Poi ci sono le opzioni di lancio:

  • input - cartella con i dati di origine su hdfs;
  • output - cartella su hdfs dove vuoi mettere il risultato;
  • file - file necessari durante il funzionamento dell'attività map-reduce;
  • mapper è il comando della console che verrà utilizzato per la fase della mappa;
  • reduce è il comando della console che verrà utilizzato per la fase di riduzione.

Dopo l'avvio, puoi vedere lo stato di avanzamento dell'attività nella console e un URL per visualizzare informazioni più dettagliate sull'attività.

Nell'interfaccia disponibile a questo URL, puoi trovare uno stato di esecuzione delle attività più dettagliato, visualizzare i log di ogni mappatore e riduttore (cosa molto utile in caso di attività fallite).

Il risultato del lavoro dopo l'esecuzione corretta viene aggiunto a HDFS nella cartella specificata nel campo di output. Puoi visualizzarne il contenuto usando il comando "hadoop fs -ls lenta_wordcount".

Il risultato stesso può essere ottenuto come segue:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

Il comando "hadoop fs -text" visualizza il contenuto della cartella in forma di testo. Ho ordinato il risultato in base al numero di occorrenze delle parole. Come previsto, le parole più comuni nella lingua sono le preposizioni.

4.4 Metodo numero 2: utilizzare Java

Lo stesso Hadoop è scritto in java e anche l'interfaccia nativa di Hadoop è basata su java. Mostriamo come appare un'applicazione Java nativa per il conteggio delle parole:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Questa classe fa esattamente la stessa cosa del nostro esempio Python. Creiamo le classi TokenizerMapper e IntSumReducer derivando rispettivamente dalle classi Mapper e Reducer. Le classi passate come parametri del modello specificano i tipi di valori di input e output. L'API nativa presuppone che alla funzione map venga fornita una coppia chiave-valore come input. Poiché nel nostro caso la chiave è vuota, definiamo semplicemente Object come tipo di chiave.

Nel metodo Main, avviamo l'attività mapreduce e definiamo i suoi parametri: nome, mappatore e riduttore, il percorso in HDFS, dove si trovano i dati di input e dove inserire il risultato. Per compilare, abbiamo bisogno delle librerie hadoop. Uso Maven per costruire, per il quale cloudera ha un repository. Le istruzioni per configurarlo possono essere trovate qui. Di conseguenza, il file pom.xmp (utilizzato da Maven per descrivere l'assemblaggio del progetto) ho ottenuto quanto segue:

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Compiliamo il progetto in un pacchetto jar:

mvn clean package

Dopo aver compilato il progetto in un file jar, il lancio avviene in modo simile, come nel caso dell'interfaccia di streaming:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Aspettiamo l'esecuzione e controlliamo il risultato:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Come puoi immaginare, il risultato dell'esecuzione della nostra applicazione nativa è lo stesso del risultato dell'applicazione di streaming che abbiamo avviato nel modo precedente.