5.1 僅映射作業

現在是時候描述各種技術,使您能夠有效地使用 MapReduce 解決實際問題,並展示 Hadoop 的一些功能,這些功能可以簡化開發或顯著加快 MapReduce 任務在集群上的執行速度。

我們記得,MapReduce 由 Map、Shuffle 和 Reduce 階段組成。通常,Shuffle 階段在實際任務中是最困難的,因為數據是在這個階段排序的。事實上,有許多任務可以單獨免除 Map 階段。以下是此類任務的示例:

  • 數據過濾(例如Web服務器日誌中的“從IP地址123.123.123.123查找所有記錄”);
  • 數據轉換(“刪除 csv 日誌中的列”);
  • 從外部源加載和卸載數據(“將日誌中的所有記錄插入數據庫”)。

此類任務使用 Map-Only 解決。在 Hadoop 中創建 Map-Only 任務時,您需要指定零個 reducer:

hadoop 上的 map-only 任務配置示例:

本機接口 Hadoop 流接口

配置 job'a 時指定 reducer 數量為零:

job.setNumReduceTasks(0); 

我們不指定減速器並指定零數量的減速器。例子:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Map Only 作業實際上非常有用。例如,在Facetz.DCA平台中,通過用戶的行為來識別用戶的特徵,正是使用了一個大的map-only,每個mapper以一個用戶作為輸入,返回他的特徵作為輸出。

5.2 合併

正如我已經寫過的,執行 Map-Reduce 任務時最困難的階段通常是洗牌階段。發生這種情況是因為中間結果(映射器的輸出)被寫入磁盤、排序並通過網絡傳輸。但是,在某些任務中,這種行為似乎不太合理。比如同一個統計文檔中單詞的任務,可以在任務的一個map-reduce節點上預先聚合好幾個mapper的輸出結果,將每台機器已經求和的值傳遞給reducer .

為此,在 hadoop 中,您可以定義一個組合函數來處理部分映射器的輸出。combining 函數與 reduce 非常相似——它將一些 mappers 的輸出作為輸入,並為這些 mappers 產生一個聚合結果,因此 reducer 也經常被用作 combiner。與reduce的一個重要區別是不是一個key對應的所有值都到達combining函數

此外,hadoop 不保證 combine 函數會針對 mapper 的輸出執行。因此,組合函數並不總是適用的,例如,在按鍵搜索中值的情況下。然而,在那些適用組合功能的任務中,它的使用可以顯著提高 MapReduce 任務的速度。

在 hadoop 上使用 Combiner:

本機接口 Hadoop 流式處理

配置job-a時,指定class-Combiner。通常,它與 Reducer 相同:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

在命令行選項中指定 -combiner 命令。通常,此命令與 reducer 命令相同。例子:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce 任務鏈

有時一個 MapReduce 不足以解決問題。例如,考慮一個稍微修改過的 WordCount 任務:有一組文本文檔,你需要統計該集合中從 1 到 1000 次出現了多少個單詞,從 1001 到 2000 次出現了多少個單詞,從 2001 到 3000 次出現了多少個單詞,等等。對於解決方案,我們需要 2 個 MapReduce 作業:

  • 修改了wordcount,對於每個單詞都會計算它屬於哪個區間;
  • 一個 MapReduce,計算在第一個 MapReduce 的輸出中遇到每個間隔的次數。

偽代碼解決方案:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

為了在 hadoop 上執行一系列 MapReduce 任務,只需將指定為第一個任務的輸出的文件夾指定為第二個任務的輸入並依次運行它們就足夠了。

在實踐中,MapReduce 任務鏈可能是非常複雜的序列,其中 MapReduce 任務可以按順序或併行地相互連接。為了簡化此類任務執行計劃的管理,有 oozie 和 luigi 等單獨的工具,將在本系列的另一篇文章中進行討論。

5.4 分佈式緩存

Hadoop 中的一個重要機制是分佈式緩存。分佈式緩存允許您將文件(例如文本文件、存檔、jar 文件)添加到 MapReduce 任務運行的環境中。

您可以添加存儲在 HDFS 上的文件,本地文件(本地到啟動任務的機器)。我已經通過 -file 選項添加 mapper.py 和 reducer.py 文件,隱含地展示瞭如何將分佈式緩存與 hadoop 流一起使用。事實上,您不僅可以添加 mapper.py 和 reducer.py,還可以添加一般的任意文件,然後像在本地文件夾中一樣使用它們。

使用分佈式緩存:

原生API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop 流媒體

#we在–files參數中列出了需要添加到分佈式緩存中的文件。--files 選項必須位於其他選項之前。

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

使用示例:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 減少連接

習慣於使用關係型數據庫的人通常會使用非常方便的 Join 操作,它可以讓他們根據某個鍵將某些表的內容連接起來,從而共同處理這些表的內容。在處理大數據時,有時也會出現這個問題。考慮以下示例:

有兩個web服務器的日誌,每個日誌長這樣:

t\t

日誌片段示例:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

有必要為每個 IP 地址計算它更頻繁地訪問 2 個服務器中的哪一個。結果應採用以下形式:

\t

部分結果示例:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

不幸的是,與關係數據庫不同,一般來說,通過鍵(在這種情況下,通過 IP 地址)連接兩個日誌是一項相當繁重的操作,可以使用 3 MapReduce 和 Reduce Join 模式解決:

ReduceJoin 的工作方式如下:

1)對於每個輸入日誌,啟動一個單獨的 MapReduce(僅限 Map),將輸入數據轉換為以下形式:

key -> (type, value

其中 key 是連接表的鍵,Type 是表的類型(在我們的例子中是 first 或 second),Value 是綁定到鍵的任何附加數據。

2)兩個 MapReduce 的輸出被饋送到第三個 MapReduce 的輸入,實際上,第三個 MapReduce 執行並集。這個 MapReduce 包含一個空的 Mapper,它只是複制輸入。接下來,shuffle 將數據分解為鍵並將其作為輸入提供給 reducer:

key -> [(type, value)]

重要的是,此時 reducer 從兩個日誌中接收記錄,同時,可以通過類型字段識別特定值來自兩個日誌中的哪一個。所以有足夠的數據來解決原來的問題。在我們的例子中,reducer 只需為每個記錄鍵計算哪種類型遇到的次數更多,並輸出該類型。

5.6 MapJoin

ReduceJoin 模式描述了通過鍵連接兩個日誌的一般情況。但是,有一種特殊情況可以顯著簡化和加速任務。這是其中一個日誌明顯小於另一個日誌的情況。考慮以下問題:

有2個日誌。第一個日誌包含 Web 服務器日誌(與上一個任務相同),第二個文件(大小為 100kb)包含 URL-> 主題匹配。示例 2 文件:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

對於每個 IP 地址,有必要計算最常加載來自該 IP 地址的哪個類別的頁面。

在這種情況下,我們還需要通過 URL 連接 2 個日誌。然而,在這種情況下,我們不必運行 3 個 MapReduce,因為第二個日誌將完全適合內存。為了解決使用第一個MapReduce的問題,我們可以將第二個log加載到Distributed Cache中,在Mapper初始化的時候,直接讀入內存,放到->topic字典中。

進一步地,問題解決如下:

地圖:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

減少:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce 接收一個 ip 和所有主題的列表作為輸入,它簡單地計算最常遇到的主題。因此,任務是使用第一個 MapReduce 解決的,實際的 Join 通常發生在地圖內部(因此,如果不需要額外的鍵聚合,則可以省去 MapOnly 作業):