5.1 Map only job

It's time to describe various techniques that allow you to effectively use MapReduce to solve practical problems, as well as show some of the features of Hadoop that can simplify development or significantly speed up the execution of a MapReduce task on a cluster.

As we remember, MapReduce consists of Map, Shuffle and Reduce stages. As a rule, the Shuffle stage turns out to be the most difficult in practical tasks, since data is sorted at this stage. In fact, there are a number of tasks in which the Map stage alone can be dispensed with. Here are examples of such tasks:

  • Data filtering (for example, "Find all records from the IP address 123.123.123.123" in the web server logs);
  • Data transformation (“Delete column in csv-logs”);
  • Loading and unloading data from an external source (“Insert all records from the log into the database”).

Such tasks are solved using Map-Only. When creating a Map-Only task in Hadoop, you need to specify zero number of reducers:

An example of a map-only task configuration on hadoop:

native interface Hadoop Streaming Interface

Specify zero number of reducers when configuring job'a:

job.setNumReduceTasks(0); 

We do not specify a reducer and specify a zero number of reducers. Example:

hadoop jar hadoop-streaming.jar \ 
 -D mapred.reduce.tasks=0\ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-file "mapper.py"

Map Only jobs can actually be very useful. For example, in the Facetz.DCA platform, to identify the characteristics of users by their behavior, it is precisely one large map-only that is used, each mapper of which takes a user as an input and returns his characteristics as an output.

5.2 Combine

As I already wrote, usually the most difficult stage when performing a Map-Reduce task is the shuffle stage. This happens because the intermediate results (mapper's output) are written to disk, sorted and transmitted over the network. However, there are tasks in which such behavior does not seem very reasonable. For example, in the same task of counting words in documents, you can pre-aggregate the results of the outputs of several mappers on one map-reduce node of the task, and pass the already summed values ​​for each machine to the reducer.

In hadoop, for this, you can define a combining function that will process the output of part of the mappers. The combining function is very similar to reduce - it takes the output of some mappers as input and produces an aggregated result for these mappers, so the reducer is often used as a combiner as well. An important difference from reduce is that not all values ​​corresponding to one key get to the combining function .

Moreover, hadoop does not guarantee that the combine function will be executed at all for the output of the mapper. Therefore, the combining function is not always applicable, for example, in the case of searching for the median value by key. Nevertheless, in those tasks where the combining function is applicable, its use allows to achieve a significant increase in the speed of the MapReduce task.

Using the Combiner on hadoop:

native interface Hadoop streaming

When configuring job-a, specify the class-Combiner. As a rule, it is the same as Reducer:

job.setMapperClass(TokenizerMapper.class); 
job.setCombinerClass(IntSumReducer.class); 
job.setReducerClass(IntSumReducer.class); 

Specify the -combiner command in the command line options. Typically, this command is the same as the reducer command. Example:

hadoop jar hadoop-streaming.jar \ 
-input input_dir\ 
-output output_dir\ 
-mapper "python mapper.py"\ 
-reducer "python reducer.py"\ 
-combiner "python reducer.py"\ 
-file "mapper.py"\ 
-file "reducer.py"\

5.3 MapReduce task chains

There are situations when one MapReduce is not enough to solve a problem. For example, consider a slightly modified WordCount task: there is a set of text documents, you need to count how many words occurred from 1 to 1000 times in the set, how many words from 1001 to 2000, how many from 2001 to 3000, and so on. For the solution, we need 2 MapReduce jobs:

  • Modified wordcount, which for each word will calculate which of the intervals it fell into;
  • A MapReduce that counts how many times each interval was encountered in the output of the first MapReduce.

Pseudo code solution:

#map1 
def map(doc): 
for word in doc: 
yield word, 1
#reduce1 
def reduce(word, values): 
yield int(sum(values)/1000), 1 
#map2 
def map(doc): 
interval, cnt = doc.split() 
yield interval, cnt 
#reduce2 
def reduce(interval, values): 
yield interval*1000, sum(values) 

In order to execute a sequence of MapReduce tasks on hadoop, it is enough just to specify the folder that was specified as output for the first one as input for the second task and run them in turn.

In practice, chains of MapReduce tasks can be quite complex sequences in which MapReduce tasks can be connected both sequentially and in parallel to each other. To simplify the management of such task execution plans, there are separate tools like oozie and luigi, which will be discussed in a separate article in this series.

5.4 Distributed cache

An important mechanism in Hadoop is the Distributed Cache. Distributed Cache allows you to add files (eg text files, archives, jar files) to the environment where the MapReduce task is running.

You can add files stored on HDFS, local files (local to the machine from which the task is launched). I've already implicitly shown how to use Distributed Cache with hadoop streaming by adding the mapper.py and reducer.py files via the -file option. In fact, you can add not only mapper.py and reducer.py, but arbitrary files in general, and then use them as if they were in a local folder.

Using Distributed Cache:

Native API
//Job configuration
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),  job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//example of usage in mapper-e:
public static class MapClass extends MapReduceBase
implements Mapper<K, V, K, V> {

 private Path[] localArchives;
 private Path[] localFiles;

 public void configure(JobConf job) {
   // get cached data from archives
   File f = new File("./map.zip/some/file/in/zip.txt");
 }

 public void map(K key, V value,
             	OutputCollector<K, V> output, Reporter reporter)
 throws IOException {
   // use data here
   // ...
   // ...
   output.collect(k, v);
 }
}
Hadoop Streaming

#we list the files that need to be added to the distributed cache in the –files parameter. The --files option must come before the other options.

yarn  hadoop-streaming.jar\ 
-files mapper.py,reducer.py,some_cached_data.txt\ 
-input '/some/input/path' \ 
-output '/some/output/path' \  
-mapper 'python mapper.py' \ 
-reducer 'python reducer.py' \

usage example:

import sys 
#just read file from local folder 
data = open('some_cached_data.txt').read() 
 
for line in sys.stdin() 
#processing input 
#use data here

5.5 Reduce Join

Those who are accustomed to working with relational databases often use the very convenient Join operation, which allows them to jointly process the contents of some tables by joining them according to some key. When working with big data, this problem also sometimes arises. Consider the following example:

There are logs of two web servers, each log looks like this:

t\t

Log snippet example:

1446792139	
178.78.82.1	
/sphingosine/unhurrying.css 
1446792139	
126.31.163.222	
/accentually.js 
1446792139	
154.164.149.83	
/pyroacid/unkemptly.jpg 
1446792139	
202.27.13.181	
/Chawia.js 
1446792139	
67.123.248.174	
/morphographical/dismain.css 
1446792139	
226.74.123.135	
/phanerite.php 
1446792139	
157.109.106.104	
/bisonant.css

It is necessary to calculate for each IP address which of the 2 servers it visited more often. The result should be in the form:

\t

An example of a part of the result:

178.78.82.1	
first 
126.31.163.222	
second 
154.164.149.83	
second 
226.74.123.135	
first

Unfortunately, unlike relational databases, in general, joining two logs by key (in this case, by IP address) is a rather heavy operation and is solved using 3 MapReduce and the Reduce Join pattern:

ReduceJoin works like this:

1) For each of the input logs, a separate MapReduce (Map only) is launched, converting the input data to the following form:

key -> (type, value

Where key is the key to join tables on, Type is the type of the table (first or second in our case), and Value is any additional data bound to the key.

2) The outputs of both MapReduces are fed to the input of the 3rd MapReduce, which, in fact, performs the union. This MapReduce contains an empty Mapper that simply copies the input. Next, shuffle decomposes the data into keys and feeds it to the reducer as input:

key -> [(type, value)]

It is important that at this moment the reducer receives records from both logs, and at the same time, it is possible to identify by the type field which of the two logs a particular value came from. So there is enough data to solve the original problem. In our case, the reducer simply has to calculate for each record key which type has encountered more and output this type.

5.6 MapJoins

The ReduceJoin pattern describes the general case of joining two logs by key. However, there is a special case in which the task can be significantly simplified and accelerated. This is the case in which one of the logs is significantly smaller than the other. Consider the following problem:

There are 2 logs. The first log contains the web server log (same as in the previous task), the second file (100kb in size) contains the URL-> Theme match. Example 2nd file:

/toyota.php 	
auto 
/football/spartak.html 	
sport 
/cars 	
auto 
/finances/money 	
business

For each IP address, it is necessary to calculate the pages of which category from this IP address were loaded most often.

In this case, we also need to join 2 logs by URL. However, in this case, we do not have to run 3 MapReduce, since the second log will completely fit into memory. In order to solve the problem using the 1st MapReduce, we can load the second log into the Distributed Cache, and when the Mapper is initialized, simply read it into memory, putting it in the -> topic dictionary.

Further, the problem is solved as follows:

map:

# find the subject of each of the pages of the first log 
input_line -> [ip,  topic] 

reduce:


Ip -> [topics] -> [ip, most_popular_topic]

Reduce receives an ip and a list of all topics as input, it simply calculates which of the topics was encountered most often. Thus, the task is solved using the 1st MapReduce, and the actual Join generally takes place inside the map (therefore, if additional aggregation by key was not needed, the MapOnly job could be dispensed with):