5.1 Kerja peta sahaja

Sudah tiba masanya untuk menerangkan pelbagai teknik yang membolehkan anda menggunakan MapReduce dengan berkesan untuk menyelesaikan masalah praktikal, serta menunjukkan beberapa ciri Hadoop yang boleh memudahkan pembangunan atau mempercepatkan dengan ketara pelaksanaan tugas MapReduce pada kelompok.

Seperti yang kita ingat, MapReduce terdiri daripada peringkat Map, Shuffle dan Reduce. Sebagai peraturan, peringkat Kocok ternyata menjadi yang paling sukar dalam tugas praktikal, kerana data diisih pada peringkat ini. Malah, terdapat beberapa tugas di mana peringkat Peta sahaja boleh diketepikan. Berikut adalah contoh tugas tersebut:

  • Penapisan data (contohnya, "Cari semua rekod daripada alamat IP 123.123.123.123" dalam log pelayan web);
  • Transformasi data ("Padam lajur dalam csv-logs");
  • Memuat dan memunggah data daripada sumber luaran ("Masukkan semua rekod daripada log ke dalam pangkalan data").

Tugasan sedemikian diselesaikan menggunakan Map-Only. Apabila membuat tugasan Peta Sahaja dalam Hadoop, anda perlu menentukan bilangan sifar pengurang:

Contoh konfigurasi tugas peta sahaja pada hadoop:

antara muka asli Antara Muka Penstriman Hadoop

Tentukan bilangan sifar pengurang semasa mengkonfigurasi job'a:

job.setNumReduceTasks(0); 

Kami tidak menentukan pengurang dan menentukan bilangan sifar pengurang. Contoh:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Kerja Peta Sahaja sebenarnya boleh menjadi sangat berguna. Sebagai contoh, dalam platform Facetz.DCA, untuk mengenal pasti ciri pengguna melalui tingkah laku mereka, ia adalah tepat satu peta besar sahaja yang digunakan, setiap pemeta yang mengambil pengguna sebagai input dan mengembalikan cirinya sebagai output.

5.2 Gabungkan

Seperti yang telah saya tulis, biasanya peringkat paling sukar semasa melaksanakan tugasan Map-Reduce ialah peringkat shuffle. Ini berlaku kerana hasil perantaraan (output pemeta) ditulis pada cakera, diisih dan dihantar melalui rangkaian. Walau bagaimanapun, terdapat tugas di mana tingkah laku sedemikian kelihatan tidak begitu munasabah. Sebagai contoh, dalam tugas yang sama mengira perkataan dalam dokumen, anda boleh pra-agregat hasil output beberapa pemeta pada satu peta-mengurangkan nod tugas, dan menghantar nilai yang telah dijumlahkan untuk setiap mesin kepada pengurang .

Dalam hadoop, untuk ini, anda boleh menentukan fungsi penggabungan yang akan memproses output sebahagian daripada pemeta. Fungsi penggabungan sangat serupa dengan pengurangan - ia mengambil keluaran beberapa pemeta sebagai input dan menghasilkan hasil agregat untuk pemeta ini, jadi pengurang sering digunakan sebagai penggabung juga. Perbezaan penting daripada pengurangan ialah tidak semua nilai yang sepadan dengan satu kunci sampai ke fungsi penggabungan .

Selain itu, hadoop tidak menjamin bahawa fungsi gabungan akan dilaksanakan sama sekali untuk output pemeta. Oleh itu, fungsi penggabungan tidak selalu terpakai, contohnya, dalam kes mencari nilai median dengan kunci. Walau bagaimanapun, dalam tugas-tugas di mana fungsi penggabungan boleh digunakan, penggunaannya membolehkan untuk mencapai peningkatan ketara dalam kelajuan tugasan MapReduce.

Menggunakan Combiner pada hadoop:

antara muka asli Penstriman Hadoop

Apabila mengkonfigurasi kerja-a, nyatakan kelas-Combiner. Sebagai peraturan, ia adalah sama seperti Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Tentukan arahan -combiner dalam pilihan baris arahan. Lazimnya, arahan ini adalah sama dengan arahan pengurang. Contoh:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Rantaian tugas MapReduce

Terdapat situasi apabila satu MapReduce tidak mencukupi untuk menyelesaikan masalah. Sebagai contoh, pertimbangkan tugas WordCount yang diubah suai sedikit: terdapat satu set dokumen teks, anda perlu mengira berapa banyak perkataan berlaku dari 1 hingga 1000 kali dalam set, berapa banyak perkataan dari 1001 hingga 2000, berapa banyak dari 2001 hingga 3000, dan sebagainya. Untuk penyelesaiannya, kami memerlukan 2 kerja MapReduce:

  • Kiraan perkataan yang diubah suai, yang bagi setiap perkataan akan mengira selang mana ia jatuh;
  • MapReduce yang mengira berapa kali setiap selang ditemui dalam output MapReduce pertama.

Penyelesaian kod pseudo:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Untuk melaksanakan jujukan tugasan MapReduce pada hadoop, cukup untuk menentukan folder yang ditentukan sebagai output untuk yang pertama sebagai input untuk tugasan kedua dan menjalankannya secara bergilir.

Dalam amalan, rantaian tugasan MapReduce boleh menjadi urutan yang agak kompleks di mana tugasan MapReduce boleh disambungkan secara berurutan dan selari antara satu sama lain. Untuk memudahkan pengurusan pelan pelaksanaan tugas tersebut, terdapat alatan yang berasingan seperti oozie dan luigi, yang akan dibincangkan dalam artikel berasingan dalam siri ini.

5.4 Cache yang diedarkan

Mekanisme penting dalam Hadoop ialah Cache Teragih. Cache Teragih membolehkan anda menambah fail (cth fail teks, arkib, fail balang) ke persekitaran tempat tugas MapReduce dijalankan.

Anda boleh menambah fail yang disimpan pada HDFS, fail tempatan (tempatan ke mesin dari mana tugas itu dilancarkan). Saya telah secara tersirat menunjukkan cara menggunakan Cache Teragih dengan penstriman hadoop dengan menambahkan fail mapper.py dan reducer.py melalui pilihan -file. Malah, anda boleh menambah bukan sahaja mapper.py dan reducer.py, tetapi fail sewenang-wenangnya secara umum, dan kemudian menggunakannya seolah-olah ia berada dalam folder setempat.

Menggunakan Cache Teragih:

API asli
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Penstriman Hadoop

#kami menyenaraikan fail yang perlu ditambah pada cache yang diedarkan dalam parameter –files. Pilihan --files mesti datang sebelum pilihan lain.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

contoh penggunaan:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Kurangkan Sertai

Mereka yang biasa bekerja dengan pangkalan data hubungan sering menggunakan operasi Sertai yang sangat mudah, yang membolehkan mereka memproses kandungan beberapa jadual secara bersama dengan menggabungkannya mengikut beberapa kunci. Apabila bekerja dengan data besar, masalah ini juga kadang-kadang timbul. Pertimbangkan contoh berikut:

Terdapat log dua pelayan web, setiap log kelihatan seperti ini:

t\t

Contoh coretan log:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Ia adalah perlu untuk mengira untuk setiap alamat IP yang mana antara 2 pelayan yang dilawati lebih kerap. Hasilnya hendaklah dalam bentuk:

\t

Contoh sebahagian daripada hasil:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Malangnya, tidak seperti pangkalan data hubungan, secara amnya, menyertai dua log dengan kunci (dalam kes ini, melalui alamat IP) adalah operasi yang agak berat dan diselesaikan menggunakan 3 MapReduce dan corak Reduce Join:

ReduceJoin berfungsi seperti ini:

1) Untuk setiap log input, MapReduce yang berasingan (Peta sahaja) dilancarkan, menukar data input kepada bentuk berikut:

key -> (type, value

Di mana kunci ialah kunci untuk menyertai jadual, Jenis ialah jenis jadual (pertama atau kedua dalam kes kami), dan Nilai ialah sebarang data tambahan yang terikat pada kunci.

2) Output kedua-dua MapReduces disalurkan kepada input MapReduce ke-3, yang, sebenarnya, melaksanakan kesatuan. MapReduce ini mengandungi Mapper kosong yang hanya menyalin input. Seterusnya, shuffle menguraikan data menjadi kekunci dan menyuapkannya ke pengurang sebagai input:

key -> [(type, value)]

Adalah penting bahawa pada masa ini pengurang menerima rekod daripada kedua-dua log, dan pada masa yang sama, adalah mungkin untuk mengenal pasti mengikut medan jenis yang mana daripada dua log nilai tertentu berasal. Jadi terdapat data yang mencukupi untuk menyelesaikan masalah asal. Dalam kes kami, pengurang hanya perlu mengira untuk setiap kunci rekod jenis yang ditemui lebih banyak dan mengeluarkan jenis ini.

5.6 MapJoins

Corak ReduceJoin menerangkan kes umum untuk menyertai dua log dengan kunci. Walau bagaimanapun, terdapat kes khas di mana tugas itu boleh dipermudahkan dan dipercepatkan dengan ketara. Ini adalah kes di mana satu daripada log adalah jauh lebih kecil daripada yang lain. Pertimbangkan masalah berikut:

Terdapat 2 log. Log pertama mengandungi log pelayan web (sama seperti dalam tugas sebelumnya), fail kedua (bersaiz 100kb) mengandungi URL-> Padanan tema. Contoh fail ke-2:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Untuk setiap alamat IP, adalah perlu untuk mengira halaman kategori mana daripada alamat IP ini paling kerap dimuatkan.

Dalam kes ini, kita juga perlu menyertai 2 log mengikut URL. Walau bagaimanapun, dalam kes ini, kita tidak perlu menjalankan 3 MapReduce, kerana log kedua akan dimuatkan sepenuhnya ke dalam ingatan. Untuk menyelesaikan masalah menggunakan 1st MapReduce, kita boleh memuatkan log kedua ke dalam Cache Teragih, dan apabila Mapper dimulakan, hanya membacanya ke dalam ingatan, meletakkannya dalam -> kamus topik.

Selanjutnya, masalah diselesaikan seperti berikut:

peta:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

mengurangkan:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce menerima ip dan senarai semua topik sebagai input, ia hanya mengira topik mana yang paling kerap ditemui. Oleh itu, tugas itu diselesaikan menggunakan MapReduce Pertama, dan Sertai sebenar biasanya berlaku di dalam peta (oleh itu, jika pengagregatan tambahan mengikut kunci tidak diperlukan, tugas MapOnly boleh diketepikan):