4.1 Általános információk a Hadoopról

A MapReduce paradigmát a Google javasolta 2004-ben a MapReduce: Simplified Data Processing on Large Clusters című cikkében . Mivel a javasolt cikk tartalmazta a paradigma leírását, de a megvalósítás hiányzott, a Yahoo több programozója javasolta a megvalósításukat a nutch webbejáróval kapcsolatos munka részeként. A Hadoop történetéről bővebben a The history of Hadoop: From 4 node to the future of data cikkben olvashat .

Kezdetben a Hadoop elsősorban az adatok tárolására és a MapReduce-feladatok futtatására szolgált, de mára a Hadoop egy nagy halom technológia, amely valamilyen módon kapcsolódik a nagy adatok feldolgozásához (nem csak a MapReduce-hoz).

A Hadoop fő (alap) összetevői a következők:

  • A Hadoop Distributed File System (HDFS) egy elosztott fájlrendszer, amely lehetővé teszi szinte korlátlan méretű információk tárolását.
  • A Hadoop YARN egy keretrendszer a fürterőforrás-kezeléshez és a feladatkezeléshez, beleértve a MapReduce keretrendszert is.
  • Hadoop gyakori

Számos projekt is kapcsolódik közvetlenül a Hadoophoz, de nem szerepelnek a Hadoop magjában:

  • Hive – eszköz az SQL-szerű lekérdezésekhez big data felett (az SQL lekérdezéseket MapReduce feladatok sorozatává alakítja);
  • A Pig egy programozási nyelv magas szintű adatelemzésre. Egy kódsor ezen a nyelven MapReduce feladatok sorozatává alakulhat;
  • A Hbase egy oszlopos adatbázis, amely megvalósítja a BigTable paradigmát;
  • A Cassandra egy nagy teljesítményű elosztott kulcsérték-adatbázis;
  • A ZooKeeper egy szolgáltatás az elosztott konfiguráció tárolására és a konfigurációs változások szinkronizálására;
  • A Mahout egy nagyméretű gépi tanulási könyvtár és motor.

Külön szeretném megjegyezni az Apache Spark projektet , amely az elosztott adatfeldolgozás motorja. Az Apache Spark jellemzően Hadoop komponenseket, például HDFS-t és YARN-t használ a munkájához, miközben maga az utóbbi időben népszerűbb lett, mint a Hadoop:

Ezen összetevők némelyikével ebben az anyagsorozatban külön cikkek foglalkoznak, de most nézzük meg, hogyan kezdhet el dolgozni a Hadooppal, és hogyan alkalmazhatja azt a gyakorlatban.

4.2 MapReduce programok futtatása a Hadoopon

Most nézzük meg, hogyan lehet MapReduce-feladatot futtatni a Hadoopon. Feladatként a klasszikus WordCount példát használjuk , amelyről az előző leckében volt szó.

Hadd emlékeztessem önöket a probléma megfogalmazására: van egy sor dokumentum. A dokumentumhalmazban előforduló minden egyes szónál meg kell számolni, hogy a szó hányszor fordul elő a halmazban.

Megoldás:

A Map szavakra bontja a dokumentumot, és párokat ad vissza (szó, 1).

Csökkentse az egyes szavak előfordulási gyakoriságát:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Most az a feladat, hogy programozzuk ezt a megoldást kód formájában, amely végrehajtható Hadoop-on és futtatható.

4.3 1. módszer. Hadoop Streaming

A MapReduce program Hadoopon való futtatásának legegyszerűbb módja a Hadoop streaming felület használata. A streaming felület feltételezi, hogy a leképezés és a redukció olyan programokként valósul meg, amelyek adatokat visznek át az stdin-ből és a kimenetből az stdout- ba .

A térkép funkciót végrehajtó programot leképezőnek nevezzük. A redukciót végrehajtó programot reduktornak nevezzük .

A Streaming interfész alapértelmezés szerint azt feltételezi, hogy egy leképezőben vagy reduktorban egy bejövő sor megfelel a térkép egy bejövő bejegyzésének .

A leképező kimenete párok (kulcs, érték) formájában jut a reduktor bemenetére, míg az összes azonos kulcshoz tartozó pár:

  • Garantáltan feldolgozható a reduktor egyszeri elindításával;
  • Sorban kerül a bemenetre (vagyis ha egy reduktor több különböző kulcsot dolgoz fel, akkor a bemenet kulcs szerint csoportosodik).

Tehát implementáljuk a leképezőt és a reduktort a pythonban:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

A Hadoop által feldolgozott adatokat HDFS-en kell tárolni. Töltsük fel cikkeinket és tegyük HDFS-re. Ehhez használja a hadoop fs parancsot :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

A hadoop fs segédprogram számos módszert támogat a fájlrendszer manipulálására, amelyek közül sok megegyezik a szabványos linux segédprogramokkal.

Most kezdjük a streaming feladatot:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

A fonal segédprogram különféle alkalmazások indítására és kezelésére szolgál (beleértve a térképkicsinyítést is) egy fürtön. A Hadoop-streaming.jar csak egy példa egy ilyen fonal alkalmazásra.

Következő az indítási lehetőségek:

  • bemenet - mappa forrásadatokkal a hdfs-en;
  • output - mappa a hdfs-en, ahová az eredményt el szeretné helyezni;
  • fájl - azok a fájlok, amelyekre szükség van a térképcsökkentési feladat működése során;
  • A mapper a konzolparancs, amelyet a térképi szakaszhoz használunk;
  • A redukció a konzol parancsa, amelyet a redukciós szakaszhoz használunk.

Az indítás után láthatja a feladat előrehaladását a konzolon, valamint egy URL-t a feladattal kapcsolatos részletesebb információk megtekintéséhez.

Az ezen az URL-en elérhető felületen megtudhatja a feladat végrehajtásának részletesebb állapotát, megtekintheti az egyes leképezők és reduktorok naplóit (ami nagyon hasznos sikertelen feladatok esetén).

A munka eredménye a sikeres végrehajtás után hozzáadódik a HDFS-hez abban a mappában, amelyet a kimeneti mezőben megadtunk. A tartalmát a "hadoop fs -ls lenta_wordcount" paranccsal tekintheti meg.

Maga az eredmény a következőképpen érhető el:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

A "hadoop fs -text" parancs szöveges formában jeleníti meg a mappa tartalmát. Az eredményt a szavak előfordulási száma szerint rendeztem. Ahogy az várható volt, a nyelv leggyakoribb szavai az elöljárószavak.

4.4 2. módszer: Java használata

Maga a Hadoop java nyelven íródott, és a Hadoop natív felülete is java alapú. Mutassuk meg, hogyan néz ki egy natív java alkalmazás a wordcount számára:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Ez az osztály pontosan ugyanazt teszi, mint a Python-példánk. A TokenizerMapper és IntSumReducer osztályokat a Mapper és Reducer osztályokból származtatva hozzuk létre. A sablonparaméterként átadott osztályok meghatározzák a bemeneti és kimeneti értékek típusát. A natív API feltételezi, hogy a leképezési függvény bemenetként egy kulcs-érték párt kap. Mivel esetünkben a kulcs üres, egyszerűen az Object-et definiáljuk kulcstípusként.

A Main metódusban elindítjuk a mapreduce feladatot és meghatározzuk a paramétereit - név, leképező és reduktor, HDFS-ben az elérési út, ahol a bemeneti adatok találhatók, és hova kell tenni az eredményt. A fordításhoz hadoop könyvtárakra van szükségünk. Építéshez a Maven-t használom, amihez a clouderának van tárolója. A beállítási útmutató itt található. Ennek eredményeként a pom.xmp fájl (amelyet a maven a projekt összeállításának leírására használ) a következőket kaptam:

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Állítsuk össze a projektet egy jar csomagba:

mvn clean package

A projekt jar fájlba építése után az indítás hasonló módon történik, mint a streaming felület esetében:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Várjuk a végrehajtást és ellenőrizzük az eredményt:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Ahogy sejtheti, a natív alkalmazásunk futtatásának eredménye ugyanaz, mint az előző módon elindított streaming alkalmazás eredménye.