4.1 Ogólne informacje o Hadoopie

Paradygmat MapReduce został zaproponowany przez Google w 2004 roku w artykule MapReduce: Simplified Data Processing on Large Clusters . Ponieważ proponowany artykuł zawierał opis paradygmatu, ale brakowało implementacji, kilku programistów z Yahoo zaproponowało ich implementację w ramach prac nad robotem nutch web crawler. Więcej o historii Hadoop można przeczytać w artykule Historia Hadoop: od 4 węzłów do przyszłości danych .

Początkowo Hadoop był przede wszystkim narzędziem do przechowywania danych i uruchamiania zadań MapReduce, ale teraz Hadoop to duży stos technologii związanych w taki czy inny sposób z przetwarzaniem dużych zbiorów danych (nie tylko z MapReduce).

Główne (rdzeniowe) komponenty Hadoop to:

  • Hadoop Distributed File System (HDFS) to rozproszony system plików, który umożliwia przechowywanie informacji o niemal nieograniczonej wielkości.
  • Hadoop YARN to platforma do zarządzania zasobami klastra i zarządzania zadaniami, w tym struktura MapReduce.
  • Wspólne dla Hadoopa

Istnieje również duża liczba projektów bezpośrednio związanych z Hadoop, ale nieuwzględnionych w rdzeniu Hadoop:

  • Hive - narzędzie do zapytań typu SQL na big data (zamienia zapytania SQL w serię zadań MapReduce);
  • Pig to język programowania do analizy danych wysokiego poziomu. Jedna linia kodu w tym języku może zamienić się w sekwencję zadań MapReduce;
  • Hbase to kolumnowa baza danych, która implementuje paradygmat BigTable;
  • Cassandra to wydajna rozproszona baza danych klucz-wartość;
  • ZooKeeper to usługa służąca do rozproszonego przechowywania konfiguracji i synchronizacji zmian w konfiguracji;
  • Mahout to biblioteka i silnik do uczenia maszynowego dużych zbiorów danych.

Osobno chciałbym zwrócić uwagę na projekt Apache Spark , który jest silnikiem do rozproszonego przetwarzania danych. Apache Spark zazwyczaj wykorzystuje do swojej pracy komponenty Hadoop, takie jak HDFS i YARN, podczas gdy sam ostatnio stał się bardziej popularny niż Hadoop:

Niektóre z tych komponentów zostaną omówione w oddzielnych artykułach z tej serii materiałów, ale na razie przyjrzyjmy się, jak możesz rozpocząć pracę z Hadoopem i zastosować go w praktyce.

4.2 Uruchamianie programów MapReduce na platformie Hadoop

Teraz przyjrzyjmy się, jak uruchomić zadanie MapReduce w Hadoop. Jako zadanie użyjemy klasycznego przykładu WordCount , który został omówiony na poprzedniej lekcji.

Przypomnę sformułowanie problemu: jest komplet dokumentów. Konieczne jest, aby każde słowo występujące w zbiorze dokumentów policzyło, ile razy to słowo występuje w zbiorze.

Rozwiązanie:

Mapa dzieli dokument na słowa i zwraca zestaw par (słowo, 1).

Zmniejsz sumy wystąpień każdego słowa:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Teraz zadaniem jest zaprogramowanie tego rozwiązania w postaci kodu, który można wykonać na Hadoopie i uruchomić.

4.3 Metoda numer 1. Transmisja strumieniowa Hadoop

Najłatwiejszym sposobem uruchomienia programu MapReduce w Hadoop jest użycie interfejsu przesyłania strumieniowego Hadoop. Interfejs przesyłania strumieniowego zakłada, że ​​map i reduce są zaimplementowane jako programy, które pobierają dane ze stdin i wysyłają je do stdout .

Program wykonujący funkcję map nazywa się mapper. Program wykonujący reduce nazywa się odpowiednio reducer .

Interfejs przesyłania strumieniowego domyślnie zakłada, że ​​jedna linia przychodząca w mapperze lub reduktorze odpowiada jednemu wpisowi wejściowemu dla map .

Wyjście mappera trafia na wejście reduktora w postaci par (klucz, wartość), przy czym wszystkie pary odpowiadają temu samemu kluczowi:

  • Gwarantowane przetworzenie przez jedno uruchomienie reduktora;
  • Zostanie przesłane do wejścia z rzędu (to znaczy, jeśli jeden reduktor przetwarza kilka różnych kluczy, wejście zostanie pogrupowane według klucza).

Zaimplementujmy więc mappera i reduktora w pythonie:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Dane przetwarzane przez Hadoop muszą być przechowywane w systemie plików HDFS. Prześlijmy nasze artykuły i umieśćmy je na HDFS. Aby to zrobić, użyj polecenia hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

Narzędzie hadoop fs obsługuje wiele metod manipulowania systemem plików, z których wiele jest identycznych ze standardowymi narzędziami linuksowymi.

Teraz zacznijmy zadanie przesyłania strumieniowego:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Narzędzie przędzy służy do uruchamiania i zarządzania różnymi aplikacjami (w tym opartymi na zmniejszaniu map) w klastrze. Hadoop-streaming.jar to tylko jeden przykład takiej aplikacji przędzy.

Dalej są opcje uruchamiania:

  • input - folder z danymi źródłowymi na hdfs;
  • output - folder na hdfs, w którym chcesz umieścić wynik;
  • plik - pliki potrzebne podczas działania zadania map-reduce;
  • mapper to polecenie konsoli, które będzie używane na etapie mapy;
  • reduce to polecenie konsoli, które zostanie użyte na etapie redukcji.

Po uruchomieniu możesz zobaczyć postęp zadania w konsoli i adres URL, aby wyświetlić bardziej szczegółowe informacje o zadaniu.

W interfejsie dostępnym pod tym adresem URL można dowiedzieć się bardziej szczegółowego statusu wykonania zadania, przejrzeć logi każdego mappera i reduktora (co jest bardzo przydatne w przypadku nieudanych zadań).

Wynik pracy po pomyślnym wykonaniu jest dodawany do HDFS w folderze, który określiliśmy w polu wyjściowym. Możesz wyświetlić jego zawartość za pomocą polecenia „hadoop fs -ls lenta_wordcount”.

Sam wynik można uzyskać w następujący sposób:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

Polecenie „hadoop fs -text” wyświetla zawartość folderu w formie tekstowej. Posortowałem wynik według liczby wystąpień słów. Zgodnie z oczekiwaniami, najczęstszymi słowami w języku są przyimki.

4.4 Metoda numer 2: użyj Javy

Sam Hadoop jest napisany w Javie, a natywny interfejs Hadoop jest również oparty na Javie. Pokażmy, jak wygląda natywna aplikacja Java dla wordcount:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Ta klasa robi dokładnie to samo, co nasz przykład w Pythonie. Tworzymy klasy TokenizerMapper i IntSumReducer, wywodząc się odpowiednio z klas Mapper i Reducer. Klasy przekazywane jako parametry szablonu określają typy wartości wejściowych i wyjściowych. Natywny interfejs API zakłada, że ​​funkcja map otrzymuje jako dane wejściowe parę klucz-wartość. Ponieważ w naszym przypadku klucz jest pusty, po prostu definiujemy Object jako typ klucza.

W metodzie Main uruchamiamy zadanie mapreduce i definiujemy jego parametry - nazwę, mapper i reducer, ścieżkę w HDFS, gdzie znajdują się dane wejściowe i gdzie umieścić wynik. Do kompilacji potrzebujemy bibliotek hadoop. Do budowania używam Mavena, dla którego cloudera ma repozytorium. Instrukcje dotyczące jego konfiguracji można znaleźć tutaj. W rezultacie w pliku pom.xmp (który jest używany przez maven do opisu montażu projektu) otrzymałem:

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Skompilujmy projekt do pakietu jar:

mvn clean package

Po zbudowaniu projektu do pliku jar, uruchomienie następuje w podobny sposób, jak w przypadku interfejsu streamingowego:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Czekamy na wykonanie i sprawdzamy wynik:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Jak można się domyślić, wynik uruchomienia naszej natywnej aplikacji jest taki sam, jak wynik uruchomienia aplikacji streamingowej, którą uruchomiliśmy w poprzedni sposób.