• Joining Data Sets with Hadoop Streaming MapReduce and Python

    by  • October 11, 2014 • Home

    Sample code for this post https://github.com/bomot113/Joining_MapReduce

    Problem statement:

    MapReduce programming model is well known for its capability to process big data. The input and output throughout the computation are sets of key/value pairs. These pairs are usually stored in one big text file or file chunks split from an original data set. There is often the case that we will need to join different data sets in order to take the advantages of their relationships. Then, we would be wondering how to join these data sets together to become a key/value pair input for another MapReduce process. This post is supposed to show a good way to solve that problem.

    Lets start first with defining a specific usage scenario: A university is monitoring power consumption of all its facilities. Each facility has many devices. A controlling system is used to read the electronic consumption of each device for every single minute. The data sets of the facilities, devices, and work logs are described as below:


    Figure 1: Entity Relationship Diagram


    Sample of data set 1: facility.txt

    1206		BAGLEY HALL
    1228		MAG H.S.C./H

    Sample of data set 2: device.txt

    131	1206	Bagley
    348	1228	HSC-G3 WC1
    349	1228	HSC-G3 PCD-H-2-N03
    352	1228	HSC-G3 PCD-H02-N01
    284	1117	WRS-WA1 HSC & South Campus
    285	1117	WRS-WA2_ERS_EA Main Tie
    286	1117	WRS-WA3 Mid Campus

    Data set 3: worklog.txt

    131	2007-05-02 23:45:02-07	0
    131	2007-05-03 00:00:02-07	4
    131	2007-05-03 00:15:02-07	3
    131	2007-05-03 00:30:02-07	3
    131	2007-05-03 00:45:02-07	3

    The requirements: The management wants the system to report the power consumption across the university for every one hour of each day in each building. We could solve this with a single SQL query:

      SELECT  facility.fid,
              date_trunc('hour', work_log.timelogged),
      FROM a02.facility facility
        INNER JOIN a02.device device
          ON facility.fid = device.fid
        INNER JOIN a02.work_log work_log
          ON device.did = work_log.did
      GROUP BY  facility.fid,
                date_trunc('hour', work_log. timelogged)

    Steps to solve the problem:

    1) Join the data sets:(a) Using 1st mapreduce to join 2 datasets facility and device into output1 with key:fid, fName | value: did (b) Using 2nd mapreduce to join 2 datasets output1 and work_log into output2 with key:fid, fName, time_logged | value: value

    2) Use the 3rd mapreduce to compute the final result.

    With the first step, we would have finished the mission that is to join the datasets using MapReduce. The second step seems to be redundant, but it actually helps us complete the business requirements and make our technical stuff more meaningful.


    1. Joining the datasets:

      The DBMS approach:

      The idea behind the joining relations in DBMSs such as MySQL, Postgresql, etc. is using Cartesian product to concatenate these relations together and then using Selection to select rows which satisfies a certain condition as the outputs.

      To join relation facility and device:

      Facility Device
      fid fname
      1 F1
      2 F2
      did fid dName
      1 1 D1
      2 1 D2
      3 2 D3
      Facility X Device
      fid fname did fid dname
      1 F1 1 1 D1
      1 F1 2 1 D2
      1 F1 3 2 D3
      2 F2 1 1 D1
      2 F2 2 1 D2
      2 F2 3 2 D3
      Selection: Facility.fid = Device.fid
      fid fname did fid dname
      fid fname did fid dname
      1 F1 1 1 D1
      1 F1 2 1 D2
      2 F2 3 2 D3

      The MapReduce approach:

      Taking a different approach, 2 relations Facility and Device will be placed one after another into one single relation by using the Union operation. We need an extra step adding empty fields into each relation which they are different from the other to make them union-compatible. All these steps will be carried out by the Map function with the output with key: the common key fid and values: fName, did, and dName.

      	  #parsing lines
            line = line.strip()
            values = line.split('\t')
      	  # Only can we recognize relations by the number of their columns
            if (len(values) == 3):
              # this is a row in file device.txt
              did, fid, dName = values
              # This must be a row in file facility.txt
              fid, fName = values
            if (fid != "\N"):
              print "%s\t%s\t%s" % (fid, fName, did, dName)
      Union Relation
      fid fname did dName
      1 F1
      2 F2
      1 1 D1
      1 2 D2
      2 3 D3

      The MapReduce frameworks such as Hadoop will sort the output from the Map function automatically by the key (fid here) before arriving at the Reduce function as its input.

      Sorted by fid
      fid fname did dName
      fid fName did dName
      1 F1
      1 1 D1
      1 2 D2
      2 F2
      2 3 D3

      At the Reduce step, by going through all the rows of the input and grouping all consecutive rows that have the same key fid, we will be able to find out which fid and fName each pair did and dName belongs to. And ouput of this process would be:

      # did_reducer.py
          for line in sys.stdin:
            # line = (fid, f_sName, did, dName)
            line = line.strip()
            values = line.split('\t')
            if (fid == "-1"):
              fid = values[0]
            # This only works if hadoop has done the sorting
            if (fid != values[0]):
              for did in dids:
                print "%s\t%s\t%s" % (fid, f_sName, did)
              dids = []
              fid = values[0]
            # group by fid
            if (values[2] == "-1"):
              # this row is of the facility relation as did has no value
              if (values[1] != ""):
                f_sName = values[1]
            if (values[1] == "-1"):
              #this row is of the device relation as f_sName has no value
      Joining Facility and Device by fid
      fid fName did
      1 F1 1
      1 F1 2
      2 F2 3

      Dada. That’s it for the joining relations using MapReduce programming model.

      We could do the similar steps to join the output in the previous step (merging Facility and Device together) with the work_log

      Work_log relation
      did time_logged value
      1 1/1/2013 12:05 3
      2 1/1/2013 12:05 2
      1 1/1/2013 14:30 5

      I didn’t intend to demonstrate these steps again, as you get the idea already. Just keep in mind what the common key is in both relations, adding empty fields to make them Union-compatible, sorting them and then manipulating the rows that have the same key to get the expected results.

      Final Results
      fid fName time_logged Value
      1 F1 1/1/2013 12:05 3
      1 F1 1/1/2013 12:05 2
      1 F1 1/1/2013 14:30 5
    2. Compute the final results:

      As I have mentioned before, at this step we are making these technical numbers meaningful and visual to the readers. It’s also the reason why the big data is thriving these days, isn’t it?

      At the Map function, we need to round the hour value of the time_logged and emit the key/value pairs output with key: fid, fName, and time_logged and value: value.

          for line in sys.stdin:
          # line = did, fid, f_sname, time_logged, e
          line = line.strip()
          did, fid, f_sname, time_logged, e  = line.split('\t')
          dt = datetime.datetime.strptime(time_logged, '%Y-%m-%d %H:%M:%S-%f')
          rounded_time = "%s:%s:00-00" % (dt.strftime('%Y-%m-%d %H'),
                    str(int(math.ceil(float(dt.minute) / 15)) * 15))
          print '%s\t%s\t%s\t%s' % (fid, rounded_time, f_sname, e)

      The tricky thing we should pay attention to here is that the MapReduce framework has to sort a composite key of fid and rounded_time. We don’t need to tell Hadoop to do so for the previous MapReduce(s) because it will consider the first column as the primary key and sort this field by default. However, composite keys that contain 2 or more fields need configuring appropriately.

        $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming-1.1.2.jar \
        -input did_wl_outputs/part-00004 \
        -output pwr_outputs \
        -file ~/hadoop/15mins_mapper.py \
        -mapper 15mins_mapper.py \
        -file ~/hadoop/15mins_op.py \
        -reducer 15mins_op.py \
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
        -jobconf mapred.text.key.partitioner.options=-k1,2

      The Reduce function will group all the pairs having the same key fid, fName, and time_logged and sum all their values.

          # parse the input we got from mapper.py
          fid, time, f_name, e_str = line.split('\t', 3)
          # convert e (currently a string) to int
              e = int(e_str)
          except ValueError:
              # count was not a number, so silently
              # ignore/discard this line
          # first line reading
          if current_time == None:
            current_time = time
            current_fid = fid
            count_e = 1
          # this IF-switch only works because Hadoop sorts map output
          # by key (here: fid, time) before it is passed to the reducer
          if (fid != current_fid) | (current_time != time):
              print '%s\t%s\t%s\t%.2f' % (fid, f_name, current_time, total_e)
              current_time = time
              current_fid  = fid
              total_e = e
          # aggregate function grouped by time
          total_e += e

      The Reduce programing model can join relations like RDBMSs do. Any comments or questions, feel free to leave at the comment box below. I will get back to you as soon as possible.


    1. http://allthingshadoop.com/2011/12/16/simple-hadoop-streaming-tutorial-using-joins-and-keys-with-python/
    2. http://hadoop.apache.org/docs/stable/streaming.html