4.1 Algemene informatie over Hadoop
Het MapReduce-paradigma werd in 2004 door Google voorgesteld in het artikel MapReduce: Simplified Data Processing on Large Clusters . Omdat het voorgestelde artikel een beschrijving van het paradigma bevatte, maar de implementatie ontbrak, stelden verschillende programmeurs van Yahoo hun implementatie voor als onderdeel van het werk aan de nutch webcrawler. Meer over de geschiedenis van Hadoop lees je in het artikel De geschiedenis van Hadoop: van 4 nodes naar de toekomst van data .
Aanvankelijk was Hadoop vooral een hulpmiddel voor het opslaan van gegevens en het uitvoeren van MapReduce-taken, maar nu is Hadoop een grote stapel technologieën die op de een of andere manier verband houden met het verwerken van big data (niet alleen met MapReduce).
De belangrijkste (kern)componenten van Hadoop zijn:
- Hadoop Distributed File System (HDFS) is een gedistribueerd bestandssysteem waarmee u informatie van bijna onbeperkte grootte kunt opslaan.
- Hadoop YARN is een raamwerk voor clusterresourcebeheer en taakbeheer, inclusief het MapReduce-raamwerk.
- Hadoop gebruikelijk
Er zijn ook een groot aantal projecten die direct verband houden met Hadoop, maar niet zijn opgenomen in de Hadoop-kern:
- Hive - een tool voor SQL-achtige query's over big data (maakt van SQL-query's een reeks MapReduce-taken);
- Pig is een programmeertaal voor data-analyse op hoog niveau. Eén regel code in deze taal kan veranderen in een reeks MapReduce-taken;
- Hbase is een kolomvormige database die het BigTable-paradigma implementeert;
- Cassandra is een hoogwaardige gedistribueerde sleutel-waardedatabase;
- ZooKeeper is een service voor gedistribueerde configuratieopslag en synchronisatie van configuratiewijzigingen;
- Mahout is een machine learning-bibliotheek en engine voor big data.
Afzonderlijk zou ik het Apache Spark- project willen noemen , een motor voor gedistribueerde gegevensverwerking. Apache Spark gebruikt doorgaans Hadoop-componenten zoals HDFS en YARN voor zijn werk, terwijl het zelf recentelijk populairder is geworden dan Hadoop:
Sommige van deze componenten zullen in afzonderlijke artikelen in deze reeks materialen worden behandeld, maar laten we nu eens kijken hoe u met Hadoop kunt gaan werken en het in de praktijk kunt brengen.
4.2 MapReduce-programma's uitvoeren op Hadoop
Laten we nu eens kijken hoe we een MapReduce-taak op Hadoop kunnen uitvoeren. Als taak zullen we het klassieke WordCount- voorbeeld gebruiken , dat in de vorige les is besproken.
Laat me u herinneren aan de formulering van het probleem: er is een reeks documenten. Voor elk woord dat in de set documenten voorkomt, moet worden geteld hoe vaak het woord in de set voorkomt.
Oplossing:
Kaart splitst het document op in woorden en retourneert een reeks paren (woord, 1).
Verminder sommen de voorkomens van elk woord:
|
|
Nu is het de taak om deze oplossing te programmeren in de vorm van code die op Hadoop kan worden uitgevoerd en uitgevoerd.
4.3 Methode nummer 1. Hadoop-streaming
De eenvoudigste manier om een MapReduce-programma op Hadoop uit te voeren, is door de Hadoop-streaminginterface te gebruiken. De streaming-interface gaat ervan uit dat map en reduce zijn geïmplementeerd als programma's die gegevens van stdin en uitvoer naar stdout halen .
Het programma dat de kaartfunctie uitvoert, wordt mapper genoemd. Het programma dat reduce uitvoert, wordt respectievelijk reducer genoemd .
De Streaming-interface gaat er standaard van uit dat één inkomende regel in een mapper of reducer overeenkomt met één inkomende invoer voor map .
De output van de mapper komt bij de input van de reducer in de vorm van paren (sleutel, waarde), terwijl alle paren overeenkomen met dezelfde sleutel:
- Gegarandeerd verwerkt door een enkele lancering van het verloopstuk;
- Wordt achtereenvolgens aan de invoer onderworpen (dat wil zeggen, als een verkleiner meerdere verschillende sleutels verwerkt, wordt de invoer gegroepeerd op sleutel).
Dus laten we mapper en reducer in python implementeren:
#mapper.py
import sys
def do_map(doc):
for word in doc.split():
yield word.lower(), 1
for line in sys.stdin:
for key, value in do_map(line):
print(key + "\t" + str(value))
#reducer.py
import sys
def do_reduce(word, values):
return word, sum(values)
prev_key = None
values = []
for line in sys.stdin:
key, value = line.split("\t")
if key != prev_key and prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
values = []
prev_key = key
values.append(int(value))
if prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
De gegevens die Hadoop gaat verwerken, moeten worden opgeslagen op HDFS. Laten we onze artikelen uploaden en op HDFS zetten. Gebruik hiervoor het hadoop fs -commando :
wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz
tar xzvf lenta_articles.tar.gz
hadoop fs -put lenta_articles
Het hadoop fs-hulpprogramma ondersteunt een groot aantal methoden voor het manipuleren van het bestandssysteem, waarvan vele identiek zijn aan de standaard Linux-hulpprogramma's.
Laten we nu beginnen met de streamingtaak:
yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\
-input lenta_articles\
-output lenta_wordcount\
-file mapper.py\
-file reducer.py\
-mapper "python mapper.py"\
-reducer "python reducer.py"
Het garenhulpprogramma wordt gebruikt om verschillende applicaties (waaronder op basis van map-reduce) op een cluster te starten en te beheren. Hadoop-streaming.jar is slechts één voorbeeld van zo'n garentoepassing.
Hierna volgen de opstartopties:
- invoer - map met brongegevens op hdfs;
- uitvoer - map op hdfs waar u het resultaat wilt plaatsen;
- bestand - bestanden die nodig zijn tijdens de uitvoering van de kaartvermindertaak;
- mapper is het consolecommando dat zal worden gebruikt voor de kaartfase;
- reduce is het consolecommando dat zal worden gebruikt voor de reduce-fase.
Na het starten kunt u de voortgang van de taak in de console zien en een URL voor meer gedetailleerde informatie over de taak.
In de interface die beschikbaar is op deze URL, kunt u een meer gedetailleerde taakuitvoeringsstatus vinden, de logboeken bekijken van elke mapper en reducer (wat erg handig is in het geval van mislukte taken).
Het resultaat van het werk na succesvolle uitvoering wordt toegevoegd aan HDFS in de map die we hebben opgegeven in het uitvoerveld. U kunt de inhoud bekijken met de opdracht "hadoop fs -ls lenta_wordcount".
Het resultaat zelf kan als volgt worden verkregen:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
from
41
this
43
on
82
and
111
into
194
De opdracht "hadoop fs -text" geeft de inhoud van de map in tekstvorm weer. Ik heb het resultaat gesorteerd op het aantal keren dat de woorden voorkomen. Zoals verwacht zijn de meest voorkomende woorden in de taal voorzetsels.
4.4 Methode nummer 2: gebruik Java
Hadoop zelf is geschreven in Java en de native interface van Hadoop is ook op Java gebaseerd. Laten we laten zien hoe een native Java-toepassing voor wordcount eruit ziet:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Deze klasse doet precies hetzelfde als ons Python-voorbeeld. We maken de klassen TokenizerMapper en IntSumReducer door af te leiden van respectievelijk de klassen Mapper en Reducer. De klassen die als sjabloonparameters worden doorgegeven, specificeren de typen invoer- en uitvoerwaarden. De native API gaat ervan uit dat de kaartfunctie een sleutel-waardepaar als invoer krijgt. Omdat in ons geval de sleutel leeg is, definiëren we gewoon Object als het sleuteltype.
In de Main-methode starten we de mapreduce-taak en definiëren we de parameters - naam, mapper en reducer, het pad in HDFS, waar de invoergegevens zich bevinden en waar het resultaat moet worden geplaatst. Om te compileren hebben we hadoop-bibliotheken nodig. Ik gebruik Maven om te bouwen, waarvoor cloudera een repository heeft. Instructies voor het instellen vindt u hier. Als resultaat kreeg ik het pom.xmp-bestand (dat door maven wordt gebruikt om de assemblage van het project te beschrijven) het volgende):
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
</dependencies>
<groupId>org.dca.examples</groupId>
<artifactId>wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
</project>
Laten we het project in een jar-pakket compileren:
mvn clean package
Nadat het project in een jar-bestand is gebouwd, vindt de lancering op een vergelijkbare manier plaats, zoals in het geval van de streaming-interface:
yarn jar wordcount-1.0-SNAPSHOT.jar WordCount
We wachten op uitvoering en controleren het resultaat:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
from
41
this
43
on
82
and
111
into
194
Zoals je misschien wel kunt raden, is het resultaat van het uitvoeren van onze native applicatie hetzelfde als het resultaat van de streaming-applicatie die we op de vorige manier hebben gelanceerd.
GO TO FULL VERSION