COMP 130 Elements of Algorithms and Computation
Spring 2012

Parallel Distributed Processing with MapReduce

MapReduce is a software framework created and patented by Google to handle the massive computational requirements for their search engine and associated analysis processes.   To handle this huge load, processing is distributed over hundreds of thousands of servers spread across a dozen data centers ("server farms") around the globe.  Google's goal was to create a system where all that computational horsepower could be easily utilized for a wide variety of tasks that needed to be distributed across many, many computers.   Many similar systems have since cropped up including the open-source Hadoop system.  While the exact details of the entire MapReduce system are beyond the scope of this course, we will explore the basic algorithmic notions behind it and see if how programs are decomposed in order for them to be scaled up across thousands of processors.

It's important to understand that distributed, parallel processing techniques such as MapReduce won't automatically make every computation run faster.    For small problems, using MapReduce may even make the calculation slower!   The strength of these distributed computation techniques is in their scalability, their ability to handle extremely large and growing computations fast and easily.   Google has used MapReduce to turn their hundreds of thousands of separate servers into what is effectively a single commodity computing resource that anyone in Google can use to run global sized applications.

 The Basics of MapReduce

The name, "MapReduce", is an homage to some fundamental computational processes in computer science, "map" and "reduce", though MapReduce process doesn't exactly mimic these "higher order functions"  (functions that use other functions as inputs or outputs).    The essential flavor of these processes remains however.  

Map is the process of applying the same operation on every element of a data set.    In Python, this is basically a for loop like such:

    # "Map"-like for-loop
    result = []
    for x in aDataSet:
        result.append(aFunction(x))
    return result

The result of a map operation is usually another data set of the same size or a mutated original data set, where each new element corresponds to a processed original element.   Notice that since each element is processed separately, map operations conveniently lend themselves to parallel processing!   Mapping is such an important operation that it is built into Python as the map() function.

Reduce, on the other hand is the process of traversing a data set and accumulating a single result value in the process.   This is one of the more common techniques used in recursive algorithms.   In Python, a reduce process might look like this loop:

    # "Reduce"-like for-loop
    result = anInitialValue
    for x in aDataSet:
        result = aFunction(x, result)   # not that the new result depends on both the x value and the old result.
    return result

What we see here is a reduction of the data set into a single result value.    Reduce can be parallelized if we think in terms of reducing a very large set of data into a smaller set of data, where reduction is taking place in parallel on subsets of the original data.   Likewise, reducing is such an important function that it is built into Python as the reduce() function.

MapReduce can be broken down into two separate processes, the "Map" stage and the "Reduce" stage.    Initially, the data is presented to MapReduce as a dictionary of key-value pairs.

Map Stage:

Every key-value pair in the input data dictionary is sent to a different instance of the same map function.   Those map function instances could be spread over multiple machines or processors.   In essence, the role of the key in the input dictionary is to identify the data (the associated value) to be processed by a single map instance.

In MapReduce, the map function is defined as outputting zero or more key-value pairs where the output key does not necessarily (usually does not) correspond to the input key.    Different map instances may output the same key values.  

It is important to recognize that outputting values when processes are spread over a multitude of computers scattered across the globe is not as simple as "returning"  a value as one normally does in a Python function.   A special output function must be handed to the map function for it to use for outputting values.   This handily also solves the problem of outputting multiple key-value results while in the middle of the map process. 

The output key-value pairs are stored in an intermediate data dictionary storage for use in the reduce stage.    Since multiple map function instances may be outputting values for the same key value in the intermediate data dictionary, those multiple values are stored as a list of values for each key.

Reduce Stage:

Interestingly, the reduce stage process is abstractly identical to the map stage process.   Instead of using the original input data dictionary, the reduce stage uses the intermediate data dictionary produced by the map stage.    As in the map stage, multiple identical reduce function instances are spread across multiple machines and each reduce function receives one of the key-value pairs from the intermediate data dictionary.

The reduce function, just as the map function, outputs key-value pairs.   In general, however, the role of the reduce function is to reduce the possibly very large list of values associated with the key down to a single or just a few resultant values.   Normally, the output key from the reduce function is same as its input key.

MapReduce process

There are no restrictions on what the map and reduce functions can do.    This means that map and reduce can themselves, call a MapReduce process.    Thus problems can be decomposed into layers of nested MapReduce processes to make it easier to create massively large computations.

It should also be re-emphasized that this MapReduce discussion is a great simplification of the actual MapReduce system employed by Google and other availble implementations.    The actual system is far more sophisticated and optimized with additional functionality, particularly in the areas of process management and shared data access.

A Simple MapReduce Implementation

At the highest level, a MapReduce system is a black box that takes in 3 inputs:  a data dictionary, a map function and a reduce function.    The system does all the work of divvying up the data across multiple instances of the map and reduce functions and then returns a dictionary of the results.

In the following extremely simplistic MapReduce implementation, having a server farm of thousands of computers is simulated by running the map and reduce functions on multiple execution threads on a single computer.   The system breaks down into 3 pieces:

Threading is NOT required for Comp130!   

import threading
from collections import defaultdict

class FuncThread(threading.Thread):
    """A Thread subclass that is specialized to run a given function 
    with particular, given input parameters.
    """
    
    def __init__(self, name, procFunc, key, value, outputFunc):
        """ Constructor for the class.  Creates a new instance of FuncThread.
        name - a string name for the thread, used to identify the thread if necessary
        procFunc - the name of a function to be used to process the data on multiple threads 
        that has the following signature:  func(key, value, outputFunc)
        Note that any returned values from func() are discarded.  outputFunc is used to output results from func().
        key - an identifier for the given value.   Is passed to func().
        value - a value associated with the given key.  Is passed to func().
        outputFunc - the name of a function used to output the returned values from func().  Is passed to func().
        Usage:  myFuncThreadObject = FuncThread(aName, aProcessingFunction, aKey, aValue, anOutputFunction)
        """
        
        threading.Thread.__init__(self)   # initalize the superclass.
        # save the constructor's input parameters as attributes of the object, to be used later:
        self.setName(name)
        self.procFunc= procFunc
        self.key = key
        self.value = value
        self.outputFunc = outputFunc
    
    def run(self):
        """ This method is run on the new thread when the FuncThread's start() method is called.  
        The given func(key, value, outputFunc) is run.
        """
#        print self.getName()+" is running.\n"
        self.procFunc(self.key, self.value, self.outputFunc)

        
def runThreads(inputDict, aFunc):
    """
    Execute the given aFunc function on as many threads as elements in the given inputDict dictionary,
    where the input to each function is one of the key-value pairs.  aFunc must have the following 
    signature:
    func(key, value, outputFunc) 
    where key is an identifier for the value and value is anything for mapFunc but
    must be a list for reduceFunc (the aggregated list of values for that key as output
    by all calls to mapFunc).  outputFunc is an output function that 
    aFunc calls to output its result(s), which has the following signature:
    outputFunc(key, value)
    where key is the identifier for the given value.
    runThreads returns a defaultdict(list) where the keys are those output by aFunc and the values are
    a list of the values output by the multiple aFunc instances.
    """
    outputDict = defaultdict(list)
    fts = []  # intialize the list of FuncThreads
    for key, value in inputDict.items():   # Create FuncThreads for every entry in the mapFunc result dictionary
        fts.append(FuncThread("FuncThread("+str(key)+", "+ str(value)+")", aFunc, key, value, lambda k, v : outputDict[k].append(v)))
   
    print "runThreads: "+str(len(fts))+" "+aFunc.__name__+" threads."
    for ft in fts:  # Run all the FuncThreads
        ft.start()
    
    for ft in fts:  # wait for all the FuncThreads to finish.
        ft.join()
 #       print ft.getName() + " is done."        
    return outputDict
   

def mapReduce(aDict, mapFunc, reduceFunc):
    """Perform a MapReduce process on the given dictionary, aDict, using the given 
    map function, mapFunc, and reduce function, reduceFunc.   Both mapFunc and reduceFunc 
    have the following function signatures:   
    func(key, value, outputFunc) 
    where key is an identifier for the value and value is anything for mapFunc but
    must be a list for reduceFunc (the aggregated list of values for that key as output
    by all calls to mapFunc).  outputFunc is an output function that 
    mapFunc and reduceFunc call, which has the following signature:
    outputFunc(key, value)
    where key is the identifier for the given value.
    A dictionary of results, {key:[values]}, is returned where the keys and values are those
    outputted by reduceFunc.
    """
    
    # ----- Map Process: -------------    
    mapResultDict = runThreads(aDict, mapFunc)

    # ----- Reduce Process: -------------
    reduceResultDict = runThreads(mapResultDict, reduceFunc)

    return reduceResultDict    # return the results	
	

Unfortunately, while the above code does serve its purpose of correctly executing map and reduce functions on a given data set, a quirk in the way that Python handles threads means that the map and reduce function instances do not execute in parallel as much as one would like them to.    A better MapReduce simulation would use the multiprocessing module  instead of the threading module to more evenly distribute the work over all the cores in a multi-core processor machine.   Look here for a more sophisiticated MapReduce in Python example.      It should also be noted that because of the high overhead of creating threads, and the fact that if the number of required parallel processes is far beyond the number of available processors in the computer, that the above simulation will probably run slower than a non-MapReduce algorithm.    the benefits of MapReduce really can't be seen until a very large number of computers are involved in solving a very large problem.

Counting Words Example

The following code has utilizes these imports:

import MapReduce as mr
import text_analysis as ta  # This is the word and tuple counting text analysis code from lecture.   Your filename may differ. 
	

Let's recast our counting single words code (please review that material here and here) in terms of MapReduce.   Essentially, this means writing a appropriate map and reduce functions.   However, we need to write a couple of utility functions first to get our data in the right form.

First, we'll create a bunch of text data by "chunkifying" a large text file into smaller pieces.   To reiterate: The chunking process is just a convenience for us to easily create large numbers of data sets.   We could have achieved the same result by having tons of data files and reading each one in and associating each data file's contents with a key in the dictionary.     All the chunkifying does for us is to allow us the convenience of a single large data file and the freedom to easily change the size of each data set.    Do NOT take the chunkifying process as somehow fundamental to anything and something that has to be done!

Let's modify our chunkifying code so that it makes a dictionary instead of a list:

def makeChunkDict(aList, nElts):
    """ Break a given list into multiple smaller lists each of the given number of elements.
    Any leftover elements are ignored.
    A dictionary of the resultant lists is returned where the key for each text chunk
    is just a counter indicating where in the original text the chunk originated.
    """
    result = {}
    for i in range(0, len(aList), nElts):
        result[i] = aList[i:i+nElts]
    return result         
	

The resultant dictionary of text chunks will be used as the input data dictionary to MapReduce.    A parallel map and reduce processes will be created each text chunk, thus the larger the chunk size, the fewer processing instances (threads here) will be necessary and the smaller the chunk size, the larger the number of map/reduce processes needed.

The map function is very simple:  every word encountered is output as a key-value pair where the key is the word is the key and the number one (1) is the value.    Essentially, all the map function does is go through every word in its text chunk and output's "Got one!".     To mimic the output of the original word counting code from lecture, the output key will actually be a one element tuple.

def mapCountWords(k, v, outputFunc):
    """  Outputs every word in the given list of words with a count of one.
    k - the key that identifies the given value, v.    Not used.
    v - a list of words
    outputFunc - a function whose signature is:
    outputFunc(aKey, aValue)
    where mapCountWords will supply aKey = (aWord,) and aValue = 1.
    """
    for w in v:     # Go through all the words in the chunk of text
        outputFunc((w,), 1)    # Output each word as a one-element tuple with an associated count value of 1

After the mapCountWords runs on all the text chunks, the intermediate data dictionary will be of the form:   {aWord: [1, 1, 1, ...]}    The effect of the map process is essentially to rearrange the words such that all the same words are together, in the form of a list of ones corresponding to each word.

The job of the reduce function is thus simply to sum all the counts together for each word:

def reduceCountWords(k, v, outputFunc):
    """  Outputs the sum of the given value, v, as associated with the given key, k.
    k - a word
    v - a list of counts (1's) for that word
    outputFunc - a function whose signature is:
    outputFunc(aKey, aValue)
    where mapCountWords will supply aKey = k and aValue = sum(v).
    """
    outputFunc(k, sum(v))

The result is a dictionary of one-element tuples, holding the individual words, as the keys and the count for that word as the value.

To actually run the MapReduce process we simply create our dictionary of text chunks and call mapReduce with that dictionary , and the mapCountWords and reduceCountWords functions:

chunkDict = makeChunkDict(ta.inputText("texts/Green Eggs and Ham.txt"), 100)   # break the text into 100 word chunks

wordCount = mr.mapReduce(chunkDict, mapCountWords, reduceCountWords)  # Process using MapReduce

print wordCount  # Print the word counts

The result is exactly the same as our monolithic word counting code from earlier lectures.

So what have we gained by all of this?  Perhaps the most important take-away here is that a process such as counting words can be viewed as being composed of two seemingly separate parts: one which simply acknowledges the existiance of each word and in doing so, rearranges the words, and one which sums up all the occurances of any given word.   One could say that our original, monolithic word counting algorithm counted the words by keeping a running tally of each word's count as it traversed the text.    On the other hand, our MapReduce-based algorithm counts the words by first rearranging the words so that all the same words are together, meaning that each word's count could be tallied in isolation.   Do you see how the map process decoupled the words, thus enabling the summing of the counts to be done in parallel?

Class Exercise:  Modify mapCountWords() so that MapReduce will count tuples of a particular length.    How can you make the map function so that the order of the tuple (# of words in the tuple) is neither hardcoded in nor set as a global variable?    (Hint)

 

Something to remember:  MapReduce is so important to Google that it is common for interviewers to ask job and internship applicants to write an algorithm in terms of map and reduce.   And the first project that almost every new hire must complete is a MapReduce application of some sort.