4.1 Hadoop 概況

MapReduce 範式由 Google 於 2004 年在其文章MapReduce: Simplified Data Processing on Large Clusters中提出。由於提議的文章包含範例的描述,但缺少實現,雅虎的幾位程序員提議將他們的實現作為 nutch 網絡爬蟲工作的一部分。您可以在 Hadoop 的歷史:從 4 個節點到數據的未來一文中閱讀有關 Hadoop 歷史的更多信息。

最初,Hadoop 主要是一種用於存儲數據和運行 MapReduce 任務的工具,但現在 Hadoop 是一大堆以某種方式與處理大數據相關的技術(不僅與 MapReduce 相關)。

Hadoop 的主要(核心)組件是:

還有大量與Hadoop直接相關,但不屬於Hadoop核心的項目:

  • Hive - 一種對大數據進行類 SQL 查詢的工具(將 SQL 查詢轉換為一系列 MapReduce 任務);
  • Pig是一種用於高級數據分析的編程語言。這種語言的一行代碼可以變成一系列 MapReduce 任務;
  • Hbase是一個實現了BigTable範式的列式數據庫;
  • Cassandra是一個高性能的分佈式鍵值數據庫;
  • ZooKeeper是一個分佈式配置存儲和同步配置變更的服務;
  • Mahout是一個大數據機器學習庫和引擎。

另外,我想提一下Apache Spark項目,它是一個用於分佈式數據處理的引擎。Apache Spark 通常使用 Hadoop 組件(如 HDFS 和 YARN)來工作,而它本身最近變得比 Hadoop 更受歡迎:

其中一些組件將在本系列材料的單獨文章中介紹,但現在,讓我們看看如何開始使用 Hadoop 並將其付諸實踐。

4.2 在 Hadoop 上運行 MapReduce 程序

現在讓我們看看如何在 Hadoop 上運行 MapReduce 任務。作為一項任務,我們將使用在上一課中討論過的經典WordCount示例。

讓我提醒你問題的提法:有一套文件。需要為文檔集中出現的每個詞統計該詞在該集中出現的次數。

解決方案:

Map 將文檔拆分為單詞並返回一組對 (word, 1)。

Reduce 對每個詞的出現次數求和:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

現在的任務是以可以在 Hadoop 上執行和運行的代碼形式編寫此解決方案。

4.3 方法編號 1。Hadoop 流媒體

在 Hadoop 上運行 MapReduce 程序的最簡單方法是使用 Hadoop 流接口。流接口假定mapreduce被實現為從 stdin 獲取數據並輸出到stdout 的程序。

執行map函數的程序稱為mapper。執行reduce的程序分別稱為reducer

Streaming 接口默認假定映射器縮減器中的一個傳入行對應於map的一個傳入條目。

mapper 的輸出以 pairs (key, value) 的形式到達 reducer 的輸入,而所有 pairs 都對應同一個 key:

  • 保證由reducer單次啟動處理;
  • 將連續提交給輸入(也就是說,如果一個reducer處理幾個不同的鍵,輸入將按鍵分組)。

那麼讓我們在python中實現mapper和reducer:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Hadoop要處理的數據必須存儲在HDFS上。讓我們上傳我們的文章並將它們放在 HDFS 上。為此,請使用hadoop fs命令:

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

hadoop fs 實用程序支持大量用於操作文件系統的方法,其中許多方法與標準 linux 實用程序相同。

現在讓我們開始流式傳輸任務:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

yarn 實用程序用於在集群上啟動和管理各種應用程序(包括基於 map-reduce 的應用程序)。Hadoop-streaming.jar 只是此類 yarn 應用程序的一個示例。

接下來是啟動選項:

  • 輸入 - 在 hdfs 上包含源數據的文件夾;
  • output - hdfs 上要放置結果的文件夾;
  • file——map-reduce任務運行過程中需要的文件;
  • mapper 是將用於地圖階段的控制台命令;
  • reduce 是將用於 reduce 階段的控制台命令。

啟動後,您可以在控制台中看到任務的進度以及用於查看有關任務的更詳細信息的 URL。

在此 URL 提供的界面中,您可以找到更詳細的任務執行狀態,查看每個 mapper 和 reducer 的日誌(這在任務失敗的情況下非常有用)。

成功執行後的工作結果將添加到我們在輸出字段中指定的文件夾中的 HDFS 中。您可以使用“hadoop fs -ls lenta_wordcount”命令查看其內容。

結果本身可以通過以下方式獲得:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

“hadoop fs -text”命令以文本形式顯示文件夾的內容。我按單詞出現的次數對結果進行排序。不出所料,該語言中最常用的詞是介詞。

4.4 方法 2:使用 Java

Hadoop本身是用java編寫的,Hadoop的原生接口也是基於java的。讓我們展示一個用於 wordcount 的原生 Java 應用程序是什麼樣的:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

此類與我們的 Python 示例完全相同。我們通過分別派生自 Mapper 和 Reducer 類來創建 TokenizerMapper 和 IntSumReducer 類。作為模板參數傳遞的類指定輸入和輸出值的類型。本機 API 假定 map 函數被賦予一個鍵值對作為輸入。因為在我們的例子中鍵是空的,所以我們簡單地將 Object 定義為鍵類型。

在 Main 方法中,我們啟動 mapreduce 任務並定義它的參數——名稱、mapper 和 reducer、HDFS 中的路徑、輸入數據所在的位置以及結果的放置位置。要編譯,我們需要 hadoop 庫。我使用 Maven 來構建,cloudera 有一個存儲庫。可以在此處找到設置說明。結果,我得到的 pom.xmp 文件(maven 用來描述項目的程序集)如下:

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

我們把工程編譯成jar包:

mvn clean package

將項目構建成 jar 文件後,啟動以類似的方式進行,就像流接口的情況一樣:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

我們等待執行並檢查結果:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

您可能會猜到,運行我們的本機應用程序的結果與我們以先前方式啟動的流式應用程序的結果相同。