5.1 Công việc chỉ bản đồ

Đã đến lúc mô tả các kỹ thuật khác nhau cho phép bạn sử dụng MapReduce một cách hiệu quả để giải quyết các vấn đề thực tế, cũng như chỉ ra một số tính năng của Hadoop có thể đơn giản hóa quá trình phát triển hoặc tăng tốc đáng kể việc thực thi tác vụ MapReduce trên một cụm.

Như chúng ta đã nhớ, MapReduce bao gồm các giai đoạn Bản đồ, Xáo trộn và Giảm. Theo quy định, giai đoạn Xáo trộn hóa ra là khó khăn nhất trong các nhiệm vụ thực tế, vì dữ liệu được sắp xếp ở giai đoạn này. Trên thực tế, có một số nhiệm vụ mà chỉ riêng giai đoạn Bản đồ có thể được phân phối. Dưới đây là ví dụ về các nhiệm vụ như vậy:

  • Lọc dữ liệu (ví dụ: "Tìm tất cả bản ghi từ địa chỉ IP 123.123.123.123" trong nhật ký máy chủ web);
  • Chuyển đổi dữ liệu (“Xóa cột trong nhật ký csv”);
  • Tải và dỡ dữ liệu từ một nguồn bên ngoài (“Chèn tất cả các bản ghi từ nhật ký vào cơ sở dữ liệu”).

Những nhiệm vụ như vậy được giải quyết bằng Map-Only. Khi tạo tác vụ Chỉ dành cho bản đồ trong Hadoop, bạn cần chỉ định số lượng bộ giảm tốc bằng 0:

Một ví dụ về cấu hình tác vụ chỉ có bản đồ trên hadoop:

giao diện gốc Giao diện truyền phát Hadoop

Chỉ định số lượng bộ giảm tốc bằng 0 khi định cấu hình công việc'a:

job.setNumReduceTasks(0); 

Chúng tôi không chỉ định bộ giảm tốc và chỉ định số lượng bộ giảm tốc bằng không. Ví dụ:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Bản đồ Chỉ các công việc thực sự có thể rất hữu ích. Ví dụ: trong nền tảng Facetz.DCA, để xác định các đặc điểm của người dùng theo hành vi của họ, chính xác là một bản đồ lớn duy nhất được sử dụng, mỗi trình ánh xạ lấy người dùng làm đầu vào và trả về các đặc điểm của anh ta làm đầu ra.

5.2 Kết hợp

Như tôi đã viết, thường thì giai đoạn khó khăn nhất khi thực hiện tác vụ Thu nhỏ bản đồ là giai đoạn xáo trộn. Điều này xảy ra vì các kết quả trung gian (đầu ra của trình ánh xạ) được ghi vào đĩa, được sắp xếp và truyền qua mạng. Tuy nhiên, có những nhiệm vụ mà hành vi đó có vẻ không hợp lý cho lắm. Ví dụ: trong cùng một tác vụ đếm từ trong tài liệu, bạn có thể tổng hợp trước kết quả đầu ra của một số trình ánh xạ trên một nút thu nhỏ bản đồ của tác vụ và chuyển các giá trị đã được tổng hợp cho từng máy sang bộ giảm tốc .

Trong hadoop, đối với điều này, bạn có thể xác định một hàm kết hợp sẽ xử lý đầu ra của một phần của trình ánh xạ. Hàm kết hợp rất giống với hàm giảm - nó lấy đầu ra của một số trình ánh xạ làm đầu vào và tạo ra kết quả tổng hợp cho các trình ánh xạ này, do đó, trình giảm tốc cũng thường được sử dụng làm trình kết hợp. Một sự khác biệt quan trọng so với giảm là không phải tất cả các giá trị tương ứng với một khóa đều đến hàm kết hợp .

Ngoài ra, hadoop không đảm bảo rằng chức năng kết hợp sẽ được thực thi hoàn toàn cho đầu ra của trình ánh xạ. Do đó, chức năng kết hợp không phải lúc nào cũng được áp dụng, ví dụ, trong trường hợp tìm kiếm giá trị trung bình theo khóa. Tuy nhiên, trong những tác vụ áp dụng chức năng kết hợp, việc sử dụng nó cho phép đạt được sự gia tăng đáng kể về tốc độ của tác vụ MapReduce.

Sử dụng Bộ kết hợp trên hadoop:

giao diện gốc Truyền trực tuyến Hadoop

Khi định cấu hình công việc-a, hãy chỉ định lớp-Combiner. Theo quy định, nó giống như Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Chỉ định lệnh -combiner trong tùy chọn dòng lệnh. Thông thường, lệnh này giống như lệnh giảm tốc. Ví dụ:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 Chuỗi tác vụ MapReduce

Có những tình huống khi một MapReduce không đủ để giải quyết vấn đề. Ví dụ: hãy xem xét tác vụ WordCount được sửa đổi một chút: có một bộ tài liệu văn bản, bạn cần đếm xem có bao nhiêu từ xuất hiện từ 1 đến 1000 lần trong tập hợp, bao nhiêu từ từ 1001 đến 2000, bao nhiêu từ 2001 đến 3000, và như thế. Đối với giải pháp, chúng tôi cần 2 công việc MapReduce:

  • Số lượng từ đã sửa đổi, đối với mỗi từ sẽ tính toán khoảng thời gian mà nó rơi vào;
  • Một MapReduce đếm số lần gặp phải mỗi khoảng thời gian trong đầu ra của MapReduce đầu tiên.

Giải pháp mã giả:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

Để thực hiện một chuỗi các tác vụ MapReduce trên hadoop, chỉ cần chỉ định thư mục được chỉ định làm đầu ra cho tác vụ đầu tiên làm đầu vào cho tác vụ thứ hai và lần lượt chạy chúng.

Trong thực tế, các chuỗi tác vụ MapReduce có thể là các chuỗi khá phức tạp trong đó các tác vụ MapReduce có thể được kết nối tuần tự và song song với nhau. Để đơn giản hóa việc quản lý các kế hoạch thực hiện nhiệm vụ như vậy, có các công cụ riêng biệt như oozie và luigi, sẽ được thảo luận trong một bài viết riêng của loạt bài này.

5.4 Bộ đệm phân tán

Một cơ chế quan trọng trong Hadoop là Distributed Cache. Bộ đệm ẩn được phân phối cho phép bạn thêm các tệp (ví dụ: tệp văn bản, tệp lưu trữ, tệp jar) vào môi trường nơi tác vụ MapReduce đang chạy.

Bạn có thể thêm các tệp được lưu trữ trên HDFS, tệp cục bộ (cục bộ vào máy mà tác vụ được khởi chạy). Tôi đã hoàn toàn trình bày cách sử dụng Distributed Cache với hadoop streaming bằng cách thêm các tệp mapper.py và reducer.py thông qua tùy chọn -file. Trên thực tế, bạn không chỉ có thể thêm mapper.py và reducer.py mà còn có thể thêm các tệp tùy ý nói chung, sau đó sử dụng chúng như thể chúng nằm trong một thư mục cục bộ.

Sử dụng bộ đệm phân tán:

API gốc
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Truyền phát Hadoop

#chúng tôi liệt kê các tệp cần được thêm vào bộ đệm được phân phối trong tham số –files. Tùy chọn --files phải xuất hiện trước các tùy chọn khác.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

ví dụ sử dụng:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Giảm tham gia

Những người đã quen làm việc với cơ sở dữ liệu quan hệ thường sử dụng thao tác Nối rất tiện lợi, thao tác này cho phép họ cùng xử lý nội dung của một số bảng bằng cách nối chúng theo một khóa nào đó. Khi làm việc với dữ liệu lớn, vấn đề này đôi khi cũng phát sinh. Hãy xem xét ví dụ sau:

Có nhật ký của hai máy chủ web, mỗi nhật ký trông như thế này:

t\t

Ví dụ về đoạn nhật ký:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

Cần phải tính toán đối với từng địa chỉ IP, máy chủ nào trong số 2 máy chủ mà nó truy cập thường xuyên hơn. Kết quả phải ở dạng:

\t

Một ví dụ về một phần của kết quả:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Thật không may, không giống như cơ sở dữ liệu quan hệ, nói chung, nối hai nhật ký theo khóa (trong trường hợp này là theo địa chỉ IP) là một thao tác khá nặng và được giải quyết bằng cách sử dụng 3 MapReduce và mẫu Giảm Tham gia:

ReduceJoin hoạt động như thế này:

1) Đối với mỗi nhật ký đầu vào, một MapReduce riêng (chỉ Bản đồ) được khởi chạy, chuyển đổi dữ liệu đầu vào sang dạng sau:

key -> (type, value

Trong đó khóa là khóa để nối các bảng, Loại là loại bảng (thứ nhất hoặc thứ hai trong trường hợp của chúng tôi) và Giá trị là bất kỳ dữ liệu bổ sung nào được liên kết với khóa.

2) Đầu ra của cả hai MapReduce được đưa vào đầu vào của MapReduce thứ 3, trên thực tế, thực hiện phép hợp. MapReduce này chứa một Mapper trống chỉ sao chép đầu vào. Tiếp theo, xáo trộn phân tách dữ liệu thành các khóa và cung cấp dữ liệu đó cho bộ giảm tốc làm đầu vào:

key -> [(type, value)]

Điều quan trọng là tại thời điểm này, bộ rút gọn nhận được các bản ghi từ cả hai nhật ký và đồng thời, có thể xác định bằng trường loại mà một giá trị cụ thể đến từ hai nhật ký. Vì vậy, có đủ dữ liệu để giải quyết vấn đề ban đầu. Trong trường hợp của chúng tôi, bộ giảm tốc chỉ cần tính toán cho từng khóa bản ghi, loại nào đã gặp nhiều hơn và xuất loại này.

5.6 Tham gia bản đồ

Mẫu ReduceJoin mô tả trường hợp chung của việc nối hai nhật ký bằng khóa. Tuy nhiên, có một trường hợp đặc biệt trong đó nhiệm vụ có thể được đơn giản hóa và tăng tốc đáng kể. Đây là trường hợp một trong các bản ghi nhỏ hơn đáng kể so với bản ghi kia. Hãy xem xét vấn đề sau:

Có 2 nhật ký. Nhật ký đầu tiên chứa nhật ký máy chủ web (giống như trong tác vụ trước), tệp thứ hai (kích thước 100kb) chứa URL-> Kết hợp chủ đề. Ví dụ tệp thứ 2:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

Đối với mỗi địa chỉ IP, cần tính toán các trang thuộc danh mục nào từ địa chỉ IP này được tải thường xuyên nhất.

Trong trường hợp này, chúng tôi cũng cần nối 2 nhật ký bằng URL. Tuy nhiên, trong trường hợp này, chúng tôi không phải chạy 3 MapReduce, vì nhật ký thứ hai sẽ hoàn toàn phù hợp với bộ nhớ. Để giải quyết vấn đề bằng cách sử dụng MapReduce đầu tiên, chúng ta có thể tải nhật ký thứ hai vào Bộ đệm ẩn được phân phối và khi Trình ánh xạ được khởi tạo, chỉ cần đọc nó vào bộ nhớ, đưa nó vào -> từ điển chủ đề.

Hơn nữa, vấn đề được giải quyết như sau:

bản đồ:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

giảm bớt:


Ip -> [topics] -> [ip, most_popular_topic]

Giảm nhận được một ip và danh sách tất cả các chủ đề làm đầu vào, nó chỉ đơn giản là tính toán chủ đề nào được gặp thường xuyên nhất. Do đó, nhiệm vụ được giải quyết bằng cách sử dụng MapReduce đầu tiên và Tham gia thực tế thường diễn ra bên trong bản đồ (do đó, nếu không cần tổng hợp bổ sung theo khóa, công việc MapOnly có thể được phân phối):