4.1 Általános információk a Hadoopról

A MapReduce paradigmát a Google javasolta 2004-ben a MapReduce: Simplified Data Processing on Large Clusters című cikkében . Mivel a javasolt cikk tartalmazta a paradigma leírását, de a megvalósítás hiányzott, a Yahoo több programozója javasolta a megvalósításukat a nutch webbejáróval kapcsolatos munka részeként. A Hadoop történetéről bővebben a The history of Hadoop: From 4 node to the future of data cikkben olvashat .
Kezdetben a Hadoop elsősorban az adatok tárolására és a MapReduce-feladatok futtatására szolgált, de mára a Hadoop egy nagy halom technológia, amely valamilyen módon kapcsolódik a nagy adatok feldolgozásához (nem csak a MapReduce-hoz).
A Hadoop fő (alap) összetevői a következők:
- A Hadoop Distributed File System (HDFS) egy elosztott fájlrendszer, amely lehetővé teszi szinte korlátlan méretű információk tárolását.
- A Hadoop YARN egy keretrendszer a fürterőforrás-kezeléshez és a feladatkezeléshez, beleértve a MapReduce keretrendszert is.
- Hadoop gyakori
Számos projekt is kapcsolódik közvetlenül a Hadoophoz, de nem szerepelnek a Hadoop magjában:
- Hive – eszköz az SQL-szerű lekérdezésekhez big data felett (az SQL lekérdezéseket MapReduce feladatok sorozatává alakítja);
- A Pig egy programozási nyelv magas szintű adatelemzésre. Egy kódsor ezen a nyelven MapReduce feladatok sorozatává alakulhat;
- A Hbase egy oszlopos adatbázis, amely megvalósítja a BigTable paradigmát;
- A Cassandra egy nagy teljesítményű elosztott kulcsérték-adatbázis;
- A ZooKeeper egy szolgáltatás az elosztott konfiguráció tárolására és a konfigurációs változások szinkronizálására;
- A Mahout egy nagyméretű gépi tanulási könyvtár és motor.
Külön szeretném megjegyezni az Apache Spark projektet , amely az elosztott adatfeldolgozás motorja. Az Apache Spark jellemzően Hadoop komponenseket, például HDFS-t és YARN-t használ a munkájához, miközben maga az utóbbi időben népszerűbb lett, mint a Hadoop:

Ezen összetevők némelyikével ebben az anyagsorozatban külön cikkek foglalkoznak, de most nézzük meg, hogyan kezdhet el dolgozni a Hadooppal, és hogyan alkalmazhatja azt a gyakorlatban.
4.2 MapReduce programok futtatása a Hadoopon
Most nézzük meg, hogyan lehet MapReduce-feladatot futtatni a Hadoopon. Feladatként a klasszikus WordCount példát használjuk , amelyről az előző leckében volt szó.
Hadd emlékeztessem önöket a probléma megfogalmazására: van egy sor dokumentum. A dokumentumhalmazban előforduló minden egyes szónál meg kell számolni, hogy a szó hányszor fordul elő a halmazban.
Megoldás:
A Map szavakra bontja a dokumentumot, és párokat ad vissza (szó, 1).
Csökkentse az egyes szavak előfordulási gyakoriságát:
|
|
Most az a feladat, hogy programozzuk ezt a megoldást kód formájában, amely végrehajtható Hadoop-on és futtatható.
4.3 1. módszer. Hadoop Streaming
A MapReduce program Hadoopon való futtatásának legegyszerűbb módja a Hadoop streaming felület használata. A streaming felület feltételezi, hogy a leképezés és a redukció olyan programokként valósul meg, amelyek adatokat visznek át az stdin-ből és a kimenetből az stdout- ba .
A térkép funkciót végrehajtó programot leképezőnek nevezzük. A redukciót végrehajtó programot reduktornak nevezzük .
A Streaming interfész alapértelmezés szerint azt feltételezi, hogy egy leképezőben vagy reduktorban egy bejövő sor megfelel a térkép egy bejövő bejegyzésének .
A leképező kimenete párok (kulcs, érték) formájában jut a reduktor bemenetére, míg az összes azonos kulcshoz tartozó pár:
- Garantáltan feldolgozható a reduktor egyszeri elindításával;
- Sorban kerül a bemenetre (vagyis ha egy reduktor több különböző kulcsot dolgoz fel, akkor a bemenet kulcs szerint csoportosodik).
Tehát implementáljuk a leképezőt és a reduktort a pythonban:
#mapper.py
import sys
def do_map(doc):
for word in doc.split():
yield word.lower(), 1
for line in sys.stdin:
for key, value in do_map(line):
print(key + "\t" + str(value))
#reducer.py
import sys
def do_reduce(word, values):
return word, sum(values)
prev_key = None
values = []
for line in sys.stdin:
key, value = line.split("\t")
if key != prev_key and prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
values = []
prev_key = key
values.append(int(value))
if prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
A Hadoop által feldolgozott adatokat HDFS-en kell tárolni. Töltsük fel cikkeinket és tegyük HDFS-re. Ehhez használja a hadoop fs parancsot :
wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz
tar xzvf lenta_articles.tar.gz
hadoop fs -put lenta_articles
A hadoop fs segédprogram számos módszert támogat a fájlrendszer manipulálására, amelyek közül sok megegyezik a szabványos linux segédprogramokkal.
Most kezdjük a streaming feladatot:
yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\
-input lenta_articles\
-output lenta_wordcount\
-file mapper.py\
-file reducer.py\
-mapper "python mapper.py"\
-reducer "python reducer.py"
A fonal segédprogram különféle alkalmazások indítására és kezelésére szolgál (beleértve a térképkicsinyítést is) egy fürtön. A Hadoop-streaming.jar csak egy példa egy ilyen fonal alkalmazásra.
Következő az indítási lehetőségek:
- bemenet - mappa forrásadatokkal a hdfs-en;
- output - mappa a hdfs-en, ahová az eredményt el szeretné helyezni;
- fájl - azok a fájlok, amelyekre szükség van a térképcsökkentési feladat működése során;
- A mapper a konzolparancs, amelyet a térképi szakaszhoz használunk;
- A redukció a konzol parancsa, amelyet a redukciós szakaszhoz használunk.
Az indítás után láthatja a feladat előrehaladását a konzolon, valamint egy URL-t a feladattal kapcsolatos részletesebb információk megtekintéséhez.

Az ezen az URL-en elérhető felületen megtudhatja a feladat végrehajtásának részletesebb állapotát, megtekintheti az egyes leképezők és reduktorok naplóit (ami nagyon hasznos sikertelen feladatok esetén).

A munka eredménye a sikeres végrehajtás után hozzáadódik a HDFS-hez abban a mappában, amelyet a kimeneti mezőben megadtunk. A tartalmát a "hadoop fs -ls lenta_wordcount" paranccsal tekintheti meg.
Maga az eredmény a következőképpen érhető el:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
from
41
this
43
on
82
and
111
into
194
A "hadoop fs -text" parancs szöveges formában jeleníti meg a mappa tartalmát. Az eredményt a szavak előfordulási száma szerint rendeztem. Ahogy az várható volt, a nyelv leggyakoribb szavai az elöljárószavak.
4.4 2. módszer: Java használata
Maga a Hadoop java nyelven íródott, és a Hadoop natív felülete is java alapú. Mutassuk meg, hogyan néz ki egy natív java alkalmazás a wordcount számára:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Ez az osztály pontosan ugyanazt teszi, mint a Python-példánk. A TokenizerMapper és IntSumReducer osztályokat a Mapper és Reducer osztályokból származtatva hozzuk létre. A sablonparaméterként átadott osztályok meghatározzák a bemeneti és kimeneti értékek típusát. A natív API feltételezi, hogy a leképezési függvény bemenetként egy kulcs-érték párt kap. Mivel esetünkben a kulcs üres, egyszerűen az Object-et definiáljuk kulcstípusként.
A Main metódusban elindítjuk a mapreduce feladatot és meghatározzuk a paramétereit - név, leképező és reduktor, HDFS-ben az elérési út, ahol a bemeneti adatok találhatók, és hova kell tenni az eredményt. A fordításhoz hadoop könyvtárakra van szükségünk. Építéshez a Maven-t használom, amihez a clouderának van tárolója. A beállítási útmutató itt található. Ennek eredményeként a pom.xmp fájl (amelyet a maven a projekt összeállításának leírására használ) a következőket kaptam:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
</dependencies>
<groupId>org.dca.examples</groupId>
<artifactId>wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
</project>
Állítsuk össze a projektet egy jar csomagba:
mvn clean package
A projekt jar fájlba építése után az indítás hasonló módon történik, mint a streaming felület esetében:
yarn jar wordcount-1.0-SNAPSHOT.jar WordCount
Várjuk a végrehajtást és ellenőrizzük az eredményt:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
from
41
this
43
on
82
and
111
into
194
Ahogy sejtheti, a natív alkalmazásunk futtatásának eredménye ugyanaz, mint az előző módon elindított streaming alkalmazás eredménye.
GO TO FULL VERSION