5.1 Yalnızca harita işi

Pratik sorunları çözmek için MapReduce'u etkin bir şekilde kullanmanıza izin veren çeşitli teknikleri açıklamanın ve Hadoop'un geliştirmeyi basitleştirebilen veya bir MapReduce görevinin bir kümede yürütülmesini önemli ölçüde hızlandırabilen bazı özelliklerini göstermenin zamanı geldi.

Hatırlayacağımız üzere MapReduce, Map, Shuffle ve Reduce aşamalarından oluşuyor. Kural olarak, Karıştırma aşaması, veriler bu aşamada sıralandığından, pratik görevler arasında en zor olanıdır. Aslında, Harita aşamasının tek başına vazgeçilebileceği bir dizi görev vardır. İşte bu tür görevlere örnekler:

  • Veri filtreleme (örneğin, web sunucusu günlüklerinde "123.123.123.123 IP adresindeki tüm kayıtları bul");
  • Veri dönüştürme ("csv günlüklerindeki sütunu sil");
  • Harici bir kaynaktan veri yükleme ve boşaltma ("Günlükteki tüm kayıtları veritabanına ekle").

Bu tür görevler Yalnızca Harita kullanılarak çözülür. Hadoop'ta Yalnızca Harita görevi oluştururken, sıfır azaltıcı sayısı belirtmeniz gerekir:

Hadoop'ta yalnızca harita görev yapılandırmasına bir örnek:

yerel arayüz Hadoop Akış Arayüzü

Job'a yapılandırırken sıfır düşürücü sayısını belirtin:

job.setNumReduceTasks(0); 

Bir redüktör belirtmiyoruz ve sıfır sayıda redüktör belirtiyoruz. Örnek:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Yalnızca Harita işleri aslında çok yararlı olabilir. Örneğin, Facetz.DCA platformunda, kullanıcıların özelliklerini davranışlarına göre belirlemek için, her bir haritalayıcının bir kullanıcıyı girdi olarak aldığı ve onun özelliklerini bir çıktı olarak döndürdüğü, yalnızca büyük bir harita kullanılır.

5.2 Birleştir

Daha önce yazdığım gibi, bir Harita Küçültme görevi gerçekleştirirken genellikle en zor aşama karıştırma aşamasıdır. Bunun nedeni, ara sonuçların (eşleştiricinin çıktısı) diske yazılması, sıralanması ve ağ üzerinden iletilmesidir. Ancak, bu tür davranışların çok makul görünmediği görevler vardır. Örneğin, belgelerdeki kelimeleri sayma görevinde, birkaç haritalayıcının çıktılarının sonuçlarını görevin bir harita azaltma düğümünde önceden toplayabilir ve her makine için önceden toplanmış değerleri indirgeyiciye iletebilirsiniz. .

Hadoop'ta bunun için, eşleyicilerin bir kısmının çıktısını işleyecek bir birleştirme işlevi tanımlayabilirsiniz. Birleştirme işlevi, azaltma işlevine çok benzer - bazı eşleyicilerin çıktısını girdi olarak alır ve bu eşleyiciler için toplu bir sonuç üretir, dolayısıyla indirgeyici genellikle bir birleştirici olarak da kullanılır. Azaltmaktan önemli bir fark, bir tuşa karşılık gelen tüm değerlerin birleştirme işlevine ulaşmamasıdır .

Ayrıca hadoop, birleştirme işlevinin eşleştiricinin çıktısı için yürütüleceğini garanti etmez. Bu nedenle, birleştirme işlevi, örneğin orta değerin anahtarla aranması durumunda her zaman uygulanabilir değildir. Bununla birlikte, birleştirme işlevinin uygulanabilir olduğu görevlerde, kullanımı, MapReduce görevinin hızında önemli bir artış elde edilmesini sağlar.

Birleştiriciyi hadoop üzerinde kullanma:

yerel arayüz Hadoop akışı

job-a'yı yapılandırırken, birleştirici sınıfını belirtin. Kural olarak, Redüktör ile aynıdır:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Komut satırı seçeneklerinde -combiner komutunu belirtin. Tipik olarak, bu komut, redüktör komutuyla aynıdır. Örnek:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce görev zincirleri

Bir MapReduce'un bir sorunu çözmek için yeterli olmadığı durumlar vardır. Örneğin, biraz değiştirilmiş bir WordCount görevini düşünün: bir dizi metin belgesi var, kümede 1'den 1000'e kadar kaç kelimenin, 1001'den 2000'e kadar, 2001'den 3000'e kadar kaç kelime olduğunu saymanız gerekiyor, ve benzeri. Çözüm için 2 MapReduce işine ihtiyacımız var:

  • Her kelime için hangi aralıklara düştüğünü hesaplayacak değiştirilmiş kelime sayısı;
  • İlk MapReduce'un çıktısında her bir aralığın kaç kez karşılaşıldığını sayan bir MapReduce.

Sözde kod çözümü:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Hadoop üzerinde bir dizi MapReduce görevi yürütmek için, ilk görev için çıktı olarak belirtilen klasörü ikinci görev için girdi olarak belirtmek ve sırayla çalıştırmak yeterlidir.

Uygulamada, MapReduce görevlerinin zincirleri, MapReduce görevlerinin hem sıralı hem de birbirine paralel olarak bağlanabildiği oldukça karmaşık diziler olabilir. Bu tür görev yürütme planlarının yönetimini basitleştirmek için, bu dizide ayrı bir makalede ele alınacak olan oozie ve luigi gibi ayrı araçlar vardır.

5.4 Dağıtılmış önbellek

Hadoop'taki önemli bir mekanizma Dağıtılmış Önbellektir. Dağıtılmış Önbellek, MapReduce görevinin çalıştığı ortama dosya (örn. metin dosyaları, arşivler, jar dosyaları) eklemenizi sağlar.

HDFS'de saklanan dosyaları, yerel dosyaları (görevin başlatıldığı makinede yerel) ekleyebilirsiniz. -file seçeneği aracılığıyla mapper.py ve reducer.py dosyalarını ekleyerek, hadoop akışıyla Dağıtılmış Önbelleğin nasıl kullanılacağını dolaylı olarak zaten gösterdim. Aslında, yalnızca mapper.py ve reducer.py'yi değil, genel olarak rasgele dosyaları ekleyebilir ve ardından bunları yerel bir klasördeymiş gibi kullanabilirsiniz.

Dağıtılmış Önbelleği Kullanma:

Yerel API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop Akışı

# –files parametresinde dağıtılmış önbelleğe eklenmesi gereken dosyaları listeliyoruz. --files seçeneği diğer seçeneklerden önce gelmelidir.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

kullanım örneği:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Birleştirmeyi Azaltın

İlişkisel veritabanlarıyla çalışmaya alışkın olanlar, genellikle bazı tabloların içeriğini bir anahtara göre birleştirerek birlikte işlemelerine olanak tanıyan çok uygun Join işlemini kullanırlar. Büyük verilerle çalışırken bazen bu sorun da ortaya çıkıyor. Aşağıdaki örneği göz önünde bulundurun:

İki web sunucusunun günlükleri vardır, her günlük şuna benzer:

t\t

Günlük snippet örneği:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Her bir IP adresi için 2 sunucudan hangisini daha sık ziyaret ettiğini hesaplamak gerekir. Sonuç şu şekilde olmalıdır:

\t

Sonucun bir kısmına örnek:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Ne yazık ki, ilişkisel veritabanlarının aksine, genel olarak, iki günlüğü anahtarla (bu durumda, IP adresine göre) birleştirmek oldukça ağır bir işlemdir ve 3 MapReduce ve Birleştirmeyi Azalt modeli kullanılarak çözülür:

ReduceJoin şu şekilde çalışır:

1) Giriş günlüklerinin her biri için, giriş verilerini aşağıdaki forma dönüştüren ayrı bir MapReduce (yalnızca Harita) başlatılır:

key -> (type, value

Anahtar, tabloları birleştirmenin anahtarı olduğunda, Tür tablonun türüdür (bizim durumumuzda birinci veya ikinci) ve Değer, anahtara bağlı herhangi bir ek veridir.

2) Her iki MapReduce'un çıktıları, aslında birleşimi gerçekleştiren 3. MapReduce'un girişine beslenir. Bu MapReduce, girişi basitçe kopyalayan boş bir Eşleyici içerir. Ardından, karıştırma, verileri anahtarlara ayırır ve indirgeyiciye girdi olarak besler:

key -> [(type, value)]

Şu anda indirgeyicinin her iki günlükten de kayıt alması önemlidir ve aynı zamanda, belirli bir değerin iki günlükten hangisinden geldiğini tür alanı ile belirlemek mümkündür. Yani orijinal sorunu çözmek için yeterli veri var. Bizim durumumuzda, indirgeyicinin her bir kayıt anahtarı için hangi türün daha fazla karşılaştığını hesaplaması ve bu türü çıktılaması gerekir.

5.6 Harita Birleştirmeleri

ReduceJoin modeli, iki günlüğü anahtarla birleştirmenin genel durumunu açıklar. Ancak, görevin önemli ölçüde basitleştirilebileceği ve hızlandırılabileceği özel bir durum vardır. Bu, günlüklerden birinin diğerinden önemli ölçüde daha küçük olduğu durumdur. Aşağıdaki sorunu göz önünde bulundurun:

2 günlük var. İlk günlük, web sunucusu günlüğünü (önceki görevdekiyle aynı), ikinci dosya (100 kb boyutunda) URL-> Tema eşleşmesini içerir. Örnek 2. dosya:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Her bir IP adresi için, bu IP adresinden hangi kategorideki sayfaların en sık yüklendiğini hesaplamak gerekir.

Bu durumda ayrıca 2 logu URL ile birleştirmemiz gerekiyor. Ancak bu durumda ikinci log tamamen belleğe sığacağı için 3 MapReduce çalıştırmamız gerekmiyor. 1. MapReduce'u kullanarak sorunu çözmek için, ikinci günlüğü Dağıtılmış Önbelleğe yükleyebiliriz ve Eşleyici başlatıldığında, onu -> konu sözlüğüne koyarak belleğe okumanız yeterlidir.

Ayrıca, sorun şu şekilde çözülür:

harita:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

azaltmak:


Ip -> [topics] -> [ip, most_popular_topic]

Azaltmak girdi olarak bir ip ve tüm konuların bir listesini alır, basitçe hangi konularda en sık karşılaşıldığını hesaplar. Böylece, görev 1. MapReduce kullanılarak çözülür ve asıl Birleştirme genellikle haritanın içinde gerçekleşir (bu nedenle, anahtarla ek toplama gerekli değilse, MapOnly işinden vazgeçilebilir):