4.1 ข้อมูลทั่วไปเกี่ยวกับ Hadoop

กระบวนทัศน์ MapReduce ถูกเสนอโดย Google ในปี 2547 ในบทความของMapReduce: การประมวลผลข้อมูลอย่างง่ายบนคลัสเตอร์ขนาดใหญ่ เนื่องจากบทความที่เสนอมีคำอธิบายของกระบวนทัศน์ แต่การนำไปใช้ขาดหายไป โปรแกรมเมอร์หลายคนจาก Yahoo จึงเสนอการนำไปใช้เป็นส่วนหนึ่งของการทำงานบนโปรแกรมรวบรวมข้อมูลเว็บแบบ nutch คุณสามารถอ่านเพิ่มเติมเกี่ยวกับประวัติของ Hadoop ได้ในบทความThe history of Hadoop: From 4 nodes to the future of data
เริ่มแรก Hadoop เป็นเครื่องมือสำหรับการจัดเก็บข้อมูลและเรียกใช้งาน MapReduce เป็นหลัก แต่ตอนนี้ Hadoop เป็นเทคโนโลยีจำนวนมากที่เกี่ยวข้องกับการประมวลผลข้อมูลขนาดใหญ่ไม่ทางใดก็ทางหนึ่ง (ไม่ใช่เฉพาะกับ MapReduce)
ส่วนประกอบหลัก (หลัก) ของ Hadoop คือ:
- Hadoop Distributed File System (HDFS)เป็นระบบไฟล์แบบกระจายที่ช่วยให้คุณสามารถจัดเก็บข้อมูลได้ไม่จำกัดขนาด
- Hadoop YARNเป็นเฟรมเวิร์กสำหรับการจัดการทรัพยากรคลัสเตอร์และการจัดการงาน รวมถึงเฟรมเวิร์ก MapReduce
- Hadoop ทั่วไป
นอกจากนี้ยังมีโครงการจำนวนมากที่เกี่ยวข้องโดยตรงกับ Hadoop แต่ไม่รวมอยู่ในแกนหลักของ Hadoop:
- Hive - เครื่องมือสำหรับการสืบค้นแบบ SQL บนข้อมูลขนาดใหญ่ (เปลี่ยนการสืบค้น SQL ให้เป็นชุดของงาน MapReduce)
- Pigเป็นภาษาโปรแกรมสำหรับการวิเคราะห์ข้อมูลระดับสูง โค้ดหนึ่งบรรทัดในภาษานี้สามารถเปลี่ยนเป็นลำดับของงาน MapReduce ได้
- Hbaseเป็นฐานข้อมูลแบบคอลัมน์ที่ใช้กระบวนทัศน์ BigTable;
- Cassandraเป็นฐานข้อมูลคีย์-ค่าแบบกระจายที่มีประสิทธิภาพสูง
- ZooKeeperเป็นบริการสำหรับการจัดเก็บการกำหนดค่าแบบกระจายและการซิงโครไนซ์การเปลี่ยนแปลงการกำหนดค่า
- Mahoutเป็นห้องสมุดและกลไกการเรียนรู้ของเครื่องบิ๊กดาต้า
แยกกัน ฉันต้องการบันทึก โครงการ Apache Sparkซึ่งเป็นเครื่องมือสำหรับการประมวลผลข้อมูลแบบกระจาย โดยทั่วไปแล้ว Apache Spark จะใช้ส่วนประกอบ Hadoop เช่น HDFS และ YARN ในการทำงาน ในขณะที่ตัวมันเองได้รับความนิยมมากกว่า Hadoop เมื่อเร็วๆ นี้:

ส่วนประกอบเหล่านี้บางส่วนจะกล่าวถึงในบทความแยกต่างหากในชุดข้อมูลนี้ แต่สำหรับตอนนี้ มาดูกันว่าคุณจะเริ่มต้นทำงานกับ Hadoop และนำมันไปใช้จริงได้อย่างไร
4.2 เรียกใช้โปรแกรม MapReduce บน Hadoop
ทีนี้มาดูวิธีเรียกใช้งาน MapReduce บน Hadoop เราจะใช้ ตัวอย่าง WordCount แบบคลาสสิก ซึ่งกล่าวถึงในบทเรียนก่อนหน้านี้
ฉันขอเตือนคุณถึงการกำหนดปัญหา:มีเอกสารชุดหนึ่ง จำเป็นสำหรับแต่ละคำที่เกิดขึ้นในชุดของเอกสารที่จะนับจำนวนครั้งที่เกิดขึ้นในชุด
สารละลาย:
แผนที่แบ่งเอกสารเป็นคำและส่งกลับชุดของคู่ (คำ, 1)
ลดผลรวมที่เกิดขึ้นของแต่ละคำ:
|
|
ตอนนี้งานคือตั้งโปรแกรมโซลูชันนี้ในรูปแบบของโค้ดที่สามารถดำเนินการบน Hadoop และเรียกใช้ได้
4.3 วิธีที่ 1 Hadoop สตรีมมิ่ง
วิธีที่ง่ายที่สุดในการเรียกใช้โปรแกรม MapReduce บน Hadoop คือการใช้อินเทอร์เฟซการสตรีม Hadoop อินเทอร์เฟซการสตรีมจะถือว่าแมปและรีดิวซ์นั้นเป็นโปรแกรมที่รับข้อมูลจาก stdin และเอาต์พุตไปยังstdout
โปรแกรมที่เรียกใช้ฟังก์ชันแผนที่เรียกว่า mapper โปรแกรมที่ดำเนินการลดเรียกว่า ตามลำดับลด
อินเทอร์เฟซการสตรีมถือว่าโดยค่าเริ่มต้นว่าบรรทัดขาเข้าหนึ่งบรรทัดในตัวแมปหรือตัวลดขนาดสอดคล้องกับหนึ่งรายการขาเข้าสำหรับแมป
เอาต์พุตของ mapper ไปที่อินพุตของ reducer ในรูปแบบของคู่ (คีย์, ค่า) ในขณะที่คู่ทั้งหมดที่ตรงกับคีย์เดียวกัน:
- รับประกันว่าจะดำเนินการโดยการเปิดตัวตัวลดเพียงครั้งเดียว
- จะถูกส่งไปยังอินพุตในแถว (นั่นคือ ถ้าตัวลดหนึ่งประมวลผลคีย์ที่แตกต่างกันหลายคีย์ อินพุตจะถูกจัดกลุ่มตามคีย์)
ลองใช้ mapper และ reducer ใน python:
#mapper.py
import sys
def do_map(doc):
for word in doc.split():
yield word.lower(), 1
for line in sys.stdin:
for key, value in do_map(line):
print(key + "\t" + str(value))
#reducer.py
import sys
def do_reduce(word, values):
return word, sum(values)
prev_key = None
values = []
for line in sys.stdin:
key, value = line.split("\t")
if key != prev_key and prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
values = []
prev_key = key
values.append(int(value))
if prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
ข้อมูลที่ Hadoop จะประมวลผลจะต้องจัดเก็บไว้ใน HDFS มาอัพโหลดบทความของเราและวางไว้บน HDFS ในการทำเช่นนี้ ใช้ คำสั่ง hadoop fs :
wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz
tar xzvf lenta_articles.tar.gz
hadoop fs -put lenta_articles
ยูทิลิตี hadoop fs รองรับเมธอดจำนวนมากในการจัดการระบบไฟล์ ซึ่งหลายเมธอดจะเหมือนกันกับยูทิลีตีมาตรฐานของลินุกซ์
ตอนนี้มาเริ่มงานการสตรีมกัน:
yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\
-input lenta_articles\
-output lenta_wordcount\
-file mapper.py\
-file reducer.py\
-mapper "python mapper.py"\
-reducer "python reducer.py"
ยูทิลิตี Yarn ใช้เพื่อเรียกใช้และจัดการแอปพลิเคชันต่างๆ (รวมถึงการลดขนาดแผนที่) บนคลัสเตอร์ Hadoop-streaming.jar เป็นเพียงตัวอย่างหนึ่งของแอปพลิเคชั่นเส้นด้าย
ถัดไปคือตัวเลือกการเปิดตัว:
- อินพุต - โฟลเดอร์ที่มีแหล่งข้อมูลบน hdfs;
- เอาต์พุต - โฟลเดอร์บน hdfs ที่คุณต้องการใส่ผลลัพธ์
- ไฟล์ - ไฟล์ที่จำเป็นระหว่างการทำงานของงานลดแผนที่
- mapper เป็นคำสั่งคอนโซลที่จะใช้สำหรับขั้นตอนการแม็พ
- reduce คือคำสั่งคอนโซลที่จะใช้สำหรับขั้นตอนการลด
หลังจากเปิดตัว คุณสามารถดูความคืบหน้าของงานในคอนโซลและ URL สำหรับดูรายละเอียดเพิ่มเติมเกี่ยวกับงาน

ในอินเทอร์เฟซที่มีให้ที่ URL นี้ คุณสามารถค้นหาสถานะการดำเนินการของงานที่ละเอียดยิ่งขึ้น ดูบันทึกของ mapper แต่ละตัวและตัวลดขนาด (ซึ่งมีประโยชน์มากในกรณีที่งานที่ล้มเหลว)

ผลลัพธ์ของงานหลังจากดำเนินการสำเร็จจะถูกเพิ่มไปยัง HDFS ในโฟลเดอร์ที่เราระบุในช่องเอาต์พุต คุณสามารถดูเนื้อหาได้โดยใช้คำสั่ง "hadoop fs -ls lenta_wordcount"
สามารถรับผลลัพธ์ได้ดังนี้:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
from
41
this
43
on
82
and
111
into
194
คำสั่ง "hadoop fs -text" แสดงเนื้อหาของโฟลเดอร์ในรูปแบบข้อความ ฉันจัดเรียงผลลัพธ์ตามจำนวนคำที่เกิดขึ้น ตามที่คาดไว้ คำที่ใช้บ่อยที่สุดในภาษานี้คือคำบุพบท
4.4 วิธีที่ 2: ใช้ Java
Hadoop นั้นเขียนด้วยภาษาจาวา และส่วนต่อประสานดั้งเดิมของ Hadoop ก็เป็นแบบจาวาเช่นกัน มาดูกันว่าแอปพลิเคชั่นจาวาดั้งเดิมสำหรับการนับคำมีลักษณะอย่างไร:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
คลาสนี้ทำเหมือนกับตัวอย่าง Python ของเราทุกประการ เราสร้างคลาส TokenizerMapper และ IntSumReducer โดยได้รับมาจากคลาส Mapper และ Reducer ตามลำดับ คลาสที่ผ่านเป็นพารามิเตอร์เทมเพลตจะระบุประเภทของค่าอินพุตและเอาต์พุต API ดั้งเดิมถือว่าฟังก์ชันแผนที่ได้รับคู่คีย์-ค่าเป็นอินพุต เนื่องจากในกรณีของเราคีย์ว่างเปล่า เราจึงกำหนด Object เป็นประเภทคีย์
ในเมธอดหลัก เราเริ่มงาน mapreduce และกำหนดพารามิเตอร์ - ชื่อ, mapper และ reducer, เส้นทางใน HDFS, ตำแหน่งข้อมูลอินพุตและตำแหน่งที่จะใส่ผลลัพธ์ ในการคอมไพล์ เราจำเป็นต้องมีไลบรารี่ของ hadoop ฉันใช้ Maven ในการสร้าง ซึ่ง cloudera มีที่เก็บ ดูคำแนะนำในการตั้งค่าได้ที่นี่ เป็นผลให้ไฟล์ pom.xmp (ซึ่ง maven ใช้เพื่ออธิบายชุดประกอบของโครงการ) ฉันได้รับสิ่งต่อไปนี้):
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
</dependencies>
<groupId>org.dca.examples</groupId>
<artifactId>wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
</project>
มารวบรวมโครงการเป็นแพ็คเกจ jar:
mvn clean package
หลังจากสร้างโปรเจ็กต์เป็นไฟล์ jar แล้ว การเรียกใช้จะเกิดขึ้นในลักษณะเดียวกัน เช่นในกรณีของอินเทอร์เฟซการสตรีม:
yarn jar wordcount-1.0-SNAPSHOT.jar WordCount
เรารอการดำเนินการและตรวจสอบผลลัพธ์:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
from
41
this
43
on
82
and
111
into
194
อย่างที่คุณอาจเดาได้ ผลลัพธ์ของการเรียกใช้แอปพลิเคชันแบบเนทีฟของเรานั้นเหมือนกับผลลัพธ์ของแอปพลิเคชันสตรีมมิ่งที่เราเปิดตัวในวิธีก่อนหน้า
GO TO FULL VERSION