5.1 맵 전용 작업

이제 실제 문제를 해결하기 위해 MapReduce를 효과적으로 사용할 수 있는 다양한 기술을 설명하고 개발을 단순화하거나 클러스터에서 MapReduce 작업 실행 속도를 크게 높일 수 있는 Hadoop의 일부 기능을 보여줄 시간입니다.

우리가 기억하는 것처럼 MapReduce는 Map, Shuffle 및 Reduce 단계로 구성됩니다. 일반적으로 셔플 단계는 데이터가 이 단계에서 정렬되기 때문에 실제 작업에서 가장 어려운 것으로 판명되었습니다. 실제로 지도 단계만 생략할 수 있는 작업이 많이 있습니다. 다음은 이러한 작업의 예입니다.

  • 데이터 필터링(예: 웹 서버 로그에서 "IP 주소 123.123.123.123에서 모든 레코드 찾기")
  • 데이터 변환("csv-logs에서 열 삭제")
  • 외부 소스에서 데이터 로드 및 언로드("로그의 모든 레코드를 데이터베이스에 삽입")

이러한 작업은 Map-Only를 사용하여 해결됩니다. Hadoop에서 Map-Only 작업을 생성할 때 리듀서의 수를 0으로 지정해야 합니다.

hadoop에서 맵 전용 태스크 구성의 예:

기본 인터페이스 Hadoop 스트리밍 인터페이스

job'a를 구성할 때 감속기의 수를 0으로 지정하십시오.

job.setNumReduceTasks(0); 

감속기를 지정하지 않고 감속기 수를 0으로 지정합니다. 예:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

지도 전용 작업은 실제로 매우 유용할 수 있습니다. 예를 들어, Facetz.DCA 플랫폼에서 사용자의 행동을 통해 사용자의 특성을 식별하기 위해 사용되는 것은 정확히 하나의 큰 맵 전용이며, 각 매퍼는 사용자를 입력으로 취하고 그의 특성을 출력으로 반환합니다.

5.2 결합

이미 쓴 것처럼 일반적으로 Map-Reduce 작업을 수행할 때 가장 어려운 단계는 셔플 단계입니다. 이는 중간 결과(매퍼의 출력)가 디스크에 기록되고 정렬되어 네트워크를 통해 전송되기 때문에 발생합니다. 그러나 그러한 행동이 그다지 합리적이지 않은 작업이 있습니다. 예를 들어, 문서에서 단어를 세는 동일한 작업에서 작업의 하나의 map-reduce 노드에서 여러 매퍼의 출력 결과를 미리 집계하고 각 시스템에 대해 이미 합산된 값을 리듀서에 전달할 수 있습니다. .

이를 위해 Hadoop에서는 일부 매퍼의 출력을 처리할 결합 함수를 정의할 수 있습니다. 결합 기능은 reduce와 매우 유사합니다. 일부 매퍼의 출력을 입력으로 사용하고 이러한 매퍼에 대한 집계 결과를 생성하므로 리듀서는 종종 결합기로도 사용됩니다. reduce와의 중요한 차이점은 하나의 키에 해당하는 모든 값이 결합 함수에 도달하지 않는다는 것 입니다 .

게다가 hadoop은 결합 함수가 매퍼의 출력에 대해 전혀 실행될 것이라고 보장하지 않습니다. 따라서 키로 중앙값을 찾는 경우와 같이 결합 기능이 항상 적용되는 것은 아닙니다. 그럼에도 불구하고 결합 기능을 적용할 수 있는 작업에서 이 기능을 사용하면 MapReduce 작업의 속도를 크게 높일 수 있습니다.

hadoop에서 컴바이너 사용:

기본 인터페이스 하둡 스트리밍

job-a를 구성할 때 class-Combiner를 지정합니다. 일반적으로 Reducer와 동일합니다.

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

명령줄 옵션에서 -combiner 명령을 지정합니다. 일반적으로 이 명령은 reducer 명령과 동일합니다. 예:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 맵리듀스 태스크 체인

하나의 MapReduce가 문제를 해결하기에 충분하지 않은 상황이 있습니다. 예를 들어, 약간 수정된 WordCount 작업을 생각해 보십시오. 텍스트 문서 세트가 있고, 세트에서 1에서 1000번까지 발생한 단어 수, 1001에서 2000까지 몇 단어, 2001에서 3000까지 몇 단어인지 계산해야 합니다. 등등. 솔루션을 위해 2개의 MapReduce 작업이 필요합니다.

  • 수정된 단어 수: 각 단어에 대해 어느 간격에 속하는지 계산합니다.
  • 첫 번째 MapReduce의 출력에서 ​​각 간격이 발생한 횟수를 세는 MapReduce입니다.

의사 코드 솔루션:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Hadoop에서 일련의 MapReduce 작업을 실행하려면 첫 번째 작업의 출력으로 지정된 폴더를 두 번째 작업의 입력으로 지정하고 차례로 실행하면 됩니다.

실제로 MapReduce 작업 체인은 MapReduce 작업이 서로 순차적으로 또는 병렬로 연결될 수 있는 매우 복잡한 시퀀스일 수 있습니다. 이러한 작업 실행 계획의 관리를 단순화하기 위해 oozie 및 luigi와 같은 별도의 도구가 있으며 이 시리즈의 별도 문서에서 설명합니다.

5.4 분산 캐시

Hadoop의 중요한 메커니즘은 분산 캐시입니다. 분산 캐시를 사용하면 MapReduce 작업이 실행되는 환경에 파일(예: 텍스트 파일, 아카이브, jar 파일)을 추가할 수 있습니다.

HDFS에 저장된 파일, 로컬 파일(작업이 시작된 시스템에 대한 로컬)을 추가할 수 있습니다. -file 옵션을 통해 mapper.py 및 reducer.py 파일을 추가하여 분산 캐시를 hadoop 스트리밍과 함께 사용하는 방법을 이미 암시적으로 보여 주었습니다. 실제로 mapper.py와 reducer.py뿐만 아니라 일반적으로 임의의 파일을 추가한 다음 로컬 폴더에 있는 것처럼 사용할 수 있습니다.

분산 캐시 사용:

네이티브 API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
하둡 스트리밍

# –files 매개변수의 분산 캐시에 추가해야 하는 파일을 나열합니다. --files 옵션은 다른 옵션 앞에 와야 합니다.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

사용 예:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 조인 줄이기

관계형 데이터베이스 작업에 익숙한 사람들은 종종 매우 편리한 조인 작업을 사용합니다. 이 작업을 통해 일부 키에 따라 일부 테이블을 조인하여 일부 테이블의 내용을 공동으로 처리할 수 있습니다. 빅 데이터로 작업할 때 이 문제도 가끔 발생합니다. 다음 예를 고려하십시오.

두 웹 서버의 로그가 있으며 각 로그는 다음과 같습니다.

t\t

로그 스니펫 예:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

각 IP 주소에 대해 2개의 서버 중 더 자주 방문한 서버를 계산해야 합니다. 결과는 다음 형식이어야 합니다.

\t

결과 일부의 예:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

안타깝게도 관계형 데이터베이스와 달리 일반적으로 키(이 경우 IP 주소)로 두 개의 로그를 조인하는 것은 다소 무거운 작업이며 3 MapReduce 및 Reduce Join 패턴을 사용하여 해결됩니다.

ReduceJoin은 다음과 같이 작동합니다.

1) 각 입력 로그에 대해 별도의 MapReduce(Map만 해당)가 실행되어 입력 데이터를 다음 형식으로 변환합니다.

key -> (type, value

여기서 key는 조인 테이블의 키이고 Type은 테이블의 유형(이 경우 첫 번째 또는 두 번째)이며 Value는 키에 바인딩된 추가 데이터입니다.

2) 두 맵리듀스의 출력은 세 번째 맵리듀스의 입력으로 공급되며 실제로 합집합을 수행합니다. 이 MapReduce에는 단순히 입력을 복사하는 빈 매퍼가 포함되어 있습니다. 다음으로 셔플은 데이터를 키로 분해하고 입력으로 리듀서에 공급합니다.

key -> [(type, value)]

이 시점에서 리듀서가 두 로그에서 레코드를 수신하는 동시에 두 로그 중 특정 값이 나온 유형 필드로 식별할 수 있다는 것이 중요합니다. 따라서 원래 문제를 해결하기에 충분한 데이터가 있습니다. 우리의 경우 리듀서는 어떤 유형이 더 많이 발생했는지 각 레코드 키에 대해 계산하고 이 유형을 출력하기만 하면 됩니다.

5.6 맵조인

ReduceJoin 패턴은 두 개의 로그를 키로 조인하는 일반적인 경우를 설명합니다. 그러나 작업을 상당히 단순화하고 가속화할 수 있는 특별한 경우가 있습니다. 이는 로그 중 하나가 다른 로그보다 훨씬 작은 경우입니다. 다음 문제를 고려하십시오.

2개의 로그가 있습니다. 첫 번째 로그에는 웹 서버 로그(이전 작업과 동일)가 포함되고 두 번째 파일(크기 100kb)에는 URL-> 테마 일치 항목이 포함됩니다. 두 번째 파일 예:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

각 IP 주소에 대해 해당 IP 주소에서 가장 자주 로드된 범주의 페이지를 계산해야 합니다.

이 경우에도 URL로 2개의 로그를 조인해야 합니다. 그러나이 경우 두 번째 로그가 메모리에 완전히 맞기 때문에 3 개의 MapReduce를 실행할 필요가 없습니다. 1st MapReduce를 사용하여 문제를 해결하기 위해 Distributed Cache에 2차 로그를 로드하고 Mapper가 초기화되면 간단히 메모리로 읽어들여 -> topic 사전에 넣습니다.

또한 다음과 같이 문제가 해결됩니다.

지도:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

줄이다:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce는 IP와 모든 주제 목록을 입력으로 받고 가장 자주 발생한 주제를 계산합니다. 따라서 작업은 첫 번째 MapReduce를 사용하여 해결되며 실제 Join은 일반적으로 맵 내부에서 발생합니다(따라서 키에 의한 추가 집계가 필요하지 않은 경우 MapOnly 작업이 생략될 수 있음).