4.1 Allmän information om Hadoop

MapReduce-paradigmet föreslogs av Google 2004 i sin artikel MapReduce: Simplified Data Processing on Large Clusters . Eftersom den föreslagna artikeln innehöll en beskrivning av paradigmet, men implementeringen saknades, föreslog flera programmerare från Yahoo deras implementering som en del av arbetet med nutch webcrawler. Du kan läsa mer om Hadoops historia i artikeln The history of Hadoop: From 4 nodes to the future of data .

Hadoop var från början i första hand ett verktyg för att lagra data och köra MapReduce-uppgifter, men nu är Hadoop en stor hög med teknologier relaterade på ett eller annat sätt till bearbetning av big data (inte bara med MapReduce).

De viktigaste (kärn)komponenterna i Hadoop är:

  • Hadoop Distributed File System (HDFS) är ett distribuerat filsystem som låter dig lagra information av nästan obegränsad storlek.
  • Hadoop YARN är ett ramverk för klusterresurshantering och uppgiftshantering, inklusive MapReduce-ramverket.
  • Hadoop vanligt

Det finns också ett stort antal projekt direkt relaterade till Hadoop, men som inte ingår i Hadoop-kärnan:

  • Hive - ett verktyg för SQL-liknande frågor över big data (förvandlar SQL-frågor till en serie MapReduce-uppgifter);
  • Pig är ett programmeringsspråk för dataanalys på hög nivå. En rad kod på detta språk kan förvandlas till en sekvens av MapReduce-uppgifter;
  • Hbase är en kolumnär databas som implementerar BigTable-paradigmet;
  • Cassandra är en högpresterande distribuerad nyckel-värdedatabas;
  • ZooKeeper är en tjänst för distribuerad konfigurationslagring och synkronisering av konfigurationsändringar;
  • Mahout är ett maskininlärningsbibliotek och motor för stora data.

Separat skulle jag vilja notera Apache Spark -projektet , som är en motor för distribuerad databehandling. Apache Spark använder vanligtvis Hadoop-komponenter som HDFS och YARN för sitt arbete, medan det nyligen har blivit mer populärt än Hadoop:

Några av dessa komponenter kommer att behandlas i separata artiklar i denna materialserie, men nu ska vi titta på hur du kan börja arbeta med Hadoop och omsätta det i praktiken.

4.2 Köra MapReduce-program på Hadoop

Låt oss nu titta på hur man kör en MapReduce-uppgift på Hadoop. Som en uppgift kommer vi att använda det klassiska WordCount- exemplet , som diskuterades i föregående lektion.

Låt mig påminna er om formuleringen av problemet: det finns en uppsättning dokument. Det är nödvändigt för varje ord som förekommer i uppsättningen av dokument att räkna hur många gånger ordet förekommer i uppsättningen.

Lösning:

Map delar upp dokumentet i ord och returnerar en uppsättning par (ord, 1).

Minska summan av förekomsten av varje ord:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Nu är uppgiften att programmera denna lösning i form av kod som kan exekveras på Hadoop och köras.

4.3 Metod nummer 1. Hadoop Streaming

Det enklaste sättet att köra ett MapReduce-program på Hadoop är att använda Hadoop-strömningsgränssnittet. Strömningsgränssnittet förutsätter att map and reduce implementeras som program som tar data från stdin och output till stdout .

Programmet som kör kartfunktionen kallas mapper. Programmet som kör reducering kallas respektive reducer .

Strömningsgränssnittet antar som standard att en inkommande rad i en mappare eller reducerare motsvarar en inkommande post för kartan .

Utdata från mapparen kommer till ingången av reduceraren i form av par (nyckel, värde), medan alla par som motsvarar samma nyckel:

  • Garanterat att bearbetas av en enda lansering av reduceraren;
  • Kommer att skickas till ingången i rad (det vill säga om en reducerare bearbetar flera olika nycklar, kommer ingången att grupperas efter nyckel).

Så låt oss implementera mapper och reducerare i python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Den data som Hadoop kommer att behandla måste lagras på HDFS. Låt oss ladda upp våra artiklar och lägga dem på HDFS. För att göra detta, använd kommandot hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

Verktyget hadoop fs stöder ett stort antal metoder för att manipulera filsystemet, varav många är identiska med standardlinux-verktygen.

Låt oss nu börja streama uppgiften:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Garnverktyget används för att starta och hantera olika applikationer (inklusive map-reduce-baserade) på ett kluster. Hadoop-streaming.jar är bara ett exempel på en sådan garnapplikation.

Nästa är startalternativen:

  • input - mapp med källdata på hdfs;
  • output - mapp på hdfs där du vill lägga resultatet;
  • fil - filer som behövs under driften av kartreduceringsuppgiften;
  • mapper är konsolkommandot som kommer att användas för kartstadiet;
  • reducera är konsolkommandot som kommer att användas för reduceringssteget.

Efter att ha startat kan du se aktivitetens förlopp i konsolen och en URL för att se mer detaljerad information om uppgiften.

I gränssnittet som är tillgängligt på denna URL kan du ta reda på en mer detaljerad aktivitetskörningsstatus, se loggarna för varje mappare och reducerare (vilket är mycket användbart vid misslyckade uppgifter).

Resultatet av arbetet efter framgångsrik exekvering läggs till HDFS i mappen som vi angav i utdatafältet. Du kan se dess innehåll med kommandot "hadoop fs -ls lenta_wordcount".

Själva resultatet kan erhållas enligt följande:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

Kommandot "hadoop fs -text" visar innehållet i mappen i textform. Jag sorterade resultatet efter antalet förekomster av orden. Som väntat är de vanligaste orden i språket prepositioner.

4.4 Metod nummer 2: använd Java

Hadoop i sig är skrivet i java, och Hadoops inbyggda gränssnitt är också javabaserat. Låt oss visa hur en inbyggd java-applikation för wordcount ser ut:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Den här klassen gör exakt samma sak som vårt Python-exempel. Vi skapar klasserna TokenizerMapper och IntSumReducer genom att härleda från klasserna Mapper och Reducer. Klasserna som skickas som mallparametrar anger typerna av ingångs- och utdatavärden. Det inbyggda API:et antar att kartfunktionen ges ett nyckel-värdepar som indata. Eftersom i vårt fall nyckeln är tom, definierar vi helt enkelt Object som nyckeltypen.

I Main-metoden startar vi mapreduce-uppgiften och definierar dess parametrar - namn, mapper och reducerare, sökvägen i HDFS, var indata finns och var resultatet ska placeras. För att kompilera behöver vi hadoop-bibliotek. Jag använder Maven för att bygga, vilket cloudera har ett förråd för. Instruktioner för att ställa in den finns här. Som ett resultat, filen pom.xmp (som används av maven för att beskriva sammansättningen av projektet) fick jag följande):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Låt oss sammanställa projektet till ett burkpaket:

mvn clean package

Efter att ha byggt projektet till en jar-fil, sker lanseringen på ett liknande sätt, som i fallet med streaming-gränssnittet:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Vi väntar på utförande och kontrollerar resultatet:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Som du kanske gissar är resultatet av att köra vår inbyggda applikation detsamma som resultatet av streamingapplikationen som vi lanserade på det tidigare sättet.