2016-11-25

# Data Structures for Parallel Computing

Week four lecture notes about Parallel Programming on Coursera.

## Implementing Combiners

In the previous week, we learn about transformer operations and parallel collections. In this week, we will study the implementations of these methods. We will see how combiners are implemented for various collections, and learn about data structures that are more amenale for parallel computations.

### Combiners

Recall that from previous week, combiners are obstruction to implement parallel transformer operations. A combiner is a builder with an additional method called combine, which combine elements from two input combiners, and produces a third new combiner:

trait Combiner[T, Repr] extends Builder[T, Repr]{
def combine(that: Combiner[T, Repr]): Combiner[T, Repr]
}


After the combine method returns, the old combiners can no longer be used but the new one can be used in the parallel program.

Before we start, let’s consider the meaning of combine. It turns out that the meaning of combine depends on the underlying data structure:

• when Repr is a set or a map, combine represents union.
• when Repr is a sequence, combine represents concatenation.

In both cases, the combine operation must be efficient. By efficient we mean it must execute in all $O(log(n) + log(m))$ time where $n$ and $m$ are the sizes of the two input combiners.

### Array Concatenation

We argue that arrays cannot be efficiently concatenated.

Here is why: computer memory can be visualized as a long tape. Any data structure that we use occupied a subset of locations on this state. Ar array of length n is a very simple data structure that occupies a contiguous block of memory. Two arrays, xs and ys can be at arbitrary positions on this state. The resulting array must also be a continuous block of memory. If xs and ys are adjacent, we could return the resulting array pointing to the starting of xs. In this case, we donot need to copy anything. But in most cases, xs and yx could be anywhere, so we are forced to copy their elements to produce a contiguous, uninterrupted block. The copying operation takes $O(n+m)$ time.

### Sets

Let’s consider some other data structures to implement sets and maps, and the efficiency of their operations.

Sets support efficient lookup, insertion and deletion, and can be implemented using following data structures:

• hash tables - $O(1)$ time for lookup, insertion and deltion
• balanced trees - $O(log n)$ time for lookup, insertion and deletion
• linked list - $O(n)$ time for lookup, insertion and deletion

But most set implementations do not have efficient union operation.

### Sequences

Next we consider data structures to implement sequences. Operation complexity for sequences can vary depend on the underlying data structure:

• mutable linked list - $O(1)$ for prepend and append, $O(n)$ for insertion
• functional(cons) lists - $O(1)$ for prepend operations, eveything else takes $O(n)$
• array lists - amortized $O(1)$ for append, $O(1)$ for random access, otherwise takes $O(n)$

Mutable linked list can have $O(1)$ concatenation, but for most sequences, concatenation is $O(n)$.

Above analysis should have convinced us that implementing combiners efficiently is not a trivial, since most of data structures do not have efficient concatenation or union.

## Parallel Two-phase Construction

In this section, we introduce a technique called parallel two-phase data structure construction to implement combiners efficiently.

### Two-phase construction

In fact, it turns out that most data structures can be constructed in parallel using two-phase construction. Previously, we insisted that a combiner and the resulting collection should have the same underlying data structure. For example, we assume that a combiner that produces an array must internally contain an array at the point when its combine method is called.

In two-phase construction, the combiner has an intermediate data structures as its internal representation. This intermediate data structures is different from the resulting data structure, and it has the following properties:

• it has an efficient combine method, and it running time is $O(log(n)+ log(m))$ time or better.
• it has an efficient += method. This ensures that individual processors can efficiently modify the data structure.
• can be converted to the resulting data structure in $O(\frac{n}{P})$ time, where n is the size of data structure and P is the number of processors. In other words, the result method is allowed to copy the entire intermediate data structure, but this copying process must be parallelizable

Together, these properties allow building the resulting data structure in two phases:

• In the first phase, different processors build intermediate data structures in parallel by invoking the += method. These intermediate data structures are then combined in a parallel reduction tree until there is a single intermediate data structure at the root.
• In the second phase, the result method uses the intermediate data structure to create the final data structure in parallel.

#### Example: array combiner

Having seen the high-level overview of how the two-phase parallel construction works, we now turn to a concrete example, a two-phase array combiner. To keep things simple, we will limit our ArrayCombiner class to reference objects, expressed with a upper bound of the type parameter T. We also add the ClassTag context bound to be able to instantiate the resulting array and the parallelism level argument.

class ArrayCombiner[T <: AnyRef: ClassTag](val parallelism: Int){
private var numElems = 0
private val buffers = new ArrayBuffer[ArrayBuffer[T]]
buffers += new ArrayBuffer[T]
}


Interally, the ArrayCombiner keeps the field numElems to store the number of elements in the combiner, and the nested ArrayBuffer used to store the elements.

First, let’s implement the += method:

def +=(x: T) = {
buffers.last += x
numElems += 1
this
}


This method finds the last nested array buffer in buffers and appends the element x to it. If the last nested ArrayBuffer ever gets full, it is expanded to accommodate more elements. The amortized running time of += is $O(1)$.

Next, we implement the combine method:

def combine(that: ArrayCombiner[T]) = {
buffers ++= that.buffers
numElems += that.numElems
this
}


Here, the reason to use nested array buffers becomes obvious. The combine method simply copies the references of the argument combiners buffers to its own buffers field. It does not need to copy the actual contents of those nested buffers, only a pointer to them. The number of computational steps is equal to the number of nested array buffers in the argument combiner. Since every array combiner is first created with only one nested array buffer, and there are exactly P array combiners created in the reduction tree, one for each processor, the buffers field will never have more than P entries. For this reason, the running of this combine method is $O(P)$.

Finally, we implement the result method:

private def copyTo(array: Array[T], from: Int, end: Int): Unit = {
var i = from
var j = 0
while (i >= buffers(j).length) {
i -= buffers(j).length
j += 1
}
var k = from
while (k < end) {
array(k) = buffers(j)(i)
i += 1
if (i >= buffers(j).length) {
i = 0
j += 1
}
k += 1
}
}

def result: Array[T] = {
val array = new Array[T](numElems)
val step = math.max(1, numElems / parallelism)
val starts = (0 until numElems by step) :+ numElems
val chunks = starts.zip(starts.tail)
val tasks = for((from,end) <- chunks) yield
array
}


Once we have the root intermediate data structures, we know the required size of the array from the numElems field, so we allocate the resulting array. We then divide the array indices into chunks, pairs of starting and ending indices that each parallel task should in parallel copy. We start these tasks, wait for their completion, and then return the array.

#### Two-phase construction for arrays

In two-phase construction, the crucial step is picking the right intermediate data structure. This intermediate data structure must partition the element space into buckets. The array combiner partitioned the elements according to their index into distinct subintervals. It turned out that these subintervals can be combined easily.

For other data structures, we need to find alternative ways of partitioning the element space.

#### Two-phase construction for hash tables

The two-phase construction for hash tables is as follows:

1. In the first phase, we partition the hash codes into buckets
2. In the second phase, we allocate the table, and map hash codes from different buckets into different regions

#### Two-phase construction for Search Trees

The two-phase construction for search trees is as follows:

1. In the first phase, we partition the elements into non-overlapping intervals according to their ordering
2. In the second phase, we construct search trees in parallel, and link non-overlapping trees

#### Two-phase construction for spatial data structures

The two-phase construction for spatial data structures is as follows:

1. spatially parition the elements
2. construct non-overlapping subsets and link them

### Implementing combiners

In addition to two-phase constructions, we have two more option to implement combiners:

1. a data structure with an efficient concatenation or union - a preferred way when the resulting data structure allows this.
2. use a concurrent data structure - different combiners share the same underlying data structure, and rely on synchronization to correctly update the data structure when += is called.

In the following sections, we will introduce a data structure that support efficient concatenation, called Conc-trees.

## Conc-Trees

Conc-Trees is a counterpart to functional(cons) list and is used to manipulate data. With Conc-trees, we can implement an efficient concatenation method.

### Conc data type

Because trees are not good for parallelism unless they are balanced, we devise a data type called Conc to represent balanced trees:

sealed trait Conc[+T]{
def level: Int
def size: Int
def left: Conc[T]
def right: Conc[T]
}


In addition to containing reference to the left and right subtre, Conc also define the level and the size of each subtree. The level will be equal to the longest path from the root to a leaf, and size will be equal to the number of elements in the subtree. In other words, the level is the height of the tree.

This conc data type will correspond to a so called conc list, originally introduced by the Fortress language. The conc list is a neat data abstraction for parallel programming and could have any number of implementations. In what follows, we will study one particularly concise conc list implementation. The conc data type has the following concrete subclasses:

case object Empty extends Conc[Nothing]{
def level = 0
def size = 0
}

class Single[T](val x: T) extends Conc[T]{
def level = 0
def size = 1
}

case class <>[T](left: Conc[T], right: Conc[T]) extends Conc[T]{
val level = 1 + math.max(left.level, right.level)
val size = left.size + right.size
}



The empty class represents empty trees, and their level and size will be zero. The single class will represent leaves containing a single element. Their level will again be zero and the size is one. Lastly, the conc class represents inner nodes of the tree and is defined with a small diamond, denoting the choice of going either left or right. Here, the level is equal to one plus level of the higher subtree. And the size is equal to the number of elements in both subtrees.

### Conc data type invariants

In addition, we define the following invariants for Conc-trees:

1. T o guard against sparses trees with too many empty subtrees, a <> node can never contain Empty as its subtree
2. To ensure the trees remain balanced, the level difference between the left and the right subtree of a <> node is always be one or less.

We rely on these invariants to implement concatenation. In the spirit of the conc operator used to build functional lists, we will overload the conc constructor with a method with the same name:

def <>(that: Conc[T]): Conc[T] = {
if (this == Empty) that
else if(that == Empty) this
else concat(this, that)
}


The conc method is a member method on the conc data type, and take another conc tree as an argument. This method will start by ensuring the first invariant. It will eliminate the empty trees. And then delegate the real work to another method called concat.

The concat method cannot always directly link two trees together by creating a conc node above them, as that would violate the height invariant. Instead, it should reorganize the tree into a balanced tree.

#### Concatenation with the Conc data type

We need to consider several cases to implement concatenation:

1. the two trees should have height difference 1 or less
2. Otherwise, we assume the left tree is higher than the right one:
• 2.1. If the left tree is left-leaning, that is its left subtree is deeper than its right subtree. In this case, we recursively concatenate the right subtree with ys.
• 2.2. If the left tree is right-leaning. In this case, we recursively concatenate the left subtree with ys.
def concat[T](xs: Conc[T], ys: Conc[T]): Conc[T] = {
val diff = ys.level - xs.level
// handling case 1
if (diff >= -1 && diff <= 1)  new <>(xs, ys)
else if(diff <= -1){
// handling case 2.1
if (xs.left.level >= xs.right.level){
val nr = concat(xs.right, ys)
new <>(xs.left, nr)
} else { // handling case 2.2
val nrr = concat(xs.right.right, ys)
if (nrr.level == xs.level - 3) {
val nl = xs.left
val nr = new <>(xs.right.left, nrr)
new <>(nl, nr)
} else{
val nl = new <>(xs.left, xs.right.left)
val nl = nrr
new <>(nl, nr)
}
}
}
}


The complexity of <> method is $O(|h1 - h2|)$.

## Constant Time Appends in Conc-Trees

With Conc data type introduced in previous section, in this section, we will use Conc-Trees to implement an append method with constant running time. Such method is crucial for implementing combiners efficiently.

First, let’s consider implementing += method. usually we assume that conc-base combiner internally holds a conc object, the += method would insert an element by creating a single element tree. This single element tree can then be concatenated with an existing tree:

var xs: Conc[T] = Empty
def +=(elem: T){
xs = xs <> Single(elem)
}


The running time of += would be $O(log(n))$. However, we would prefer if += method was a constant time method, as it is invoked every time the processor adds a new element to the combiner.

It turns out this is possible. The idea will be to represent the result of a += method operations differently. Therefore, we introduce a new node type called Append:

case class Append[T](left: Conc[T], right: Conc[T]) extends Conc[T] {
val level = 1 + math.max(left.level, right.level)
val size = left.size + right.size
}


The Append node has exactly the same structure as the conc node, we relax the previous invariance on Conc data structure. For Append node, we will allow arbitrary difference in levels between the left and the right child.

With this newly introduced node type, we can implement append method. We make sure that if the total number of elements in the tree is n, then there are never more than $log(g)$ of append nodes.

def appendLeaf[T](xs: Conc[T], y: Single[T]): Conc[T] = xs match{
case Empty => ys
case xs: Singe[T] => new <>(xs, ys)
case _ <> _ => new Append(xs, ys)
case xs: Append[T] => append(xs, ys )
}


The append method is defined as follows:

@tailrec
private def append[T](xs: Append[T], ys: Conc[T]): Conc[T] = {
if (xs.right.level > ys.level) new Append(xs, ys)
else {
val zs = new <>(xs.right, ys)
xs.left match{
case ws @ Append(_, _) => append(ws, zs)
case ws if ws.level <= zs.level => ws <> zs
case ws => new Append(ws, zs)
}
}
}



To summarize, we have implemented an immutable data structure with:

• $O(1)$ appends
• $O(log n)$ concatenation

Next, we will see how to implement a more efficient, mutable data Conc-tree variant, which can implement a Combiner.

## Conc-Tree Combiners

In this section, we will implemente combiners based on conc-trees that we introduced in the previous lectures. We will call this new combiner implmenetaion a conc buffer.

### Conc Buffers

Internally a conc buffer contains a conc tree and an array of fixed size k. It also contains a field chunkSize, which holds the index of the first empty array entry:

class ConcBuffer[T: ClassTag](val k: Int, private var conc: Conc[T]) {
private var chunk: Array[T] = new Array(k)
private var chunkSize: Int = 0
}


The += method is implemented using chunk:

final def +=(elem: T): Unit = {
if (chunkSize >= k) expand()
chunk(chunkSize) = elem
chunkSize += 1
}


If the array is full, += internally calls expand method to push the array into the conc tree.

#### Chunk Nodes

To push the array into the Conc tree, we introduce a new node type called Chunk:

class Chunk[T](val array: Array[T], val size: Int) extends Conc[T] {
def level = 0
}


The Chunk class holds the array and a number of elements in it. The level of the chunk mode is zero.

#### Expanding the conc buffer

The expand method inserts the chunk into the Conc-tree, and allocates a new Chunk:

private def expand() {
conc = appendLeaf(conc, new Chunk(chunk, chunkSize))
chunk = new Array(k)
chunkSize = 0
}


#### Combine method

Then, the combine method is straightforward:

private def combine(that: ConcBuffer[T]): ConcBuffer[T] = {
val combinedConc = this.result <> that.result
new ConcBuffer(k, combinedConc)
}


#### Result method

The result method packs chunk array into the tree and returns the resulting tree:

def result: Conc[T] = {
conc = appendLeaf(conc, new Conc.Chunk(chunk, chunkSize))
conc
}


## Summary

In this week, first we learn about how combiners are implemented for various collections. Then, we learn about parallel two-phase construction to implement combiners efficiently. Finally, we learn about a new data structure called Conc-Tree that support concantenation efficiently and use this data structure to implement combiners.