4.1 Generel information om Hadoop

MapReduce-paradigmet blev foreslået af Google i 2004 i sin artikel MapReduce: Simplified Data Processing on Large Clusters . Da den foreslåede artikel indeholdt en beskrivelse af paradigmet, men implementeringen manglede, foreslog flere programmører fra Yahoo deres implementering som en del af arbejdet med nutch webcrawleren. Du kan læse mere om Hadoops historie i artiklen The history of Hadoop: From 4 nodes to the future of data .

I starten var Hadoop primært et værktøj til at gemme data og køre MapReduce-opgaver, men nu er Hadoop en stor stak af teknologier, der på den ene eller anden måde er relateret til behandling af big data (ikke kun med MapReduce).

De vigtigste (kerne) komponenter i Hadoop er:

  • Hadoop Distributed File System (HDFS) er et distribueret filsystem, der giver dig mulighed for at gemme information af næsten ubegrænset størrelse.
  • Hadoop YARN er en ramme for cluster ressource management og opgavestyring, herunder MapReduce rammen.
  • Hadoop almindelig

Der er også et stort antal projekter direkte relateret til Hadoop, men ikke inkluderet i Hadoop-kernen:

  • Hive - et værktøj til SQL-lignende forespørgsler over big data (gør SQL-forespørgsler til en række MapReduce-opgaver);
  • Pig er et programmeringssprog til dataanalyse på højt niveau. En linje kode på dette sprog kan blive til en sekvens af MapReduce-opgaver;
  • Hbase er en kolonneformet database, der implementerer BigTable-paradigmet;
  • Cassandra er en højtydende distribueret nøgleværdidatabase;
  • ZooKeeper er en service til distribueret konfigurationslagring og synkronisering af konfigurationsændringer;
  • Mahout er et maskinlæringsbibliotek og -motor med store data.

Separat vil jeg bemærke Apache Spark- projektet , som er en motor til distribueret databehandling. Apache Spark bruger typisk Hadoop-komponenter som HDFS og YARN til sit arbejde, mens selv for nylig er blevet mere populær end Hadoop:

Nogle af disse komponenter vil blive dækket i separate artikler i denne materialeserie, men lad os lige nu se på, hvordan du kan begynde at arbejde med Hadoop og omsætte det i praksis.

4.2 Kørsel af MapReduce-programmer på Hadoop

Lad os nu se på, hvordan man kører en MapReduce-opgave på Hadoop. Som opgave vil vi bruge det klassiske WordCount- eksempel , som blev diskuteret i forrige lektion.

Lad mig minde dig om formuleringen af ​​problemet: Der er et sæt dokumenter. Det er nødvendigt for hvert ord, der forekommer i sættet af dokumenter, at tælle, hvor mange gange ordet forekommer i sættet.

Løsning:

Kort opdeler dokumentet i ord og returnerer et sæt par (ord, 1).

Reducer summen af ​​forekomsten af ​​hvert ord:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Nu er opgaven at programmere denne løsning i form af kode, der kan eksekveres på Hadoop og køre.

4.3 Metode nummer 1. Hadoop streaming

Den nemmeste måde at køre et MapReduce-program på Hadoop er at bruge Hadoop-streaminggrænsefladen. Streaminggrænsefladen antager, at map og reduce implementeres som programmer, der tager data fra stdin og output til stdout .

Det program, der udfører kortfunktionen, kaldes mapper. Programmet, der udfører reducering , kaldes henholdsvis reducer .

Streaming-grænsefladen antager som standard, at én indgående linje i en mapper eller reducering svarer til én indgående post for kort .

Outputtet fra mapperen kommer til input fra reducereren i form af par (nøgle, værdi), mens alle par svarer til den samme nøgle:

  • Garanteret at blive behandlet af en enkelt lancering af reduceringen;
  • Sendes til input i en række (det vil sige, hvis en reducering behandler flere forskellige nøgler, vil inputtet blive grupperet efter nøgle).

Så lad os implementere mapper og reducer i python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

De data, som Hadoop vil behandle, skal gemmes på HDFS. Lad os uploade vores artikler og lægge dem på HDFS. For at gøre dette skal du bruge kommandoen hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

Hadoop fs-værktøjet understøtter et stort antal metoder til at manipulere filsystemet, hvoraf mange er identiske med standard linux-værktøjer.

Lad os nu starte streamingopgaven:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Garnværktøjet bruges til at starte og administrere forskellige applikationer (inklusive kort-reducer baseret) på en klynge. Hadoop-streaming.jar er blot et eksempel på sådan en garnapplikation.

Dernæst er lanceringsmulighederne:

  • input - mappe med kildedata på hdfs;
  • output - mappe på hdfs, hvor du vil placere resultatet;
  • fil - filer, der er nødvendige under driften af ​​kortreducerende opgave;
  • mapper er konsolkommandoen, der vil blive brugt til kortfasen;
  • reduce er konsolkommandoen, der vil blive brugt til reduktionstrinnet.

Efter lancering kan du se status for opgaven i konsollen og en URL til at se mere detaljerede oplysninger om opgaven.

I grænsefladen, der er tilgængelig på denne URL, kan du finde ud af en mere detaljeret opgaveudførelsesstatus, se logfilerne for hver mapper og reducer (hvilket er meget nyttigt i tilfælde af mislykkede opgaver).

Resultatet af arbejdet efter vellykket udførelse føjes til HDFS i den mappe, som vi har angivet i outputfeltet. Du kan se dens indhold ved at bruge kommandoen "hadoop fs -ls lenta_wordcount".

Selve resultatet kan opnås som følger:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

Kommandoen "hadoop fs -text" viser indholdet af mappen i tekstform. Jeg sorterede resultatet efter antallet af forekomster af ordene. Som forventet er de mest almindelige ord i sproget præpositioner.

4.4 Metode nummer 2: brug Java

Hadoop selv er skrevet i java, og Hadoops native grænseflade er også java-baseret. Lad os vise, hvordan en native java-applikation til wordcount ser ud:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Denne klasse gør nøjagtig det samme som vores Python-eksempel. Vi opretter TokenizerMapper- og IntSumReducer-klasserne ved at udlede fra henholdsvis Mapper- og Reducer-klasserne. Klasserne, der sendes som skabelonparametre, specificerer typerne af input- og outputværdier. Den native API antager, at kortfunktionen får et nøgle-værdi-par som input. Da nøglen i vores tilfælde er tom, definerer vi blot Objekt som nøgletypen.

I Main-metoden starter vi mapreduce-opgaven og definerer dens parametre - navn, mapper og reducer, stien i HDFS, hvor inputdataene er placeret og hvor resultatet skal placeres. For at kompilere har vi brug for hadoop-biblioteker. Jeg bruger Maven til at bygge, som cloudera har et depot til. Vejledning til opsætning kan findes her. Som et resultat, pom.xmp-filen (som bruges af maven til at beskrive samlingen af ​​projektet) fik jeg følgende):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Lad os kompilere projektet i en krukkepakke:

mvn clean package

Efter at have bygget projektet ind i en jar-fil, sker lanceringen på samme måde, som i tilfældet med streaminggrænsefladen:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Vi venter på udførelse og kontrollerer resultatet:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Som du måske gætter, er resultatet af at køre vores oprindelige applikation det samme som resultatet af streamingapplikationen, som vi lancerede på den tidligere måde.