4.1 Hadoop に関する一般情報

MapReduce パラダイムは、2004 年に Google によって「MapReduce: 大規模クラスターでのデータ処理の簡素化」という記事で提案されました。提案された記事にはパラダイムの説明が含まれていましたが、実装が欠落していたため、Yahoo の数名のプログラマーが、Nutch Web クローラーの作業の一部としてその実装を提案しました。Hadoop の歴史の詳細については、記事「Hadoop の歴史: 4 ノードからデータの未来まで」を参照してください。

当初、Hadoop は主にデータを保存し、MapReduce タスクを実行するためのツールでしたが、現在では、Hadoop は、(MapReduce だけでなく) ビッグ データの処理に何らかの形で関連するテクノロジーの大きなスタックとなっています。

Hadoop の主要 (コア) コンポーネントは次のとおりです。

Hadoop に直接関連しているものの、Hadoop コアには含まれていないプロジェクトも多数あります。

  • Hive - ビッグ データに対する SQL のようなクエリのためのツール (SQL クエリを一連の MapReduce タスクに変換します)。
  • Pig は、高度なデータ分析のためのプログラミング言語です。この言語の 1 行のコードは、一連の MapReduce タスクに変わる可能性があります。
  • Hbaseは、BigTable パラダイムを実装する列指向データベースです。
  • Cassandraは、高性能の分散型キー/値データベースです。
  • ZooKeeper は、分散構成ストレージと構成変更の同期のためのサービスです。
  • Mahout は、ビッグデータ機械学習ライブラリおよびエンジンです。

これとは別に、分散データ処理エンジンであるApache Sparkプロジェクトについても触れておきたいと思います。Apache Spark は通常、その動作に HDFS や YARN などの Hadoop コンポーネントを使用しますが、最近では Apache Spark 自体が Hadoop よりも人気が高まっています。

これらのコンポーネントの一部については、この一連の資料の別の記事で説明しますが、ここでは、Hadoop の使用を開始して実践する方法を見てみましょう。

4.2 Hadoop での MapReduce プログラムの実行

次に、Hadoop で MapReduce タスクを実行する方法を見てみましょう。タスクとして、前のレッスンで説明した古典的なWordCount の例を使用します。

問題の定式化を思い出させてください。一連の文書があります。文書セット内で出現する各単語について、その単語がセット内で何回出現するかを数える必要があります。

解決:

Map はドキュメントを単語に分割し、ペアのセット (単語、1) を返します。

Reduce は各単語の出現を合計します。

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

ここでのタスクは、このソリューションを Hadoop 上で実行できるコードの形式でプログラムして実行することです。

4.3 方法その 1。Hadoopストリーミング

Hadoop で MapReduce プログラムを実行する最も簡単な方法は、Hadoop ストリーミング インターフェイスを使用することです。ストリーミング インターフェイスは、mapreduce がstdin からデータを取得してstdoutに出力するプログラムとして実装されていることを前提としています。

マップ機能を実行するプログラムをマッパーと呼びます。Reduceを実行するプログラムは、それぞれ、Reducer と呼ばれます。

ストリーミング インターフェイスは、デフォルトで、マッパーまたはリデューサーの 1 つの入力行が、mapの 1 つの入力エントリに対応すると想定します。

マッパーの出力はペア (キー、値) の形式でリデューサーの入力に渡されますが、すべてのペアは同じキーに対応します。

  • リデューサーの 1 回の起動によって処理されることが保証されます。
  • 入力に連続して送信されます (つまり、1 つのリデューサーが複数の異なるキーを処理する場合、入力はキーごとにグループ化されます)。

それでは、Python でマッパーとリデューサーを実装しましょう。

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Hadoop が処理するデータは HDFS に保存する必要があります。記事をアップロードして HDFS に置きましょう。これを行うには、hadoop fsコマンドを使用します。

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

hadoop fs ユーティリティは、ファイル システムを操作するための多数のメソッドをサポートしており、その多くは標準の Linux ユーティリティと同じです。

次に、ストリーミング タスクを開始しましょう。

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Yarn ユーティリティは、クラスター上でさまざまなアプリケーション (map-reduce ベースを含む) を起動および管理するために使用されます。Hadoop-streaming.jar は、そのような糸アプリケーションの一例にすぎません。

次に起動オプションがあります。

  • input - HDF 上のソース データを含むフォルダー。
  • Output - 結果を配置する HDF 上のフォルダー。
  • file - マップリデュースタスクの操作中に必要なファイル。
  • Mapper は、マップ段階で使用されるコンソール コマンドです。
  • reduce は、reduce ステージで使用されるコンソール コマンドです。

起動後、コンソールにタスクの進行状況が表示され、タスクに関する詳細情報を表示するための URL が表示されます。

この URL で利用可能なインターフェイスでは、より詳細なタスクの実行ステータスを確認したり、各マッパーとリデューサーのログを表示したりできます (タスクが失敗した場合に非常に役立ちます)。

実行が成功した後の作業結果は、出力フィールドで指定したフォルダー内の HDFS に追加されます。「hadoop fs -ls lenta_wordcount」コマンドを使用して、その内容を表示できます。

結果自体は次のようにして取得できます。

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

「hadoop fs -text」コマンドは、フォルダーの内容をテキスト形式で表示します。結果を単語の出現数で並べ替えました。予想通り、この言語で最も一般的な単語は前置詞です。

4.4 方法 2: Java を使用する

Hadoop 自体は Java で書かれており、Hadoop のネイティブ インターフェイスも Java ベースです。wordcount 用のネイティブ Java アプリケーションがどのようなものかを示してみましょう。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

このクラスは、Python の例とまったく同じことを行います。Mapper クラスと Reducer クラスからそれぞれ派生して、TokenizerMapper クラスと IntSumReducer クラスを作成します。テンプレート パラメーターとして渡されるクラスは、入力値と出力値のタイプを指定します。ネイティブ API は、マップ関数に入力としてキーと値のペアが与えられることを前提としています。この例ではキーが空であるため、単に Object をキーのタイプとして定義します。

Main メソッドでは、mapreduce タスクを開始し、そのパラメーター (名前、マッパーとリデューサー、HDFS 内のパス、入力データの配置場所、および結果の配置場所) を定義します。コンパイルするには、Hadoop ライブラリが必要です。Cloudera にリポジトリがある Maven を使用してビルドします。設定手順については、こちらをご覧ください。その結果、pom.xmp ファイル (プロジェクトのアセンブリを記述するために Maven によって使用される) は次のようになります)。

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

プロジェクトを jar パッケージにコンパイルしましょう。

mvn clean package

プロジェクトを jar ファイルにビルドした後、ストリーミング インターフェイスの場合と同様の方法で起動が行われます。

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

実行を待って結果を確認します。

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

ご想像のとおり、ネイティブ アプリケーションを実行した結果は、前の方法で起動したストリーミング アプリケーションの結果と同じになります。