Sector 0

November 15, 2008

Data Parallelism in Functional Programming

In this small article I will show you how you can use the power of functional programming to write data structures for parallel processing that scales well on any number of cores.

What is one of the primary concerns for programmers right now? Writing high performance code that scales well regardless of whether the software is running on a CPU with one, two or 1000 cores.

Todays software does not scale well, and one of the major obstacles is making it run efficiently on multiple cores without encountering the problems that are bound to arise when programming with multiple threads. When dealing with parallel processing data parallelism is a great tool to accomplish the goal of writing highly scalable software.

In functional programming a list is one of the most fundamental data structures you can find. Furthermore it is also great for – say – iterating, and it is very easy to make parallel. I will show you a few examples written in F# – a functional programming language for the .NET platform with roots in ML and OCaml. I will demonstrate the principles with a simple data type (PList) that can be used in a wide variety of ways for processing data, and which utilizes the number of cores in your computer without you having to worry about it.

Finally I will endulge in a bit of theory for extending this data structure to span multiple machines connected in a computing cluster.

Constructing the Data Structure

A list is, as I mentioned earlier, a singly linked list. Each node contains an element e of some generic type ‘a, and it also has a pointer to the next node in the list.

A singly linked list

In order to parallelize this data structure we need to break it up into two or more bits depending on the number of cores. There is another aspect to consider here and that is whether or not we risk performing some I/O or other stuff on each element that potentially will pause the thread. A rule of thumb says to allocate twice as many threads as there are cores in the CPU(s), but for this example we will just use the number of cores in the machine.

So how do we decide on the scheme on which we base the data structure of the parallel list? Consider this: We have a list of integers between 1 and 10 and we want to construct a new list by mapping the function x*x on each element. In F# it might look like this:

let sqrlist = List.map (fun x -> x*x) [1..10]

The List.map function iterates through the entire list, applies the (here anonymous) function to each element, and adds the result to the new list.

This is all very well but it is strictly sequential. If we split the list into two separate lists, each containing half the elements of the original list, we can apply List.map to two smaller lists in parallel. But splitting the list is not feasible – the list needs to be partitioned beforehand. And to do this I use a queue to act as the placeholder for each of the sequential sublists. Each time I want to add an element to the parallel list I simply dequeues the first element (which is a sequential list), adds the element to that list, and enqueues the result. This ensures that the list is always balanced, and adding the values 1..10 in a parallel list with two sublist will result in the following:

The first 3 insertions yields the following results

And continuing with all 10 insertions will yield this:

So the first order of business is to define the queue. In the .NET generic collections library there is an implementation, but it is purely imperative and I wanted to use a purely functional one. So I implemented one following the template from the book Purely Functional Data Structures by Chris Okasaki.

type Queue<'a>() =
    let mutable outlist:'a list = []
    let mutable inlist:'a list = []
 
    member q.Enqueue v = inlist <- v::inlist
    member q.Dequeue() =
        match outlist with
        | h::t  ->  outlist <- t
                    h
        | []    ->  match List.rev inlist with
                    | []    -> failwith "Empty queue"
                    | h::t  ->  outlist <- t
                                inlist <- []
                                h
    member q.to_list() = List.rev inlist |> List.append outlist
    member q.Clear() =
        outlist <- []
        inlist <- []
    member q.Length with get() = (List.length outlist)+(List.length inlist)

This data structure does not perform quite as well as the imperative one from the .NET library but it has the benefit of being purely functional, enabling you to implement it in your language of choice.

The next line of code is just to create a type alias between a queue of lists of type ‘a with the type name ‘a plist.

type 'a plist = Queue<’a list>

Next comes the module that holds the interesting functions. In the standard list this module is called List so I will call this module PList. This module contains a few functions that are not part of the original List module, but remember that this is just for playing around and showing you how it works.

The first of these functions is create_empty which – like the name implies – creates an empty plist. The signature is

create_empty : unit -> 'a plist

and the implementation is

/// Create an empty plist
let create_empty<'a>() =
    let plist = new Queue<'a list>()
    [1..Environment.ProcessorCount] |> List.iter (fun _ -> plist.Enqueue [])
    plist

The next function is add which adds a single element to the list. It is similar to the cons (‘::’) operator on regular lists. As described earlier it dequeues the first list in the queue, adds the element to this list and enqueues it in the list again. This is done in order to preserve order as well as keeping the parallel list balanced. The signature is

add : 'a plist -> 'a -> 'a plist

and the implementation is

/// Create a new list (in order to preserve immutable state) by adding
/// an element to the original. Rotate the sequential lists in order
/// to balance the data.
let add (plist:'a plist) e =
    let newList = new Queue<'a list>()
    List.iter (fun lst -> plist.Dequeue() |> newList.Enqueue) (plist.to_list())
    let lst = newList.Dequeue()
    newList.Enqueue (e::lst)
    newList

Then there is the function from_list which serves no other purpose than providing you with a single function to add a whole list of elements in one go. The signature is

from_list : 'a list -> 'a plist

and the implementation looks like this

/// Create a new plist from the sequential list lst
let from_list (lst:'a list) =
    let newList = create_empty<'a>()
    List.iter (fun e -> newList.Enqueue (e::newList.Dequeue())) lst
    newList

Finally there are a few interesting functions that do exist in the original List module. The first one is map. As you may – or may not – know this function takes a list of elements as well as a function and returns a new list. The elements of the new list is the result of applying the function to each element of the first list and it has the signature

map : ('a -> 'b) -> 'a plist -> 'b plist

What this parallel version does is simply to use a thread for each sequential list in the queue, here using the async monad from the F# library. The async monad does not spawn a thread per se, but puts a job on the .NET thread pool. The thread pool hold a number of idle threads and you can use them for anything asynchronous. This is just so the CLR and the operating system won’t have to go through the hard work of creating and destroying threads all the time.

Lets get back on track again. Each thread (or asynchronous task) uses the standard List.map, and the results from all the threads are compiled into a new parallel list by enqueueing the new sequential lists in a new queue. The implementation is

/// Asynchronously map the function to all elements and return a new
/// parallel list with the result.
let map f (plist:'a plist) =
    let map_async f lst = async {  return List.map f lst }
    let tasks = plist.to_list() |> List.map (fun lst -> map_async f lst)
    let newList = new Queue<'a list>()
    Async.Run(Async.Parallel tasks) |> Array.iter (fun seqList -> newList.Enqueue seqList)
    newList

The next function is length which just returns the number of elements in the parallel list. It has the signature

length : 'a plist -> int

and is implemented like this

/// Calculate the length of the parallel list by adding the lengths of the sequential lists
let length (plist:'a plist) =
    plist.to_list() |> List.map (fun lst -> List.length lst) |> List.sum

The last function is iter. This does the same as map except that it just iterates through the list without creating a new one. This is the equivalent of running an imperative for-loop over the list [n..m], and has the signature

iter : ('a -> unit) -> 'a plist -> unit

and the implementation of iter is

/// Iterate through all elements by doing it asynchronously for all
/// sequential lists.
let iter f (plist:'a plist) =
    let iter_async f lst = async {  List.iter f lst }
    let tasks = plist.to_list() |> List.map (fun lst -> iter_async f lst)
    Async.Run(Async.Parallel tasks) |> ignore

Load Balancing

I wrote earlier that there might be a problem if you use this approach for processing asynchronous tasks. How can that be? Well, suppose you have 2 cores in your CPU. You create a parallel list using two sequential lists. This parallel list contains URL’s that you want to fetch with the map function, in effect creating a new list of strings with the HTML in. So when you apply the function fetch_url to PList.map the PList will create two asynchronous tasks – one for each sequential list in the parallel list. What happens if one of the threads tries to fetch an URL from a really slow server? The entire thread pauses until all the data is fetched, leaving just one other thread to continue while the first one waits. So here you might want to spawn, say, 2 asynchronous tasks for each core. This will provide the CPU with more threads to work with in case anyone sits idle.

On the other hand, if the list consists of something that does not make the threads wait, for example a list of strings that needs to be parsed, too many threads will only slow things down since the CPU now has to switch between too many threads.

I invite you to experiment with this. You can provide a “loadfactor” to the create_empty function.

Extending PList to Span Multiple Machines

If we introduce the notion of “location” to the code, we can begin to experiment in another way. Consider this: A normal computer can be viewed as depicted below

A computer can have a number of CPU’s that in turn can have a number of cores. A computing cluster can be viewed similarly by extending the graph like this

where the individual computers are connected by some sort of network. What if we switch the nodes above out with a “location” object and use such a graph when we create and use our parallel data structures? We would use such a location object when creating our data structures; I could use a “local CPU location” object to create a parallel list running in local process space with the same number of sequential lists inside as there are cores in this particular CPU. Or I could use the top node of my location graph – the “cluster location” object to create a parallel list that spans the entire cluster of computers, in effect creating a distributed parallel data structure for my processing and storing of data. This is just something I am experimenting with.

6 Comments »

  1. Hey,

    I was thinking, wouldn’t it be simplier to extend existing List module with some function, let’s call it ‘pmap’ which will call asynchronously given ‘mapping’ function on each element using ThreadPool tasks, or better, Task Parallel Library Tasks?

    Comment by scypior — November 15, 2008 @ 11:35

  2. @scypior

    It would not be simpler to just add a ‘pmap’ function to the existing List module due to one fact: in order to parallelize a task on that list would require you to split the list in two (if you had two cores) first. This would require you to iterate through the list first in order to generate the sublists. This would be okay , though, for small lists with tasks that are ‘heavy’ and require proportionally long time to run.

    Iterating through the list and putting a task for each element on the thread pool is not feasible; the construction of the async task is usually more heavy than the task itself, and if the list is large the thread pool would just get congested.

    I hope this answers your questions.

    Comment by frank — November 15, 2008 @ 13:09

  3. Thanks Frank.
    If this is the general case, is the GUI “thread” a special case?

    Comment by Art Scott — November 16, 2008 @ 4:30

  4. @Art Scott

    I’m not sure what you mean; The GUI thread is always a special case, but the threads spawned in this example has nothing to do with that particular thread. Could you please elaborate?

    Comment by frank — November 16, 2008 @ 11:42

  5. Yeah, spawning tasks using TPL has got its overhead. Delegate calling has overhead too.

    F# functions on the other hand, are kinda light weighted from what I’ve known, and differs from delegates. Wouldn’t it be great if there was a way to call such functions asynchronously not using underlying .NET delegates? Or maybe I’ve got something totally wrong:)

    Comment by scypior — November 16, 2008 @ 12:01

  6. [...] Data Parallelism in Functional Programming – Frank Thomsen looks at one of the often touted major benefits of functional programming, parallelism, looking at data structures that allow parallelism, and some methods of dividing the work up. [...]

    Pingback by Reflective Perspective - Chris Alcock » The Morning Brew #226 — November 19, 2008 @ 9:10

RSS feed for comments on this post. TrackBack URL

Leave a comment

Powered by WordPress