4.1 Generell informasjon om Hadoop

MapReduce-paradigmet ble foreslått av Google i 2004 i artikkelen MapReduce: Simplified Data Processing on Large Clusters . Siden den foreslåtte artikkelen inneholdt en beskrivelse av paradigmet, men implementeringen manglet, foreslo flere programmerere fra Yahoo implementeringen av dem som en del av arbeidet med nutch webcrawler. Du kan lese mer om historien til Hadoop i artikkelen The history of Hadoop: From 4 nodes to the future of data .

I utgangspunktet var Hadoop først og fremst et verktøy for å lagre data og kjøre MapReduce-oppgaver, men nå er Hadoop en stor stabel med teknologier knyttet på en eller annen måte til behandling av store data (ikke bare med MapReduce).

Hovedkomponentene (kjerne) i Hadoop er:

  • Hadoop Distributed File System (HDFS) er et distribuert filsystem som lar deg lagre informasjon av nesten ubegrenset størrelse.
  • Hadoop YARN er et rammeverk for cluster resource management og oppgavestyring, inkludert MapReduce-rammeverket.
  • Hadoop vanlig

Det er også et stort antall prosjekter direkte relatert til Hadoop, men ikke inkludert i Hadoop-kjernen:

  • Hive - et verktøy for SQL-lignende spørringer over store data (gjør SQL-spørringer til en rekke MapReduce-oppgaver);
  • Pig er et programmeringsspråk for dataanalyse på høyt nivå. Én linje med kode på dette språket kan bli til en sekvens av MapReduce-oppgaver;
  • Hbase er en kolonneformet database som implementerer BigTable-paradigmet;
  • Cassandra er en høyytelses distribuert nøkkelverdi-database;
  • ZooKeeper er en tjeneste for distribuert konfigurasjonslagring og synkronisering av konfigurasjonsendringer;
  • Mahout er et maskinlæringsbibliotek og -motor for store data.

Separat vil jeg merke meg Apache Spark- prosjektet , som er en motor for distribuert databehandling. Apache Spark bruker vanligvis Hadoop-komponenter som HDFS og YARN for arbeidet sitt, mens selv nylig har blitt mer populært enn Hadoop:

Noen av disse komponentene vil bli dekket i separate artikler i denne serien av materialer, men foreløpig, la oss se på hvordan du kan begynne å jobbe med Hadoop og sette det i praksis.

4.2 Kjøre MapReduce-programmer på Hadoop

La oss nå se på hvordan du kjører en MapReduce-oppgave på Hadoop. Som en oppgave vil vi bruke det klassiske WordCount- eksemplet , som ble diskutert i forrige leksjon.

La meg minne deg på formuleringen av problemet: det er et sett med dokumenter. Det er nødvendig for hvert ord som forekommer i settet med dokumenter å telle hvor mange ganger ordet forekommer i settet.

Løsning:

Kart deler opp dokumentet i ord og returnerer et sett med par (ord, 1).

Reduser summen av forekomsten av hvert ord:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Nå er oppgaven å programmere denne løsningen i form av kode som kan kjøres på Hadoop og kjøres.

4.3 Metode nummer 1. Hadoop Streaming

Den enkleste måten å kjøre et MapReduce-program på Hadoop er å bruke Hadoop-strømmegrensesnittet. Streaming-grensesnittet forutsetter at kart og redusering er implementert som programmer som tar data fra stdin og output til stdout .

Programmet som utfører kartfunksjonen kalles mapper. Programmet som kjører redusering kalles henholdsvis redusering .

Streaming-grensesnittet antar som standard at én innkommende linje i en mapper eller redusering tilsvarer én innkommende oppføring for kart .

Utgangen fra kartleggeren kommer til inngangen til reduseringen i form av par (nøkkel, verdi), mens alle parene tilsvarer den samme nøkkelen:

  • Garantert å bli behandlet av en enkelt lansering av reduksjonsanordningen;
  • Sendes til inngangen på rad (det vil si at hvis en redusering behandler flere forskjellige nøkler, vil inndata bli gruppert etter nøkkel).

Så la oss implementere mapper og redusering i python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Dataene som Hadoop skal behandle må lagres på HDFS. La oss laste opp artiklene våre og legge dem på HDFS. For å gjøre dette, bruk hadoop fs- kommandoen :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

Hadoop fs-verktøyet støtter et stort antall metoder for å manipulere filsystemet, hvorav mange er identiske med standard linux-verktøy.

La oss nå starte strømmeoppgaven:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Garnverktøyet brukes til å starte og administrere ulike applikasjoner (inkludert kartreduseringsbasert) på en klynge. Hadoop-streaming.jar er bare ett eksempel på en slik garnapplikasjon.

Neste er lanseringsalternativene:

  • input - mappe med kildedata på hdfs;
  • output - mappe på hdfs der du vil legge resultatet;
  • fil - filer som trengs under driften av kartreduksjonsoppgaven;
  • mapper er konsollkommandoen som skal brukes for kartstadiet;
  • reduser er konsollkommandoen som skal brukes for reduseringstrinnet.

Etter oppstart kan du se fremdriften til oppgaven i konsollen og en URL for å se mer detaljert informasjon om oppgaven.

I grensesnittet som er tilgjengelig på denne URL-adressen, kan du finne ut en mer detaljert oppgaveutførelsesstatus, se loggene til hver kartlegger og redusering (noe som er veldig nyttig i tilfelle mislykkede oppgaver).

Resultatet av arbeidet etter vellykket utførelse legges til HDFS i mappen som vi spesifiserte i utdatafeltet. Du kan se innholdet ved å bruke kommandoen "hadoop fs -ls lenta_wordcount".

Selve resultatet kan oppnås som følger:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

Kommandoen "hadoop fs -text" viser innholdet i mappen i tekstform. Jeg sorterte resultatet etter antall forekomster av ordene. Som forventet er de vanligste ordene i språket preposisjoner.

4.4 Metode nummer 2: bruk Java

Hadoop i seg selv er skrevet i java, og Hadoops opprinnelige grensesnitt er også java-basert. La oss vise hvordan en innebygd java-applikasjon for ordtelling ser ut:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Denne klassen gjør akkurat det samme som vårt Python-eksempel. Vi lager TokenizerMapper- og IntSumReducer-klassene ved å utlede fra henholdsvis Mapper- og Reducer-klassene. Klassene som sendes som malparametere spesifiserer typene inngangs- og utdataverdier. Det opprinnelige API-et antar at kartfunksjonen er gitt et nøkkelverdi-par som input. Siden nøkkelen i vårt tilfelle er tom, definerer vi ganske enkelt Objekt som nøkkeltypen.

I hovedmetoden starter vi mapreduce-oppgaven og definerer dens parametere - navn, mapper og redusering, banen i HDFS, hvor inndataene er plassert og hvor resultatet skal plasseres. For å kompilere trenger vi hadoop-biblioteker. Jeg bruker Maven til å bygge, som cloudera har et depot for. Instruksjoner for oppsett finner du her. Som et resultat, pom.xmp-filen (som brukes av maven for å beskrive sammenstillingen av prosjektet) fikk jeg følgende):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

La oss kompilere prosjektet til en krukkepakke:

mvn clean package

Etter å ha bygget prosjektet inn i en jar-fil, skjer lanseringen på en lignende måte, som i tilfellet med strømmegrensesnittet:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Vi venter på utførelse og sjekker resultatet:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Som du kanskje gjetter, er resultatet av å kjøre vår opprinnelige applikasjon det samme som resultatet av strømmeapplikasjonen som vi lanserte på forrige måte.