5.1 Peta mung proyek

Iku wektu kanggo njlèntrèhaké macem-macem Techniques sing ngijini sampeyan kanggo èfèktif nggunakake MapReduce kanggo ngatasi masalah praktis, uga nuduhake sawetara fitur saka Hadoop sing bisa menakake pembangunan utawa Ngartekno nyepetake eksekusi tugas MapReduce ing kluster.

Nalika kita ngelingi, MapReduce kasusun saka Map, Shuffle lan nyuda orane tumrap sekolah. Biasane, tataran Shuffle dadi sing paling angel ing tugas praktis, amarga data diurutake ing tahap iki. Nyatane, ana sawetara tugas ing tataran Peta mung bisa dispensed karo. Ing ngisor iki conto tugas kasebut:

  • Nyaring data (contone, "Golek kabeh cathetan saka alamat IP 123.123.123.123" ing log server web);
  • Transformasi data ("Busak kolom ing csv-logs");
  • Ngunggah lan mbongkar data saka sumber eksternal ("Lebokake kabeh cathetan saka log menyang database").

Tugas kasebut ditanggulangi nggunakake Map-Mung. Nalika nggawe tugas Map-Mung ing Hadoop, sampeyan kudu nemtokake nomer nol pengurangan:

Conto konfigurasi tugas mung peta ing hadoop:

antarmuka asli Antarmuka Streaming Hadoop

Nemtokake nomer nol pengurangan nalika ngonfigurasi job'a:

job.setNumReduceTasks(0); 

Kita ora nemtokake reducer lan nemtokake nomer nol reducer. Tuladha:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Pakaryan Map Only bisa migunani banget. Contone, ing platform Facetz.DCA, kanggo ngenali karakteristik pangguna kanthi prilaku, mung siji peta gedhe sing digunakake, saben mapper njupuk pangguna minangka input lan ngasilake karakteristik minangka output.

5.2 Gabungan

Kaya sing wis dakcritakake, biasane tahap sing paling angel nalika nindakake tugas Map-Reduce yaiku tahap acak. Iki kedadeyan amarga asil penengah (output mapper) ditulis menyang disk, diurutake lan dikirim liwat jaringan. Nanging, ana tugas sing prilaku kasebut ora katon cukup. Contone, ing tugas sing padha kanggo ngetung tembung ing dokumen, sampeyan bisa pre-agregat asil saka sawetara mappers ing siji peta-ngurangi simpul tugas, lan ngirim nilai sing wis dijumlah kanggo saben mesin menyang reducer. .

Ing hadoop, kanggo iki, sampeyan bisa nemtokake fungsi gabungan sing bakal ngolah output bagean saka mappers. Fungsi gabungan meh padha karo ngurangi - njupuk output saka sawetara mappers minangka input lan ngasilake asil agregat kanggo mappers iki, saéngga reducer uga asring digunakake minangka combiner. Bentenane penting saka nyuda yaiku ora kabeh nilai sing cocog karo siji tombol entuk fungsi gabungan .

Menapa malih, hadoop ora njamin yen fungsi gabungan bakal dieksekusi ing kabeh kanggo output mapper. Mulane, fungsi gabungan ora tansah ditrapake, contone, ing kasus nggoleki nilai median dening tombol. Nanging, ing tugas kasebut ing ngendi fungsi gabungan ditrapake, panggunaan kasebut ngidini kanggo nambah kacepetan tugas MapReduce sing signifikan.

Nggunakake Combiner ing hadoop:

antarmuka asli Hadoop streaming

Nalika ngatur proyek-a, nemtokake kelas-Combiner. Minangka aturan, padha karo Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Nemtokake printah -combiner ing pilihan baris printah. Biasane, printah iki padha karo printah reducer. Tuladha:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Rantai tugas MapReduce

Ana kahanan nalika siji MapReduce ora cukup kanggo ngatasi masalah. Contone, nimbang tugas WordCount sing rada diowahi: ana sakumpulan dokumen teks, sampeyan kudu ngetung pirang-pirang tembung sing kedadeyan saka 1 nganti 1000 kaping ing set, pirang-pirang tembung saka 1001 nganti 2000, pira saka 2001 nganti 3000, lan liya-liyane. Kanggo solusi kasebut, kita butuh 2 proyek MapReduce:

  • Jumlah tembung sing diowahi, sing kanggo saben tembung bakal ngetung interval sing ana;
  • A MapReduce sing counts carane kakehan saben interval ditemoni ing output saka MapReduce pisanan.

Solusi kode pseudo:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Kanggo nglakokake urutan tugas MapReduce ing hadoop, cukup kanggo nemtokake folder sing ditemtokake minangka output kanggo sing pisanan minangka input kanggo tugas kapindho lan mbukak kanthi siji.

Ing praktik, rantai tugas MapReduce bisa dadi urutan sing cukup rumit ing ngendi tugas MapReduce bisa disambungake kanthi urutan lan sejajar. Kanggo nyederhanakake manajemen rencana eksekusi tugas kasebut, ana alat sing kapisah kaya oozie lan luigi, sing bakal dibahas ing artikel sing kapisah ing seri iki.

5.4 Cache mbagekke

Mekanisme penting ing Hadoop yaiku Cache Distributed. Cache Distributed ngidini sampeyan nambah file (contone, file teks, arsip, file jar) menyang lingkungan ing ngendi tugas MapReduce lagi mlaku.

Sampeyan bisa nambah file sing disimpen ing HDFS, file lokal (lokal kanggo mesin saka kang tugas dibukak). Aku wis implicitly nuduhake carane nggunakake Distributed Cache karo hadoop streaming kanthi nambah file mapper.py lan reducer.py liwat pilihan -file. Ing kasunyatan, sampeyan bisa nambah ora mung mapper.py lan reducer.py, nanging file kasepakatan ing umum, lan banjur digunakake minangka yen padha ing folder lokal.

Nggunakake Cache Distributed:

API asli
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Streaming Hadoop

#we dhaptar file sing kudu ditambahake menyang cache sing disebarake ing parameter –files. Opsi --files kudu teka sadurunge opsi liyane.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

conto panggunaan:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Ngurangi Gabung

Wong-wong sing wis biasa nggarap database relasional kerep nggunakake operasi Gabung sing trep banget, sing ngidini dheweke bisa bebarengan ngolah isi sawetara tabel kanthi nggabungake miturut sawetara tombol. Nalika nggarap data gedhe, masalah iki uga kadhangkala muncul. Coba conto ing ngisor iki:

Ana log saka rong server web, saben log katon kaya iki:

t\t

Tuladha cuplikan log:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Sampeyan kudu ngetung kanggo saben alamat IP sing saka 2 server sing luwih kerep dibukak. Asil kudu ing wangun:

\t

Conto bagéan saka asil:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Sayange, ora kaya database relasional, umume, nggabungake rong log kanthi kunci (ing kasus iki, kanthi alamat IP) minangka operasi sing rada abot lan ditanggulangi kanthi nggunakake pola 3 MapReduce lan Reduce Join:

ReduceJoin dianggo kaya iki:

1) Kanggo saben log input, MapReduce sing kapisah (mung Peta) diluncurake, ngowahi data input menyang formulir ing ngisor iki:

key -> (type, value

Ngendi kunci minangka kunci kanggo nggabungake tabel, Tipe minangka jinis tabel (pisanan utawa kaloro ing kasus kita), lan Nilai minangka data tambahan sing ana ing kunci kasebut.

2) Output saka loro MapReduces diwenehake menyang input saka MapReduce kaping 3, sing, nyatane, nindakake kesatuan. MapReduce iki ngemot Mapper kosong sing mung nyalin input. Sabanjure, shuffle decomposes data menyang tombol lan feed menyang reducer minangka input:

key -> [(type, value)]

Iku penting sing ing wayahe reducer nampa cathetan saka loro log, lan ing wektu sing padha, iku bisa kanggo ngenali dening lapangan jinis kang saka loro log nilai tartamtu teka saka. Dadi ana data sing cukup kanggo ngatasi masalah asli. Ing kasus kita, reducer mung kudu ngetung kanggo saben kunci rekaman sing jinis wis ditemoni luwih akeh lan ngasilake jinis iki.

5.6 MapJoins

Pola ReduceJoin nggambarake kasus umum nggabungake rong log kanthi kunci. Nanging, ana kasus khusus sing tugas bisa disederhanakake lan dipercepat. Iki minangka kasus sing siji log luwih cilik tinimbang liyane. Coba masalah ing ngisor iki:

Ana 2 log. Log pisanan ngemot log server web (padha karo tugas sadurunge), file kapindho (ukuran 100kb) ngemot URL-> Pertandhingan tema. Tuladha file 2:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Kanggo saben alamat IP, sampeyan kudu ngetung kaca sing kategori saka alamat IP iki paling kerep dimuat.

Ing kasus iki, kita uga kudu nggabungake 2 log kanthi URL. Nanging, ing kasus iki, kita ora kudu mbukak 3 MapReduce, wiwit log kaloro bakal rampung pas menyang memori. Kanggo ngatasi masalah nggunakake 1st MapReduce, kita bisa mbukak log kapindho menyang Cache mbagekke, lan nalika Mapper wis initialized, mung maca menyang memori, sijine iku ing -> kamus topik.

Salajengipun, masalah ditanggulangi kaya ing ngisor iki:

peta:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

nyuda:


Ip -> [topics] -> [ip, most_popular_topic]

Ngurangi nampa ip lan dhaptar kabeh topik minangka input, mung ngetung topik sing paling kerep ditemoni. Mangkono, tugas wis ditanggulangi nggunakake 1st MapReduce, lan Gabung nyata umume njupuk Panggonan ing peta (mulane, yen agregasi tambahan dening tombol ora dibutuhake, proyek MapOnly bisa dispensed karo):