5.1 Praca tylko na mapie

Pora opisać różne techniki, które pozwalają efektywnie wykorzystać MapReduce do rozwiązywania praktycznych problemów, a także pokazać niektóre cechy Hadoop, które mogą uprościć programowanie lub znacznie przyspieszyć wykonanie zadania MapReduce na klastrze.

Jak pamiętamy, MapReduce składa się z etapów Map, Shuffle i Reduce. Z reguły w zadaniach praktycznych etap Shuffle okazuje się najtrudniejszy, ponieważ na tym etapie dane są sortowane. W rzeczywistości istnieje wiele zadań, w przypadku których można zrezygnować z samego etapu mapy. Oto przykłady takich zadań:

  • Filtrowanie danych (np. „Znajdź wszystkie wpisy z adresu IP 123.123.123.123” w logach serwera WWW);
  • Transformacja danych („Usuń kolumnę w dziennikach csv”);
  • Wczytywanie i wyładowywanie danych z zewnętrznego źródła („Wstaw wszystkie rekordy z logu do bazy danych”).

Takie zadania są rozwiązywane za pomocą Map-Only. Tworząc zadanie Map-Only w Hadoop, musisz określić zerową liczbę reduktorów:

Przykład konfiguracji zadania opartego tylko na mapie na platformie hadoop:

natywny interfejs Interfejs przesyłania strumieniowego Hadoop

Określ zerową liczbę reduktorów podczas konfigurowania job'a:

job.setNumReduceTasks(0); 

Nie określamy reduktora i określamy zerową liczbę reduktorów. Przykład:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Zadania tylko na mapie mogą być bardzo przydatne. Na przykład na platformie Facetz.DCA, aby zidentyfikować cechy użytkowników na podstawie ich zachowania, używana jest dokładnie jedna duża mapa, z której każdy mapper przyjmuje użytkownika jako dane wejściowe i zwraca jego cechy jako dane wyjściowe.

5.2 Połącz

Jak już pisałem, zwykle najtrudniejszym etapem podczas wykonywania zadania Map-Reduce jest etap przetasowania. Dzieje się tak, ponieważ wyniki pośrednie (wyjście mapera) są zapisywane na dysku, sortowane i przesyłane przez sieć. Są jednak zadania, w których takie zachowanie nie wydaje się zbyt rozsądne. Na przykład w tym samym zadaniu liczenia słów w dokumentach możesz wstępnie zagregować wyniki wyjść kilku maperów na jednym węźle mapowania zadania i przekazać już zsumowane wartości dla każdej maszyny do reduktora .

W tym celu w hadoop można zdefiniować funkcję łączącą, która będzie przetwarzać dane wyjściowe części maperów. Funkcja łączenia jest bardzo podobna do funkcji reduce — pobiera dane wyjściowe niektórych maperów jako dane wejściowe i tworzy zagregowany wynik dla tych maperów, więc reduktor jest często używany również jako sumator. Istotną różnicą w stosunku do reduce jest to, że nie wszystkie wartości odpowiadające jednemu kluczowi trafiają do funkcji łączącej .

Co więcej, hadoop nie gwarantuje, że funkcja łączenia zostanie w ogóle wykonana dla danych wyjściowych mappera. Dlatego funkcja łączenia nie zawsze ma zastosowanie, na przykład w przypadku wyszukiwania wartości mediany po kluczu. Niemniej jednak w tych zadaniach, w których ma zastosowanie funkcja łączenia, jej użycie pozwala na osiągnięcie znacznego wzrostu szybkości zadania MapReduce.

Używanie Combinera na hadoop:

natywny interfejs Transmisja strumieniowa Hadoop

Podczas konfigurowania zadania-a określ klasę-Combiner. Z reguły jest to to samo co Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Określ komendę -combiner w opcjach wiersza komend. Zwykle to polecenie jest takie samo, jak polecenie zmniejszania. Przykład:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Łańcuchy zadań MapReduce

Są sytuacje, w których jeden MapReduce to za mało, aby rozwiązać problem. Weźmy na przykład nieco zmodyfikowane zadanie WordCount: jest zestaw dokumentów tekstowych, trzeba policzyć ile słów wystąpiło od 1 do 1000 razy w zestawie, ile słów od 1001 do 2000, ile od 2001 do 3000, i tak dalej. Do rozwiązania potrzebujemy 2 zadań MapReduce:

  • Zmodyfikowana liczba słów, która dla każdego słowa obliczy, w którym z przedziałów się znalazła;
  • MapReduce, który zlicza, ile razy każdy interwał został napotkany w danych wyjściowych pierwszego MapReduce.

Rozwiązanie pseudokodowe:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Aby wykonać sekwencję zadań MapReduce na hadoop, wystarczy wskazać folder, który został określony jako wyjście dla pierwszego zadania jako dane wejściowe dla drugiego zadania i uruchomić je po kolei.

W praktyce łańcuchy zadań MapReduce mogą być dość złożonymi sekwencjami, w których zadania MapReduce można łączyć zarówno sekwencyjnie, jak i równolegle do siebie. Aby uprościć zarządzanie takimi planami wykonania zadań, istnieją osobne narzędzia, takie jak oozie i luigi, które zostaną omówione w osobnym artykule z tej serii.

5.4 Rozproszona pamięć podręczna

Ważnym mechanizmem w Hadoop jest rozproszona pamięć podręczna. Rozproszona pamięć podręczna umożliwia dodawanie plików (np. plików tekstowych, archiwów, plików jar) do środowiska, w którym uruchomione jest zadanie MapReduce.

Możesz dodać pliki przechowywane na HDFS, pliki lokalne (lokalne dla maszyny, z której uruchamiane jest zadanie). Już pośrednio pokazałem, jak używać rozproszonej pamięci podręcznej ze strumieniowaniem hadoop, dodając pliki mapper.py i reducer.py za pomocą opcji -file. W rzeczywistości możesz dodać nie tylko mapper.py i reducer.py, ale ogólnie dowolne pliki, a następnie używać ich tak, jakby znajdowały się w folderze lokalnym.

Korzystanie z rozproszonej pamięci podręcznej:

Natywne API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Transmisja strumieniowa Hadoop

#wymieniamy pliki, które należy dodać do rozproszonej pamięci podręcznej w parametrze –files. Opcja --files musi znajdować się przed innymi opcjami.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

przykład użycia:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Zmniejsz łączność

Ci, którzy są przyzwyczajeni do pracy z relacyjnymi bazami danych, często korzystają z bardzo wygodnej operacji Join, która pozwala im wspólnie przetwarzać zawartość niektórych tabel, łącząc je według jakiegoś klucza. Podczas pracy z dużymi zbiorami danych czasami pojawia się również ten problem. Rozważ następujący przykład:

Istnieją dzienniki dwóch serwerów WWW, każdy dziennik wygląda tak:

t\t

Przykład fragmentu dziennika:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Konieczne jest obliczenie dla każdego adresu IP, który z 2 serwerów odwiedzał częściej. Wynik powinien być w postaci:

\t

Przykład części wyniku:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Niestety, w przeciwieństwie do relacyjnych baz danych, generalnie łączenie dwóch dzienników za pomocą klucza (w tym przypadku adresu IP) jest dość ciężką operacją i można je rozwiązać za pomocą 3 MapReduce i wzorca Reduce Join:

ReduceJoin działa w ten sposób:

1) Dla każdego z logów wejściowych uruchamiany jest osobny MapReduce (tylko Map), konwertujący dane wejściowe do następującej postaci:

key -> (type, value

Gdzie klucz to klucz do łączenia tabel, Typ to typ tabeli (w naszym przypadku pierwsza lub druga), a Wartość to wszelkie dodatkowe dane powiązane z kluczem.

2) Wyjścia obu MapReduce są podawane na wejście trzeciego MapReduce, który w rzeczywistości wykonuje połączenie. Ten MapReduce zawiera pusty Mapper, który po prostu kopiuje dane wejściowe. Następnie shuffle rozkłada dane na klucze i podaje je do reduktora jako dane wejściowe:

key -> [(type, value)]

Ważne jest, aby w tym momencie reduktor otrzymał zapisy z obu logów, a jednocześnie po polu typu można było rozpoznać, z którego z dwóch logów pochodzi dana wartość. Jest więc wystarczająco dużo danych, aby rozwiązać pierwotny problem. W naszym przypadku reduktor musi po prostu obliczyć dla każdego klucza rekordu, który typ napotkał więcej i wypisać ten typ.

5.6 MapJoin

Wzorzec ReduceJoin opisuje ogólny przypadek łączenia dwóch dzienników za pomocą klucza. Istnieje jednak szczególny przypadek, w którym zadanie można znacznie uprościć i przyspieszyć. Jest to przypadek, w którym jeden z kłód jest znacznie mniejszy od drugiego. Rozważ następujący problem:

Są 2 logi. Pierwszy log zawiera log serwera WWW (taki sam jak w poprzednim zadaniu), drugi plik (100kb) zawiera adres URL-> Dopasowanie motywu. Przykład 2. plik:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Dla każdego adresu IP należy obliczyć, które strony z kategorii z tego adresu IP były ładowane najczęściej.

W tym przypadku musimy również połączyć 2 logi przez adres URL. Jednak w tym przypadku nie musimy uruchamiać 3 MapReduce, ponieważ drugi log całkowicie zmieści się w pamięci. Aby rozwiązać problem za pomocą 1. MapReduce, możemy załadować drugi log do Distributed Cache, a po zainicjowaniu Mappera po prostu wczytać go do pamięci, umieszczając w słowniku ->topic.

Ponadto problem został rozwiązany w następujący sposób:

mapa:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

zmniejszyć:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce otrzymuje adres IP i listę wszystkich tematów jako dane wejściowe, po prostu oblicza, który z tematów był najczęściej napotykany. W ten sposób zadanie jest rozwiązywane za pomocą 1. MapReduce, a samo Join odbywa się generalnie wewnątrz mapy (dlatego, gdyby nie była potrzebna dodatkowa agregacja według klucza, można by zrezygnować z zadania MapOnly):