4.1 Informasi umum tentang Hadoop

Paradigma MapReduce diusulkan oleh Google pada tahun 2004 dalam artikelnya MapReduce: Pemrosesan Data Sederhana pada Cluster Besar . Karena artikel yang diusulkan berisi deskripsi paradigma, tetapi implementasinya hilang, beberapa programmer dari Yahoo mengusulkan penerapannya sebagai bagian dari pekerjaan perayap web nutch. Anda dapat membaca lebih lanjut tentang sejarah Hadoop di artikel Sejarah Hadoop: Dari 4 node hingga data masa depan .

Awalnya, Hadoop terutama merupakan alat untuk menyimpan data dan menjalankan tugas MapReduce, tetapi sekarang Hadoop adalah tumpukan besar teknologi yang terkait dengan satu atau lain cara untuk memproses data besar (tidak hanya dengan MapReduce).

Komponen (inti) utama Hadoop adalah:

  • Sistem File Terdistribusi Hadoop (HDFS) adalah sistem file terdistribusi yang memungkinkan Anda menyimpan informasi dengan ukuran hampir tidak terbatas.
  • Hadoop YARN adalah framework untuk manajemen sumber daya cluster dan manajemen tugas, termasuk framework MapReduce.
  • Hadoop umum

Ada juga banyak proyek yang terkait langsung dengan Hadoop, tetapi tidak termasuk dalam inti Hadoop:

  • Hive - alat untuk kueri mirip SQL melalui data besar (mengubah kueri SQL menjadi serangkaian tugas MapReduce);
  • Pig adalah bahasa pemrograman untuk analisis data tingkat tinggi. Satu baris kode dalam bahasa ini dapat berubah menjadi rangkaian tugas MapReduce;
  • Hbase adalah database berbentuk kolom yang mengimplementasikan paradigma BigTable;
  • Cassandra adalah database nilai kunci terdistribusi berkinerja tinggi;
  • ZooKeeper adalah layanan untuk penyimpanan konfigurasi terdistribusi dan sinkronisasi perubahan konfigurasi;
  • Mahout adalah perpustakaan dan mesin pembelajaran mesin data besar.

Secara terpisah, saya ingin mencatat proyek Apache Spark , yang merupakan mesin untuk pemrosesan data terdistribusi. Apache Spark biasanya menggunakan komponen Hadoop seperti HDFS dan YARN untuk pekerjaannya, sementara itu sendiri baru-baru ini menjadi lebih populer daripada Hadoop:

Beberapa komponen ini akan dibahas dalam artikel terpisah dalam rangkaian materi ini, tetapi untuk saat ini, mari kita lihat bagaimana Anda dapat mulai bekerja dengan Hadoop dan mempraktikkannya.

4.2 Menjalankan program MapReduce di Hadoop

Sekarang mari kita lihat cara menjalankan tugas MapReduce di Hadoop. Sebagai tugas, kami akan menggunakan contoh WordCount klasik , yang telah dibahas di pelajaran sebelumnya.

Izinkan saya mengingatkan Anda rumusan masalahnya: ada satu set dokumen. Setiap kata yang muncul dalam kumpulan dokumen perlu dihitung berapa kali kata tersebut muncul dalam kumpulan tersebut.

Larutan:

Peta membagi dokumen menjadi kata-kata dan mengembalikan satu set pasangan (kata, 1).

Kurangi jumlah kemunculan setiap kata:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Sekarang tugasnya adalah memprogram solusi ini dalam bentuk kode yang dapat dijalankan di Hadoop dan dijalankan.

4.3 Metode nomor 1. Hadoop Streaming

Cara termudah untuk menjalankan program MapReduce di Hadoop adalah dengan menggunakan antarmuka streaming Hadoop. Antarmuka streaming mengasumsikan bahwa peta dan pengurangan diimplementasikan sebagai program yang mengambil data dari stdin dan keluaran ke stdout .

Program yang menjalankan fungsi peta disebut mapper. Program yang menjalankan pengurangan disebut, masing-masing, peredam .

Antarmuka Streaming mengasumsikan secara default bahwa satu baris masuk dalam mapper atau peredam sesuai dengan satu entri masuk untuk map .

Output dari mapper sampai ke input peredam dalam bentuk pasangan (kunci, nilai), sedangkan semua pasangan yang sesuai dengan kunci yang sama:

  • Dijamin akan diproses dengan satu peluncuran peredam;
  • Akan diserahkan ke input berturut-turut (yaitu, jika satu peredam memproses beberapa kunci yang berbeda, input akan dikelompokkan berdasarkan kunci).

Jadi mari terapkan mapper dan reducer dengan python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Data yang akan diproses Hadoop harus disimpan di HDFS. Mari unggah artikel kita dan taruh di HDFS. Untuk melakukan ini, gunakan perintah hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

Utilitas hadoop fs mendukung sejumlah besar metode untuk memanipulasi sistem file, banyak di antaranya identik dengan utilitas linux standar.

Sekarang mari kita mulai tugas streaming:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Utilitas benang digunakan untuk meluncurkan dan mengelola berbagai aplikasi (termasuk berbasis pengurangan peta) pada sebuah cluster. Hadoop-streaming.jar hanyalah salah satu contoh aplikasi benang semacam itu.

Berikutnya adalah opsi peluncuran:

  • input - folder dengan data sumber di hdfs;
  • keluaran - folder di hdfs tempat Anda ingin meletakkan hasilnya;
  • file - file yang diperlukan selama pengoperasian tugas pengurangan peta;
  • mapper adalah perintah konsol yang akan digunakan untuk tahap peta;
  • pengurangan adalah perintah konsol yang akan digunakan untuk tahap pengurangan.

Setelah diluncurkan, Anda dapat melihat progres tugas di konsol dan URL untuk melihat informasi lebih rinci tentang tugas tersebut.

Di antarmuka yang tersedia di URL ini, Anda dapat mengetahui status eksekusi tugas yang lebih detail, melihat log dari setiap mapper dan reducer (yang sangat berguna jika ada tugas yang gagal).

Hasil pekerjaan setelah eksekusi yang berhasil ditambahkan ke HDFS di folder yang kami tentukan di bidang keluaran. Anda dapat melihat isinya menggunakan perintah "hadoop fs -ls lenta_wordcount".

Hasilnya sendiri dapat diperoleh sebagai berikut:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

Perintah "hadoop fs -text" menampilkan isi folder dalam bentuk teks. Saya mengurutkan hasilnya berdasarkan jumlah kemunculan kata-kata. Seperti yang diharapkan, kata yang paling umum dalam bahasa ini adalah preposisi.

4.4 Metode nomor 2: gunakan Java

Hadoop sendiri ditulis dalam java, dan antarmuka asli Hadoop juga berbasis java. Mari kita tunjukkan seperti apa aplikasi java asli untuk wordcount:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Kelas ini persis sama dengan contoh Python kita. Kami membuat kelas TokenizerMapper dan IntSumReducer dengan menurunkan masing-masing dari kelas Mapper dan Reducer. Kelas yang diteruskan sebagai parameter template menentukan jenis nilai input dan output. API asli mengasumsikan bahwa fungsi peta diberi pasangan kunci-nilai sebagai masukan. Karena dalam kasus kita kuncinya kosong, kita cukup mendefinisikan Object sebagai tipe kuncinya.

Dalam metode Utama, kami memulai tugas mapreduce dan menentukan parameternya - nama, mapper dan peredam, jalur di HDFS, tempat data input berada dan di mana meletakkan hasilnya. Untuk mengkompilasi, kita membutuhkan pustaka hadoop. Saya menggunakan Maven untuk membangun, yang cloudera memiliki repositori. Petunjuk untuk menyiapkannya dapat ditemukan di sini. Hasilnya, file pom.xmp (yang digunakan oleh maven untuk mendeskripsikan rakitan proyek) saya mendapatkan yang berikut ini):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Mari kompilasi proyek menjadi paket jar:

mvn clean package

Setelah membuat proyek menjadi file jar, peluncuran terjadi dengan cara yang sama, seperti dalam kasus antarmuka streaming:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Kami menunggu eksekusi dan memeriksa hasilnya:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Seperti yang Anda duga, hasil menjalankan aplikasi asli kami sama dengan hasil aplikasi streaming yang kami luncurkan dengan cara sebelumnya.