4.1 Hadoop에 대한 일반 정보

MapReduce 패러다임은 2004년 Google에서 MapReduce: Simplified Data Processing on Large Clusters 기사에서 제안했습니다 . 제안된 기사에는 패러다임에 대한 설명이 포함되어 있지만 구현이 누락되었으므로 Yahoo의 여러 프로그래머가 nutch 웹 크롤러 작업의 일부로 구현을 제안했습니다. Hadoop의 역사: From 4 nodes to the future of data 문서에서 Hadoop의 역사에 대해 자세히 알아볼 수 있습니다 .

처음에 Hadoop은 주로 데이터를 저장하고 MapReduce 작업을 실행하기 위한 도구였지만 이제 Hadoop은 어떤 식으로든 빅 데이터 처리(MapReduce뿐만 아니라)와 관련된 대규모 기술 스택입니다.

Hadoop의 주요(핵심) 구성 요소는 다음과 같습니다.

  • HDFS(Hadoop Distributed File System)는 거의 무제한 크기의 정보를 저장할 수 있는 분산 파일 시스템입니다.
  • Hadoop YARN은 MapReduce 프레임워크를 포함하여 클러스터 리소스 관리 및 작업 관리를 위한 프레임워크입니다.
  • 하둡 공통

또한 Hadoop과 직접적으로 관련이 있지만 Hadoop 코어에는 포함되지 않은 많은 프로젝트가 있습니다.

  • Hive - 빅 데이터에 대한 SQL과 유사한 쿼리를 위한 도구(SQL 쿼리를 일련의 MapReduce 작업으로 변환)
  • Pig 는 높은 수준의 데이터 분석을 위한 프로그래밍 언어입니다. 이 언어로 된 한 줄의 코드는 일련의 MapReduce 작업으로 바뀔 수 있습니다.
  • Hbase 는 BigTable 패러다임을 구현하는 컬럼형 데이터베이스입니다.
  • Cassandra 는 고성능 분산 키-값 데이터베이스입니다.
  • ZooKeeper 는 분산 구성 저장 및 구성 변경 동기화를 위한 서비스입니다.
  • Mahout 은 빅 데이터 기계 학습 라이브러리 및 엔진입니다.

이와 별도로 분산 데이터 처리를 위한 엔진인 Apache Spark 프로젝트 에 주목하고 싶습니다 . Apache Spark는 일반적으로 작업을 위해 HDFS 및 YARN과 같은 Hadoop 구성 요소를 사용하지만 최근에는 자체가 Hadoop보다 대중화되었습니다.

이러한 구성 요소 중 일부는 이 자료 시리즈의 별도 기사에서 다루겠지만 지금은 Hadoop 작업을 시작하고 실제로 적용하는 방법을 살펴보겠습니다.

4.2 Hadoop에서 MapReduce 프로그램 실행

이제 Hadoop에서 MapReduce 작업을 실행하는 방법을 살펴보겠습니다. 작업으로 이전 수업에서 논의한 고전적인 WordCount 예제를 사용합니다.

문제의 정식화를 상기시켜 드리겠습니다. 일련의 문서가 있습니다. 문서 세트에서 발생하는 각 단어는 해당 단어가 세트에서 발생하는 횟수를 세는 데 필요합니다.

해결책:

맵은 문서를 단어로 분할하고 쌍 세트(단어, 1)를 반환합니다.

감소는 각 단어의 발생을 합산합니다.

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

이제 과제는 이 솔루션을 Hadoop에서 실행할 수 있는 코드 형태로 프로그래밍하고 실행하는 것입니다.

4.3 방법 번호 1. 하둡 스트리밍

Hadoop에서 MapReduce 프로그램을 실행하는 가장 쉬운 방법은 Hadoop 스트리밍 인터페이스를 사용하는 것입니다. 스트리밍 인터페이스는 mapreduce가 stdin에서 데이터를 가져와 stdout 으로 출력하는 프로그램으로 구현된다고 가정합니다 .

지도 기능을 실행하는 프로그램을 매퍼라고 합니다. reduce를 실행하는 프로그램을 각각 reducer 라고 합니다 .

Streaming 인터페이스는 기본적으로 mapper 또는 reducer 의 들어오는 라인 하나가 map 의 들어오는 항목 하나에 해당한다고 가정합니다 .

매퍼의 출력은 쌍(키, 값)의 형태로 리듀서의 입력으로 전달되는 반면 모든 쌍은 동일한 키에 해당합니다.

  • 감속기의 단일 실행으로 처리가 보장됩니다.
  • 연속으로 입력에 제출됩니다(즉, 하나의 리듀서가 여러 다른 키를 처리하는 경우 입력이 키별로 그룹화됨).

이제 파이썬에서 매퍼와 감속기를 구현해 봅시다.

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Hadoop이 처리할 데이터는 HDFS에 저장되어야 합니다. 기사를 업로드하고 HDFS에 올려봅시다. 이렇게 하려면 hadoop fs 명령을 사용합니다 .

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

hadoop fs 유틸리티는 파일 시스템을 조작하기 위한 많은 방법을 지원하며, 그 중 다수는 표준 Linux 유틸리티와 동일합니다.

이제 스트리밍 작업을 시작하겠습니다.

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

yarn 유틸리티는 클러스터에서 다양한 애플리케이션(map-reduce 기반 포함)을 시작하고 관리하는 데 사용됩니다. Hadoop-streaming.jar은 그러한 원사 애플리케이션의 한 예일 뿐입니다.

다음은 실행 옵션입니다.

  • 입력 - hdfs에 소스 데이터가 있는 폴더;
  • 출력 - 결과를 저장하려는 hdfs의 폴더
  • file - map-reduce 작업을 수행하는 동안 필요한 파일입니다.
  • mapper는 맵 단계에 사용될 콘솔 명령입니다.
  • reduce는 축소 단계에 사용될 콘솔 명령입니다.

실행 후 콘솔에서 작업의 진행 상황과 작업에 대한 자세한 정보를 볼 수 있는 URL을 볼 수 있습니다.

이 URL에서 사용할 수 있는 인터페이스에서 보다 자세한 작업 실행 상태를 확인하고 각 매퍼 및 감속기의 로그를 볼 수 있습니다(작업이 실패한 경우 매우 유용함).

성공적인 실행 후 작업 결과는 출력 필드에 지정한 폴더의 HDFS에 추가됩니다. "hadoop fs -ls lenta_wordcount" 명령을 사용하여 내용을 볼 수 있습니다.

결과 자체는 다음과 같이 얻을 수 있습니다.

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

"hadoop fs -text" 명령은 폴더의 내용을 텍스트 형식으로 표시합니다. 나는 단어의 발생 횟수에 따라 결과를 정렬했습니다. 예상대로 언어에서 가장 흔한 단어는 전치사입니다.

4.4 방법 번호 2: Java 사용

Hadoop 자체는 Java로 작성되었으며 Hadoop의 기본 인터페이스도 Java 기반입니다. 단어 수를 위한 기본 자바 애플리케이션이 어떻게 생겼는지 보여드리겠습니다.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

이 클래스는 Python 예제와 정확히 동일합니다. Mapper 및 Reducer 클래스에서 각각 파생하여 TokenizerMapper 및 IntSumReducer 클래스를 만듭니다. 템플릿 매개변수로 전달된 클래스는 입력 및 출력 값의 유형을 지정합니다. 기본 API는 맵 함수에 키-값 쌍이 입력으로 제공된다고 가정합니다. 우리의 경우에는 키가 비어 있으므로 Object를 키 유형으로 정의하기만 하면 됩니다.

Main 메서드에서 mapreduce 작업을 시작하고 이름, 매퍼 및 감속기, HDFS의 경로, 입력 데이터가 있는 위치 및 결과를 넣을 위치와 같은 매개 변수를 정의합니다. 컴파일하려면 hadoop 라이브러리가 필요합니다. Cloudera에 리포지토리가 있는 Maven을 사용하여 빌드합니다. 설정 방법은 여기에서 확인할 수 있습니다. 결과적으로 pom.xmp 파일(maven에서 프로젝트 어셈블리를 설명하는 데 사용됨)에서 다음을 얻었습니다.

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

프로젝트를 jar 패키지로 컴파일해 보겠습니다.

mvn clean package

프로젝트를 jar 파일로 빌드한 후 스트리밍 인터페이스의 경우와 유사한 방식으로 시작됩니다.

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

실행을 기다리고 결과를 확인합니다.

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

짐작할 수 있듯이 네이티브 애플리케이션을 실행한 결과는 이전 방식으로 시작한 스트리밍 애플리케이션의 결과와 동일합니다.