5.1 Petakan pekerjaan saja

Saatnya menjelaskan berbagai teknik yang memungkinkan Anda menggunakan MapReduce secara efektif untuk memecahkan masalah praktis, serta menunjukkan beberapa fitur Hadoop yang dapat menyederhanakan pengembangan atau mempercepat eksekusi tugas MapReduce secara signifikan pada sebuah klaster.

Seperti yang kita ingat, MapReduce terdiri dari tahapan Map, Shuffle dan Reduce. Sebagai aturan, tahap Shuffle ternyata menjadi yang paling sulit dalam tugas-tugas praktis, karena data diurutkan pada tahap ini. Nyatanya, ada sejumlah tugas di mana tahap Peta saja dapat ditiadakan. Berikut adalah contoh tugas tersebut:

  • Pemfilteran data (misalnya, "Temukan semua data dari alamat IP 123.123.123.123" di log server web);
  • Transformasi data (“Hapus kolom di csv-logs”);
  • Memuat dan membongkar data dari sumber eksternal ("Masukkan semua catatan dari log ke dalam database").

Tugas semacam itu diselesaikan menggunakan Map-Only. Saat membuat tugas Hanya-Peta di Hadoop, Anda perlu menentukan nol jumlah reduksi:

Contoh konfigurasi tugas khusus peta di hadoop:

antarmuka asli Antarmuka Streaming Hadoop

Tentukan jumlah reduksi nol saat mengonfigurasi pekerjaan:

job.setNumReduceTasks(0); 

Kami tidak menentukan reduksi dan menentukan jumlah reduksi nol. Contoh:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Pekerjaan Map Only sebenarnya bisa sangat berguna. Misalnya, di platform Facetz.DCA, untuk mengidentifikasi karakteristik pengguna berdasarkan perilakunya, hanya satu peta besar yang digunakan, masing-masing pembuat peta mengambil pengguna sebagai input dan mengembalikan karakteristiknya sebagai output.

5.2 Gabungkan

Seperti yang sudah saya tulis, biasanya tahapan yang paling sulit saat melakukan tugas Map-Reduce adalah tahapan shuffle. Ini terjadi karena hasil antara (keluaran mapper) ditulis ke disk, diurutkan, dan dikirim melalui jaringan. Namun, ada tugas di mana perilaku seperti itu tampaknya tidak masuk akal. Misalnya, dalam tugas menghitung kata yang sama dalam dokumen, Anda dapat melakukan pra-agregat hasil keluaran dari beberapa pembuat peta pada satu simpul pengurangan peta dari tugas tersebut, dan meneruskan nilai yang sudah dijumlahkan untuk setiap mesin ke peredam .

Di hadoop, untuk ini, Anda dapat menentukan fungsi penggabungan yang akan memproses keluaran dari bagian pembuat peta. Fungsi penggabungan sangat mirip dengan pengurangan - dibutuhkan output dari beberapa pembuat peta sebagai masukan dan menghasilkan hasil agregat untuk pembuat peta ini, sehingga peredam juga sering digunakan sebagai penggabung. Perbedaan penting dari pengurangan adalah bahwa tidak semua nilai yang sesuai dengan satu kunci masuk ke fungsi penggabungan .

Selain itu, hadoop tidak menjamin bahwa fungsi combine akan dieksekusi sama sekali untuk output dari mapper. Oleh karena itu, fungsi penggabungan tidak selalu dapat diterapkan, misalnya dalam kasus pencarian nilai median dengan kunci. Namun demikian, dalam tugas-tugas di mana fungsi penggabungan berlaku, penggunaannya memungkinkan untuk mencapai peningkatan yang signifikan dalam kecepatan tugas MapReduce.

Menggunakan Penggabung di hadoop:

antarmuka asli Streaming Hadoop

Saat mengonfigurasi pekerjaan-a, tentukan class-Combiner. Sebagai aturan, ini sama dengan Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Tentukan perintah -combiner di opsi baris perintah. Biasanya, perintah ini sama dengan perintah peredam. Contoh:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Rantai tugas MapReduce

Ada situasi ketika satu MapReduce tidak cukup untuk menyelesaikan masalah. Misalnya, pertimbangkan tugas WordCount yang sedikit dimodifikasi: ada satu set dokumen teks, Anda perlu menghitung berapa banyak kata yang muncul dari 1 hingga 1000 kali dalam set, berapa banyak kata dari 1001 hingga 2000, berapa banyak dari 2001 hingga 3000, dan seterusnya. Untuk solusinya, kita membutuhkan 2 pekerjaan MapReduce:

  • Jumlah kata yang dimodifikasi, yang untuk setiap kata akan menghitung interval mana yang termasuk;
  • MapReduce yang menghitung berapa kali setiap interval ditemukan dalam keluaran MapReduce pertama.

Solusi kode semu:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Untuk menjalankan urutan tugas MapReduce pada hadoop, cukup dengan menentukan folder yang ditentukan sebagai output untuk yang pertama sebagai input untuk tugas kedua dan menjalankannya secara bergantian.

Dalam praktiknya, rantai tugas MapReduce bisa menjadi urutan yang cukup kompleks di mana tugas MapReduce dapat dihubungkan baik secara berurutan maupun paralel satu sama lain. Untuk menyederhanakan pengelolaan rencana pelaksanaan tugas tersebut, ada alat terpisah seperti oozie dan luigi, yang akan dibahas dalam artikel terpisah di seri ini.

5.4 Tembolok terdistribusi

Mekanisme penting dalam Hadoop adalah Distributed Cache. Cache Terdistribusi memungkinkan Anda untuk menambahkan file (misalnya file teks, arsip, file jar) ke lingkungan tempat tugas MapReduce berjalan.

Anda dapat menambahkan file yang disimpan di HDFS, file lokal (lokal ke mesin tempat tugas diluncurkan). Saya telah secara implisit menunjukkan cara menggunakan Cache Terdistribusi dengan streaming hadoop dengan menambahkan file mapper.py dan reducer.py melalui opsi -file. Nyatanya, Anda tidak hanya dapat menambahkan mapper.py dan reducer.py, tetapi file arbitrer pada umumnya, lalu menggunakannya seolah-olah berada di folder lokal.

Menggunakan Cache Terdistribusi:

API asli
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop Streaming

#kami mencantumkan file yang perlu ditambahkan ke cache terdistribusi di parameter –files. Opsi --files harus ada sebelum opsi lainnya.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

contoh penggunaan:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Kurangi Gabung

Mereka yang terbiasa bekerja dengan database relasional sering menggunakan operasi Gabung yang sangat nyaman, yang memungkinkan mereka untuk memproses konten beberapa tabel secara bersama-sama dengan menggabungkannya sesuai dengan beberapa kunci. Saat bekerja dengan big data, masalah ini juga terkadang muncul. Pertimbangkan contoh berikut:

Ada log dari dua server web, setiap log terlihat seperti ini:

t\t

Contoh cuplikan log:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Penting untuk menghitung untuk setiap alamat IP mana dari 2 server yang lebih sering dikunjungi. Hasilnya harus dalam bentuk:

\t

Contoh sebagian hasilnya:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Sayangnya, tidak seperti database relasional, pada umumnya, menggabungkan dua log dengan kunci (dalam hal ini, dengan alamat IP) adalah operasi yang agak berat dan diselesaikan dengan menggunakan 3 MapReduce dan pola Reduce Join:

ReduceJoin berfungsi seperti ini:

1) Untuk setiap log masukan, MapReduce terpisah (hanya Peta) diluncurkan, mengonversi data masukan ke formulir berikut:

key -> (type, value

Di mana key adalah kunci untuk menggabungkan tabel, Type adalah tipe tabel (pertama atau kedua dalam kasus kita), dan Value adalah data tambahan apa pun yang terikat ke key.

2) Output dari kedua MapReduce diumpankan ke input dari MapReduce ke-3, yang sebenarnya melakukan penyatuan. MapReduce ini berisi Mapper kosong yang hanya menyalin masukan. Selanjutnya, shuffle menguraikan data menjadi kunci dan mengumpankannya ke peredam sebagai masukan:

key -> [(type, value)]

Penting bahwa pada saat ini peredam menerima catatan dari kedua log, dan pada saat yang sama, dimungkinkan untuk mengidentifikasi dari bidang jenis mana dari dua log tersebut berasal dari nilai tertentu. Jadi ada cukup data untuk menyelesaikan masalah aslinya. Dalam kasus kami, peredam hanya perlu menghitung untuk setiap kunci rekaman yang jenisnya lebih banyak ditemui dan menampilkan jenis ini.

5.6 Peta Bergabung

Pola ReduceJoin menjelaskan kasus umum penggabungan dua log dengan kunci. Namun, ada kasus khusus di mana tugas dapat disederhanakan dan dipercepat secara signifikan. Ini adalah kasus di mana salah satu log secara signifikan lebih kecil dari yang lain. Pertimbangkan masalah berikut:

Ada 2 log. Log pertama berisi log server web (sama seperti pada tugas sebelumnya), file kedua (berukuran 100kb) berisi kecocokan URL-> Tema. Contoh file ke-2:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Untuk setiap alamat IP, perlu untuk menghitung halaman kategori mana dari alamat IP ini yang paling sering dimuat.

Dalam hal ini, kita juga perlu menggabungkan 2 log dengan URL. Namun, dalam hal ini, kita tidak perlu menjalankan 3 MapReduce, karena log kedua akan sepenuhnya masuk ke dalam memori. Untuk mengatasi masalah dengan menggunakan MapReduce ke-1, kita dapat memuat log kedua ke dalam Distributed Cache, dan ketika Mapper diinisialisasi, cukup membacanya ke dalam memori, memasukkannya ke dalam -> kamus topik.

Selanjutnya, masalah diselesaikan sebagai berikut:

peta:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

mengurangi:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce menerima ip dan daftar semua topik sebagai input, itu hanya menghitung topik mana yang paling sering ditemui. Dengan demikian, tugas diselesaikan menggunakan MapReduce ke-1, dan Gabungan yang sebenarnya umumnya terjadi di dalam peta (oleh karena itu, jika agregasi tambahan dengan kunci tidak diperlukan, pekerjaan MapOnly dapat ditiadakan):