4.1 Maklumat am tentang Hadoop

Paradigma MapReduce telah dicadangkan oleh Google pada tahun 2004 dalam artikelnya MapReduce: Pemprosesan Data Ringkas pada Kluster Besar . Memandangkan artikel yang dicadangkan itu mengandungi penerangan tentang paradigma, tetapi pelaksanaannya tiada, beberapa pengaturcara daripada Yahoo mencadangkan pelaksanaannya sebagai sebahagian daripada kerja pada perangkak web nutch. Anda boleh membaca lebih lanjut tentang sejarah Hadoop dalam artikel Sejarah Hadoop: Daripada 4 nod kepada masa depan data .

Pada mulanya, Hadoop ialah alat untuk menyimpan data dan menjalankan tugas MapReduce, tetapi kini Hadoop ialah timbunan besar teknologi yang berkaitan dalam satu cara atau yang lain untuk memproses data besar (bukan sahaja dengan MapReduce).

Komponen utama (teras) Hadoop ialah:

  • Hadoop Distributed File System (HDFS) ialah sistem fail teragih yang membolehkan anda menyimpan maklumat dengan saiz yang hampir tidak terhad.
  • Hadoop YARN ialah rangka kerja untuk pengurusan sumber kluster dan pengurusan tugas, termasuk rangka kerja MapReduce.
  • Hadoop biasa

Terdapat juga sejumlah besar projek yang berkaitan secara langsung dengan Hadoop, tetapi tidak termasuk dalam teras Hadoop:

  • Hive - alat untuk pertanyaan seperti SQL ke atas data besar (menukar pertanyaan SQL menjadi satu siri tugasan MapReduce);
  • Babi ialah bahasa pengaturcaraan untuk analisis data peringkat tinggi. Satu baris kod dalam bahasa ini boleh bertukar menjadi urutan tugasan MapReduce;
  • Hbase ialah pangkalan data kolumnar yang melaksanakan paradigma BigTable;
  • Cassandra ialah pangkalan data nilai kunci teragih berprestasi tinggi;
  • ZooKeeper ialah perkhidmatan untuk penyimpanan konfigurasi teragih dan penyegerakan perubahan konfigurasi;
  • Mahout ialah perpustakaan dan enjin pembelajaran mesin data besar.

Secara berasingan, saya ingin ambil perhatian projek Apache Spark , yang merupakan enjin untuk pemprosesan data teragih. Apache Spark biasanya menggunakan komponen Hadoop seperti HDFS dan YARN untuk kerjanya, manakala ianya baru-baru ini menjadi lebih popular daripada Hadoop:

Beberapa komponen ini akan diliputi dalam artikel berasingan dalam siri bahan ini, tetapi buat masa ini, mari lihat bagaimana anda boleh mula bekerja dengan Hadoop dan mempraktikkannya.

4.2 Menjalankan program MapReduce pada Hadoop

Sekarang mari kita lihat cara menjalankan tugas MapReduce pada Hadoop. Sebagai tugas, kami akan menggunakan contoh WordCount klasik , yang telah dibincangkan dalam pelajaran sebelumnya.

Biar saya mengingatkan anda rumusan masalah: terdapat satu set dokumen. Ia adalah perlu untuk setiap perkataan yang berlaku dalam set dokumen untuk mengira berapa kali perkataan itu berlaku dalam set.

Penyelesaian:

Peta membahagikan dokumen kepada perkataan dan mengembalikan set pasangan (perkataan, 1).

Kurangkan jumlah kejadian setiap perkataan:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Sekarang tugasnya adalah untuk memprogramkan penyelesaian ini dalam bentuk kod yang boleh dilaksanakan pada Hadoop dan dijalankan.

4.3 Kaedah nombor 1. Penstriman Hadoop

Cara paling mudah untuk menjalankan program MapReduce pada Hadoop ialah menggunakan antara muka penstriman Hadoop. Antara muka penstriman menganggap bahawa peta dan pengurangan dilaksanakan sebagai program yang mengambil data daripada stdin dan output ke stdout .

Program yang melaksanakan fungsi peta dipanggil pemeta. Program yang melaksanakan reduce dipanggil, masing-masing, reducer .

Antara muka Penstriman menganggap secara lalai bahawa satu baris masuk dalam pemeta atau pengurang sepadan dengan satu masukan masuk untuk peta .

Output pemeta sampai ke input pengurang dalam bentuk pasangan (kunci, nilai), manakala semua pasangan sepadan dengan kunci yang sama:

  • Dijamin akan diproses dengan satu pelancaran pengurang;
  • Akan diserahkan kepada input berturut-turut (iaitu, jika satu pengurang memproses beberapa kekunci yang berbeza, input akan dikumpulkan mengikut kekunci).

Jadi mari kita laksanakan pemeta dan pengurang dalam python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Data yang akan diproses oleh Hadoop mesti disimpan pada HDFS. Mari muat naik artikel kami dan letakkannya di HDFS. Untuk melakukan ini, gunakan perintah hadoop fs :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

Utiliti hadoop fs menyokong sejumlah besar kaedah untuk memanipulasi sistem fail, kebanyakannya adalah sama dengan utiliti linux standard.

Sekarang mari kita mulakan tugas penstriman:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

Utiliti benang digunakan untuk melancarkan dan mengurus pelbagai aplikasi (termasuk berdasarkan pengurangan peta) pada kelompok. Hadoop-streaming.jar hanyalah satu contoh aplikasi benang sedemikian.

Seterusnya ialah pilihan pelancaran:

  • input - folder dengan data sumber pada hdfs;
  • output - folder pada hdfs di mana anda mahu meletakkan hasilnya;
  • fail - fail yang diperlukan semasa operasi tugas mengurangkan peta;
  • mapper ialah arahan konsol yang akan digunakan untuk peringkat peta;
  • reduce ialah arahan konsol yang akan digunakan untuk peringkat reduce.

Selepas pelancaran, anda boleh melihat kemajuan tugasan dalam konsol dan URL untuk melihat maklumat yang lebih terperinci tentang tugasan tersebut.

Dalam antara muka yang tersedia di URL ini, anda boleh mengetahui status pelaksanaan tugas yang lebih terperinci, melihat log setiap pemeta dan pengurang (yang sangat berguna sekiranya tugasan gagal).

Hasil kerja selepas pelaksanaan yang berjaya ditambahkan pada HDFS dalam folder yang kami tentukan dalam medan output. Anda boleh melihat kandungannya menggunakan arahan "hadoop fs -ls lenta_wordcount".

Hasilnya sendiri boleh diperoleh seperti berikut:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

Perintah "hadoop fs -text" memaparkan kandungan folder dalam bentuk teks. Saya menyusun keputusan mengikut bilangan kemunculan perkataan. Seperti yang dijangkakan, perkataan yang paling biasa dalam bahasa ialah preposisi.

4.4 Kaedah nombor 2: gunakan Java

Hadoop sendiri ditulis dalam java, dan antara muka asli Hadoop juga berasaskan java. Mari tunjukkan rupa aplikasi java asli untuk wordcount:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

Kelas ini melakukan perkara yang sama seperti contoh Python kami. Kami mencipta kelas TokenizerMapper dan IntSumReducer dengan memperoleh daripada kelas Mapper dan Reducer, masing-masing. Kelas yang diluluskan sebagai parameter templat menentukan jenis nilai input dan output. API asli menganggap bahawa fungsi peta diberikan pasangan nilai kunci sebagai input. Oleh kerana dalam kes kami kunci itu kosong, kami hanya mentakrifkan Objek sebagai jenis kunci.

Dalam kaedah Utama, kami memulakan tugas mapreduce dan menentukan parameternya - nama, pemeta dan pengurang, laluan dalam HDFS, di mana data input terletak dan di mana untuk meletakkan hasilnya. Untuk menyusun, kami memerlukan perpustakaan hadoop. Saya menggunakan Maven untuk membina, yang cloudera mempunyai repositori. Arahan untuk menetapkannya boleh didapati di sini. Akibatnya, fail pom.xmp (yang digunakan oleh maven untuk menerangkan pemasangan projek) saya mendapat yang berikut):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Mari kita susun projek ke dalam pakej balang:

mvn clean package

Selepas membina projek ke dalam fail balang, pelancaran berlaku dengan cara yang sama, seperti dalam kes antara muka penstriman:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

Kami menunggu pelaksanaan dan menyemak hasilnya:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

Seperti yang anda fikirkan, hasil menjalankan aplikasi asli kami adalah sama dengan hasil aplikasi penstriman yang kami lancarkan dengan cara sebelumnya.