4.1 Hadoop 概况

MapReduce 范式由 Google 于 2004 年在其文章MapReduce: Simplified Data Processing on Large Clusters中提出。由于提议的文章包含范例的描述,但缺少实现,雅虎的几位程序员提议将他们的实现作为 nutch 网络爬虫工作的一部分。您可以在 Hadoop 的历史:从 4 个节点到数据的未来一文中阅读有关 Hadoop 历史的更多信息。

最初,Hadoop 主要是一种用于存储数据和运行 MapReduce 任务的工具,但现在 Hadoop 是一大堆以某种方式与处理大数据相关的技术(不仅与 MapReduce 相关)。

Hadoop 的主要(核心)组件是:

还有大量与Hadoop直接相关,但不属于Hadoop核心的项目:

  • Hive - 一种对大数据进行类 SQL 查询的工具(将 SQL 查询转换为一系列 MapReduce 任务);
  • Pig是一种用于高级数据分析的编程语言。这种语言的一行代码可以变成一系列 MapReduce 任务;
  • Hbase是一个实现了BigTable范式的列式数据库;
  • Cassandra是一个高性能的分布式键值数据库;
  • ZooKeeper是一个分布式配置存储和同步配置变更的服务;
  • Mahout是一个大数据机器学习库和引擎。

另外,我想提一下Apache Spark项目,它是一个用于分布式数据处理的引擎。Apache Spark 通常使用 Hadoop 组件(如 HDFS 和 YARN)来工作,而它本身最近变得比 Hadoop 更受欢迎:

其中一些组件将在本系列材料的单独文章中介绍,但现在,让我们看看如何开始使用 Hadoop 并将其付诸实践。

4.2 在 Hadoop 上运行 MapReduce 程序

现在让我们看看如何在 Hadoop 上运行 MapReduce 任务。作为一项任务,我们将使用在上一课中讨论过的经典WordCount示例。

让我提醒你问题的提法:有一套文件。需要为文档集中出现的每个词统计该词在该集中出现的次数。

解决方案:

Map 将文档拆分为单词并返回一组对 (word, 1)。

Reduce 对每个词的出现次数求和:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

现在的任务是以可以在 Hadoop 上执行和运行的代码形式编写此解决方案。

4.3 方法编号 1。Hadoop 流媒体

在 Hadoop 上运行 MapReduce 程序的最简单方法是使用 Hadoop 流接口。流接口假定mapreduce被实现为从 stdin 获取数据并输出到stdout 的程序。

执行map函数的程序称为mapper。执行reduce的程序分别称为reducer

Streaming 接口默认假定映射器缩减器中的一个传入行对应于map的一个传入条目。

mapper 的输出以 pairs (key, value) 的形式到达 reducer 的输入,同时所有 pairs 对应同一个 key:

  • 保证由reducer单次启动处理;
  • 将连续提交给输入(也就是说,如果一个reducer处理几个不同的键,输入将按键分组)。

那么让我们在python中实现mapper和reducer:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Hadoop要处理的数据必须存储在HDFS上。让我们上传我们的文章并将它们放在 HDFS 上。为此,请使用hadoop fs命令:

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

hadoop fs 实用程序支持大量用于操作文件系统的方法,其中许多方法与标准 linux 实用程序相同。

现在让我们开始流式传输任务:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

yarn 实用程序用于在集群上启动和管理各种应用程序(包括基于 map-reduce 的应用程序)。Hadoop-streaming.jar 只是此类 yarn 应用程序的一个示例。

接下来是启动选项:

  • 输入 - 在 hdfs 上包含源数据的文件夹;
  • output - hdfs 上要放置结果的文件夹;
  • file——map-reduce任务运行过程中需要的文件;
  • mapper 是将用于地图阶段的控制台命令;
  • reduce 是将用于 reduce 阶段的控制台命令。

启动后,您可以在控制台中看到任务的进度以及用于查看有关任务的更详细信息的 URL。

在此 URL 提供的界面中,您可以找到更详细的任务执行状态,查看每个 mapper 和 reducer 的日志(这在任务失败的情况下非常有用)。

成功执行后的工作结果将添加到我们在输出字段中指定的文件夹中的 HDFS 中。您可以使用“hadoop fs -ls lenta_wordcount”命令查看其内容。

结果本身可以通过以下方式获得:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

“hadoop fs -text”命令以文本形式显示文件夹的内容。我按单词出现的次数对结果进行排序。不出所料,该语言中最常用的词是介词。

4.4 方法 2:使用 Java

Hadoop本身是用java编写的,Hadoop的原生接口也是基于java的。让我们展示一个用于 wordcount 的原生 Java 应用程序是什么样的:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

此类与我们的 Python 示例完全相同。我们通过分别派生自 Mapper 和 Reducer 类来创建 TokenizerMapper 和 IntSumReducer 类。作为模板参数传递的类指定输入和输出值的类型。本机 API 假定 map 函数被赋予一个键值对作为输入。因为在我们的例子中键是空的,所以我们简单地将 Object 定义为键类型。

在 Main 方法中,我们启动 mapreduce 任务并定义它的参数——名称、mapper 和 reducer、HDFS 中的路径、输入数据所在的位置以及结果的放置位置。要编译,我们需要 hadoop 库。我使用 Maven 来构建,cloudera 有一个存储库。可以在此处找到设置说明。结果,我得到的 pom.xmp 文件(maven 用来描述项目的程序集)如下:

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

我们把工程编译成jar包:

mvn clean package

将项目构建成 jar 文件后,启动以类似的方式进行,就像流接口的情况一样:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

我们等待执行并检查结果:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

您可能会猜到,运行我们的本机应用程序的结果与我们以先前方式启动的流式应用程序的结果相同。