5.1 แผนที่เฉพาะงาน

ได้เวลาอธิบายเทคนิคต่างๆ ที่ช่วยให้คุณสามารถใช้ MapReduce เพื่อแก้ปัญหาในทางปฏิบัติได้อย่างมีประสิทธิภาพ รวมถึงแสดงคุณลักษณะบางอย่างของ Hadoop ที่สามารถทำให้การพัฒนาง่ายขึ้นหรือเพิ่มความเร็วในการดำเนินการงาน MapReduce บนคลัสเตอร์ได้อย่างมาก

ดังที่เราจำได้ MapReduce ประกอบด้วยขั้นตอนแผนที่ สับเปลี่ยน และย่อ ตามกฎแล้ว ขั้นตอนการสุ่มกลายเป็นงานที่ยากที่สุดในงานจริง เนื่องจากข้อมูลจะถูกจัดเรียงในขั้นตอนนี้ ในความเป็นจริง มีงานจำนวนมากที่สามารถจัดการด่านแผนที่เพียงอย่างเดียวได้ นี่คือตัวอย่างของงานดังกล่าว:

  • การกรองข้อมูล (เช่น "ค้นหาบันทึกทั้งหมดจากที่อยู่ IP 123.123.123.123" ในบันทึกของเว็บเซิร์ฟเวอร์)
  • การแปลงข้อมูล (“ลบคอลัมน์ในไฟล์บันทึก csv”);
  • การโหลดและยกเลิกการโหลดข้อมูลจากแหล่งภายนอก (“แทรกบันทึกทั้งหมดจากการเข้าสู่ระบบลงในฐานข้อมูล”)

งานดังกล่าวได้รับการแก้ไขโดยใช้แผนที่เท่านั้น เมื่อสร้างงาน Map-Only ใน Hadoop คุณต้องระบุตัวลดจำนวนเป็นศูนย์:

ตัวอย่างของการกำหนดค่างานแมปเท่านั้นบน hadoop:

อินเทอร์เฟซดั้งเดิม อินเทอร์เฟซการสตรีม Hadoop

ระบุตัวลดจำนวนเป็นศูนย์เมื่อกำหนดค่า job'a:

job.setNumReduceTasks(0); 

เราไม่ระบุตัวลดและระบุตัวลดจำนวนเป็นศูนย์ ตัวอย่าง:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

งาน Map Only จริงๆ แล้วมีประโยชน์มาก ตัวอย่างเช่น ในแพลตฟอร์ม Facetz.DCA เพื่อระบุลักษณะเฉพาะของผู้ใช้ตามพฤติกรรมของผู้ใช้ จะใช้แผนที่ขนาดใหญ่เพียงแผนที่เดียวเท่านั้น ซึ่งผู้ทำแผนที่แต่ละคนจะใช้ผู้ใช้เป็นอินพุตและส่งกลับลักษณะเฉพาะของตนเป็นเอาต์พุต

5.2 รวมกัน

อย่างที่ฉันเขียนไปแล้ว โดยปกติขั้นตอนที่ยากที่สุดเมื่อทำภารกิจลดขนาดแผนที่คือขั้นตอนการสับเปลี่ยน สิ่งนี้เกิดขึ้นเนื่องจากผลลัพธ์ระหว่างกลาง (เอาต์พุตของผู้ทำแผนที่) ถูกเขียนลงดิสก์ จัดเรียง และส่งผ่านเครือข่าย อย่างไรก็ตาม มีงานบางอย่างที่พฤติกรรมดังกล่าวดูไม่สมเหตุสมผล ตัวอย่างเช่น ในงานเดียวกันของการนับคำในเอกสาร คุณสามารถรวมผลลัพธ์ของเอาต์พุตของผู้ทำแผนที่หลายรายการไว้ล่วงหน้าบนโหนดการลดแผนที่เดียวของงาน และส่งค่าที่รวมแล้วสำหรับแต่ละเครื่องไปยังตัวลด .

สำหรับสิ่งนี้ใน hadoop คุณสามารถกำหนดฟังก์ชันการรวมที่จะประมวลผลเอาต์พุตของส่วนหนึ่งของตัวแมป ฟังก์ชันการรวมจะคล้ายกันมากกับการลด - จะใช้เอาต์พุตของตัวทำแผนที่บางตัวเป็นอินพุตและสร้างผลลัพธ์รวมสำหรับตัวทำแผนที่เหล่านี้ ดังนั้นตัวลดจึงมักใช้เป็นตัวรวมเช่นกัน ความแตกต่างที่สำคัญจากการลดคือ ค่าทั้งหมดที่ไม่ ตรงกับคีย์เดียวจะเข้าสู่ฟังก์ชันการรวม

นอกจากนี้ hadoop ไม่รับประกันว่าฟังก์ชันการรวมจะถูกดำเนินการเลยสำหรับเอาต์พุตของตัวทำแผนที่ ดังนั้น ฟังก์ชันการรวมจึงใช้ไม่ได้เสมอไป เช่น ในกรณีของการค้นหาค่ามัธยฐานด้วยคีย์ อย่างไรก็ตาม ในงานเหล่านั้นที่สามารถใช้ฟังก์ชันการรวมได้ การใช้งานจะช่วยให้บรรลุความเร็วที่เพิ่มขึ้นอย่างมากของงาน MapReduce

การใช้ Combiner บน hadoop:

อินเทอร์เฟซดั้งเดิม สตรีมมิ่ง Hadoop

เมื่อกำหนดค่า job-a ให้ระบุ class-Combiner ตามกฎแล้วจะเหมือนกับตัวลด:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

ระบุคำสั่ง -combiner ในตัวเลือกบรรทัดคำสั่ง โดยทั่วไปแล้ว คำสั่งนี้จะเหมือนกับคำสั่งตัวลดขนาด ตัวอย่าง:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce สายงาน

มีบางสถานการณ์ที่ MapReduce หนึ่งรายการไม่เพียงพอที่จะแก้ปัญหา ตัวอย่างเช่น พิจารณางาน WordCount ที่ปรับเปลี่ยนเล็กน้อย: มีเอกสารข้อความชุดหนึ่ง คุณต้องนับจำนวนคำที่เกิดขึ้นตั้งแต่ 1 ถึง 1,000 ครั้งในชุด จำนวนคำตั้งแต่ 1,001 ถึง 2000, 2001 ถึง 3,000 กี่คำ และอื่น ๆ สำหรับวิธีแก้ปัญหา เราต้องการงาน MapReduce 2 งาน:

  • จำนวนคำที่แก้ไขซึ่งสำหรับแต่ละคำจะคำนวณว่าช่วงใดที่ลดลง
  • MapReduce ที่นับจำนวนครั้งที่พบแต่ละช่วงเวลาในเอาต์พุตของ MapReduce แรก

วิธีแก้ปัญหารหัสหลอก:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

ในการดำเนินการตามลำดับของงาน MapReduce บน hadoop ก็เพียงพอแล้วที่จะระบุโฟลเดอร์ที่ระบุเป็นเอาต์พุตสำหรับอันแรกเป็นอินพุตสำหรับงานที่สองและเรียกใช้ตามลำดับ

ในทางปฏิบัติ สายโซ่ของงาน MapReduce สามารถเป็นลำดับที่ค่อนข้างซับซ้อน ซึ่งงาน MapReduce สามารถเชื่อมต่อได้ทั้งแบบต่อเนื่องและแบบคู่ขนานกัน เพื่อลดความซับซ้อนในการจัดการแผนการดำเนินงานดังกล่าว มีเครื่องมือแยกต่างหาก เช่น oozie และ luigi ซึ่งจะกล่าวถึงในบทความแยกต่างหากในชุดนี้

5.4 แคชแบบกระจาย

กลไกสำคัญใน Hadoop คือแคชแบบกระจาย Distributed Cache ให้คุณเพิ่มไฟล์ (เช่น ไฟล์ข้อความ, ไฟล์เก็บถาวร, ไฟล์ jar) ไปยังสภาพแวดล้อมที่งาน MapReduce กำลังทำงานอยู่

คุณสามารถเพิ่มไฟล์ที่จัดเก็บไว้ใน HDFS ไฟล์ในเครื่อง (ในเครื่องที่เปิดใช้งานงาน) ฉันได้แสดงวิธีใช้ Distributed Cache กับการสตรีม Hadoop โดยการเพิ่มไฟล์ mapper.py และ reducer.py ผ่านตัวเลือก -file ในความเป็นจริง คุณสามารถเพิ่มได้ไม่เพียงแค่ mapper.py และ reducer.py เท่านั้น แต่ยังเพิ่มไฟล์ทั่วไปได้ตามอำเภอใจ จากนั้นใช้ไฟล์เหล่านั้นราวกับว่าไฟล์เหล่านั้นอยู่ในโฟลเดอร์ในเครื่อง

การใช้แคชแบบกระจาย:

API เนทีฟ
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop สตรีมมิ่ง

# เราแสดงรายการไฟล์ที่ต้องเพิ่มไปยังแคชแบบกระจายในพารามิเตอร์ –files ตัวเลือก --files ต้องมาก่อนตัวเลือกอื่นๆ

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

ตัวอย่างการใช้งาน:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 ลดการเข้าร่วม

ผู้ที่คุ้นเคยกับการทำงานกับฐานข้อมูลเชิงสัมพันธ์มักจะใช้การดำเนินการเข้าร่วมที่สะดวกมาก ซึ่งช่วยให้พวกเขาสามารถประมวลผลเนื้อหาของตารางบางตารางร่วมกันโดยเข้าร่วมตามคีย์บางคีย์ เมื่อทำงานกับข้อมูลขนาดใหญ่ บางครั้งปัญหานี้ก็เกิดขึ้นเช่นกัน พิจารณาตัวอย่างต่อไปนี้:

มีบันทึกของเว็บเซิร์ฟเวอร์สองรายการ แต่ละบันทึกมีลักษณะดังนี้:

t\t

ตัวอย่างส่วนย่อยของบันทึก:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

จำเป็นต้องคำนวณสำหรับแต่ละที่อยู่ IP ว่าเซิร์ฟเวอร์ใดใน 2 เซิร์ฟเวอร์ที่เยี่ยมชมบ่อยกว่ากัน ผลลัพธ์ควรอยู่ในรูปแบบ:

\t

ตัวอย่างส่วนหนึ่งของผลลัพธ์:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

น่าเสียดายที่โดยทั่วไปแล้วไม่เหมือนฐานข้อมูลเชิงสัมพันธ์ การรวมสองบันทึกด้วยคีย์ (ในกรณีนี้คือตามที่อยู่ IP) เป็นการดำเนินการที่ค่อนข้างหนักและแก้ไขได้โดยใช้ 3 MapReduce และรูปแบบการลดการเข้าร่วม:

ReduceJoin ทำงานดังนี้:

1)สำหรับบันทึกอินพุตแต่ละรายการ จะมีการเปิดตัว MapReduce (แผนที่เท่านั้น) แยกต่างหาก โดยแปลงข้อมูลอินพุตเป็นรูปแบบต่อไปนี้:

key -> (type, value

โดยที่คีย์คือคีย์ในการรวมตาราง Type คือประเภทของตาราง (ตัวแรกหรือตัวที่สองในกรณีของเรา) และ Value คือข้อมูลเพิ่มเติมที่ผูกไว้กับคีย์

2)เอาต์พุตของ MapReduces ทั้งสองจะถูกป้อนเข้ากับอินพุตของ MapReduce ตัวที่ 3 ซึ่งอันที่จริงแล้วทำการรวมเข้าด้วยกัน MapReduce นี้มี Mapper เปล่าที่คัดลอกอินพุต จากนั้น shuffle จะแยกย่อยข้อมูลออกเป็นคีย์และป้อนไปยังตัวลดขนาดเป็นอินพุต:

key -> [(type, value)]

สิ่งสำคัญคือในขณะนี้ตัวลดขนาดจะได้รับบันทึกจากบันทึกทั้งสอง และในขณะเดียวกันก็เป็นไปได้ที่จะระบุโดยฟิลด์ประเภทว่าค่าใดมาจากบันทึกทั้งสองรายการ ดังนั้นจึงมีข้อมูลเพียงพอในการแก้ปัญหาเดิม ในกรณีของเรา ตัวลดเพียงแค่ต้องคำนวณสำหรับแต่ละคีย์บันทึกว่าประเภทใดที่พบมากกว่าและส่งออกประเภทนี้

5.6 แผนที่เข้าร่วม

รูปแบบ ReduceJoin อธิบายถึงกรณีทั่วไปของการรวมบันทึกสองรายการด้วยคีย์ อย่างไรก็ตาม มีกรณีพิเศษที่งานสามารถลดความซับซ้อนและเร่งความเร็วได้อย่างมาก นี่เป็นกรณีที่บันทึกหนึ่งมีขนาดเล็กกว่าบันทึกอื่นอย่างมาก พิจารณาปัญหาต่อไปนี้:

มี 2 ​​ล็อก บันทึกแรกมีบันทึกเว็บเซิร์ฟเวอร์ (เหมือนกับงานก่อนหน้า) ไฟล์ที่สอง (ขนาด 100kb) มี URL-> ธีมตรงกัน ตัวอย่างไฟล์ที่ 2:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

สำหรับที่อยู่ IP แต่ละรายการ จำเป็นต้องคำนวณหน้าของหมวดหมู่จากที่อยู่ IP นี้ที่โหลดบ่อยที่สุด

ในกรณีนี้ เราจำเป็นต้องรวม 2 บันทึกด้วย URL อย่างไรก็ตาม ในกรณีนี้ เราไม่จำเป็นต้องเรียกใช้ 3 MapReduce เนื่องจากบันทึกที่สองจะพอดีกับหน่วยความจำอย่างสมบูรณ์ ในการแก้ปัญหาโดยใช้ MapReduce ลำดับที่ 1 เราสามารถโหลดบันทึกที่สองลงใน Distributed Cache และเมื่อ Mapper ถูกเตรียมใช้งาน เพียงแค่อ่านมันลงในหน่วยความจำ วางไว้ใน -> พจนานุกรมหัวข้อ

นอกจากนี้ ปัญหาได้รับการแก้ไขดังนี้:

แผนที่:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

ลด:


Ip -> [topics] -> [ip, most_popular_topic]

รีดิวซ์ได้รับ ip และรายการของหัวข้อทั้งหมดเป็นอินพุต โดยจะคำนวณว่าหัวข้อใดที่พบบ่อยที่สุด ดังนั้น งานจะได้รับการแก้ไขโดยใช้ MapReduce ที่ 1 และโดยทั่วไปการเข้าร่วมจริงจะเกิดขึ้นภายในแผนที่ (ดังนั้น หากไม่ต้องการการรวมเพิ่มเติมโดยคีย์ งาน MapOnly ก็สามารถจ่ายได้):