5.1 Csak térképes feladat

Itt az ideje, hogy leírjunk különféle technikákat, amelyek lehetővé teszik a MapReduce hatékony használatát gyakorlati problémák megoldására, valamint bemutatjuk a Hadoop néhány olyan funkcióját, amelyek leegyszerűsíthetik a fejlesztést vagy jelentősen felgyorsíthatják a MapReduce-feladatok végrehajtását egy fürtön.

Emlékszünk rá, hogy a MapReduce Map, Shuffle és Reduce szakaszokból áll. Általában a Shuffle szakasz a legnehezebb a gyakorlati feladatokban, mivel az adatokat ebben a szakaszban rendezik. Valójában számos olyan feladat van, amelyekben a Térkép szakasz önmagában elhagyható. Íme példák az ilyen feladatokra:

  • Adatszűrés (például "Az összes rekord megkeresése a 123.123.123.123 IP-címről" a webszerver naplóiban);
  • Adatátalakítás ("Oszlop törlése a csv-naplókban");
  • Adatok betöltése és eltávolítása külső forrásból („A napló összes rekordjának beillesztése az adatbázisba”).

Az ilyen feladatokat a csak térkép segítségével oldják meg. Ha csak térképes feladatot hoz létre a Hadoopban, nulla számú szűkítőt kell megadnia:

Példa a hadoop csak térképes feladatkonfigurációjára:

natív felület Hadoop streaming felület

Adja meg a szűkítők nulla számát az a feladat konfigurálásakor:

job.setNumReduceTasks(0); 

Nem adunk meg szűkítőt, és nulla számú szűkítőt adunk meg. Példa:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

A csak térképes állások valójában nagyon hasznosak lehetnek. Például a Facetz.DCA platformon a felhasználók jellemzőinek viselkedésük alapján történő azonosítására pontosan egy nagy térképet használnak, amelynek minden egyes leképezője egy felhasználót vesz inputként, és az ő jellemzőit adja vissza kimenetként.

5.2 Összevonás

Ahogy már írtam, a Map-Reduce feladat végrehajtásának legnehezebb szakasza általában a keverési szakasz. Ez azért történik, mert a közbenső eredményeket (a leképező kimenetét) a rendszer lemezre írja, rendezi és továbbítja a hálózaton. Vannak azonban olyan feladatok, amelyekben az ilyen viselkedés nem tűnik túl ésszerűnek. Például a dokumentumokban lévő szavak számlálásának ugyanabban a feladatában előre összesítheti több leképező kimenetének eredményeit a feladat egy térképredukáló csomópontján, és átadhatja az egyes gépek már összesített értékeit a reduktornak. .

A hadoopban ehhez definiálhat egy kombináló függvényt, amely feldolgozza a leképezők egy részének kimenetét. A kombináló funkció nagyon hasonlít a redukcióhoz – egyes leképezők kimenetét veszi bemenetként, és összesített eredményt ad ezekre a leképezőkre, így a reduktort gyakran kombinálóként is használják. Fontos különbség a redukcióhoz képest, hogy nem minden, egy kulcsnak megfelelő érték jut el a kombináló funkcióhoz .

Sőt, a hadoop nem garantálja, hogy a leképező kimenetén a kombájn függvény egyáltalán végrehajtásra kerül. Emiatt a kombináló funkció nem mindig alkalmazható, például a mediánérték kulcs szerinti keresése esetén. Mindazonáltal azokban a feladatokban, ahol a kombináló funkció alkalmazható, használata lehetővé teszi a MapReduce feladat gyorsaságának jelentős növelését.

A Combiner használata a hadoop-on:

natív felület Hadoop streamelés

A job-a konfigurálásakor adja meg a Combiner osztályt. Általános szabály, hogy ez ugyanaz, mint a Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Adja meg a -combiner parancsot a parancssori beállításokban. Ez a parancs általában megegyezik a reduktor paranccsal. Példa:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce feladatláncok

Vannak helyzetek, amikor egy MapReduce nem elég egy probléma megoldásához. Például vegyünk egy kicsit módosított WordCount feladatot: van egy szöveges dokumentumkészlet, meg kell számolni, hogy hány szó fordult elő 1-től 1000-ig a készletben, hány szó 1001-től 2000-ig, hány 2001-től 3000-ig, stb. A megoldáshoz 2 MapReduce munkára van szükségünk:

  • Módosított szószám, amely minden szónál kiszámítja, hogy melyik intervallumba esett;
  • Egy MapReduce, amely megszámolja, hogy az egyes intervallumok hányszor fordultak elő az első MapReduce kimenetében.

Pszeudokód megoldás:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

A MapReduce feladatok sorozatának végrehajtásához a hadoop-on elég, ha megadja az elsőhöz kimenetként megadott mappát a második feladat bemeneteként, és felváltva futtatja azokat.

A gyakorlatban a MapReduce feladatok láncai meglehetősen összetett sorozatok lehetnek, amelyekben a MapReduce feladatok szekvenciálisan és egymással párhuzamosan is összekapcsolhatók. Az ilyen feladat-végrehajtási tervek kezelésének egyszerűsítésére külön eszközök állnak rendelkezésre, mint például az oozie és a luigi, amelyekről a sorozat egy külön cikkében lesz szó.

5.4 Elosztott gyorsítótár

A Hadoop egyik fontos mechanizmusa az elosztott gyorsítótár. Az elosztott gyorsítótár lehetővé teszi fájlok (pl. szöveges fájlok, archívumok, jar fájlok) hozzáadását ahhoz a környezethez, ahol a MapReduce feladat fut.

Hozzáadhat HDFS-en tárolt fájlokat, helyi fájlokat (annak a gépnek a helyén, amelyről a feladat elindul). Már implicit módon bemutattam, hogyan kell használni az elosztott gyorsítótárat hadoop streameléssel a mapper.py és a reduktor.py fájlok hozzáadásával a -file opcióval. Valójában nemcsak a mapper.py és a reduktor.py, hanem általában tetszőleges fájlokat is hozzáadhatja, majd úgy használhatja őket, mintha egy helyi mappában lennének.

Az elosztott gyorsítótár használata:

Natív API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop Streaming

#a –files paraméterben felsoroljuk azokat a fájlokat, amelyeket hozzá kell adni az elosztott gyorsítótárhoz. A --files kapcsolónak a többi opció előtt kell lennie.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

használati példa:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Csatlakozás csökkentése

A relációs adatbázisokkal való munkavégzéshez szokottak gyakran használják a nagyon kényelmes Join műveletet, amely lehetővé teszi, hogy egyes táblák tartalmát közösen dolgozzák fel, valamilyen kulcs szerint összekapcsolva. A nagy adatokkal való munka során néha ez a probléma is felmerül. Tekintsük a következő példát:

Két webszerver naplója van, mindegyik napló így néz ki:

t\t

Példa naplórészletre:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Minden IP-címhez ki kell számítani, hogy a 2 szerver közül melyiket látogatta meg gyakrabban. Az eredménynek a következő formában kell lennie:

\t

Példa az eredmény egy részére:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Sajnos a relációs adatbázisokkal ellentétben általában két napló kulcsonkénti összekapcsolása (jelen esetben IP-cím alapján) meglehetősen nehéz művelet, és a 3 MapReduce és a Reduce Join minta segítségével oldható meg:

A ReduceJoin így működik:

1) Mindegyik bemeneti naplóhoz külön MapReduce (csak térkép) indul el, amely a bemeneti adatokat a következő formára konvertálja:

key -> (type, value

Ahol a kulcs a táblák összekapcsolásának kulcsa, a Type a tábla típusa (esetünkben az első vagy a második), az Érték pedig a kulcshoz kötött bármely további adat.

2) Mindkét MapReduce kimenete a 3. MapReduce bemenetére van táplálva, amely valójában végrehajtja az egyesülést. Ez a MapReduce egy üres leképezőt tartalmaz, amely egyszerűen másolja a bemenetet. Ezután a shuffle kulcsokra bontja az adatokat, és bemenetként továbbítja a reduktorba:

key -> [(type, value)]

Fontos, hogy ebben a pillanatban a reduktor mindkét naplóból fogadja a rekordokat, és ezzel egyidejűleg a típusmező alapján azonosítható legyen, hogy a két napló közül melyikből származott egy adott érték. Tehát elegendő adat áll rendelkezésre az eredeti probléma megoldásához. Esetünkben a reduktornak egyszerűen ki kell számítania minden egyes rekordkulcsra, hogy melyik típus találkozott többet, és ezt a típust adja ki.

5.6 MapJoins

A ReduceJoin minta két napló kulcsonkénti összekapcsolásának általános esetét írja le. Van azonban egy speciális eset, amikor a feladat jelentősen leegyszerűsíthető és felgyorsítható. Ez az az eset, amikor az egyik rönk lényegesen kisebb, mint a másik. Vegye figyelembe a következő problémát:

2 napló van. Az első napló a webszerver naplóját tartalmazza (ugyanaz, mint az előző feladatban), a második fájl (100 kb méretű) az URL-> Theme egyezést tartalmazza. Példa 2. fájl:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Minden IP-címhez ki kell számítani, hogy ebből az IP-címből melyik kategóriába tartozó oldalak töltötték be leggyakrabban.

Ebben az esetben 2 naplót is össze kell kapcsolnunk URL alapján. Ebben az esetben azonban nem kell 3 MapReduce-t futtatnunk, mivel a második napló teljesen belefér a memóriába. A probléma megoldása érdekében az 1. MapReduce segítségével a második naplót betölthetjük a Distributed Cache-be, majd a Mapper inicializálása után egyszerűen beolvassuk a memóriába, a -> témaszótárba helyezve.

Továbbá a probléma megoldása a következőképpen történik:

térkép:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

csökkenteni:


Ip -> [topics] -> [ip, most_popular_topic]

A Reduce bemenetként kap egy IP-címet és egy listát az összes témakörről, egyszerűen kiszámolja, hogy melyik témakörrel találkoztak leggyakrabban. Így a feladat megoldása az 1. MapReduce segítségével történik, és a tényleges csatlakozás általában a térképen belül történik (tehát ha nincs szükség további kulcsonkénti összesítésre, a MapOnly feladattól eltekinthetünk):