5.1 マップのみのジョブ

ここでは、MapReduce を効果的に使用して実際的な問題を解決するためのさまざまなテクニックを説明し、開発を簡素化したり、クラスター上での MapReduce タスクの実行を大幅に高速化したりできる Hadoop の機能のいくつかを紹介します。

覚えているとおり、MapReduce は Map、Shuffle、Reduce の各ステージで構成されています。一般に、シャッフル段階はデータがこの段階でソートされるため、実際のタスクの中で最も難しいことがわかります。実際、Map ステージのみを省略できるタスクが数多くあります。そのようなタスクの例を次に示します。

  • データ フィルタリング (たとえば、Web サーバー ログ内の「IP アドレス 123.123.123.123 からすべてのレコードを検索」)。
  • データ変換 (「csv-logs の列を削除」);
  • 外部ソースからのデータのロードおよびアンロード (「ログからデータベースへのすべてのレコードの挿入」)。

このようなタスクは、Map-Only を使用して解決されます。Hadoop でマップのみのタスクを作成する場合、リデューサーの数をゼロに指定する必要があります。

Hadoop でのマップのみのタスク構成の例:

ネイティブインターフェース Hadoopストリーミングインターフェース

ジョブ a を構成するときに、リデューサーの数をゼロに指定します。

job.setNumReduceTasks(0); 

レデューサーを指定せず、レデューサーの数をゼロに指定します。例:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

マップのみのジョブは実際に非常に役立ちます。たとえば、Facetz.DCA プラットフォームでは、ユーザーの行動によってユーザーの特性を識別するために、まさに 1 つの大きなマップだけが使用され、各マッパーはユーザーを入力として受け取り、その特性を出力として返します。

5.2 結合

すでに書きましたが、通常、Map-Reduce タスクを実行するときに最も難しい段階はシャッフル段階です。これは、中間結果 (マッパーの出力) がディスクに書き込まれ、並べ替えられ、ネットワーク経由で送信されるために発生します。ただし、そのような動作があまり合理的ではないと思われるタスクもあります。たとえば、ドキュメント内の単語を数える同じタスクで、タスクの 1 つの Map-Reduce ノードで複数のマッパーの出力の結果を事前に集計し、各マシンのすでに合計された値をリデューサーに渡すことができます。 。

Hadoop では、このために、マッパーの一部の出力を処理する結合関数を定義できます。結合関数は、reduce と非常に似ています。いくつかのマッパーの出力を入力として受け取り、これらのマッパーの集計結果を生成するため、reducer は結合器としてもよく使用されます。reduce との重要な違いは、1 つのキーに対応するすべての値が結合関数に到達するわけではないことです。

さらに、hadoop は、マッパーの出力に対して結合関数が実行されることをまったく保証しません。したがって、キーで中央値を検索する場合など、結合機能は必ずしも適用できるわけではありません。それにもかかわらず、結合関数が適用できるタスクでは、結合関数を使用することで MapReduce タスクの速度を大幅に向上させることができます。

Hadoop で Combiner を使用する:

ネイティブインターフェース Hadoopストリーミング

job-a を設定するときは、class-Combiner を指定します。原則として、これは Reducer と同じです。

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

コマンドラインオプションに-combinerコマンドを指定します。通常、このコマンドは Reducer コマンドと同じです。例:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce タスクチェーン

1 つの MapReduce だけでは問題を解決できない場合があります。たとえば、少し変更した WordCount タスクを考えてみましょう。テキスト ドキュメントのセットがあり、セット内で 1 ~ 1000 回出現した単語の数、1001 ~ 2000 の単語の数、2001 ~ 3000 の単語の数、等々。この解決策には、2 つの MapReduce ジョブが必要です。

  • 変更された単語数。単語ごとに、単語がどの区間に該当するかを計算します。
  • 最初の MapReduce の出力で各間隔が何回発生したかをカウントする MapReduce。

疑似コードの解決策:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Hadoop 上で一連の MapReduce タスクを実行するには、最初のタスクで出力として指定したフォルダーを 2 番目のタスクの入力として指定し、順番に実行するだけで十分です。

実際には、MapReduce タスクのチェーンは非常に複雑なシーケンスになる可能性があり、MapReduce タスクは相互に順次および並列に接続できます。このようなタスク実行計画の管理を簡素化するために、oozie や luigi などの別のツールがあります。これらについては、このシリーズの別の記事で説明します。

5.4 分散キャッシュ

Hadoop の重要なメカニズムは分散キャッシュです。分散キャッシュを使用すると、MapReduce タスクが実行されている環境にファイル (テキスト ファイル、アーカイブ、jar ファイルなど) を追加できます。

HDFS に保存されているファイル、ローカル ファイル (タスクの起動元のマシンに対してローカル) を追加できます。-file オプションを使用して、mapper.py および Reducer.py ファイルを追加することで、Hadoop ストリーミングで分散キャッシュを使用する方法をすでに暗黙的に示しました。実は、mapper.pyやreducer.pyだけでなく、任意のファイル全般を追加して、あたかもローカルフォルダにあるかのように使用することができます。

分散キャッシュの使用:

ネイティブAPI
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoopストリーミング

#分散キャッシュに追加する必要があるファイルを –files パラメーターにリストします。--files オプションは他のオプションより前に指定する必要があります。

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

使用例:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 結合の削減

リレーショナル データベースの操作に慣れている人は、非常に便利な結合操作をよく使用します。この操作を使用すると、いくつかのテーブルの内容を、いくつかのキーに従って結合することによって共同で処理できます。ビッグデータを扱う場合にも、この問題が発生することがあります。次の例を考えてみましょう。

2 つの Web サーバーのログがあり、それぞれのログは次のようになります。

t\t

ログ スニペットの例:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

IP アドレスごとに、2 つのサーバーのうちどちらをより頻繁にアクセスしたかを計算する必要があります。結果は次の形式になるはずです。

\t

結果の一部の例:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

残念ながら、リレーショナル データベースとは異なり、一般に 2 つのログをキー (この場合は IP アドレス) で結合するのはかなり重い操作であり、3 つの MapReduce と Reduce Join パターンを使用して解決されます。

ReduceJoin は次のように機能します。

1)入力ログごとに、個別の MapReduce (マップのみ) が起動され、入力データが次の形式に変換されます。

key -> (type, value

key はテーブルを結合するキー、Type はテーブルのタイプ (この場合は 1 番目または 2 番目)、Value はキーにバインドされた追加データです。

2)両方の MapReduce の出力が 3 番目の MapReduce の入力に供給され、実際に結合が実行されます。この MapReduce には、入力をコピーするだけの空のマッパーが含まれています。次に、shuffle はデータをキーに分解し、それを入力としてリデューサーに送ります。

key -> [(type, value)]

この時点で、Reducer が両方のログからレコードを受信すると同時に、特定の値が 2 つのログのどちらから来たものであるかをタイプ フィールドによって識別できることが重要です。したがって、元の問題を解決するのに十分なデータがあります。私たちの場合、リデューサーはレコードキーごとにどのタイプがより多く遭遇したかを計算し、このタイプを出力するだけで済みます。

5.6 マップ結合

ReduceJoin パターンは、キーによって 2 つのログを結合する一般的なケースを記述します。ただし、タスクを大幅に簡素化し、高速化できる特殊なケースがあります。これは、ログの一方が他方よりも大幅に小さい場合です。次の問題を考えてみましょう。

ログが 2 つあります。最初のログには Web サーバーのログ (前のタスクと同じ) が含まれ、2 番目のファイル (サイズは 100 kb) には URL-> テーマの一致が含まれます。2 番目のファイルの例:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

IP アドレスごとに、この IP アドレスからどのカテゴリのページが最も頻繁にロードされたかを計算する必要があります。

この場合、2 つのログを URL で結合する必要もあります。ただし、この場合、2 番目のログはメモリに完全に収まるため、3 つの MapReduce を実行する必要はありません。1 番目の MapReduce を使用して問題を解決するには、2 番目のログを分散キャッシュにロードし、マッパーの初期化時に単純にそれをメモリに読み取り、-> トピック ディクショナリに置きます。

さらに、この問題は次のように解決されます。

地図:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

減らす:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce は IP とすべてのトピックのリストを入力として受け取り、どのトピックが最も頻繁に検出されたかを単純に計算します。したがって、タスクは最初の MapReduce を使用して解決され、実際の結合は通常マップ内で行われます (したがって、キーによる追加の集計が必要ない場合は、MapOnly ジョブを省略できます)。