5.1 Работа само с карта

Време е да опишем различни техники, които ви позволяват ефективно да използвате MapReduce за решаване на практически проблеми, Howто и да покажете някои от функциите на Hadoop, които могат да опростят разработката or значително да ускорят изпълнението на MapReduce задача на клъстер.

Както си спомняме, MapReduce се състои от етапи Map, Shuffle и Reduce. По правило етапът на разбъркване се оказва най-трудният в практическите задачи, тъй като данните се сортират на този етап. Всъщност има редица задачи, при които само етапът Карта може да бъде изоставен. Ето примери за такива задачи:

  • Филтриране на данни (например „Намиране на всички записи от IP address 123.123.123.123“ в регистрационните файлове на уеб сървъра);
  • Трансформация на данни („Изтриване на колона в csv-дневници“);
  • Зареждане и разтоварване на данни от външен източник („Вмъкване на всички записи от дневника в базата данни“).

Такива задачи се решават с помощта на Map-Only. Когато създавате задача само за карта в Hadoop, трябва да посочите нулев брой редуктори:

Пример за конфигурация на задача само за карта в hadoop:

собствен интерфейс Hadoop стрийминг интерфейс

Посочете нулев брой редуктори, когато конфигурирате job'a:

job.setNumReduceTasks(0); 

Ние не посочваме редуктор и посочваме нулев брой редуктори. Пример:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Работата само с карта всъщност може да бъде много полезна. Например, в платформата Facetz.DCA, за идентифициране на характеристиките на потребителите чрез тяхното поведение, се използва точно една голяма карта, като всеки картограф приема потребител като вход и връща неговите характеристики като изход.

5.2 Комбинирайте

Както вече писах, обикновено най-трудният етап при изпълнение на задача Map-Reduce е етапът на разбъркване. Това се случва, защото междинните резултати (изхода на картографа) се записват на диск, сортират се и се предават по мрежата. Има обаче задачи, при които подобно поведение не изглежда много разумно. Например, в една и съща задача за преброяване на думи в documentи, можете предварително да обобщите резултатите от изходите на няколко съпоставители на един възел за редуциране на карта на задачата и да предадете вече сумираните стойности за всяка машина към редуктора .

В hadoop за това можете да дефинирате комбинираща функция, която ще обработва изхода на част от съпоставителите. Комбиниращата функция е много подобна на намаляване - тя приема изхода на някои картографи като вход и произвежда обобщен резултат за тези картографи, така че редукторът често се използва и като комбинирач. Важна разлика от намалението е , че не всички стойности, съответстващи на един ключ, достигат до функцията за комбиниране .

Освен това hadoop не гарантира, че функцията за комбиниране изобщо ще бъде изпълнена за изхода на картографа. Следователно функцията за комбиниране не винаги е приложима, например в случай на търсене на средната стойност по ключ. Независимо от това, в тези задачи, където е приложима функцията за комбиниране, нейното използване позволява да се постигне значително увеличение на скоростта на задачата MapReduce.

Използване на Combiner на hadoop:

собствен интерфейс Hadoop стрийминг

Когато конфигурирате job-a, посочете class-Combiner. По правило е същото като редуктора:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Посочете командата -combiner в опциите на командния ред. Обикновено тази команда е същата като командата за редуктор. Пример:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Вериги от задачи на MapReduce

Има ситуации, когато един MapReduce не е достатъчен за решаване на проблем. Например, помислете за леко модифицирана задача WordCount: има набор от текстови documentи, трябва да преброите колко думи са се появor от 1 до 1000 пъти в набора, колко думи от 1001 до 2000, колко от 2001 до 3000, и така нататък. За решението ни трябват 2 работни места MapReduce:

  • Модифициран wordcount, който за всяка дума ще изчислява в кой от интервалите е попаднала;
  • MapReduce, който брои колко пъти всеки интервал е бил срещнат в изхода на първия MapReduce.

Решение на псевдоcode:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

За да изпълните последователност от MapReduce задачи на hadoop, достатъчно е просто да посочите папката, която е посочена като изход за първата, като вход за втората задача и да ги изпълните на свой ред.

На практика веригите от MapReduce задачи могат да бъдат доста сложни поредици, в които MapReduce задачите могат да бъдат свързани Howто последователно, така и паралелно една към друга. За да се опрости управлението на такива планове за изпълнение на задачи, има отделни инструменти като oozie и luigi, които ще бъдат обсъдени в отделна статия от тази серия.

5.4 Разпределен кеш

Важен механизъм в Hadoop е разпределеният кеш. Разпределеният кеш ви позволява да добавяте файлове (напр. текстови файлове, архиви, jar файлове) към средата, в която се изпълнява задачата MapReduce.

Можете да добавяте файлове, съхранявани на HDFS, локални файлове (локални за машината, от която се стартира задачата). Вече имплицитно показах How да използвам Distributed Cache с hadoop стрийминг, като добавих файловете mapper.py и reducer.py чрез опцията -file. Всъщност можете да добавите не само mapper.py и reducer.py, но произволни файлове като цяло и след това да ги използвате, сякаш са в локална папка.

Използване на разпределен кеш:

Роден API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop поточно предаване

#изброяваме файловете, които трябва да се добавят към разпределения кеш в параметъра –files. Опцията --files трябва да е преди другите опции.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

пример за използване:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Намаляване на присъединяването

Тези, които са свикнали да работят с релационни бази данни, често използват много удобната операция Join, която им позволява съвместно да обработват съдържанието на някои таблици, като ги обединяват според няHowъв ключ. Когато работите с големи данни, този проблем също понякога възниква. Разгледайте следния пример:

Има регистрационни файлове на два уеб сървъра, всеки регистрационен файл изглежда така:

t\t

Пример за фрагмент от регистрационен файл:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Необходимо е да се изчисли за всеки IP address кой от 2 сървъра е посещавал по-често. Резултатът трябва да бъде във формата:

\t

Пример за част от резултата:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

За съжаление, за разлика от релационните бази данни, като цяло свързването на два журнала по ключ (в този случай по IP address) е доста тежка операция и се решава с помощта на 3 MapReduce и модела Reduce Join:

ReduceJoin работи по следния начин:

1) За всеки от входните регистрационни файлове се стартира отделен MapReduce (само за карта), преобразуващ входните данни в следната форма:

key -> (type, value

Където key е ключът за свързване на таблици, Type е типът на tableта (първа or втора в нашия случай), а Value е всички допълнителни данни, свързани с ключа.

2) Резултатите от двете MapReduce се подават към входа на 3-тата MapReduce, която всъщност извършва обединението. Този MapReduce съдържа празен Mapper, който просто копира входа. След това shuffle разлага данните на ключове и ги подава към редуктора като вход:

key -> [(type, value)]

Важно е, че в този момент редукторът получава записи от двата лога и в същото време е възможно да се идентифицира чрез полето за тип от кой от двата лога идва определена стойност. Така че има достатъчно данни за решаване на първоначалния проблем. В нашия случай редукторът просто трябва да изчисли за всеки ключ за запис кой тип е срещнал повече и да изведе този тип.

5.6 MapJoins

Моделът ReduceJoin описва общия случай на свързване на два журнала чрез ключ. Има обаче специален случай, в който задачата може значително да се опрости и ускори. Това е случаят, в който един от трупите е значително по-малък от другия. Помислете за следния проблем:

Има 2 трупи. Първият дневник съдържа дневника на уеб сървъра (същия като в предишната задача), вторият файл (с размер 100 kb) съдържа URL-> съвпадение на тема. Пример 2-ри файл:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

За всеки IP address е необходимо да се изчисли страниците от коя категория от този IP address са се зареждали най-често.

В този случай също трябва да обединим 2 журнала чрез URL. В този случай обаче не е нужно да стартираме 3 MapReduce, тъй като вторият дневник ще се побере напълно в паметта. За да разрешим проблема с помощта на 1st MapReduce, можем да заредим втория журнал в Distributed Cache и когато Mapper се инициализира, просто да го прочетем в паметта, като го поставим в -> речника на темата.

Освен това проблемът се решава, Howто следва:

карта:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

намали:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce получава ip и списък с всички теми като вход, той просто изчислява коя от темите е срещана най-често. По този начин задачата се решава с помощта на 1st MapReduce и действителното присъединяване обикновено се извършва вътре в картата (следователно, ако не е необходимо допълнително агрегиране по ключ, задачата MapOnly може да бъде изоставена):