4.1 Hadoop hakkında genel bilgiler
MapReduce paradigması, Google tarafından 2004 yılında MapReduce: Büyük Kümelerde Basitleştirilmiş Veri İşleme makalesinde önerildi . Önerilen makale paradigmanın bir tanımını içerdiğinden, ancak uygulama eksik olduğundan, Yahoo'dan birkaç programcı, fındık web gezgini üzerindeki çalışmanın bir parçası olarak bunların uygulanmasını önerdi. Hadoop'un tarihi hakkında daha fazla bilgiyi Hadoop'un tarihi: 4 düğümden verilerin geleceğine kadar makalesinde okuyabilirsiniz .
Başlangıçta, Hadoop öncelikle verileri depolamak ve MapReduce görevlerini çalıştırmak için bir araçtı, ancak şimdi Hadoop şu veya bu şekilde büyük verileri işlemekle ilgili büyük bir teknolojiler yığınıdır (yalnızca MapReduce ile değil).
Hadoop'un ana (temel) bileşenleri şunlardır:
- Hadoop Dağıtılmış Dosya Sistemi (HDFS), neredeyse sınırsız boyutta bilgi depolamanıza izin veren dağıtılmış bir dosya sistemidir.
- Hadoop YARN, MapReduce çerçevesi de dahil olmak üzere küme kaynak yönetimi ve görev yönetimi için bir çerçevedir.
- Hadoop ortak
Doğrudan Hadoop ile ilgili olan ancak Hadoop çekirdeğine dahil olmayan çok sayıda proje de vardır:
- Hive - büyük veriler üzerinde SQL benzeri sorgular için bir araç (SQL sorgularını bir dizi MapReduce görevine dönüştürür);
- Pig, üst düzey veri analizi için bir programlama dilidir. Bu dildeki bir kod satırı, bir dizi MapReduce görevine dönüşebilir;
- Hbase , BigTable paradigmasını uygulayan sütunlu bir veritabanıdır;
- Cassandra, yüksek performanslı, dağıtılmış bir anahtar/değer veritabanıdır;
- ZooKeeper , dağıtılmış yapılandırma depolaması ve yapılandırma değişikliklerinin senkronizasyonu için bir hizmettir;
- Mahout , bir büyük veri makine öğrenimi kitaplığı ve motorudur.
Ayrı olarak, dağıtılmış veri işleme için bir motor olan Apache Spark projesini de not etmek isterim . Apache Spark, çalışmaları için tipik olarak HDFS ve YARN gibi Hadoop bileşenlerini kullanırken kendisi son zamanlarda Hadoop'tan daha popüler hale geldi:
Bu bileşenlerden bazıları, bu malzeme serisinde ayrı makalelerde ele alınacaktır, ancak şimdilik, Hadoop ile nasıl çalışmaya başlayıp onu uygulamaya koyabileceğinize bir göz atalım.
4.2 MapReduce programlarının Hadoop üzerinde çalıştırılması
Şimdi Hadoop'ta bir MapReduce görevinin nasıl çalıştırılacağına bakalım. Görev olarak, önceki derste ele alınan klasik WordCount örneğini kullanacağız .
Size sorunun formülasyonunu hatırlatayım: bir dizi belge var. Belge setinde geçen her kelimenin, kelimenin sette kaç kez geçtiğini saymak gerekir.
Çözüm:
Harita, belgeyi sözcüklere böler ve bir dizi çift (kelime, 1) döndürür.
Azalt, her kelimenin tekrarını toplar:
|
|
Şimdi görev, bu çözümü Hadoop'ta yürütülebilecek ve çalıştırılabilecek kod biçiminde programlamaktır.
4.3 Yöntem numarası 1. Hadoop Akışı
Hadoop'ta bir MapReduce programını çalıştırmanın en kolay yolu, Hadoop akış arayüzünü kullanmaktır. Akış arabirimi, haritalama ve azaltmanın stdin'den veri alan ve stdout'a çıktı veren programlar olarak uygulandığını varsayar .
Harita işlevini yürüten programa haritalayıcı denir. azaltmayı yürüten programa sırasıyla, redüktör denir .
Akış arabirimi, varsayılan olarak, bir eşleyici veya indirgeyicideki bir gelen satırın, map için gelen bir girişe karşılık geldiğini varsayar .
Eşleştiricinin çıktısı, indirgeyicinin girişine çiftler (anahtar, değer) şeklinde gelirken, tüm çiftler aynı anahtara karşılık gelir:
- Redüktörün tek bir çalıştırılmasıyla işlenmesi garanti edilir;
- Girişe arka arkaya gönderilecektir (yani, bir düşürücü birkaç farklı anahtarı işlerse, giriş anahtara göre gruplandırılacaktır).
Öyleyse eşleyici ve indirgeyiciyi python'da uygulayalım:
#mapper.py
import sys
def do_map(doc):
for word in doc.split():
yield word.lower(), 1
for line in sys.stdin:
for key, value in do_map(line):
print(key + "\t" + str(value))
#reducer.py
import sys
def do_reduce(word, values):
return word, sum(values)
prev_key = None
values = []
for line in sys.stdin:
key, value = line.split("\t")
if key != prev_key and prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
values = []
prev_key = key
values.append(int(value))
if prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
print(result_key + "\t" + str(result_value))
Hadoop'un işleyeceği veriler HDFS'de depolanmalıdır. Yazılarımızı yükleyip HDFS'ye koyalım. Bunu yapmak için hadoop fs komutunu kullanın :
wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz
tar xzvf lenta_articles.tar.gz
hadoop fs -put lenta_articles
Hadoop fs yardımcı programı, dosya sistemini işlemek için, çoğu standart linux yardımcı programları ile aynı olan çok sayıda yöntemi destekler.
Şimdi akış görevini başlatalım:
yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\
-input lenta_articles\
-output lenta_wordcount\
-file mapper.py\
-file reducer.py\
-mapper "python mapper.py"\
-reducer "python reducer.py"
İplik yardımcı programı, bir küme üzerinde çeşitli uygulamaları (harita azaltma tabanlı dahil) başlatmak ve yönetmek için kullanılır. Hadoop-streaming.jar, böyle bir iplik uygulamasının yalnızca bir örneğidir.
Sırada başlatma seçenekleri var:
- giriş - hdf'lerde kaynak verileri olan klasör;
- çıktı - hdfs'de sonucu koymak istediğiniz klasör;
- dosya - harita küçültme görevinin çalışması sırasında gerekli olan dosyalar;
- mapper, harita aşaması için kullanılacak konsol komutudur;
- azalt, azaltma aşaması için kullanılacak konsol komutudur.
Başlattıktan sonra, konsolda görevin ilerleyişini ve görevle ilgili daha ayrıntılı bilgileri görüntülemek için bir URL'yi görebilirsiniz.
Bu URL'de bulunan arayüzde, daha ayrıntılı bir görev yürütme durumunu öğrenebilir, her eşleyicinin ve indirgeyicinin günlüklerini görüntüleyebilirsiniz (başarısız görevler durumunda çok kullanışlıdır).
Çalışmanın başarılı bir şekilde yürütülmesinden sonraki sonucu, çıktı alanında belirttiğimiz klasörde HDFS'ye eklenir. İçeriğini "hadoop fs -ls lenta_wordcount" komutunu kullanarak görüntüleyebilirsiniz.
Sonucun kendisi şu şekilde elde edilebilir:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
from
41
this
43
on
82
and
111
into
194
"hadoop fs -text" komutu, klasörün içeriğini metin biçiminde görüntüler. Sonucu kelimelerin tekrar sayısına göre sıraladım. Beklendiği gibi, dildeki en yaygın kelimeler edatlardır.
4.4 Yöntem numarası 2: Java kullanın
Hadoop'un kendisi java ile yazılmıştır ve Hadoop'un yerel arayüzü de java tabanlıdır. Wordcount için yerel bir java uygulamasının nasıl göründüğünü gösterelim:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Bu sınıf, Python örneğimizle tamamen aynı şeyi yapar. Mapper ve Reducer sınıflarından sırasıyla TokenizerMapper ve IntSumReducer sınıflarını türeterek oluşturuyoruz. Şablon parametreleri olarak iletilen sınıflar, girdi ve çıktı değerlerinin türlerini belirtir. Yerel API, harita işlevine girdi olarak bir anahtar/değer çifti verildiğini varsayar. Bizim durumumuzda anahtar boş olduğundan, nesneyi anahtar türü olarak tanımlarız.
Ana yöntemde, mapreduce görevini başlatır ve parametrelerini tanımlarız - ad, eşleyici ve indirgeyici, HDFS'deki yol, giriş verilerinin bulunduğu ve sonucun nereye yerleştirileceği. Derlemek için hadoop kitaplıklarına ihtiyacımız var. Maven'i inşa etmek için kullanıyorum, bunun için cloudera'nın bir deposu var. Ayarlama talimatları burada bulunabilir. Sonuç olarak, pom.xmp dosyası (maven tarafından projenin montajını tanımlamak için kullanılır) aşağıdakileri aldım:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<version>2.6.0-cdh5.4.2</version>
</dependency>
</dependencies>
<groupId>org.dca.examples</groupId>
<artifactId>wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
</project>
Projeyi bir jar paketinde derleyelim:
mvn clean package
Projeyi bir jar dosyasında oluşturduktan sonra, başlatma, akış arabiriminde olduğu gibi benzer bir şekilde gerçekleşir:
yarn jar wordcount-1.0-SNAPSHOT.jar WordCount
Yürütmeyi bekliyoruz ve sonucu kontrol ediyoruz:
hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5
from
41
this
43
on
82
and
111
into
194
Tahmin edebileceğiniz gibi, yerel uygulamamızı çalıştırmanın sonucu, önceki şekilde başlattığımız akış uygulamasının sonucuyla aynı.
GO TO FULL VERSION