5.1 Map only job
It's time to describe various techniques that allow you to effectively use MapReduce to solve practical problems, as well as show some of the features of Hadoop that can simplify development or significantly speed up the execution of a MapReduce task on a cluster.
As we remember, MapReduce consists of Map, Shuffle and Reduce stages. As a rule, the Shuffle stage turns out to be the most difficult in practical tasks, since data is sorted at this stage. In fact, there are a number of tasks in which the Map stage alone can be dispensed with. Here are examples of such tasks:
- Data filtering (for example, "Find all records from the IP address 123.123.123.123" in the web server logs);
- Data transformation (“Delete column in csv-logs”);
- Loading and unloading data from an external source (“Insert all records from the log into the database”).
Such tasks are solved using Map-Only. When creating a Map-Only task in Hadoop, you need to specify zero number of reducers:
An example of a map-only task configuration on hadoop:
native interface | Hadoop Streaming Interface |
---|---|
Specify zero number of reducers when configuring job'a:
|
We do not specify a reducer and specify a zero number of reducers. Example:
|
Map Only jobs can actually be very useful. For example, in the Facetz.DCA platform, to identify the characteristics of users by their behavior, it is precisely one large map-only that is used, each mapper of which takes a user as an input and returns his characteristics as an output.
5.2 Combine
As I already wrote, usually the most difficult stage when performing a Map-Reduce task is the shuffle stage. This happens because the intermediate results (mapper's output) are written to disk, sorted and transmitted over the network. However, there are tasks in which such behavior does not seem very reasonable. For example, in the same task of counting words in documents, you can pre-aggregate the results of the outputs of several mappers on one map-reduce node of the task, and pass the already summed values for each machine to the reducer.
In hadoop, for this, you can define a combining function that will process the output of part of the mappers. The combining function is very similar to reduce - it takes the output of some mappers as input and produces an aggregated result for these mappers, so the reducer is often used as a combiner as well. An important difference from reduce is that not all values corresponding to one key get to the combining function .
Moreover, hadoop does not guarantee that the combine function will be executed at all for the output of the mapper. Therefore, the combining function is not always applicable, for example, in the case of searching for the median value by key. Nevertheless, in those tasks where the combining function is applicable, its use allows to achieve a significant increase in the speed of the MapReduce task.
Using the Combiner on hadoop:
native interface | Hadoop streaming |
---|---|
When configuring job-a, specify the class-Combiner. As a rule, it is the same as Reducer:
|
Specify the -combiner command in the command line options. Typically, this command is the same as the reducer command. Example:
|
5.3 MapReduce task chains
There are situations when one MapReduce is not enough to solve a problem. For example, consider a slightly modified WordCount task: there is a set of text documents, you need to count how many words occurred from 1 to 1000 times in the set, how many words from 1001 to 2000, how many from 2001 to 3000, and so on. For the solution, we need 2 MapReduce jobs:
- Modified wordcount, which for each word will calculate which of the intervals it fell into;
- A MapReduce that counts how many times each interval was encountered in the output of the first MapReduce.
Pseudo code solution:
|
|
|
|
In order to execute a sequence of MapReduce tasks on hadoop, it is enough just to specify the folder that was specified as output for the first one as input for the second task and run them in turn.
In practice, chains of MapReduce tasks can be quite complex sequences in which MapReduce tasks can be connected both sequentially and in parallel to each other. To simplify the management of such task execution plans, there are separate tools like oozie and luigi, which will be discussed in a separate article in this series.
5.4 Distributed cache
An important mechanism in Hadoop is the Distributed Cache. Distributed Cache allows you to add files (eg text files, archives, jar files) to the environment where the MapReduce task is running.
You can add files stored on HDFS, local files (local to the machine from which the task is launched). I've already implicitly shown how to use Distributed Cache with hadoop streaming by adding the mapper.py and reducer.py files via the -file option. In fact, you can add not only mapper.py and reducer.py, but arbitrary files in general, and then use them as if they were in a local folder.
Using Distributed Cache:
Native API |
---|
|
Hadoop Streaming |
---|
#we list the files that need to be added to the distributed cache in the –files parameter. The --files option must come before the other options.
usage example:
|
5.5 Reduce Join
Those who are accustomed to working with relational databases often use the very convenient Join operation, which allows them to jointly process the contents of some tables by joining them according to some key. When working with big data, this problem also sometimes arises. Consider the following example:
There are logs of two web servers, each log looks like this:
t\t
Log snippet example:
1446792139
178.78.82.1
/sphingosine/unhurrying.css
1446792139
126.31.163.222
/accentually.js
1446792139
154.164.149.83
/pyroacid/unkemptly.jpg
1446792139
202.27.13.181
/Chawia.js
1446792139
67.123.248.174
/morphographical/dismain.css
1446792139
226.74.123.135
/phanerite.php
1446792139
157.109.106.104
/bisonant.css
It is necessary to calculate for each IP address which of the 2 servers it visited more often. The result should be in the form:
\t
An example of a part of the result:
178.78.82.1
first
126.31.163.222
second
154.164.149.83
second
226.74.123.135
first
Unfortunately, unlike relational databases, in general, joining two logs by key (in this case, by IP address) is a rather heavy operation and is solved using 3 MapReduce and the Reduce Join pattern:
ReduceJoin works like this:
1) For each of the input logs, a separate MapReduce (Map only) is launched, converting the input data to the following form:
key -> (type, value
Where key is the key to join tables on, Type is the type of the table (first or second in our case), and Value is any additional data bound to the key.
2) The outputs of both MapReduces are fed to the input of the 3rd MapReduce, which, in fact, performs the union. This MapReduce contains an empty Mapper that simply copies the input. Next, shuffle decomposes the data into keys and feeds it to the reducer as input:
key -> [(type, value)]
It is important that at this moment the reducer receives records from both logs, and at the same time, it is possible to identify by the type field which of the two logs a particular value came from. So there is enough data to solve the original problem. In our case, the reducer simply has to calculate for each record key which type has encountered more and output this type.
5.6 MapJoins
The ReduceJoin pattern describes the general case of joining two logs by key. However, there is a special case in which the task can be significantly simplified and accelerated. This is the case in which one of the logs is significantly smaller than the other. Consider the following problem:
There are 2 logs. The first log contains the web server log (same as in the previous task), the second file (100kb in size) contains the URL-> Theme match. Example 2nd file:
/toyota.php
auto
/football/spartak.html
sport
/cars
auto
/finances/money
business
For each IP address, it is necessary to calculate the pages of which category from this IP address were loaded most often.
In this case, we also need to join 2 logs by URL. However, in this case, we do not have to run 3 MapReduce, since the second log will completely fit into memory. In order to solve the problem using the 1st MapReduce, we can load the second log into the Distributed Cache, and when the Mapper is initialized, simply read it into memory, putting it in the -> topic dictionary.
Further, the problem is solved as follows:
map:
# find the subject of each of the pages of the first log
input_line -> [ip, topic]
reduce:
Ip -> [topics] -> [ip, most_popular_topic]
Reduce receives an ip and a list of all topics as input, it simply calculates which of the topics was encountered most often. Thus, the task is solved using the 1st MapReduce, and the actual Join generally takes place inside the map (therefore, if additional aggregation by key was not needed, the MapOnly job could be dispensed with):
GO TO FULL VERSION