5.1 మ్యాప్ మాత్రమే ఉద్యోగం

ఆచరణాత్మక సమస్యలను పరిష్కరించడానికి MapReduceని సమర్థవంతంగా ఉపయోగించడానికి మిమ్మల్ని అనుమతించే వివిధ సాంకేతికతలను వివరించడానికి ఇది సమయం, అలాగే అభివృద్ధిని సులభతరం చేసే లేదా క్లస్టర్‌లో MapReduce టాస్క్ అమలును గణనీయంగా వేగవంతం చేసే Hadoop యొక్క కొన్ని లక్షణాలను చూపుతుంది.

మనకు గుర్తున్నట్లుగా, MapReduce మ్యాప్, షఫుల్ మరియు తగ్గించు దశలను కలిగి ఉంటుంది. నియమం ప్రకారం, షఫుల్ దశ ఆచరణాత్మక పనులలో చాలా కష్టంగా మారుతుంది, ఎందుకంటే ఈ దశలో డేటా క్రమబద్ధీకరించబడుతుంది. వాస్తవానికి, మ్యాప్ దశ మాత్రమే పంపిణీ చేయగల అనేక పనులు ఉన్నాయి. అటువంటి పనుల ఉదాహరణలు ఇక్కడ ఉన్నాయి:

  • డేటా ఫిల్టరింగ్ (ఉదాహరణకు, వెబ్ సర్వర్ లాగ్‌లలో "IP చిరునామా 123.123.123.123 నుండి అన్ని రికార్డులను కనుగొనండి");
  • డేటా పరివర్తన (“csv-లాగ్‌లలో కాలమ్‌ని తొలగించు”);
  • బాహ్య మూలం నుండి డేటాను లోడ్ చేయడం మరియు అన్‌లోడ్ చేయడం ("లాగ్ నుండి అన్ని రికార్డులను డేటాబేస్‌లోకి చొప్పించండి").

ఇటువంటి పనులు మ్యాప్-మాత్రమే ఉపయోగించి పరిష్కరించబడతాయి. హడూప్‌లో మ్యాప్-మాత్రమే టాస్క్‌ని క్రియేట్ చేస్తున్నప్పుడు, మీరు సున్నా సంఖ్య తగ్గింపులను పేర్కొనాలి:

హడూప్‌లో మ్యాప్-మాత్రమే టాస్క్ కాన్ఫిగరేషన్‌కు ఉదాహరణ:

స్థానిక ఇంటర్ఫేస్ హడూప్ స్ట్రీమింగ్ ఇంటర్‌ఫేస్

job'aని కాన్ఫిగర్ చేస్తున్నప్పుడు తగ్గించేవారి సున్నా సంఖ్యను పేర్కొనండి:

job.setNumReduceTasks(0); 

మేము తగ్గింపుదారుని పేర్కొనము మరియు తగ్గించేవారి సంఖ్యను సున్నాని పేర్కొనము. ఉదాహరణ:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

మ్యాప్ మాత్రమే ఉద్యోగాలు నిజానికి చాలా ఉపయోగకరంగా ఉంటాయి. ఉదాహరణకు, Facetz.DCA ప్లాట్‌ఫారమ్‌లో, వారి ప్రవర్తన ద్వారా వినియోగదారుల లక్షణాలను గుర్తించడానికి, ఇది ఖచ్చితంగా ఒక పెద్ద మ్యాప్ మాత్రమే ఉపయోగించబడుతుంది, వీటిలో ప్రతి మ్యాపర్ వినియోగదారుని ఇన్‌పుట్‌గా తీసుకుంటుంది మరియు అతని లక్షణాలను అవుట్‌పుట్‌గా అందిస్తుంది.

5.2 కలపండి

నేను ఇప్పటికే వ్రాసినట్లుగా, సాధారణంగా మ్యాప్-రిడ్యూస్ టాస్క్ చేసేటప్పుడు అత్యంత కష్టతరమైన దశ షఫుల్ దశ. ఇంటర్మీడియట్ ఫలితాలు (మ్యాపర్ యొక్క అవుట్‌పుట్) డిస్క్‌కి వ్రాయబడి, క్రమబద్ధీకరించబడి మరియు నెట్‌వర్క్ ద్వారా ప్రసారం చేయబడినందున ఇది జరుగుతుంది. అయినప్పటికీ, అటువంటి ప్రవర్తన చాలా సహేతుకమైనదిగా అనిపించని పనులు ఉన్నాయి. ఉదాహరణకు, పత్రాలలో పదాలను లెక్కించే అదే పనిలో, మీరు టాస్క్ యొక్క ఒక మ్యాప్-రిడ్యూస్ నోడ్‌లో అనేక మ్యాపర్‌ల అవుట్‌పుట్‌ల ఫలితాలను ముందే సమగ్రపరచవచ్చు మరియు ప్రతి మెషీన్‌కు ఇప్పటికే సంగ్రహించిన విలువలను తగ్గించేవారికి పంపవచ్చు. .

హడూప్‌లో, దీని కోసం, మీరు మ్యాపర్‌ల భాగం యొక్క అవుట్‌పుట్‌ను ప్రాసెస్ చేసే కలయిక ఫంక్షన్‌ను నిర్వచించవచ్చు. కలపడం ఫంక్షన్ తగ్గించడానికి చాలా పోలి ఉంటుంది - ఇది కొన్ని మ్యాపర్‌ల అవుట్‌పుట్‌ను ఇన్‌పుట్‌గా తీసుకుంటుంది మరియు ఈ మ్యాపర్‌ల కోసం సమగ్ర ఫలితాన్ని ఉత్పత్తి చేస్తుంది, కాబట్టి రీడ్యూసర్ తరచుగా కాంబినర్‌గా కూడా ఉపయోగించబడుతుంది. తగ్గింపు నుండి ముఖ్యమైన వ్యత్యాసం ఏమిటంటే , ఒక కీకి సంబంధించిన అన్ని విలువలు కలపడం ఫంక్షన్‌కు చేరుకోలేవు .

అంతేకాకుండా, మ్యాపర్ యొక్క అవుట్‌పుట్ కోసం కంబైన్ ఫంక్షన్ అస్సలు అమలు చేయబడుతుందని హడూప్ హామీ ఇవ్వదు. అందువల్ల, కలపడం ఫంక్షన్ ఎల్లప్పుడూ వర్తించదు, ఉదాహరణకు, కీ ద్వారా మధ్యస్థ విలువ కోసం శోధించే సందర్భంలో. అయినప్పటికీ, కలపడం ఫంక్షన్ వర్తించే పనులలో, దాని ఉపయోగం MapReduce టాస్క్ వేగంలో గణనీయమైన పెరుగుదలను సాధించడానికి అనుమతిస్తుంది.

హడూప్‌లో కంబైనర్‌ని ఉపయోగించడం:

స్థానిక ఇంటర్ఫేస్ హడూప్ స్ట్రీమింగ్

job-aని కాన్ఫిగర్ చేస్తున్నప్పుడు, class-Combinerని పేర్కొనండి. నియమం ప్రకారం, ఇది రిడ్యూసర్ వలె ఉంటుంది:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

కమాండ్ లైన్ ఎంపికలలో -combiner ఆదేశాన్ని పేర్కొనండి. సాధారణంగా, ఈ ఆదేశం రీడ్యూసర్ కమాండ్ వలె ఉంటుంది. ఉదాహరణ:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 మ్యాప్‌రెడ్యూస్ టాస్క్ చెయిన్‌లు

సమస్యను పరిష్కరించడానికి ఒక MapReduce సరిపోనప్పుడు పరిస్థితులు ఉన్నాయి. ఉదాహరణకు, కొద్దిగా సవరించిన WordCount టాస్క్‌ను పరిగణించండి: టెక్స్ట్ డాక్యుమెంట్‌ల సమితి ఉంది, సెట్‌లో 1 నుండి 1000 సార్లు ఎన్ని పదాలు సంభవించాయో మీరు లెక్కించాలి, 1001 నుండి 2000 వరకు ఎన్ని పదాలు, 2001 నుండి 3000 వరకు ఎన్ని, మరియు అందువలన న. పరిష్కారం కోసం, మాకు 2 MapReduce ఉద్యోగాలు అవసరం:

  • సవరించిన పదాల గణన, ప్రతి పదానికి అది ఏ విరామాలలో పడిపోయిందో గణిస్తుంది;
  • మొదటి MapReduce యొక్క అవుట్‌పుట్‌లో ప్రతి విరామం ఎన్నిసార్లు ఎదుర్కుందో లెక్కించే MapReduce.

సూడో కోడ్ పరిష్కారం:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

హడూప్‌లో మ్యాప్‌రెడ్యూస్ టాస్క్‌ల క్రమాన్ని అమలు చేయడానికి, మొదటి దాని కోసం అవుట్‌పుట్‌గా పేర్కొన్న ఫోల్డర్‌ను రెండవ టాస్క్‌కు ఇన్‌పుట్‌గా పేర్కొని, వాటిని వరుసగా రన్ చేస్తే సరిపోతుంది.

ఆచరణలో, MapReduce టాస్క్‌ల గొలుసులు చాలా క్లిష్టమైన సీక్వెన్స్‌లుగా ఉంటాయి, దీనిలో MapReduce టాస్క్‌లు వరుసగా మరియు ఒకదానికొకటి సమాంతరంగా కనెక్ట్ చేయబడతాయి. అటువంటి టాస్క్ ఎగ్జిక్యూషన్ ప్లాన్‌ల నిర్వహణను సులభతరం చేయడానికి, ఓజీ మరియు లుయిగి వంటి ప్రత్యేక సాధనాలు ఉన్నాయి, ఈ సిరీస్‌లో ప్రత్యేక కథనంలో చర్చించబడతాయి.

5.4 పంపిణీ చేయబడిన కాష్

హడూప్‌లో ఒక ముఖ్యమైన మెకానిజం డిస్ట్రిబ్యూటెడ్ కాష్. MapReduce టాస్క్ అమలులో ఉన్న వాతావరణంలో ఫైల్‌లను (ఉదా. టెక్స్ట్ ఫైల్‌లు, ఆర్కైవ్‌లు, జార్ ఫైల్‌లు) జోడించడానికి డిస్ట్రిబ్యూటెడ్ కాష్ మిమ్మల్ని అనుమతిస్తుంది.

మీరు HDFS, స్థానిక ఫైల్‌లలో నిల్వ చేసిన ఫైల్‌లను జోడించవచ్చు (టాస్క్ ప్రారంభించబడిన మెషీన్‌కు స్థానికంగా). -file ఎంపిక ద్వారా mapper.py మరియు reducer.py ఫైల్‌లను జోడించడం ద్వారా హడూప్ స్ట్రీమింగ్‌తో డిస్ట్రిబ్యూటెడ్ కాష్‌ని ఎలా ఉపయోగించాలో నేను ఇప్పటికే పరోక్షంగా చూపించాను. వాస్తవానికి, మీరు mapper.py మరియు reducer.py మాత్రమే కాకుండా, సాధారణంగా ఏకపక్ష ఫైల్‌లను జోడించవచ్చు, ఆపై వాటిని స్థానిక ఫోల్డర్‌లో ఉన్నట్లుగా ఉపయోగించవచ్చు.

పంపిణీ చేయబడిన కాష్‌ని ఉపయోగించడం:

స్థానిక API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
హడూప్ స్ట్రీమింగ్

#ఫైల్స్ పారామీటర్‌లో పంపిణీ చేయబడిన కాష్‌కి జోడించాల్సిన ఫైల్‌లను మేము జాబితా చేస్తాము. ఇతర ఎంపికల కంటే ముందు --files ఎంపిక తప్పక వస్తుంది.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

వినియోగ ఉదాహరణ:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 చేరడాన్ని తగ్గించండి

రిలేషనల్ డేటాబేస్‌లతో పనిచేయడానికి అలవాటు పడిన వారు తరచుగా చాలా అనుకూలమైన జాయిన్ ఆపరేషన్‌ను ఉపయోగిస్తారు, ఇది కొన్ని పట్టికల కంటెంట్‌లను కొన్ని కీ ప్రకారం చేరడం ద్వారా సంయుక్తంగా ప్రాసెస్ చేయడానికి వీలు కల్పిస్తుంది. పెద్ద డేటాతో పని చేస్తున్నప్పుడు, ఈ సమస్య కొన్నిసార్లు తలెత్తుతుంది. కింది ఉదాహరణను పరిగణించండి:

రెండు వెబ్ సర్వర్‌ల లాగ్‌లు ఉన్నాయి, ప్రతి లాగ్ ఇలా కనిపిస్తుంది:

t\t

లాగ్ స్నిప్పెట్ ఉదాహరణ:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

ప్రతి IP చిరునామా కోసం 2 సర్వర్‌లలో ఏది తరచుగా సందర్శించిందో లెక్కించడం అవసరం. ఫలితం రూపంలో ఉండాలి:

\t

ఫలితం యొక్క భాగానికి ఉదాహరణ:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

దురదృష్టవశాత్తూ, రిలేషనల్ డేటాబేస్‌ల మాదిరిగా కాకుండా, సాధారణంగా, రెండు లాగ్‌లను కీ ద్వారా (ఈ సందర్భంలో, IP చిరునామా ద్వారా) చేరడం చాలా భారీ ఆపరేషన్ మరియు 3 MapReduce మరియు Reduce Join నమూనాను ఉపయోగించి పరిష్కరించబడుతుంది:

ReduceJoin ఇలా పనిచేస్తుంది:

1) ప్రతి ఇన్‌పుట్ లాగ్‌ల కోసం, ఒక ప్రత్యేక MapReduce (మ్యాప్ మాత్రమే) ప్రారంభించబడింది, ఇన్‌పుట్ డేటాను క్రింది ఫారమ్‌కి మారుస్తుంది:

key -> (type, value

పట్టికలలో చేరడానికి కీ కీ అయితే, టైప్ అనేది పట్టిక రకం (మా విషయంలో మొదటి లేదా రెండవది), మరియు విలువ అనేది కీకి కట్టుబడి ఉన్న ఏదైనా అదనపు డేటా.

2) MapReduces రెండింటి యొక్క అవుట్‌పుట్‌లు 3వ MapReduce యొక్క ఇన్‌పుట్‌కు అందించబడతాయి, ఇది వాస్తవానికి యూనియన్‌ను నిర్వహిస్తుంది. ఈ MapReduce కేవలం ఇన్‌పుట్‌ను కాపీ చేసే ఖాళీ మ్యాపర్‌ని కలిగి ఉంది. తర్వాత, షఫుల్ డేటాను కీలుగా విడదీస్తుంది మరియు ఇన్‌పుట్‌గా రీడ్యూసర్‌కి ఫీడ్ చేస్తుంది:

key -> [(type, value)]

ఈ సమయంలో తగ్గింపుదారు రెండు లాగ్‌ల నుండి రికార్డులను స్వీకరించడం చాలా ముఖ్యం మరియు అదే సమయంలో, రెండు లాగ్‌లలో నిర్దిష్ట విలువ వచ్చిన టైప్ ఫీల్డ్ ద్వారా గుర్తించడం సాధ్యమవుతుంది. కాబట్టి అసలు సమస్యను పరిష్కరించడానికి తగినంత డేటా ఉంది. మా విషయంలో, రీడ్యూసర్ ప్రతి రికార్డ్ కీకి ఏ రకం ఎక్కువగా ఎదురైందో మరియు ఈ రకాన్ని అవుట్‌పుట్ చేసిందో లెక్కించాలి.

5.6 MapJoins

ReduceJoin నమూనా కీ ద్వారా రెండు లాగ్‌లను కలిపే సాధారణ సందర్భాన్ని వివరిస్తుంది. ఏదేమైనా, పనిని గణనీయంగా సరళీకృతం చేయడం మరియు వేగవంతం చేయడం వంటి ప్రత్యేక సందర్భం ఉంది. లాగ్‌లలో ఒకటి మరొకదాని కంటే గణనీయంగా తక్కువగా ఉండే సందర్భం ఇది. కింది సమస్యను పరిగణించండి:

2 లాగ్‌లు ఉన్నాయి. మొదటి లాగ్ వెబ్ సర్వర్ లాగ్‌ను కలిగి ఉంది (మునుపటి టాస్క్‌లో అదే), రెండవ ఫైల్ (100kb పరిమాణం) URL-> థీమ్ సరిపోలికను కలిగి ఉంటుంది. ఉదాహరణ 2వ ఫైల్:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

ప్రతి IP చిరునామా కోసం, ఈ IP చిరునామా నుండి ఏ వర్గం యొక్క పేజీలు ఎక్కువగా లోడ్ చేయబడతాయో లెక్కించడం అవసరం.

ఈ సందర్భంలో, మేము URL ద్వారా 2 లాగ్‌లను కూడా చేరాలి. అయితే, ఈ సందర్భంలో, మేము 3 MapReduceని అమలు చేయవలసిన అవసరం లేదు, ఎందుకంటే రెండవ లాగ్ మెమరీకి పూర్తిగా సరిపోతుంది. 1వ MapReduceని ఉపయోగించి సమస్యను పరిష్కరించడానికి, మేము రెండవ లాగ్‌ను డిస్ట్రిబ్యూటెడ్ కాష్‌లోకి లోడ్ చేయవచ్చు మరియు మ్యాపర్ ప్రారంభించబడినప్పుడు, దాన్ని మెమరీలోకి చదవండి, దాన్ని -> టాపిక్ డిక్షనరీలో ఉంచండి.

ఇంకా, సమస్య ఈ క్రింది విధంగా పరిష్కరించబడుతుంది:

పటం:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

తగ్గించండి:


Ip -> [topics] -> [ip, most_popular_topic]

తగ్గించు ఒక ip మరియు అన్ని అంశాల జాబితాను ఇన్‌పుట్‌గా అందుకుంటుంది, ఇది ఏ టాపిక్‌లను ఎక్కువగా ఎదుర్కొందో గణిస్తుంది. ఈ విధంగా, 1వ MapReduceని ఉపయోగించి పని పరిష్కరించబడుతుంది మరియు అసలు చేరడం సాధారణంగా మ్యాప్‌లో జరుగుతుంది (అందుచేత, కీ ద్వారా అదనపు సమీకరణ అవసరం లేకుంటే, MapOnly జాబ్‌ని దీనితో పంపిణీ చేయవచ్చు):