Parallel Programming week two notes

2016-10-18

Week two lecture notes about Parallel Programming on Coursera.

Data Operation in Parallel

Processing elements of collections in parallel is one of the main applications of parallelism today.

We will focus on three operations: map ,fold, scan, will see how these opertion can be implemented in parallel. In the same time, we will focus on two data structures which are efficient to use in parallel (both it terms of split and combine) : array and trees.

Parallelism and collections

To achieve parallelism, collections and operations have to hold the following properties:

• properties of collections : ability to split, combine
• properties of opertions : associativity, independence

Our goal is implement map, reduce, and scan in parallel, let’s see how can we acheive.

Parallel Mapping

Map applies a given function to each list element:

List(a1, a2, ..., an).map(f) == List(f(a1), f(a2), ..., f(an))


and has the following properties:

• list.map(x => x) == list

• list.map(f.compose(g)) == list.map(g).map(f) where (f.compose(g))(x) = f(g(x))

Because lists are inefficient to use in parallel(inefficient to split and combine), so we consider how to implement map in parallel on arrays.

Parallel map on arrays

The implementation of parallel map on arrays is as follows:

def mapASegPar[A, B](inp: Array[A], left: Int, right: Int, f: A => B, out: Array[B]) :Unit = {
if( right -left < threshold)
mapASegSeq(inp, left, right, f, out)
// this is the sequential version of Map
else{
val mid = left + (right - left)/2
// do a split, and run them in parallel
parallel(mapASegPar(inp, left, mid, f, out),
mapASegPar(inp, mid, right, f, out))
}
}


Recall from week one, given expressions $e_1$ and $e_2$, parallel($e_1$,$e_2$) can compute them in parallel and return the pair of results.

With such implementation, We have to ensure that:

• writes need to be disjoint (otherwise: non-deterministic behavior)
• threshold needs to be large enough (otherwise we lose efficienty)

On the other hand, if we implement parallelism in terms of high-order functions, it does not pay off.

Parallel map on immutable tree

Map can also be implemented in parallel on immutable trees.

Such immutable tree has following properties:

• leaves store array segments
• non-leaf node stores number of elements

The definiton of such tree is as follows:

sealed abstract class Tree[A]{
val size: Int
}
case class Leat[A](a: Array[A]) extends Tree[A] {
override val size = a.size
}
case class Node[A](l: Tree[A], r: Tree[A]) extends Tree[A] {
override val size = l.size + r.size
}


For convenience, we assume that the trees are approximately balanced

The parallel map on trees:

def mapTreePar[A: Manifest, B: Manifest](t: Tree[A], f: A => B): Tree[B] =
t match {
case Leaf(a) => {
val len = a.length;
val b = new Array[B](len)
var i = 0
while (i < len) {
b(i) = f(a(i));
i = i + 1
}
Leaf(a)
}
case Node(l, r) => {
val (lb,rb) = parallel(mapTreePar(l,f), mapTreePar(r,f))
Node(lb,rb)
}
}


Comparison of arrays and immutable trees

Arrays and immutable trees both have their advantages and disadvantages. And we list some of them in the following.

Arrays

• (+) random access to elements, on shared memory can share array
• (+) good memory locality
• (-) imperative: must ensure parallel tasks write to disjoint parts
• (-) expensive to concatenate

Immutable trees

• (+) purely functional, produce new trees, keep old ones
• (+) no need to worry about disjointness of writes by parallel tasks
• (+) efficient to combine two trees
• (-) high memory allocation overhead

Parallel Fold (Reduce) Operation

fold combine elements with a given operation.

There are four versions of fold, the differences lies in:

• whether they take an initial element or assume non-empty list (fold or reduce)
• in which order they combine operations of collection (foldLeft or foldRight)

Examples are:

List(1,3,8).foldLeft(100)((s,x) => s - x) == ((100 - 1) -3) - 8 == 88
List(1,3,8).foldRight(100)((s,x) => s - x) == 1 - (3 - (8- 100)) == -94
List(1,3,8).reduceLeft((s,x) => s - x) == (1 - 3) - 8 == -10
List(1,3,8).reduceRight((s,x) => s - x) == 1 - (3 - 8) == 6


For fold(reduce), the associative operations matters. Because when we wish to process elements of a collection in parallel, we would like to have the freedom of choosing the order in which we process elements. Not only would we like to be able to process it from left to right, but also by choosing the order in some more complex ways. For this to be correct, the operation has to be associative. Examples of associative operations include addition or concatenation of strings.

Associative operation

In general, Operation f: (A,A) => A is associative if and only if for every x, y, z, we have:

If we write $f(a, b)$ in infix form as $a \oplus b$, associativity becomes:

This law tells us that if we have arbitrary expressions that have the same list of operands that are connected with $\oplus$, but may have arbitrarily different parenthesis indicated in the order in which the operation should be done. Then these expressions evaluate to the same result. For example:

Trees for expressions

Each expressions built from values connected with $\oplus$ can be represented as a tree where:

• leaves represent the values
• the nodes $\oplus$ represent the application of the operation

The shape of the tree then encodes where the parenthesis were. Examples are:

Such tree is defined as follows:

sealed abstract class Tree[A]
case class Leaf[A](value: A) extends Tree[A]
case class Node[A](left: Tree[A],right: Tree[A]) extends Tree[A]


Parallel reduce of a tree

By using tree definition above, we can now define the parallel version of reduce:

def reduce[A](t: Tree[A],f: (A,A) => A): A = t match {
case Leaf(v) =>
case Node(l,r) => {
val (lV,rV) = parallel(reduce[A](l,f), reduce[A](r,f))
f(lV,rV)
}
}


Consequence stated as tree reduction

We have said that: if an operation $\oplus$ is associative, and two expressions with same list of operands are connected with such operation, but different parentheses. Then these two expressions evaluate to the same result. This is the consequence of associativity.

We can express this consequence in Scala:

if f: (A,A) => A is associative, t1: Tree[A] and t2: Tree[A] and toList(t1) == toList(t2), then we have:

reduce(t1, f) == reduce(t2, f)

Explanation of the consequence

The intuition is that given a tree, we can use tree rotation until it becomes list-like. And by sssociativity law such tree rotation preserves the result:

Applying rotation to tree preserves toList as well as the value of reduce. toList(t1) == toList(t2) => rotations can bring t1, t2 to same tree.

Parallel array reduce

The parallel array reduce is as follows:

def reduceSeg[A](inp: Array[A], left: Int, right: Int, f: (A,A) => A): A = {
if (right - left < threshold) {
var res = inp(left)
var i = left + 1
while (i < right) {
res = f(res, inp(i))
i = i + 1
}
res
} else {
val mid = left + (right - left) / 2
val (a1, a2) = parallel(reduceSeg(inp, left, mid, f),
reduceSeg(inp, mid, right, f))
f(a1,a2)
}
}
def reduce[A](inp: Array[A],f: (A,A) => A): A =
reduceSeg(inp,0,inp.length,f)


Associativity

In this section, we will examine more associative operations.

Recall that: Operation f: (A,A) => A is associative if and only if for every x,y,z, we have:

and for associative operation:

• two expressions with same list of operands connected with $\oplus$, but different parentheses evaluate to the same result
• reduce on any tree with this list of operands gives the same result

However, there is another property that is different from associativity, namely commutativity

Commutativity

Operation f: (A,A) => A is commutative if and only if for every x,y, we have:

It’s important to know that, for correctness of reduce, we need to assure associativity.

Associativity is not preserved by mapping

In general, if f(x,y) is commutative and $h_1(z)$, $h_2(z)$ are arbitrary functions, then any function defined by $g(x,y) = h_2(f(h_1(x),h_1(y))$ is equal to $h_2(f(h_1(y),h_2(x))) = g(y,x)$, so it is commutative, but it often loses associativity even if $f$ was associative to start with.

When combining and optimizing reduce and map invocations, we need to be careful that operations given to reduce remain associative.

Making an operation commutative is easy

It is easy to make an operation commutative.

Suppose we have a binary operation g and a strict total ordering less (e.g. lexicographical ordering of bit representations). Then this operation is commutative:

def f(x:A, y:A) = if (less(y,x)) g(y,x) else g(x,y)


Indeed f(x,y) == f(y,x) because:

• if x==y then both sides equal g(x,x)
• if less(y,x) then left sides is g(y,x) and it is not less(x,y) so right side is also g(y,x)
• if less(x,y) then it is not less(y,x) so left sides is g(x,y) and right side is also g(x,y)

However, We know of no such efficient trick for associativity

Associative operations on tuples

We can construct associative operations by using two associative operations on tuples.

Suppose f1: (A1, A1) => A1 and f2: (A2, A2) => A2 are associative, Then f: ((A1, A2), (A1, A2)) => (A1, A2) defined by

f((x1, x2), (y1, y2)) = (f1(x1, y1), f2(x2, y2))


is also associative

Similarily, we can construct associative operations on n-tuples

Associativity through symmetry and commutativity

Although commutativity of $f$ alone does not imply associativity, it implies it if we have an additional property. Define:

E(x, y, z) = f(f(x, y), z)


We say arguments of $E$ can rotate if E(x, y, z) = E(y, z, x), that is

f(f(x, y), z) = f(f(y, z), x)


Then, we claim that:

if $f$ is commutative and arguments of $E$ can rotate then $f$ is also associative.

Parallel Scan (Prefix Sum) Operation

scan combine folds of all list prefixes.

scanLeft: meaning and properties

given a list List(a1, a2, a3) and a function f , by applying f to the list, we obtain the following result:

List(a1, a2, a3).scanLeft(a0)(f) == List(b0, b1, b2, b3)


where

• b0 = a0
• b1 = f(b0, a1)
• b2 = f(b1, a2)
• b3 = f(b2, a3)

We assume that f is associative, throughout this section.

Note that: the result of scanRight is different from scanLeft, even if f is associative. But they act similar, so we only consider scanLeft.

Sequential scanLeft on array

Sequentail scanleft can be implemented as follows:

def scanLeft[A](inp: Array[A],
a0: A, f: (A, A) => A,
out: Array[A]): Unit = {
out(0) = a0
var a = a0
var i = 0
while (i < inp.length) {
a = f(a, inp(i))
i = i + 1
out(i) = a
}
}


Making scan parallel

Next, we consider how to make scanLeft parallel. At first, it seems impossible, because:

• the value of the last element in sequence depends on all previous ones
• need to wait on all previous partial results to be computed first

But if we perform computation in a different order, and give up on reusing all intermediate results, we are able to implement scanLeft in parallel:

• do more work and more applications of f that need the simple sequential version
• improve parallelism and in terms of the parallel running time, which will compensate for that the fact that we are applying f a few more times than in the sequential algorithm

High-level approach: express scan using map and reduce

scan can be implemented by using map and reduce:

def scanLeft[A](inp: Array[A], a0: A, f: (A,A) => A, out: Array[A]) = {
//reduce array elements from 0 to i
val fi = { (i: Int, v: A) => reduceSeg1(inp, 0, i, a0, f)}
mapSeg(inp, 0, inp.length, fi, out)
val last = inp.length - 1
//
out(last + 1) = f(out(last), inp(last))
}


In the above implementation, we do not reuse any computation across different elements of the outptu array.

However, it’s possible to resue some of the computation, if we save the intermediate results of parallel computation in a tree,

For simplicity, we also assume that input collection is also a tree, but that is going to be a different kind of tree compared to the tree that saves the intermediate results.

Tree definitions

Trees storing input collection, and it only have values in leaves:

sealed abstract class Tree[A]

case class Leaf[A](a: A) extends Tree[A]

case class Node[A](l: Tree[A], r: Tree[A]) extends Tree[A]


In contrat, trees storing intermediate values, and it also have res values in nodes:

sealed abstract class TreeRes[A] {
val res: A
}

case class LeafRes[A](override val res: A) extends TreeRes[A]

case class NodeRes[A](l: TreeRes[A],
override val res: A,
r: TreeRes[A]) extends TreeRes[A]


Parallel reduce that preserves the computation tree

upsweep can transforms input Tree values into TreeRes values in paralle:

def upsweep[A](t: Tree[A], f: (A, A) => A): TreeRes[A] = t match {
case Leaf(v) => LeafRes(v)
case Node(l, r) => {
val (tL, tR) = parallel(upsweep(l, f), upsweep(r, f))
NodeRes(tL, f(tL.res, tR.res), tR)
}
}


Here is a example. Given a initial tree:

upsweep can transform the tree into following:

The function name suggests the bottom up computation that we use in order to obtain the tree of results.

Next, let’s use the result tree to create the final collection, so called downsweep.

Using tree with results to create the final collection

Given the result tree, downsweep can compute the result of scanLeft.

def downsweep[A](t: TreeRes[A], a0: A, f: (A, A) => A): Tree[A] = t match {
case LeafRes(a) => Leaf(f(a0, a))
case NodeRes(l, _, r) => {
val (tL, tR) = parallel(downsweep[A](l, a0, f),
downsweep[A](r, f(a0, l.res), f))
Node(tL,tR)
}
}


To understand how downsweep works, the key fact to remember is that a0 is supposed to denote the reduce of all elements that come to the left of the current tree t, that we are given. That is:

• When we have a Leaf, we simply need to apply operation f to the given element a0 and the element in the Leaf.
• When we have a Node, we recursively do a downsweep on the left and right subtree, which give us two new trees, left and right, and then, we will combine it.
• An important thing to remember: for left subtree, when applying downsweep, we pass the same element a0; while for right subtree we need to take into account both the elements that we are given, a0 ,and the result of left subtree. That is, we pass the combined result to the right subtree.

scanLeft on trees

Now that we have upsweep and downsweep, we can then use both to implement scanLeft. Here is the implementation:

def scanLeft[A](t: Tree[A], a0: A, f: (A,A) => A): Tree[A] = {
val tRes = upsweep(t,f) //build up an intermediate tree
val scan1 =  downsweep(tRes,a0,f) //using intermediate tree to obtain the final result
prepend(a0,scan1)
}


Prepend simply prepend the initial element to result:

def prepend[A](x: A,t: Tree[A]): Tree[A] = t match {
case Leaf(v) => Node(Leaf(x),Leaf(v))
//always prepend x to the leftmost tree
case Node(l,r) => Node(prepend(x,l),r)
}


Parallel scanLeft on arrays

Previous definition implement scanLeft using tree, but it would more efficient to implement scanLeft using arrays.

Let’s see how can we implement scanLeft by using arrays.

First, we also need to make use of intermediate result trees. But the definition of intermediate result tree is slightly different from the previous one, such a tree is defined as:

sealed abstract class TreeResA[A]{val res:A}
case class Leaf[A](from:Int,to:Int,
override val res:A) extends TreeResA[A]

case class Node[A](l:TreeResA[A],
override val res:A,
r:TreeResA[A]) extends TreeResA[A]


We can see that the difference compared to previous TreeRes: each Leaf now keeps track of the array segment range(from, to) from which res is computed.

Upsweep on array

Similarily, we define upsweep

def upsweep[A](inp: Array[A], from: Int, to: Int,
f: (A, A) => A): TreeResA[A] = {
if (to - from < threshold)
Leaf(from, to, reduceSeg1(inp, from + 1, to, inp(from), f))
else {
val mid = from + (to - from) / 2
val (tL,tR) = parallel(upsweep(inp, from, mid, f),
upsweep(inp, mid, to, f))
Node(tL,f(tL.res,tR.res),tR)
}
}


Sequential reduce for segment

def reduceSeg1[A](inp: Array[A], left: Int, right: Int,
a0: A, f: (A, A) =>A): A = {
var a = a0
var i = left
while (i < right) {
a = f(a, inp(i))
i = i+1
}
a
}


Downsweep on array

def downsweep[A](inp: Array[A],
a0: A,f: (A,A) => A,
t: TreeResA[A],
out: Array[A]):Unit =t match {
case Leaf(from, to, res)=>
scanLeftSeg(inp, from, to, a0, f, out)
case Node(l,_,r) => {
val (_,_) = parallel(
downsweep(inp, a0, f, l, out),
downsweep(inp, f(a0,l.res), f, r, out))
}
}


Sequential scanLeft on segment

def scanLeftSeg[A](inp: Array[A],
left: Int,
right: Int,
a0: A, f: (A, A) => A,
out: Array[A]) = {
if (left < right) {
var i = left
var a = a0
while (i < right) {
a = f(a, inp(i))
i = i + 1
out(i) = a
}
}
}


parallel scan on the array

def scanLeft[A](inp: Array[A],
a0: A, f: (A, A) => A,
out: Array[A]) = {
val t = upsweep(inp, 0, inp.length, f)
downsweep(inp, a0, f, t, out)
out(0) = a0 //prepends a0
}


Summary

In this week, we learn about three important data operations, namely map, reduce, and scan. We also learn how to implement these operations in parallel on Arrays and Trees.