4.1 ข้อมูลทั่วไปเกี่ยวกับ Hadoop

กระบวนทัศน์ MapReduce ถูกเสนอโดย Google ในปี 2547 ในบทความของMapReduce: การประมวลผลข้อมูลอย่างง่ายบนคลัสเตอร์ขนาดใหญ่ เนื่องจากบทความที่เสนอมีคำอธิบายของกระบวนทัศน์ แต่การนำไปใช้ขาดหายไป โปรแกรมเมอร์หลายคนจาก Yahoo จึงเสนอการนำไปใช้เป็นส่วนหนึ่งของการทำงานบนโปรแกรมรวบรวมข้อมูลเว็บแบบ nutch คุณสามารถอ่านเพิ่มเติมเกี่ยวกับประวัติของ Hadoop ได้ในบทความThe history of Hadoop: From 4 nodes to the future of data

เริ่มแรก Hadoop เป็นเครื่องมือสำหรับการจัดเก็บข้อมูลและเรียกใช้งาน MapReduce เป็นหลัก แต่ตอนนี้ Hadoop เป็นเทคโนโลยีจำนวนมากที่เกี่ยวข้องกับการประมวลผลข้อมูลขนาดใหญ่ไม่ทางใดก็ทางหนึ่ง (ไม่ใช่เฉพาะกับ MapReduce)

ส่วนประกอบหลัก (หลัก) ของ Hadoop คือ:

  • Hadoop Distributed File System (HDFS)เป็นระบบไฟล์แบบกระจายที่ช่วยให้คุณสามารถจัดเก็บข้อมูลได้ไม่จำกัดขนาด
  • Hadoop YARNเป็นเฟรมเวิร์กสำหรับการจัดการทรัพยากรคลัสเตอร์และการจัดการงาน รวมถึงเฟรมเวิร์ก MapReduce
  • Hadoop ทั่วไป

นอกจากนี้ยังมีโครงการจำนวนมากที่เกี่ยวข้องโดยตรงกับ Hadoop แต่ไม่รวมอยู่ในแกนหลักของ Hadoop:

  • Hive - เครื่องมือสำหรับการสืบค้นแบบ SQL บนข้อมูลขนาดใหญ่ (เปลี่ยนการสืบค้น SQL ให้เป็นชุดของงาน MapReduce)
  • Pigเป็นภาษาโปรแกรมสำหรับการวิเคราะห์ข้อมูลระดับสูง โค้ดหนึ่งบรรทัดในภาษานี้สามารถเปลี่ยนเป็นลำดับของงาน MapReduce ได้
  • Hbaseเป็นฐานข้อมูลแบบคอลัมน์ที่ใช้กระบวนทัศน์ BigTable;
  • Cassandraเป็นฐานข้อมูลคีย์-ค่าแบบกระจายที่มีประสิทธิภาพสูง
  • ZooKeeperเป็นบริการสำหรับการจัดเก็บการกำหนดค่าแบบกระจายและการซิงโครไนซ์การเปลี่ยนแปลงการกำหนดค่า
  • Mahoutเป็นห้องสมุดและกลไกการเรียนรู้ของเครื่องบิ๊กดาต้า

แยกกัน ฉันต้องการบันทึก โครงการ Apache Sparkซึ่งเป็นเครื่องมือสำหรับการประมวลผลข้อมูลแบบกระจาย โดยทั่วไปแล้ว Apache Spark จะใช้ส่วนประกอบ Hadoop เช่น HDFS และ YARN ในการทำงาน ในขณะที่ตัวมันเองได้รับความนิยมมากกว่า Hadoop เมื่อเร็วๆ นี้:

ส่วนประกอบเหล่านี้บางส่วนจะกล่าวถึงในบทความแยกต่างหากในชุดข้อมูลนี้ แต่สำหรับตอนนี้ มาดูกันว่าคุณจะเริ่มต้นทำงานกับ Hadoop และนำมันไปใช้จริงได้อย่างไร

4.2 เรียกใช้โปรแกรม MapReduce บน Hadoop

ทีนี้มาดูวิธีเรียกใช้งาน MapReduce บน Hadoop เราจะใช้ ตัวอย่าง WordCount แบบคลาสสิก ซึ่งกล่าวถึงในบทเรียนก่อนหน้านี้

ฉันขอเตือนคุณถึงการกำหนดปัญหา:มีเอกสารชุดหนึ่ง จำเป็นสำหรับแต่ละคำที่เกิดขึ้นในชุดของเอกสารที่จะนับจำนวนครั้งที่เกิดขึ้นในชุด

สารละลาย:

แผนที่แบ่งเอกสารเป็นคำและส่งกลับชุดของคู่ (คำ, 1)

ลดผลรวมที่เกิดขึ้นของแต่ละคำ:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

ตอนนี้งานคือตั้งโปรแกรมโซลูชันนี้ในรูปแบบของโค้ดที่สามารถดำเนินการบน Hadoop และเรียกใช้ได้

4.3 วิธีที่ 1 Hadoop สตรีมมิ่ง

วิธีที่ง่ายที่สุดในการเรียกใช้โปรแกรม MapReduce บน Hadoop คือการใช้อินเทอร์เฟซการสตรีม Hadoop อินเทอร์เฟซการสตรีมจะถือว่าแมปและรีดิวซ์นั้นเป็นโปรแกรมที่รับข้อมูลจาก stdin และเอาต์พุตไปยังstdout

โปรแกรมที่เรียกใช้ฟังก์ชันแผนที่เรียกว่า mapper โปรแกรมที่ดำเนินการลดเรียกว่า ตามลำดับลด

อินเทอร์เฟซการสตรีมถือว่าโดยค่าเริ่มต้นว่าบรรทัดขาเข้าหนึ่งบรรทัดในตัวแมปหรือตัวลดขนาดสอดคล้องกับหนึ่งรายการขาเข้าสำหรับแมป

เอาต์พุตของ mapper ไปที่อินพุตของ reducer ในรูปแบบของคู่ (คีย์, ค่า) ในขณะที่คู่ทั้งหมดที่ตรงกับคีย์เดียวกัน:

  • รับประกันว่าจะดำเนินการโดยการเปิดตัวตัวลดเพียงครั้งเดียว
  • จะถูกส่งไปยังอินพุตในแถว (นั่นคือ ถ้าตัวลดหนึ่งประมวลผลคีย์ที่แตกต่างกันหลายคีย์ อินพุตจะถูกจัดกลุ่มตามคีย์)

ลองใช้ mapper และ reducer ใน python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

ข้อมูลที่ Hadoop จะประมวลผลจะต้องจัดเก็บไว้ใน HDFS มาอัพโหลดบทความของเราและวางไว้บน HDFS ในการทำเช่นนี้ ใช้ คำสั่ง hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

ยูทิลิตี hadoop fs รองรับเมธอดจำนวนมากในการจัดการระบบไฟล์ ซึ่งหลายเมธอดจะเหมือนกันกับยูทิลีตีมาตรฐานของลินุกซ์

ตอนนี้มาเริ่มงานการสตรีมกัน:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

ยูทิลิตี Yarn ใช้เพื่อเรียกใช้และจัดการแอปพลิเคชันต่างๆ (รวมถึงการลดขนาดแผนที่) บนคลัสเตอร์ Hadoop-streaming.jar เป็นเพียงตัวอย่างหนึ่งของแอปพลิเคชั่นเส้นด้าย

ถัดไปคือตัวเลือกการเปิดตัว:

  • อินพุต - โฟลเดอร์ที่มีแหล่งข้อมูลบน hdfs;
  • เอาต์พุต - โฟลเดอร์บน hdfs ที่คุณต้องการใส่ผลลัพธ์
  • ไฟล์ - ไฟล์ที่จำเป็นระหว่างการทำงานของงานลดแผนที่
  • mapper เป็นคำสั่งคอนโซลที่จะใช้สำหรับขั้นตอนการแม็พ
  • reduce คือคำสั่งคอนโซลที่จะใช้สำหรับขั้นตอนการลด

หลังจากเปิดตัว คุณสามารถดูความคืบหน้าของงานในคอนโซลและ URL สำหรับดูรายละเอียดเพิ่มเติมเกี่ยวกับงาน

ในอินเทอร์เฟซที่มีให้ที่ URL นี้ คุณสามารถค้นหาสถานะการดำเนินการของงานที่ละเอียดยิ่งขึ้น ดูบันทึกของ mapper แต่ละตัวและตัวลดขนาด (ซึ่งมีประโยชน์มากในกรณีที่งานที่ล้มเหลว)

ผลลัพธ์ของงานหลังจากดำเนินการสำเร็จจะถูกเพิ่มไปยัง HDFS ในโฟลเดอร์ที่เราระบุในช่องเอาต์พุต คุณสามารถดูเนื้อหาได้โดยใช้คำสั่ง "hadoop fs -ls lenta_wordcount"

สามารถรับผลลัพธ์ได้ดังนี้:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

คำสั่ง "hadoop fs -text" แสดงเนื้อหาของโฟลเดอร์ในรูปแบบข้อความ ฉันจัดเรียงผลลัพธ์ตามจำนวนคำที่เกิดขึ้น ตามที่คาดไว้ คำที่ใช้บ่อยที่สุดในภาษานี้คือคำบุพบท

4.4 วิธีที่ 2: ใช้ Java

Hadoop นั้นเขียนด้วยภาษาจาวา และส่วนต่อประสานดั้งเดิมของ Hadoop ก็เป็นแบบจาวาเช่นกัน มาดูกันว่าแอปพลิเคชั่นจาวาดั้งเดิมสำหรับการนับคำมีลักษณะอย่างไร:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

คลาสนี้ทำเหมือนกับตัวอย่าง Python ของเราทุกประการ เราสร้างคลาส TokenizerMapper และ IntSumReducer โดยได้รับมาจากคลาส Mapper และ Reducer ตามลำดับ คลาสที่ผ่านเป็นพารามิเตอร์เทมเพลตจะระบุประเภทของค่าอินพุตและเอาต์พุต API ดั้งเดิมถือว่าฟังก์ชันแผนที่ได้รับคู่คีย์-ค่าเป็นอินพุต เนื่องจากในกรณีของเราคีย์ว่างเปล่า เราจึงกำหนด Object เป็นประเภทคีย์

ในเมธอดหลัก เราเริ่มงาน mapreduce และกำหนดพารามิเตอร์ - ชื่อ, mapper และ reducer, เส้นทางใน HDFS, ตำแหน่งข้อมูลอินพุตและตำแหน่งที่จะใส่ผลลัพธ์ ในการคอมไพล์ เราจำเป็นต้องมีไลบรารี่ของ hadoop ฉันใช้ Maven ในการสร้าง ซึ่ง cloudera มีที่เก็บ ดูคำแนะนำในการตั้งค่าได้ที่นี่ เป็นผลให้ไฟล์ pom.xmp (ซึ่ง maven ใช้เพื่ออธิบายชุดประกอบของโครงการ) ฉันได้รับสิ่งต่อไปนี้):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

มารวบรวมโครงการเป็นแพ็คเกจ jar:

mvn clean package

หลังจากสร้างโปรเจ็กต์เป็นไฟล์ jar แล้ว การเรียกใช้จะเกิดขึ้นในลักษณะเดียวกัน เช่นในกรณีของอินเทอร์เฟซการสตรีม:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

เรารอการดำเนินการและตรวจสอบผลลัพธ์:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

อย่างที่คุณอาจเดาได้ ผลลัพธ์ของการเรียกใช้แอปพลิเคชันแบบเนทีฟของเรานั้นเหมือนกับผลลัพธ์ของแอปพลิเคชันสตรีมมิ่งที่เราเปิดตัวในวิธีก่อนหน้า