4.1 General information about Hadoop

The MapReduce paradigm was proposed by Google in 2004 in its article MapReduce: Simplified Data Processing on Large Clusters . Since the proposed article contained a description of the paradigm, but the implementation was missing, several programmers from Yahoo proposed their implementation as part of the work on the nutch web crawler. You can read more about the history of Hadoop in the article The history of Hadoop: From 4 nodes to the future of data .

Initially, Hadoop was primarily a tool for storing data and running MapReduce tasks, but now Hadoop is a large stack of technologies related in one way or another to processing big data (not only with MapReduce).

The main (core) components of Hadoop are:

  • Hadoop Distributed File System (HDFS) is a distributed file system that allows you to store information of almost unlimited size.
  • Hadoop YARN is a framework for cluster resource management and task management, including the MapReduce framework.
  • Hadoop common

There are also a large number of projects directly related to Hadoop, but not included in the Hadoop core:

  • Hive - a tool for SQL-like queries over big data (turns SQL queries into a series of MapReduce tasks);
  • Pig is a programming language for high-level data analysis. One line of code in this language can turn into a sequence of MapReduce tasks;
  • Hbase is a columnar database that implements the BigTable paradigm;
  • Cassandra is a high-performance distributed key-value database;
  • ZooKeeper is a service for distributed configuration storage and synchronization of configuration changes;
  • Mahout is a big data machine learning library and engine.

Separately, I would like to note the Apache Spark project , which is an engine for distributed data processing. Apache Spark typically uses Hadoop components such as HDFS and YARN for its work, while itself has recently become more popular than Hadoop:

Some of these components will be covered in separate articles in this series of materials, but for now, let's look at how you can start working with Hadoop and put it into practice.

4.2 Running MapReduce programs on Hadoop

Now let's look at how to run a MapReduce task on Hadoop. As a task, we will use the classic WordCount example , which was discussed in the previous lesson.

Let me remind you the formulation of the problem: there is a set of documents. It is necessary for each word occurring in the set of documents to count how many times the word occurs in the set.

Solution:

Map splits the document into words and returns a set of pairs (word, 1).

Reduce sums the occurrences of each word:

def map(doc):  
for word in doc.split():  
	yield word, 1 
def reduce(word, values):  
	yield word, sum(values)

Now the task is to program this solution in the form of code that can be executed on Hadoop and run.

4.3 Method number 1. Hadoop Streaming

The easiest way to run a MapReduce program on Hadoop is to use the Hadoop streaming interface. The streaming interface assumes that map and reduce are implemented as programs that take data from stdin and output to stdout .

The program that executes the map function is called mapper. The program that executes reduce is called, respectively, reducer .

The Streaming interface assumes by default that one incoming line in a mapper or reducer corresponds to one incoming entry for map .

The output of the mapper gets to the input of the reducer in the form of pairs (key, value), while all pairs corresponding to the same key:

  • Guaranteed to be processed by a single launch of the reducer;
  • Will be submitted to the input in a row (that is, if one reducer processes several different keys, the input will be grouped by key).

So let's implement mapper and reducer in python:

#mapper.py  
import sys  
  
def do_map(doc):  
for word in doc.split():  
	yield word.lower(), 1  
  
for line in sys.stdin:  
	for key, value in do_map(line):  
    	print(key + "\t" + str(value))  
 
#reducer.py  
import sys  
  
def do_reduce(word, values):  
	return word, sum(values)  
  
prev_key = None  
values = []  
  
for line in sys.stdin:  
	key, value = line.split("\t")  
	if key != prev_key and prev_key is not None:  
    	result_key, result_value = do_reduce(prev_key, values)  
    	print(result_key + "\t" + str(result_value))  
    	values = []  
	prev_key = key  
	values.append(int(value))  
  
if prev_key is not None:  
	result_key, result_value = do_reduce(prev_key, values)  
	print(result_key + "\t" + str(result_value))

The data that Hadoop will process must be stored on HDFS. Let's upload our articles and put them on HDFS. To do this, use the hadoop fs command :

wget https://www.dropbox.com/s/opp5psid1x3jt41/lenta_articles.tar.gz  
tar xzvf lenta_articles.tar.gz  
hadoop fs -put lenta_articles 

The hadoop fs utility supports a large number of methods for manipulating the file system, many of which are identical to the standard linux utilities.

Now let's start the streaming task:

yarn jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar\  
 -input lenta_articles\  
 -output lenta_wordcount\  
 -file mapper.py\  
 -file reducer.py\  
 -mapper "python mapper.py"\  
 -reducer "python reducer.py" 

The yarn utility is used to run and manage various applications (including map-reduce based) on a cluster. Hadoop-streaming.jar is just one example of such a yarn application.

Next are the launch options:

  • input - folder with source data on hdfs;
  • output - folder on hdfs where you want to put the result;
  • file - files that are needed during the operation of the map-reduce task;
  • mapper is the console command that will be used for the map stage;
  • reduce is the console command that will be used for the reduce stage.

After launching, you can see the progress of the task in the console and a URL for viewing more detailed information about the task.

In the interface available at this URL, you can find out a more detailed task execution status, view the logs of each mapper and reducer (which is very useful in case of failed tasks).

The result of the work after successful execution is added to HDFS in the folder that we specified in the output field. You can view its contents using the "hadoop fs -ls lenta_wordcount" command.

The result itself can be obtained as follows:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41  
this
43  
on
82  
and
111  
into
194 

The "hadoop fs -text" command displays the contents of the folder in text form. I sorted the result by the number of occurrences of the words. As expected, the most common words in the language are prepositions.

4.4 Method number 2: use Java

Hadoop itself is written in java, and Hadoop's native interface is also java-based. Let's show what a native java application for wordcount looks like:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper
        	extends Mapper<Object, Text, Text, IntWritable>{

    	private final static IntWritable one = new IntWritable(1);
    	private Text word = new Text();

    	public void map(Object key, Text value, Context context
    	) throws IOException, InterruptedException {
        	StringTokenizer itr = new StringTokenizer(value.toString());
        	while (itr.hasMoreTokens()) {
            	word.set(itr.nextToken());
            	context.write(word, one);
        	}
    	}
	}

	public static class IntSumReducer
        	extends Reducer<Text,IntWritable,Text,IntWritable> {
    	private IntWritable result = new IntWritable();

    	public void reduce(Text key, Iterable values,
                       	Context context
    	) throws IOException, InterruptedException {
        	int sum = 0;
        	for (IntWritable val : values) {
            	sum += val.get();
        	}
        	result.set(sum);
        	context.write(key, result);
    	}
	}

	public static void main(String[] args) throws Exception {
    	Configuration conf = new Configuration();
    	Job job = Job.getInstance(conf, "word count");
    	job.setJarByClass(WordCount.class);
    	job.setMapperClass(TokenizerMapper.class);
    	job.setReducerClass(IntSumReducer.class);
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	FileInputFormat.addInputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_articles"));
    	FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost/user/cloudera/lenta_wordcount"));
    	System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

This class does exactly the same as our Python example. We create the TokenizerMapper and IntSumReducer classes by deriving from the Mapper and Reducer classes, respectively. The classes passed as template parameters specify the types of input and output values. The native API assumes that the map function is given a key-value pair as input. Since in our case the key is empty, we simply define Object as the key type.

In the Main method, we start the mapreduce task and define its parameters - name, mapper and reducer, the path in HDFS, where the input data is located and where to put the result. To compile, we need hadoop libraries. I use Maven to build, for which cloudera has a repository. Instructions for setting it up can be found here. As a result, the pom.xmp file (which is used by maven to describe the assembly of the project) I got the following):

<?xml version="1.0" encoding="UTF-8"?>  
<project xmlns="http://maven.apache.org/POM/4.0.0"  
     	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
     	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
	<modelVersion>4.0.0</modelVersion>  
  
	<repositories>  
    	<repository>  
        	<id>cloudera</id>  
        	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>  
    	</repository>  
	</repositories>  
  
	<dependencies>  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-common</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-auth</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-hdfs</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
    	<dependency>  
        	<groupId>org.apache.hadoop</groupId>  
        	<artifactId>hadoop-mapreduce-client-app</artifactId>  
        	<version>2.6.0-cdh5.4.2</version>  
    	</dependency>  
  
	</dependencies>  
  
	<groupId>org.dca.examples</groupId>  
	<artifactId>wordcount</artifactId>  
	<version>1.0-SNAPSHOT</version>  
 
</project>

Let's compile the project into a jar package:

mvn clean package

After building the project into a jar file, the launch occurs in a similar way, as in the case of the streaming interface:

yarn jar wordcount-1.0-SNAPSHOT.jar  WordCount 

We wait for execution and check the result:

hadoop fs -text lenta_wordcount/* | sort -n -k2,2 | tail -n5  
from
41
this
43
on
82
and
111
into
194

As you might guess, the result of running our native application is the same as the result of the streaming application that we launched in the previous way.