Sample code for this post https://github.com/bomot113/Joining_MapReduce
MapReduce programming model is well known for its capability to process big data. The input and output throughout the computation are sets of key/value pairs. These pairs are usually stored in one big text file or file chunks split from an original data set. There is often the case that we will need to join different data sets in order to take the advantages of their relationships. Then, we would be wondering how to join these data sets together to become a key/value pair input for another MapReduce process. This post is supposed to show a good way to solve that problem.
Lets start first with defining a specific usage scenario: A university is monitoring power consumption of all its facilities. Each facility has many devices. A controlling system is used to read the electronic consumption of each device for every single minute. The data sets of the facilities, devices, and work logs are described as below:
Figure 1: Entity Relationship Diagram
Sample of data set 1: facility.txt
1206 BAGLEY HALL 1228 MAG H.S.C./H 1117 WEST RECEIVING STA
Sample of data set 2: device.txt
131 1206 Bagley 348 1228 HSC-G3 WC1 349 1228 HSC-G3 PCD-H-2-N03 352 1228 HSC-G3 PCD-H02-N01 284 1117 WRS-WA1 HSC & South Campus 285 1117 WRS-WA2_ERS_EA Main Tie 286 1117 WRS-WA3 Mid Campus
Data set 3: worklog.txt
131 2007-05-02 23:45:02-07 0 131 2007-05-03 00:00:02-07 4 131 2007-05-03 00:15:02-07 3 131 2007-05-03 00:30:02-07 3 131 2007-05-03 00:45:02-07 3
The requirements: The management wants the system to report the power consumption across the university for every one hour of each day in each building. We could solve this with a single SQL query:
SELECT facility.fid, facility.fName, date_trunc('hour', work_log.timelogged), SUM(work_log.value) FROM a02.facility facility INNER JOIN a02.device device ON facility.fid = device.fid INNER JOIN a02.work_log work_log ON device.did = work_log.did GROUP BY facility.fid, facility.shortname, date_trunc('hour', work_log. timelogged)
Steps to solve the problem:
1) Join the data sets:(a) Using 1st mapreduce to join 2 datasets facility and device into output1 with key:fid, fName | value: did (b) Using 2nd mapreduce to join 2 datasets output1 and work_log into output2 with key:fid, fName, time_logged | value: value
2) Use the 3rd mapreduce to compute the final result.
With the first step, we would have finished the mission that is to join the datasets using MapReduce. The second step seems to be redundant, but it actually helps us complete the business requirements and make our technical stuff more meaningful.
Joining the datasets:
The DBMS approach:
The idea behind the joining relations in DBMSs such as MySQL, Postgresql, etc. is using Cartesian product to concatenate these relations together and then using Selection to select rows which satisfies a certain condition as the outputs.
To join relation facility and device:
Facility Device fid fname 1 F1 2 F2 did fid dName 1 1 D1 2 1 D2 3 2 D3 Facility X Device fid fname did fid dname 1 F1 1 1 D1 1 F1 2 1 D2 1 F1 3 2 D3 2 F2 1 1 D1 2 F2 2 1 D2 2 F2 3 2 D3 Selection: Facility.fid = Device.fid fid fname did fid dname fid fname did fid dname 1 F1 1 1 D1 1 F1 2 1 D2 2 F2 3 2 D3
The MapReduce approach:
Taking a different approach, 2 relations Facility and Device will be placed one after another into one single relation by using the Union operation. We need an extra step adding empty fields into each relation which they are different from the other to make them union-compatible. All these steps will be carried out by the Map function with the output with key: the common key fid and values: fName, did, and dName.
#did_mapper.py #parsing lines line = line.strip() values = line.split('\t') # Only can we recognize relations by the number of their columns if (len(values) == 3): # this is a row in file device.txt did, fid, dName = values else: # This must be a row in file facility.txt fid, fName = values if (fid != "\N"): print "%s\t%s\t%s" % (fid, fName, did, dName)
Union Relation fid fname did dName 1 F1 2 F2 1 1 D1 1 2 D2 2 3 D3
The MapReduce frameworks such as Hadoop will sort the output from the Map function automatically by the key (fid here) before arriving at the Reduce function as its input.
Sorted by fid fid fname did dName fid fName did dName 1 F1 1 1 D1 1 2 D2 2 F2 2 3 D3
At the Reduce step, by going through all the rows of the input and grouping all consecutive rows that have the same key fid, we will be able to find out which fid and fName each pair did and dName belongs to. And ouput of this process would be:
# did_reducer.py for line in sys.stdin: # line = (fid, f_sName, did, dName) line = line.strip() values = line.split('\t') if (fid == "-1"): fid = values # This only works if hadoop has done the sorting if (fid != values): for did in dids: print "%s\t%s\t%s" % (fid, f_sName, did) dids =  fid = values # group by fid if (values == "-1"): # this row is of the facility relation as did has no value if (values != ""): f_sName = values if (values == "-1"): #this row is of the device relation as f_sName has no value dids.append(values)
Joining Facility and Device by fid fid fName did 1 F1 1 1 F1 2 2 F2 3
Dada. That’s it for the joining relations using MapReduce programming model.
We could do the similar steps to join the output in the previous step (merging Facility and Device together) with the work_log
Work_log relation did time_logged value 1 1/1/2013 12:05 3 2 1/1/2013 12:05 2 1 1/1/2013 14:30 5
I didn’t intend to demonstrate these steps again, as you get the idea already. Just keep in mind what the common key is in both relations, adding empty fields to make them Union-compatible, sorting them and then manipulating the rows that have the same key to get the expected results.
Final Results fid fName time_logged Value 1 F1 1/1/2013 12:05 3 1 F1 1/1/2013 12:05 2 1 F1 1/1/2013 14:30 5
Compute the final results:
As I have mentioned before, at this step we are making these technical numbers meaningful and visual to the readers. It’s also the reason why the big data is thriving these days, isn’t it?
At the Map function, we need to round the hour value of the time_logged and emit the key/value pairs output with key: fid, fName, and time_logged and value: value.
#15mins_mapper.py for line in sys.stdin: # line = did, fid, f_sname, time_logged, e line = line.strip() did, fid, f_sname, time_logged, e = line.split('\t') dt = datetime.datetime.strptime(time_logged, '%Y-%m-%d %H:%M:%S-%f') rounded_time = "%s:%s:00-00" % (dt.strftime('%Y-%m-%d %H'), str(int(math.ceil(float(dt.minute) / 15)) * 15)) print '%s\t%s\t%s\t%s' % (fid, rounded_time, f_sname, e)
The tricky thing we should pay attention to here is that the MapReduce framework has to sort a composite key of fid and rounded_time. We don’t need to tell Hadoop to do so for the previous MapReduce(s) because it will consider the first column as the primary key and sort this field by default. However, composite keys that contain 2 or more fields need configuring appropriately.
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming-1.1.2.jar \ -input did_wl_outputs/part-00004 \ -output pwr_outputs \ -file ~/hadoop/15mins_mapper.py \ -mapper 15mins_mapper.py \ -file ~/hadoop/15mins_op.py \ -reducer 15mins_op.py \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -jobconf mapred.text.key.partitioner.options=-k1,2
The Reduce function will group all the pairs having the same key fid, fName, and time_logged and sum all their values.
#15mins_reducer.py # parse the input we got from mapper.py fid, time, f_name, e_str = line.split('\t', 3) # convert e (currently a string) to int try: e = int(e_str) except ValueError: # count was not a number, so silently # ignore/discard this line continue # first line reading if current_time == None: current_time = time current_fid = fid count_e = 1 # this IF-switch only works because Hadoop sorts map output # by key (here: fid, time) before it is passed to the reducer if (fid != current_fid) | (current_time != time): print '%s\t%s\t%s\t%.2f' % (fid, f_name, current_time, total_e) current_time = time current_fid = fid total_e = e # aggregate function grouped by time total_e += e
The Reduce programing model can join relations like RDBMSs do. Any comments or questions, feel free to leave at the comment box below. I will get back to you as soon as possible.