4.1 Hadoop সম্পর্কে সাধারণ তথ্য

MapReduce দৃষ্টান্তটি 2004 সালে Google তার নিবন্ধ MapReduce: সরলীকৃত ডেটা প্রসেসিং অন লার্জ ক্লাস্টারে প্রস্তাব করেছিল । যেহেতু প্রস্তাবিত নিবন্ধে দৃষ্টান্তের একটি বিবরণ রয়েছে, কিন্তু বাস্তবায়ন অনুপস্থিত ছিল, ইয়াহুর বেশ কয়েকজন প্রোগ্রামার নচ ওয়েব ক্রলারের কাজের অংশ হিসাবে তাদের বাস্তবায়নের প্রস্তাব করেছিলেন। আপনি Hadoop এর ইতিহাস সম্পর্কে নিবন্ধে আরও পড়তে পারেন Hadoop এর ইতিহাস: 4 নোড থেকে ডেটার ভবিষ্যত পর্যন্ত

প্রাথমিকভাবে, Hadoop প্রাথমিকভাবে ডেটা সংরক্ষণ এবং MapReduce কার্যগুলি চালানোর জন্য একটি সরঞ্জাম ছিল, কিন্তু এখন Hadoop হল বড় ডেটা প্রক্রিয়াকরণের (শুধুমাত্র MapReduce এর সাথে নয়) সম্পর্কিত প্রযুক্তির একটি বড় স্ট্যাক।

Hadoop এর প্রধান (মূল) উপাদান হল:

  • Hadoop ডিস্ট্রিবিউটেড ফাইল সিস্টেম (HDFS) একটি বিতরণ করা ফাইল সিস্টেম যা আপনাকে প্রায় সীমাহীন আকারের তথ্য সংরক্ষণ করতে দেয়।
  • Hadoop YARN হল MapReduce ফ্রেমওয়ার্ক সহ ক্লাস্টার রিসোর্স ম্যানেজমেন্ট এবং টাস্ক ম্যানেজমেন্টের জন্য একটি ফ্রেমওয়ার্ক।
  • হাডুপ সাধারণ

Hadoop এর সাথে সরাসরি সম্পর্কিত প্রচুর সংখ্যক প্রকল্প রয়েছে, কিন্তু Hadoop কোরে অন্তর্ভুক্ত নয়:

  • হাইভ - বড় ডেটার উপর এসকিউএল-এর মতো প্রশ্নের জন্য একটি টুল (এসকিউএল প্রশ্নগুলিকে MapReduce কার্যগুলির একটি সিরিজে পরিণত করে);
  • পিগ উচ্চ-স্তরের ডেটা বিশ্লেষণের জন্য একটি প্রোগ্রামিং ভাষা। এই ভাষায় কোডের একটি লাইন MapReduce টাস্কের একটি সিকোয়েন্সে পরিণত হতে পারে;
  • Hbase হল একটি কলামার ডাটাবেস যা BigTable প্যারাডাইম প্রয়োগ করে;
  • ক্যাসান্দ্রা একটি উচ্চ-কর্মক্ষমতা বিতরণ করা কী-মানের ডাটাবেস;
  • ZooKeeper হল বিতরণকৃত কনফিগারেশন স্টোরেজ এবং কনফিগারেশন পরিবর্তনের সিঙ্ক্রোনাইজেশনের জন্য একটি পরিষেবা;
  • মাহুত একটি বড় ডেটা মেশিন লার্নিং লাইব্রেরি এবং ইঞ্জিন।

আলাদাভাবে, আমি Apache Spark প্রকল্পটি নোট করতে চাই , যা বিতরণ করা ডেটা প্রক্রিয়াকরণের জন্য একটি ইঞ্জিন। অ্যাপাচি স্পার্ক সাধারণত তার কাজের জন্য HDFS এবং YARN এর মতো Hadoop উপাদান ব্যবহার করে, যখন নিজেই সম্প্রতি Hadoop এর চেয়ে বেশি জনপ্রিয় হয়ে উঠেছে:

এই উপাদানগুলির মধ্যে কয়েকটি উপাদানের এই সিরিজে পৃথক নিবন্ধে কভার করা হবে, কিন্তু আপাতত, আসুন দেখুন কিভাবে আপনি Hadoop এর সাথে কাজ শুরু করতে পারেন এবং এটিকে অনুশীলনে আনতে পারেন।

4.2 Hadoop-এ MapReduce প্রোগ্রাম চালানো

এখন দেখা যাক কিভাবে Hadoop এ MapReduce টাস্ক চালানো যায়। একটি কাজ হিসাবে, আমরা ক্লাসিক WordCount উদাহরণ ব্যবহার করব , যা পূর্ববর্তী পাঠে আলোচনা করা হয়েছিল।

আমি আপনাকে সমস্যার সূত্র মনে করিয়ে দিই: নথির একটি সেট আছে। নথির সেটে সংঘটিত প্রতিটি শব্দের জন্য সেটটিতে কতবার শব্দটি আসে তা গণনা করা প্রয়োজন।

সমাধান:

মানচিত্র নথিটিকে শব্দে বিভক্ত করে এবং জোড়ার একটি সেট প্রদান করে (শব্দ, 1)।

প্রতিটি শব্দের যোগফল কমিয়ে দিন:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

এখন কাজটি হল এই সমাধানটিকে কোড আকারে প্রোগ্রাম করা যা Hadoop এ চালানো যায়।

4.3 পদ্ধতি নম্বর 1। Hadoop স্ট্রিমিং

Hadoop এ MapReduce প্রোগ্রাম চালানোর সবচেয়ে সহজ উপায় হল Hadoop স্ট্রিমিং ইন্টারফেস ব্যবহার করা। স্ট্রিমিং ইন্টারফেস অনুমান করে যে মানচিত্র এবং হ্রাস এমন প্রোগ্রাম হিসাবে প্রয়োগ করা হয় যা stdin এবং আউটপুট থেকে stdout- এ ডেটা নেয় ।

যে প্রোগ্রামটি ম্যাপ ফাংশন চালায় তাকে ম্যাপার বলা হয়। যে প্রোগ্রামটি কম চালায় তাকে যথাক্রমে রিডুসার বলা হয় ।

স্ট্রিমিং ইন্টারফেস ডিফল্টরূপে অনুমান করে যে একটি ম্যাপার বা রিডুসারে একটি ইনকামিং লাইন মানচিত্রের জন্য একটি ইনকামিং এন্ট্রির সাথে মিলে যায় ।

ম্যাপারের আউটপুট জোড়া (কী, মান) আকারে রিডুসারের ইনপুটে যায়, যখন একই কী এর সাথে সম্পর্কিত সমস্ত জোড়া:

  • রিডুসারের একটি একক লঞ্চ দ্বারা প্রক্রিয়াকরণের নিশ্চয়তা;
  • একটি সারিতে ইনপুটে জমা দেওয়া হবে (অর্থাৎ, যদি একটি রিডুসার বিভিন্ন কী প্রক্রিয়া করে, ইনপুটটি কী দ্বারা গোষ্ঠীবদ্ধ হবে)।

তাই আসুন পাইথনে ম্যাপার এবং রিডুসার প্রয়োগ করি:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

Hadoop যে ডেটা প্রক্রিয়া করবে তা অবশ্যই HDFS-এ সংরক্ষণ করতে হবে। আসুন আমাদের নিবন্ধগুলি আপলোড করি এবং সেগুলিকে HDFS-এ রাখি৷ এটি করতে, hadoop fs কমান্ড ব্যবহার করুন :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

hadoop fs ইউটিলিটি ফাইল সিস্টেমকে ম্যানিপুলেট করার জন্য প্রচুর সংখ্যক পদ্ধতি সমর্থন করে, যার মধ্যে অনেকগুলি স্ট্যান্ডার্ড linux ইউটিলিটিগুলির সাথে অভিন্ন।

এখন স্ট্রিমিং টাস্ক শুরু করা যাক:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

সুতা ইউটিলিটি একটি ক্লাস্টারে বিভিন্ন অ্যাপ্লিকেশন (ম্যাপ-রিডুস ভিত্তিক সহ) চালু এবং পরিচালনা করতে ব্যবহৃত হয়। Hadoop-streaming.jar এই ধরনের একটি সুতা প্রয়োগের একটি উদাহরণ মাত্র।

পরবর্তী লঞ্চ বিকল্প আছে:

  • ইনপুট - hdfs-এ উৎস ডেটা সহ ফোল্ডার;
  • আউটপুট - hdfs-এ ফোল্ডার যেখানে আপনি ফলাফল রাখতে চান;
  • ফাইল - ম্যাপ-রিডুস টাস্ক পরিচালনার সময় প্রয়োজনীয় ফাইলগুলি;
  • ম্যাপার হল কনসোল কমান্ড যা মানচিত্র পর্যায়ে ব্যবহার করা হবে;
  • reduce হল কনসোল কমান্ড যা হ্রাস পর্যায়ে ব্যবহার করা হবে।

লঞ্চ করার পরে, আপনি কনসোলে টাস্কের অগ্রগতি এবং টাস্ক সম্পর্কে আরও বিশদ তথ্য দেখার জন্য একটি URL দেখতে পারেন।

এই URL-এ উপলব্ধ ইন্টারফেসে, আপনি আরও বিস্তারিত টাস্ক এক্সিকিউশন স্ট্যাটাস খুঁজে পেতে পারেন, প্রতিটি ম্যাপার এবং রিডুসারের লগ দেখতে পারেন (যা ব্যর্থ কাজের ক্ষেত্রে খুবই কার্যকর)।

সফলভাবে সঞ্চালনের পরে কাজের ফলাফল আমরা আউটপুট ক্ষেত্রে নির্দিষ্ট করা ফোল্ডারে HDFS-এ যোগ করা হয়। আপনি "hadoop fs -ls lenta_wordcount" কমান্ড ব্যবহার করে এর বিষয়বস্তু দেখতে পারেন।

ফলাফল নিজেই নিম্নলিখিত হিসাবে প্রাপ্ত করা যেতে পারে:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

"hadoop fs -text" কমান্ডটি পাঠ্য আকারে ফোল্ডারের বিষয়বস্তু প্রদর্শন করে। আমি শব্দের সংঘটন সংখ্যা দ্বারা ফলাফল বাছাই. প্রত্যাশিত হিসাবে, ভাষার সবচেয়ে সাধারণ শব্দগুলি হল অব্যয়।

4.4 পদ্ধতি নম্বর 2: Java ব্যবহার করুন

Hadoop নিজেই জাভাতে লেখা, এবং Hadoop এর নেটিভ ইন্টারফেসও জাভা-ভিত্তিক। ওয়ার্ডকাউন্টের জন্য একটি নেটিভ জাভা অ্যাপ্লিকেশন দেখতে কেমন তা দেখাই:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

এই ক্লাসটি আমাদের পাইথন উদাহরণের মতোই করে। আমরা যথাক্রমে ম্যাপার এবং রিডুসার ক্লাস থেকে প্রাপ্ত করে TokenizerMapper এবং IntSumReducer ক্লাস তৈরি করি। টেমপ্লেট প্যারামিটার হিসাবে পাস করা ক্লাসগুলি ইনপুট এবং আউটপুট মানগুলির প্রকারগুলি নির্দিষ্ট করে৷ নেটিভ এপিআই অনুমান করে যে মানচিত্র ফাংশনটি ইনপুট হিসাবে একটি কী-মান জোড়া দেওয়া হয়েছে। যেহেতু আমাদের ক্ষেত্রে কীটি খালি, আমরা কেবলমাত্র অবজেক্টকে কী প্রকার হিসাবে সংজ্ঞায়িত করি।

প্রধান পদ্ধতিতে, আমরা ম্যাপ্রেডুস টাস্ক শুরু করি এবং এর পরামিতিগুলি সংজ্ঞায়িত করি - নাম, ম্যাপার এবং রিডিউসার, HDFS-এর পথ, যেখানে ইনপুট ডেটা অবস্থিত এবং ফলাফল কোথায় রাখতে হবে। কম্পাইল করার জন্য, আমাদের হাডুপ লাইব্রেরি দরকার। আমি নির্মাণ করতে মাভেন ব্যবহার করি, যার জন্য ক্লাউডারের একটি সংগ্রহস্থল রয়েছে। এটি সেট আপ করার জন্য নির্দেশাবলী এখানে পাওয়া যাবে। ফলস্বরূপ, pom.xmp ফাইল (যা প্রকল্পের সমাবেশ বর্ণনা করতে ম্যাভেন ব্যবহার করে) আমি নিম্নলিখিত পেয়েছি):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

আসুন একটি জার প্যাকেজে প্রকল্পটি কম্পাইল করি:

mvn clean package

একটি জার ফাইলে প্রকল্পটি তৈরি করার পরে, লঞ্চটি একইভাবে ঘটে, যেমন স্ট্রিমিং ইন্টারফেসের ক্ষেত্রে:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

আমরা মৃত্যুদন্ডের জন্য অপেক্ষা করি এবং ফলাফলটি পরীক্ষা করি:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

আপনি অনুমান করতে পারেন, আমাদের নেটিভ অ্যাপ্লিকেশন চালানোর ফলাফল আমরা আগের পদ্ধতিতে চালু করা স্ট্রিমিং অ্যাপ্লিকেশনটির ফলাফলের মতোই।