4.1 Thông tin chung về Hadoop

Mô hình MapReduce đã được Google đề xuất vào năm 2004 trong bài viết MapReduce: Xử lý dữ liệu đơn giản hóa trên các cụm lớn . Vì bài viết được đề xuất có mô tả về mô hình, nhưng phần triển khai bị thiếu, một số lập trình viên từ Yahoo đã đề xuất cách triển khai của họ như một phần công việc trên trình thu thập dữ liệu web nutch. Bạn có thể đọc thêm về lịch sử của Hadoop trong bài viết Lịch sử của Hadoop: Từ 4 nút đến tương lai của dữ liệu .

Ban đầu, Hadoop chủ yếu là một công cụ để lưu trữ dữ liệu và chạy các tác vụ MapReduce, nhưng giờ đây Hadoop là một tập hợp lớn các công nghệ liên quan theo cách này hay cách khác để xử lý dữ liệu lớn (không chỉ với MapReduce).

Các thành phần chính (cốt lõi) của Hadoop là:

  • Hệ thống tệp phân tán Hadoop (HDFS) là một hệ thống tệp phân tán cho phép bạn lưu trữ thông tin với kích thước gần như không giới hạn.
  • Hadoop YARN là một khung để quản lý tài nguyên cụm và quản lý tác vụ, bao gồm cả khung MapReduce.
  • Hadoop chung

Ngoài ra còn có một số lượng lớn các dự án liên quan trực tiếp đến Hadoop, nhưng không được bao gồm trong lõi Hadoop:

  • Hive - một công cụ cho các truy vấn giống như SQL trên dữ liệu lớn (biến các truy vấn SQL thành một loạt các tác vụ MapReduce);
  • Pig là một ngôn ngữ lập trình để phân tích dữ liệu cấp cao. Một dòng mã trong ngôn ngữ này có thể biến thành một chuỗi các tác vụ MapReduce;
  • Hbase là một cơ sở dữ liệu dạng cột triển khai mô hình BigTable;
  • Cassandra là một cơ sở dữ liệu khóa-giá trị được phân phối hiệu suất cao;
  • ZooKeeper là một dịch vụ lưu trữ cấu hình phân tán và đồng bộ hóa các thay đổi cấu hình;
  • Mahout là một thư viện và công cụ học máy dữ liệu lớn.

Một cách riêng biệt, tôi muốn lưu ý dự án Apache Spark , một công cụ để xử lý dữ liệu phân tán. Apache Spark thường sử dụng các thành phần Hadoop như HDFS và YARN cho công việc của mình, trong khi chính nó gần đây đã trở nên phổ biến hơn Hadoop:

Một số thành phần này sẽ được đề cập trong các bài viết riêng trong loạt tài liệu này, nhưng bây giờ, hãy xem cách bạn có thể bắt đầu làm việc với Hadoop và áp dụng nó vào thực tế.

4.2 Chạy chương trình MapReduce trên Hadoop

Bây giờ hãy xem cách chạy tác vụ MapReduce trên Hadoop. Như một nhiệm vụ, chúng ta sẽ sử dụng ví dụ WordCount cổ điển đã được thảo luận trong bài học trước.

Hãy để tôi nhắc bạn về công thức của vấn đề: có một bộ tài liệu. Đối với mỗi từ xuất hiện trong tập hợp tài liệu, cần phải đếm số lần từ đó xuất hiện trong tập hợp.

Giải pháp:

Bản đồ chia tài liệu thành các từ và trả về một tập hợp các cặp (từ, 1).

Giảm tổng số lần xuất hiện của mỗi từ:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Bây giờ nhiệm vụ là lập trình giải pháp này dưới dạng mã có thể được thực thi trên Hadoop và chạy.

4.3 Phương pháp số 1. Truyền phát Hadoop

Cách dễ nhất để chạy chương trình MapReduce trên Hadoop là sử dụng giao diện phát trực tuyến Hadoop. Giao diện phát trực tuyến giả định rằng mapreduce được triển khai dưới dạng các chương trình lấy dữ liệu từ stdin và xuất ra thiết bị xuất chuẩn .

Chương trình thực hiện chức năng bản đồ được gọi là mapper. Chương trình thực thi reduce được gọi tương ứng là reducer .

Theo mặc định, giao diện Truyền phát giả định rằng một dòng đến trong trình ánh xạ hoặc bộ giảm tốc tương ứng với một mục nhập đến cho bản đồ .

Đầu ra của trình ánh xạ đến đầu vào của bộ giảm tốc ở dạng cặp (khóa, giá trị), trong khi tất cả các cặp tương ứng với cùng một khóa:

  • Đảm bảo được xử lý bằng một lần khởi động bộ giảm tốc;
  • Sẽ được gửi đến đầu vào liên tiếp (nghĩa là, nếu một bộ giảm tốc xử lý một số khóa khác nhau, thì đầu vào sẽ được nhóm theo khóa).

Vì vậy, hãy triển khai trình ánh xạ và trình giảm tốc trong python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Dữ liệu mà Hadoop sẽ xử lý phải được lưu trữ trên HDFS. Hãy tải lên các bài viết của chúng tôi và đưa chúng lên HDFS. Để làm điều này, sử dụng lệnh hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

Tiện ích hadoop fs hỗ trợ một số lượng lớn các phương thức để thao tác với hệ thống tệp, nhiều phương thức trong số đó giống hệt với các tiện ích linux tiêu chuẩn.

Bây giờ hãy bắt đầu nhiệm vụ phát trực tuyến:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Tiện ích sợi được sử dụng để khởi chạy và quản lý các ứng dụng khác nhau (bao gồm cả dựa trên bản đồ thu nhỏ) trên một cụm. Hadoop-streaming.jar chỉ là một ví dụ về ứng dụng sợi như vậy.

Tiếp theo là các tùy chọn khởi chạy:

  • đầu vào - thư mục chứa dữ liệu nguồn trên hdfs;
  • đầu ra - thư mục trên hdfs nơi bạn muốn đặt kết quả;
  • tệp - các tệp cần thiết trong quá trình thực hiện tác vụ thu nhỏ bản đồ;
  • mapper là lệnh console sẽ được sử dụng cho giai đoạn bản đồ;
  • giảm là lệnh điều khiển sẽ được sử dụng cho giai đoạn giảm.

Sau khi khởi chạy, bạn có thể xem tiến trình của tác vụ trong bảng điều khiển và một URL để xem thông tin chi tiết hơn về tác vụ.

Trong giao diện có sẵn tại URL này, bạn có thể tìm hiểu trạng thái thực thi tác vụ chi tiết hơn, xem nhật ký của từng trình ánh xạ và trình rút gọn (rất hữu ích trong trường hợp tác vụ không thành công).

Kết quả của công việc sau khi thực hiện thành công được thêm vào HDFS trong thư mục mà chúng tôi đã chỉ định trong trường đầu ra. Bạn có thể xem nội dung của nó bằng lệnh "hadoop fs -ls lenta_wordcount".

Bản thân kết quả có thể thu được như sau:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

Lệnh "hadoop fs -text" hiển thị nội dung của thư mục ở dạng văn bản. Tôi đã sắp xếp kết quả theo số lần xuất hiện của các từ. Như mong đợi, những từ phổ biến nhất trong ngôn ngữ là giới từ.

4.4 Cách 2: sử dụng Java

Bản thân Hadoop được viết bằng java và giao diện gốc của Hadoop cũng dựa trên java. Hãy cho biết một ứng dụng java gốc cho số từ trông như thế nào:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Lớp này thực hiện chính xác như ví dụ Python của chúng tôi. Chúng tôi tạo các lớp TokenizerMapper và IntSumReducer bằng cách lần lượt xuất phát từ các lớp Mapper và Reducer. Các lớp được truyền dưới dạng tham số mẫu chỉ định các loại giá trị đầu vào và đầu ra. API gốc giả định rằng chức năng bản đồ được cung cấp một cặp khóa-giá trị làm đầu vào. Vì trong trường hợp của chúng tôi, khóa trống, chúng tôi chỉ cần xác định Đối tượng là loại khóa.

Trong phương thức Chính, chúng tôi bắt đầu tác vụ mapreduce và xác định các tham số của nó - tên, trình ánh xạ và trình giảm tốc, đường dẫn trong HDFS, vị trí của dữ liệu đầu vào và vị trí đặt kết quả. Để biên dịch, chúng ta cần các thư viện hadoop. Tôi sử dụng Maven để xây dựng, mà cloudera có một kho lưu trữ. Hướng dẫn thiết lập nó có thể được tìm thấy ở đây. Kết quả là tệp pom.xmp (được sử dụng bởi maven để mô tả việc lắp ráp dự án) tôi nhận được như sau):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Hãy biên dịch dự án thành một gói jar:

mvn clean package

Sau khi xây dựng dự án thành tệp jar, quá trình khởi chạy diễn ra theo cách tương tự, như trong trường hợp giao diện phát trực tuyến:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Chúng tôi chờ thực hiện và kiểm tra kết quả:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Như bạn có thể đoán, kết quả của việc chạy ứng dụng gốc của chúng tôi giống như kết quả của ứng dụng phát trực tuyến mà chúng tôi đã khởi chạy theo cách trước đó.