4.1 Ogólne informacje o Hadoopie
Paradygmat MapReduce został zaproponowany przez Google w 2004 roku w artykule MapReduce: Simplified Data Processing on Large Clusters . Ponieważ proponowany artykuł zawierał opis paradygmatu, ale brakowało implementacji, kilku programistów z Yahoo zaproponowało ich implementację w ramach prac nad robotem nutch web crawler. Więcej o historii Hadoop można przeczytać w artykule Historia Hadoop: od 4 węzłów do przyszłości danych .
Początkowo Hadoop był przede wszystkim narzędziem do przechowywania danych i uruchamiania zadań MapReduce, ale teraz Hadoop to duży stos technologii związanych w taki czy inny sposób z przetwarzaniem dużych zbiorów danych (nie tylko z MapReduce).
Główne (rdzeniowe) komponenty Hadoop to:
- Hadoop Distributed File System (HDFS) to rozproszony system plików, który umożliwia przechowywanie informacji o niemal nieograniczonej wielkości.
- Hadoop YARN to platforma do zarządzania zasobami klastra i zarządzania zadaniami, w tym struktura MapReduce.
- Wspólne dla Hadoopa
Istnieje również duża liczba projektów bezpośrednio związanych z Hadoop, ale nieuwzględnionych w rdzeniu Hadoop:
- Hive - narzędzie do zapytań typu SQL na big data (zamienia zapytania SQL w serię zadań MapReduce);
- Pig to język programowania do analizy danych wysokiego poziomu. Jedna linia kodu w tym języku może zamienić się w sekwencję zadań MapReduce;
- Hbase to kolumnowa baza danych, która implementuje paradygmat BigTable;
- Cassandra to wydajna rozproszona baza danych klucz-wartość;
- ZooKeeper to usługa służąca do rozproszonego przechowywania konfiguracji i synchronizacji zmian w konfiguracji;
- Mahout to biblioteka i silnik do uczenia maszynowego dużych zbiorów danych.
Osobno chciałbym zwrócić uwagę na projekt Apache Spark , który jest silnikiem do rozproszonego przetwarzania danych. Apache Spark zazwyczaj wykorzystuje do swojej pracy komponenty Hadoop, takie jak HDFS i YARN, podczas gdy sam ostatnio stał się bardziej popularny niż Hadoop:
Niektóre z tych komponentów zostaną omówione w oddzielnych artykułach z tej serii materiałów, ale na razie przyjrzyjmy się, jak możesz rozpocząć pracę z Hadoopem i zastosować go w praktyce.
4.2 Uruchamianie programów MapReduce na platformie Hadoop
Teraz przyjrzyjmy się, jak uruchomić zadanie MapReduce w Hadoop. Jako zadanie użyjemy klasycznego przykładu WordCount , który został omówiony na poprzedniej lekcji.
Przypomnę sformułowanie problemu: jest komplet dokumentów. Konieczne jest, aby każde słowo występujące w zbiorze dokumentów policzyło, ile razy to słowo występuje w zbiorze.
Rozwiązanie:
Mapa dzieli dokument na słowa i zwraca zestaw par (słowo, 1).
Zmniejsz sumy wystąpień każdego słowa:
|
|
Teraz zadaniem jest zaprogramowanie tego rozwiązania w postaci kodu, który można wykonać na Hadoopie i uruchomić.
4.3 Metoda numer 1. Transmisja strumieniowa Hadoop
Najłatwiejszym sposobem uruchomienia programu MapReduce w Hadoop jest użycie interfejsu przesyłania strumieniowego Hadoop. Interfejs przesyłania strumieniowego zakłada, że map i reduce są zaimplementowane jako programy, które pobierają dane ze stdin i wysyłają je do stdout .
Program wykonujący funkcję map nazywa się mapper. Program wykonujący reduce nazywa się odpowiednio reducer .
Interfejs przesyłania strumieniowego domyślnie zakłada, że jedna linia przychodząca w mapperze lub reduktorze odpowiada jednemu wpisowi wejściowemu dla map .
Wyjście mappera trafia na wejście reduktora w postaci par (klucz, wartość), przy czym wszystkie pary odpowiadają temu samemu kluczowi:
- Gwarantowane przetworzenie przez jedno uruchomienie reduktora;
- Zostanie przesłane do wejścia z rzędu (to znaczy, jeśli jeden reduktor przetwarza kilka różnych kluczy, wejście zostanie pogrupowane według klucza).
Zaimplementujmy więc mappera i reduktora w pythonie:
#mapper.py
import sys
def do_map(doc):
for word in doc.split():
yield word.lower(), 1
for line in sys.stdin:
for key, value in do_map(line):
print(key + "\t" + str(value))
#reducer.py
import sys
def do_reduce(word, values):
return word, sum(values)
prev_key = None
values = []
for line in sys.stdin:
key, value = line.split("\t")
if key != prev_key and prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
values = []
prev_key = key
values.append(int(value))
if prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
Dane przetwarzane przez Hadoop muszą być przechowywane w systemie plików HDFS. Prześlijmy nasze artykuły i umieśćmy je na HDFS. Aby to zrobić, użyj polecenia hadoop fs :
wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz
tar xzvf lenta_articles.tar.gz
hadoop fs -put lenta_articles
Narzędzie hadoop fs obsługuje wiele metod manipulowania systemem plików, z których wiele jest identycznych ze standardowymi narzędziami linuksowymi.
Teraz zacznijmy zadanie przesyłania strumieniowego:
yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\
-input lenta_articles\
-output lenta_wordcount\
-file mapper.py\
-file reducer.py\
-mapper "python mapper.py"\
-reducer "python reducer.py"
Narzędzie przędzy służy do uruchamiania i zarządzania różnymi aplikacjami (w tym opartymi na zmniejszaniu map) w klastrze. Hadoop-streaming.jar to tylko jeden przykład takiej aplikacji przędzy.
Dalej są opcje uruchamiania:
- input - folder z danymi źródłowymi na hdfs;
- output - folder na hdfs, w którym chcesz umieścić wynik;
- plik - pliki potrzebne podczas działania zadania map-reduce;
- mapper to polecenie konsoli, które będzie używane na etapie mapy;
- reduce to polecenie konsoli, które zostanie użyte na etapie redukcji.
Po uruchomieniu możesz zobaczyć postęp zadania w konsoli i adres URL, aby wyświetlić bardziej szczegółowe informacje o zadaniu.
W interfejsie dostępnym pod tym adresem URL można dowiedzieć się bardziej szczegółowego statusu wykonania zadania, przejrzeć logi każdego mappera i reduktora (co jest bardzo przydatne w przypadku nieudanych zadań).
Wynik pracy po pomyślnym wykonaniu jest dodawany do HDFS w folderze, który określiliśmy w polu wyjściowym. Możesz wyświetlić jego zawartość za pomocą polecenia „hadoop fs -ls lenta_wordcount”.
Sam wynik można uzyskać w następujący sposób:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
from
41
this
43
on
82
and
111
into
194
Polecenie „hadoop fs -text” wyświetla zawartość folderu w formie tekstowej. Posortowałem wynik według liczby wystąpień słów. Zgodnie z oczekiwaniami, najczęstszymi słowami w języku są przyimki.
4.4 Metoda numer 2: użyj Javy
Sam Hadoop jest napisany w Javie, a natywny interfejs Hadoop jest również oparty na Javie. Pokażmy, jak wygląda natywna aplikacja Java dla wordcount:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Ta klasa robi dokładnie to samo, co nasz przykład w Pythonie. Tworzymy klasy TokenizerMapper i IntSumReducer, wywodząc się odpowiednio z klas Mapper i Reducer. Klasy przekazywane jako parametry szablonu określają typy wartości wejściowych i wyjściowych. Natywny interfejs API zakłada, że funkcja map otrzymuje jako dane wejściowe parę klucz-wartość. Ponieważ w naszym przypadku klucz jest pusty, po prostu definiujemy Object jako typ klucza.
W metodzie Main uruchamiamy zadanie mapreduce i definiujemy jego parametry - nazwę, mapper i reducer, ścieżkę w HDFS, gdzie znajdują się dane wejściowe i gdzie umieścić wynik. Do kompilacji potrzebujemy bibliotek hadoop. Do budowania używam Mavena, dla którego cloudera ma repozytorium. Instrukcje dotyczące jego konfiguracji można znaleźć tutaj. W rezultacie w pliku pom.xmp (który jest używany przez maven do opisu montażu projektu) otrzymałem:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
</dependencies>
<groupId>org.dca.examples</groupId>
<artifactId>wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
</project>
Skompilujmy projekt do pakietu jar:
mvn clean package
Po zbudowaniu projektu do pliku jar, uruchomienie następuje w podobny sposób, jak w przypadku interfejsu streamingowego:
yarn jar wordcount-1.0-SNAPSHOT.jar WordCount
Czekamy na wykonanie i sprawdzamy wynik:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
from
41
this
43
on
82
and
111
into
194
Jak można się domyślić, wynik uruchomienia naszej natywnej aplikacji jest taki sam, jak wynik uruchomienia aplikacji streamingowej, którą uruchomiliśmy w poprzedni sposób.
GO TO FULL VERSION