BigData: Hadoop

На разположение

4.1 Обща информация за Hadoop

Парадигмата MapReduce беше предложена от Google през 2004 г. в неговата статия MapReduce: Опростена обработка на данни в големи клъстери . Тъй като предложената статия съдържаше описание на парадигмата, но внедряването липсваше, няколко програмисти от Yahoo предложиха внедряването им като част от работата по слабия уеб робот. Можете да прочетете повече за историята на Hadoop в статията Историята на Hadoop: От 4 възела до бъдещето на данните .

Първоначално Hadoop беше предимно инструмент за съхраняване на данни и изпълнение на задачи на MapReduce, но сега Hadoop е голям набор от технологии, свързани по един or друг начин с обработката на големи данни (не само с MapReduce).

Основните (основни) компоненти на Hadoop са:

  • Hadoop Distributed File System (HDFS) е разпределена файлова система, която ви позволява да съхранявате информация с почти неограничен размер.
  • Hadoop YARN е рамка за управление на клъстерни ресурси и управление на задачи, включително рамката MapReduce.
  • Hadoop общ

Има и голям брой проекти, пряко свързани с Hadoop, но не включени в ядрото на Hadoop:

  • Hive - инструмент за SQL-подобни заявки върху големи данни (превръща SQL заявките в поредица от MapReduce задачи);
  • Pig е език за програмиране за анализ на данни от високо ниво. Един ред code на този език може да се превърне в поредица от задачи на MapReduce;
  • Hbase е колонна база данни, която прилага парадигмата BigTable;
  • Cassandra е високопроизводителна разпределена база данни с ключ-стойност;
  • ZooKeeper е услуга за разпределено съхранение на конфигурация и синхронизиране на промените в конфигурацията;
  • Mahout е библиотека и двигател за машинно обучение с големи данни.

Отделно бих искал да отбележа проекта Apache Spark , който е двигател за разпределена обработка на данни. Apache Spark обикновено използва Hadoop компоненти като HDFS и YARN за своята работа, докато самият той наскоро стана по-популярен от Hadoop:

Някои от тези компоненти ще бъдат разгледани в отделни статии в тази поредица от материали, но засега нека да разгледаме How можете да започнете да работите с Hadoop и да го приложите на практика.

4.2 Изпълнение на програми MapReduce на Hadoop

Сега нека да разгледаме How да изпълним задача MapReduce на Hadoop. Като задача ще използваме класическия пример за WordCount , който беше разгледан в предишния урок.

Нека ви напомня формулировката на проблема: има набор от documentи. Необходимо е за всяка дума, срещаща се в набора от documentи, да се преброи колко пъти думата се среща в набора.

Решение:

Map разделя documentа на думи и връща набор от двойки (word, 1).

Намалете сумите на срещанията на всяка дума:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Сега задачата е да програмираме това решение под формата на code, който може да се изпълни на Hadoop и да се стартира.

4.3 Метод номер 1. Hadoop поточно предаване

Най-лесният начин да стартирате програма MapReduce на Hadoop е да използвате интерфейса за поточно предаване на Hadoop. Интерфейсът за поточно предаване предполага, че map и reduce се изпълняват като програми, които вземат данни от stdin и извеждат към stdout .

Програмата, която изпълнява функцията map се нарича mapper. Програмата, която изпълнява reduce, се нарича, съответно, reducer .

Интерфейсът за поточно предаване приема по подразбиране, че един входящ ред в картограф or редуктор съответства на един входящ запис за карта .

Изходът на картографа достига до входа на редуктора под формата на двойки (ключ, стойност), докато всички двойки съответстват на един и същ ключ:

  • Гарантирана обработка чрез еднократно пускане на редуктора;
  • Ще бъде подаден към входа в ред (тоест, ако един редуктор обработва няколко различни ключа, входът ще бъде групиран по ключ).

И така, нека внедрим картограф и редуктор в python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Данните, които Hadoop ще обработва, трябва да се съхраняват на HDFS. Нека да качим нашите статии и да ги поставим на HDFS. За да направите това, използвайте командата hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

Помощната програма hadoop fs поддържа голям брой методи за манипулиране на файловата система, много от които са идентични със стандартните помощни програми на Linux.

Сега нека започнем задачата за стрийминг:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Помощната програма yarn се използва за стартиране и управление на различни applications (включително базирани на картографиране) в клъстер. Hadoop-streaming.jar е само един пример за такова приложение на прежда.

Следват опциите за стартиране:

  • вход - папка с изходни данни на hdfs;
  • изход - папка на hdfs, където искате да поставите резултата;
  • файл - файлове, които са необходими по време на работата на задачата за намаляване на картата;
  • mapper е конзолната команда, която ще се използва за етапа на карта;
  • намали е конзолната команда, която ще се използва за етапа на намаляване.

След стартиране можете да видите напредъка на задачата в конзолата и URL за преглед на по-подробна информация за задачата.

В интерфейса, достъпен на този URL, можете да намерите по-подробен статус на изпълнение на задачата, да видите регистрационните файлове на всеки съпоставител и редуктор (което е много полезно в случай на неуспешни задачи).

Резултатът от работата след успешно изпълнение се добавя към HDFS в папката, която сме посочor в полето за изход. Можете да видите съдържанието му с помощта на командата "hadoop fs -ls lenta_wordcount".

Самият резултат може да се получи по следния начин:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

Командата "hadoop fs -text" показва съдържанието на папката в текстова форма. Сортирах резултата по броя на срещанията на думите. Очаквано, най-често срещаните думи в езика са предлозите.

4.4 Метод номер 2: използвайте Java

Самият Hadoop е написан на java, а собственият интерфейс на Hadoop също е базиран на java. Нека да покажем How изглежда естествено java приложение за wordcount:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Този клас прави точно същото като нашия пример на Python. Създаваме класовете TokenizerMapper и IntSumReducer, като произлизаме съответно от класовете Mapper и Reducer. Класовете, предавани като параметри на шаблона, указват типовете входни и изходни стойности. Естественият API приема, че на функцията map е дадена двойка ключ-стойност като вход. Тъй като в нашия случай ключът е празен, ние просто дефинираме Object като тип ключ.

В метода Main стартираме задачата mapreduce и дефинираме нейните параметри - име, мапър и редуктор, пътя в HDFS, къде се намират входните данни и къде да поставим резултата. За да компorраме, имаме нужда от hadoop библиотеки. Използвам Maven за изграждане, за което cloudera има хранorще. Инструкции за настройването му можете да намерите тук. В резултат на file pom.xmp (който се използва от maven за описание на сглобяването на проекта) получих следното):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Нека компorраме проекта в jar пакет:

mvn clean package

След изграждането на проекта в jar файл, стартирането става по подобен начин, Howто в случая с интерфейса за поточно предаване:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Изчакваме изпълнението и проверяваме резултата:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Както може би се досещате, резултатът от стартирането на нашето собствено приложение е същият като резултата от приложението за стрийминг, което стартирахме по предишния начин.

Коментари
  • Популярен
  • Нов
  • Стар
Трябва да сте влезли, за да оставите коментар
Тази страница все още няма коментари