5.1 仅映射作业

现在是时候描述各种技术,让您可以有效地使用 MapReduce 解决实际问题,并展示 Hadoop 的一些功能,这些功能可以简化开发或显着加快 MapReduce 任务在集群上的执行速度。

我们记得,MapReduce 由 Map、Shuffle 和 Reduce 阶段组成。通常,Shuffle 阶段在实际任务中是最困难的,因为数据是在这个阶段排序的。事实上,有许多任务可以单独免除 Map 阶段。以下是此类任务的示例:

  • 数据过滤(例如Web服务器日志中的“从IP地址123.123.123.123查找所有记录”);
  • 数据转换(“删除 csv 日志中的列”);
  • 从外部源加载和卸载数据(“将日志中的所有记录插入数据库”)。

此类任务使用 Map-Only 解决。在 Hadoop 中创建 Map-Only 任务时,您需要指定零个 reducer:

hadoop 上的 map-only 任务配置示例:

本机界面 Hadoop 流接口

配置 job'a 时指定 reducer 数量为零:

job.setNumReduceTasks(0); 

我们不指定减速器并指定零数量的减速器。例子:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Map Only 作业实际上非常有用。例如,在Facetz.DCA平台中,通过用户的行为来识别用户的特征,正是使用了一个大的map-only,每个mapper以一个用户作为输入,返回他的特征作为输出。

5.2 合并

正如我已经写过的,执行 Map-Reduce 任务时最困难的阶段通常是洗牌阶段。发生这种情况是因为中间结果(映射器的输出)被写入磁盘、排序并通过网络传输。但是,在某些任务中,这种行为似乎不太合理。比如在同一个统计文档单词的任务中,可以在任务的一个map-reduce节点上预先聚合好几个mapper的输出结果,将每台机器已经求和的值传递给reducer .

为此,在 hadoop 中,您可以定义一个组合函数来处理部分映射器的输出。combining 函数与 reduce 非常相似——它将一些 mappers 的输出作为输入,并为这些 mappers 产生一个聚合结果,因此 reducer 也经常被用作 combiner。与reduce的一个重要区别是不是一个key对应的所有值都到达combining函数

此外,hadoop 不保证 combine 函数会针对 mapper 的输出执行。因此,组合函数并不总是适用的,例如,在按键搜索中值的情况下。然而,在那些适用组合功能的任务中,它的使用可以显着提高 MapReduce 任务的速度。

在 hadoop 上使用 Combiner:

本机界面 Hadoop 流式处理

配置job-a时,指定class-Combiner。通常,它与 Reducer 相同:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

在命令行选项中指定 -combiner 命令。通常,此命令与 reducer 命令相同。例子:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce 任务链

有时一个 MapReduce 不足以解决问题。例如,考虑一个稍微修改过的 WordCount 任务:有一组文本文档,你需要统计集合中从 1 到 1000 次出现了多少个单词,从 1001 到 2000 有多少个单词,从 2001 到 3000 有多少个单词,等等。对于解决方案,我们需要 2 个 MapReduce 作业:

  • 修改了wordcount,对于每个单词都会计算它属于哪个区间;
  • 一个 MapReduce,计算在第一个 MapReduce 的输出中遇到每个间隔的次数。

伪代码解决方案:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

为了在 hadoop 上执行一系列 MapReduce 任务,只需将指定为第一个任务的输出的文件夹指定为第二个任务的输入并依次运行它们就足够了。

在实践中,MapReduce 任务链可能是非常复杂的序列,其中 MapReduce 任务可以按顺序或并行地相互连接。为了简化此类任务执行计划的管理,有 oozie 和 luigi 等单独的工具,将在本系列的另一篇文章中进行讨论。

5.4 分布式缓存

Hadoop 中的一个重要机制是分布式缓存。分布式缓存允许您将文件(例如文本文件、存档、jar 文件)添加到 MapReduce 任务运行的环境中。

您可以添加存储在 HDFS 上的文件,本地文件(本地到启动任务的机器)。我已经通过 -file 选项添加 mapper.py 和 reducer.py 文件,隐含地展示了如何将分布式缓存与 hadoop 流结合使用。事实上,您不仅可以添加 mapper.py 和 reducer.py,还可以添加一般的任意文件,然后像在本地文件夹中一样使用它们。

使用分布式缓存:

本机 API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop 流媒体

#we在–files参数中列出需要添加到分布式缓存的文件。--files 选项必须位于其他选项之前。

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

使用示例:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 减少连接

习惯于使用关系型数据库的人通常会使用非常方便的 Join 操作,它可以让他们根据某个键将某些表的内容连接起来,从而共同处理这些表的内容。在处理大数据时,有时也会出现这个问题。考虑以下示例:

有两个web服务器的日志,每个日志长这样:

t\t

日志片段示例:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

有必要为每个 IP 地址计算它更频繁地访问 2 个服务器中的哪一个。结果应采用以下形式:

\t

部分结果示例:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

不幸的是,与关系数据库不同,一般来说,通过键(在这种情况下,通过 IP 地址)连接两个日志是一项相当繁重的操作,可以使用 3 MapReduce 和 Reduce Join 模式解决:

ReduceJoin 的工作方式如下:

1)对于每个输入日志,启动一个单独的 MapReduce(仅限 Map),将输入数据转换为以下形式:

key -> (type, value

其中 key 是连接表的键,Type 是表的类型(在我们的例子中是 first 或 second),Value 是绑定到键的任何附加数据。

2)两个 MapReduce 的输出被馈送到第三个 MapReduce 的输入,实际上,第三个 MapReduce 执行并集。这个 MapReduce 包含一个空的 Mapper,它只是复制输入。接下来,shuffle 将数据分解为键并将其作为输入提供给 reducer:

key -> [(type, value)]

重要的是,此时 reducer 从两个日志中接收记录,同时,可以通过类型字段识别特定值来自两个日志中的哪一个。所以有足够的数据来解决原来的问题。在我们的例子中,reducer 只需为每个记录键计算遇到更多类型的记录键并输出该类型。

5.6 MapJoin

ReduceJoin 模式描述了通过键连接两个日志的一般情况。但是,有一种特殊情况可以显着简化和加速任务。这是其中一个日志明显小于另一个日志的情况。考虑以下问题:

有2个日志。第一个日志包含 Web 服务器日志(与上一个任务相同),第二个文件(大小为 100kb)包含 URL-> 主题匹配。示例 2 文件:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

对于每个 IP 地址,有必要计算最常加载来自该 IP 地址的哪个类别的页面。

在这种情况下,我们还需要通过 URL 连接 2 个日志。然而,在这种情况下,我们不必运行 3 个 MapReduce,因为第二个日志将完全适合内存。为了解决使用第一个MapReduce的问题,我们可以将第二个log加载到Distributed Cache中,在Mapper初始化的时候,直接读入内存,放到->topic字典中。

进一步地,问题解决如下:

地图:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

减少:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce 接收一个 ip 和所有主题的列表作为输入,它简单地计算最常遇到的主题。因此,任务是使用第一个 MapReduce 解决的,实际的 Join 通常发生在地图内部(因此,如果不需要额外的键聚合,则可以省去 MapOnly 作业):