5.1 Mapa lamang na trabaho

Panahon na upang ilarawan ang iba't ibang mga diskarte na nagbibigay-daan sa iyong epektibong gumamit ng MapReduce upang malutas ang mga praktikal na problema, pati na rin ipakita ang ilan sa mga tampok ng Hadoop na maaaring gawing simple ang pag-unlad o makabuluhang mapabilis ang pagpapatupad ng isang MapReduce na gawain sa isang cluster.

Tulad ng naaalala namin, ang MapReduce ay binubuo ng mga yugto ng Map, Shuffle at Reduce. Bilang isang patakaran, ang yugto ng Shuffle ay lumalabas na ang pinakamahirap sa mga praktikal na gawain, dahil ang data ay pinagsunod-sunod sa yugtong ito. Sa katunayan, mayroong ilang mga gawain kung saan ang yugto ng Map lamang ang maaaring ibigay. Narito ang mga halimbawa ng mga naturang gawain:

  • Pag-filter ng data (halimbawa, "Hanapin ang lahat ng mga tala mula sa IP address 123.123.123.123" sa mga log ng web server);
  • Pagbabago ng data ("Tanggalin ang column sa csv-logs");
  • Naglo-load at nag-unload ng data mula sa isang panlabas na mapagkukunan ("Ipasok ang lahat ng mga tala mula sa log sa database").

Ang mga ganitong gawain ay nireresolba gamit ang Map-Only. Kapag gumagawa ng isang Map-Only na gawain sa Hadoop, kailangan mong tukuyin ang zero na bilang ng mga reducer:

Isang halimbawa ng configuration ng gawaing mapa lamang sa hadoop:

katutubong interface Hadoop Streaming Interface

Tukuyin ang zero na bilang ng mga reducer kapag kino-configure ang job'a:

job.setNumReduceTasks(0); 

Hindi namin tinukoy ang isang reducer at tinukoy ang isang zero na bilang ng mga reducer. Halimbawa:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Ang mga trabaho sa Map Only ay maaaring maging lubhang kapaki-pakinabang. Halimbawa, sa Facetz.DCA platform, upang matukoy ang mga katangian ng mga user sa pamamagitan ng kanilang pag-uugali, ito ay tiyak na isang malaking mapa-lamang na ginagamit, ang bawat mapper ay kumukuha ng isang user bilang input at ibinabalik ang kanyang mga katangian bilang isang output.

5.2 Pagsamahin

Tulad ng naisulat ko na, kadalasan ang pinakamahirap na yugto kapag nagsasagawa ng isang Map-Reduce na gawain ay ang shuffle stage. Nangyayari ito dahil ang mga intermediate na resulta (mapper's output) ay nakasulat sa disk, pinagsunod-sunod at ipinadala sa network. Gayunpaman, may mga gawain kung saan ang gayong pag-uugali ay tila hindi masyadong makatwiran. Halimbawa, sa parehong gawain ng pagbibilang ng mga salita sa mga dokumento, maaari mong paunang pagsama-samahin ang mga resulta ng mga output ng ilang mga mapper sa isang mapa-bawasan ang node ng gawain, at ipasa ang na-summed na mga halaga para sa bawat makina sa reducer .

Sa hadoop, para dito, maaari mong tukuyin ang isang pinagsama-samang function na magpoproseso ng output ng bahagi ng mga mappers. Ang pagsasama-sama ng function ay halos kapareho sa pagbabawas - ito ay tumatagal ng output ng ilang mappers bilang input at gumagawa ng pinagsama-samang resulta para sa mga mapper na ito, kaya ang reducer ay kadalasang ginagamit din bilang isang combiner. Ang isang mahalagang pagkakaiba mula sa bawasan ay hindi lahat ng mga halaga na tumutugma sa isang susi ay napupunta sa pinagsamang function .

Bukod dito, hindi ginagarantiyahan ng hadoop na ang pinagsamang function ay isasagawa sa lahat para sa output ng mapper. Samakatuwid, ang pagsasama-sama ng function ay hindi palaging naaangkop, halimbawa, sa kaso ng paghahanap para sa median na halaga sa pamamagitan ng key. Gayunpaman, sa mga gawaing iyon kung saan naaangkop ang pagsasama-sama, ang paggamit nito ay nagbibigay-daan upang makamit ang isang makabuluhang pagtaas sa bilis ng gawain ng MapReduce.

Gamit ang Combiner sa hadoop:

katutubong interface Pag-stream ng Hadoop

Kapag nag-configure ng trabaho-a, tukuyin ang class-Combiner. Bilang isang patakaran, ito ay kapareho ng Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Tukuyin ang -combiner command sa mga opsyon sa command line. Karaniwan, ang utos na ito ay kapareho ng utos ng reducer. Halimbawa:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce task chains

May mga sitwasyon kapag ang isang MapReduce ay hindi sapat upang malutas ang isang problema. Halimbawa, isaalang-alang ang isang bahagyang binagong gawain ng WordCount: mayroong isang hanay ng mga tekstong dokumento, kailangan mong bilangin kung gaano karaming mga salita ang naganap mula 1 hanggang 1000 beses sa hanay, gaano karaming mga salita mula 1001 hanggang 2000, ilan mula 2001 hanggang 3000, at iba pa. Para sa solusyon, kailangan namin ng 2 trabaho sa MapReduce:

  • Binagong bilang ng salita, na para sa bawat salita ay kakalkulahin kung alin sa mga pagitan ito nahulog;
  • Isang MapReduce na binibilang kung ilang beses na-encounter ang bawat interval sa output ng unang MapReduce.

Solusyon sa pseudo code:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Upang maisagawa ang isang pagkakasunud-sunod ng mga gawain ng MapReduce sa hadoop, sapat lamang na tukuyin ang folder na tinukoy bilang output para sa una bilang input para sa pangalawang gawain at patakbuhin ang mga ito sa turn.

Sa pagsasagawa, ang mga kadena ng mga gawain sa MapReduce ay maaaring medyo kumplikadong mga pagkakasunud-sunod kung saan ang mga gawain ng MapReduce ay maaaring konektado sa parehong sunud-sunod at parallel sa bawat isa. Upang pasimplehin ang pamamahala ng naturang mga plano sa pagpapatupad ng gawain, mayroong magkahiwalay na tool tulad ng oozie at luigi, na tatalakayin sa isang hiwalay na artikulo sa seryeng ito.

5.4 Ibinahagi ang cache

Ang isang mahalagang mekanismo sa Hadoop ay ang Distributed Cache. Binibigyang-daan ka ng Distributed Cache na magdagdag ng mga file (hal. text file, archive, jar file) sa kapaligiran kung saan tumatakbo ang MapReduce task.

Maaari kang magdagdag ng mga file na nakaimbak sa HDFS, mga lokal na file (lokal sa makina kung saan inilunsad ang gawain). Naipakita ko na kung paano gamitin ang Distributed Cache sa hadoop streaming sa pamamagitan ng pagdaragdag ng mapper.py at reducer.py na mga file sa pamamagitan ng -file na opsyon. Sa katunayan, maaari kang magdagdag ng hindi lamang mapper.py at reducer.py, ngunit arbitrary na mga file sa pangkalahatan, at pagkatapos ay gamitin ang mga ito na parang nasa isang lokal na folder.

Paggamit ng Distributed Cache:

Native API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Pag-stream ng Hadoop

#inilista namin ang mga file na kailangang idagdag sa ipinamahagi na cache sa parameter na –files. Ang --files na opsyon ay dapat mauna bago ang iba pang mga opsyon.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

halimbawa ng paggamit:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Bawasan ang Sumali

Ang mga nakasanayan na magtrabaho sa mga relational database ay kadalasang gumagamit ng napaka-maginhawang Join operation, na nagpapahintulot sa kanila na magkasamang iproseso ang mga nilalaman ng ilang mga talahanayan sa pamamagitan ng pagsali sa kanila ayon sa ilang key. Kapag nagtatrabaho sa malaking data, ang problemang ito ay minsan din lumitaw. Isaalang-alang ang sumusunod na halimbawa:

Mayroong mga log ng dalawang web server, ang bawat log ay ganito ang hitsura:

t\t

Halimbawa ng log snippet:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Kinakailangang kalkulahin para sa bawat IP address kung alin sa 2 server ang mas madalas nitong binisita. Ang resulta ay dapat nasa anyo:

\t

Isang halimbawa ng bahagi ng resulta:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Sa kasamaang palad, hindi tulad ng mga relational database, sa pangkalahatan, ang pagsali sa dalawang log sa pamamagitan ng key (sa kasong ito, sa pamamagitan ng IP address) ay medyo mabigat na operasyon at nalutas gamit ang 3 MapReduce at ang Reduce Join pattern:

Ang ReduceJoin ay gumagana tulad nito:

1) Para sa bawat isa sa mga input log, isang hiwalay na MapReduce (Map lang) ang inilulunsad, na kino-convert ang input data sa sumusunod na form:

key -> (type, value

Kung saan ang susi ay ang susi sa pagsali sa mga talahanayan, ang Uri ay ang uri ng talahanayan (una o pangalawa sa aming kaso), at ang Value ay anumang karagdagang data na nakatali sa susi.

2) Ang mga output ng parehong MapReduces ay pinapakain sa input ng 3rd MapReduce, na, sa katunayan, ay gumaganap ng unyon. Ang MapReduce na ito ay naglalaman ng isang walang laman na Mapper na kinokopya lang ang input. Susunod, ang shuffle ay nagde-decompose ng data sa mga key at pinapakain ito sa reducer bilang input:

key -> [(type, value)]

Mahalaga na sa sandaling ito ang reducer ay tumatanggap ng mga tala mula sa parehong mga log, at sa parehong oras, posible na matukoy sa pamamagitan ng uri ng field kung alin sa dalawang log ang nagmula sa isang partikular na halaga. Kaya mayroong sapat na data upang malutas ang orihinal na problema. Sa aming kaso, kailangan lang kalkulahin ng reducer para sa bawat record key kung aling uri ang nakatagpo ng higit pa at output ang ganitong uri.

5.6 MapJoins

Inilalarawan ng pattern ng ReduceJoin ang pangkalahatang kaso ng pagsali sa dalawang log sa pamamagitan ng key. Gayunpaman, mayroong isang espesyal na kaso kung saan ang gawain ay maaaring makabuluhang pinasimple at pinabilis. Ito ang kaso kung saan ang isa sa mga log ay makabuluhang mas maliit kaysa sa isa. Isaalang-alang ang sumusunod na problema:

Mayroong 2 log. Ang unang log ay naglalaman ng web server log (katulad ng sa nakaraang gawain), ang pangalawang file (100kb ang laki) ay naglalaman ng URL-> Theme match. Halimbawa 2nd file:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Para sa bawat IP address, kinakailangang kalkulahin ang mga pahina kung aling kategorya mula sa IP address na ito ang pinakamadalas na na-load.

Sa kasong ito, kailangan din naming sumali sa 2 log sa pamamagitan ng URL. Gayunpaman, sa kasong ito, hindi namin kailangang magpatakbo ng 3 MapReduce, dahil ang pangalawang log ay ganap na magkakasya sa memorya. Upang malutas ang problema gamit ang 1st MapReduce, maaari naming i-load ang pangalawang log sa Distributed Cache, at kapag nasimulan ang Mapper, basahin lamang ito sa memorya, ilagay ito sa -> diksyonaryo ng paksa.

Dagdag pa, ang problema ay nalutas tulad ng sumusunod:

mapa:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

bawasan:


Ip -> [topics] -> [ip, most_popular_topic]

Ang Reduce ay tumatanggap ng isang ip at isang listahan ng lahat ng mga paksa bilang input, kinakalkula lamang nito kung alin sa mga paksa ang pinakamadalas na nakatagpo. Kaya, ang gawain ay nalutas gamit ang 1st MapReduce, at ang aktwal na Join ay karaniwang nagaganap sa loob ng mapa (samakatuwid, kung ang karagdagang pagsasama-sama sa pamamagitan ng susi ay hindi kailangan, ang MapOnly na trabaho ay maaaring ibigay):