Parallel Programming week two notes
Basic Task Parallel Algorithms
Week two lecture notes about Parallel Programming on Coursera.
Data Operation in Parallel
Processing elements of collections in parallel is one of the main applications of parallelism today.
We will focus on three operations: map ,fold, scan, will see how these opertion can be implemented in parallel. In the same time, we will focus on two data structures which are efficient to use in parallel (both it terms of split and combine) : array and trees.
Parallelism and collections
To achieve parallelism, collections and operations have to hold the following properties:
 properties of collections : ability to split, combine
 properties of opertions : associativity, independence
Our goal is implement map, reduce, and scan in parallel, let’s see how can we acheive.
Parallel Mapping
Map applies a given function to each list element:
List(a1, a2, ..., an).map(f) == List(f(a1), f(a2), ..., f(an))
and has the following properties:

list.map(x => x) == list

list.map(f.compose(g)) == list.map(g).map(f) where (f.compose(g))(x) = f(g(x))
Because lists are inefficient to use in parallel(inefficient to split and combine), so we consider how to implement map in parallel on arrays.
Parallel map on arrays
The implementation of parallel map on arrays is as follows:
def mapASegPar[A, B](inp: Array[A], left: Int, right: Int, f: A => B, out: Array[B]) :Unit = {
if( right left < threshold)
mapASegSeq(inp, left, right, f, out)
// this is the sequential version of Map
else{
val mid = left + (right  left)/2
// do a split, and run them in parallel
parallel(mapASegPar(inp, left, mid, f, out),
mapASegPar(inp, mid, right, f, out))
}
}
Recall from week one, given expressions $e_1$ and $e_2$, parallel($e_1$,$e_2$) can compute them in parallel and return the pair of results.
With such implementation, We have to ensure that:
 writes need to be disjoint (otherwise: nondeterministic behavior)
 threshold needs to be large enough (otherwise we lose efficienty)
On the other hand, if we implement parallelism in terms of highorder functions, it does not pay off.
Parallel map on immutable tree
Map can also be implemented in parallel on immutable trees.
Such immutable tree has following properties:
 leaves store array segments
 nonleaf node stores number of elements
The definiton of such tree is as follows:
sealed abstract class Tree[A]{
val size: Int
}
case class Leat[A](a: Array[A]) extends Tree[A] {
override val size = a.size
}
case class Node[A](l: Tree[A], r: Tree[A]) extends Tree[A] {
override val size = l.size + r.size
}
For convenience, we assume that the trees are approximately balanced
The parallel map on trees:
def mapTreePar[A: Manifest, B: Manifest](t: Tree[A], f: A => B): Tree[B] =
t match {
case Leaf(a) => {
val len = a.length;
val b = new Array[B](len)
var i = 0
while (i < len) {
b(i) = f(a(i));
i = i + 1
}
Leaf(a)
}
case Node(l, r) => {
val (lb,rb) = parallel(mapTreePar(l,f), mapTreePar(r,f))
Node(lb,rb)
}
}
Comparison of arrays and immutable trees
Arrays and immutable trees both have their advantages and disadvantages. And we list some of them in the following.
(+) implies advantages, and () imlies disadvantages.
Arrays
 (+) random access to elements, on shared memory can share array
 (+) good memory locality
 () imperative: must ensure parallel tasks write to disjoint parts
 () expensive to concatenate
Immutable trees
 (+) purely functional, produce new trees, keep old ones
 (+) no need to worry about disjointness of writes by parallel tasks
 (+) efficient to combine two trees
 () high memory allocation overhead
 () bad memory locality
Parallel Fold (Reduce) Operation
fold combine elements with a given operation.
There are four versions of fold, the differences lies in:
 whether they take an initial element or assume nonempty list (fold or reduce)
 in which order they combine operations of collection (foldLeft or foldRight)
Examples are:
List(1,3,8).foldLeft(100)((s,x) => s  x) == ((100  1) 3)  8 == 88
List(1,3,8).foldRight(100)((s,x) => s  x) == 1  (3  (8 100)) == 94
List(1,3,8).reduceLeft((s,x) => s  x) == (1  3)  8 == 10
List(1,3,8).reduceRight((s,x) => s  x) == 1  (3  8) == 6
For fold(reduce), the associative operations matters. Because when we wish to process elements of a collection in parallel, we would like to have the freedom of choosing the order in which we process elements. Not only would we like to be able to process it from left to right, but also by choosing the order in some more complex ways. For this to be correct, the operation has to be associative. Examples of associative operations include addition or concatenation of strings.
Associative operation
In general, Operation f: (A,A) => A is associative if and only if for every x, y, z, we have:
If we write $f(a, b)$ in infix form as $a \oplus b$, associativity becomes:
This law tells us that if we have arbitrary expressions that have the same list of operands that are connected with $\oplus$, but may have arbitrarily different parenthesis indicated in the order in which the operation should be done. Then these expressions evaluate to the same result. For example:
Trees for expressions
Each expressions built from values connected with $\oplus$ can be represented as a tree where:
 leaves represent the values
 the nodes $\oplus$ represent the application of the operation
The shape of the tree then encodes where the parenthesis were. Examples are:
Such tree is defined as follows:
sealed abstract class Tree[A]
case class Leaf[A](value: A) extends Tree[A]
case class Node[A](left: Tree[A],right: Tree[A]) extends Tree[A]
Parallel reduce of a tree
By using tree definition above, we can now define the parallel version of reduce:
def reduce[A](t: Tree[A],f: (A,A) => A): A = t match {
case Leaf(v) =>
case Node(l,r) => {
val (lV,rV) = parallel(reduce[A](l,f), reduce[A](r,f))
f(lV,rV)
}
}
Consequence stated as tree reduction
We have said that: if an operation $\oplus$ is associative, and two expressions with same list of operands are connected with such operation, but different parentheses. Then these two expressions evaluate to the same result. This is the consequence of associativity.
We can express this consequence in Scala:
if f: (A,A) => A is associative, t1: Tree[A] and t2: Tree[A] and toList(t1) == toList(t2), then we have:
reduce(t1, f) == reduce(t2, f)
Explanation of the consequence
The intuition is that given a tree, we can use tree rotation until it becomes listlike. And by sssociativity law such tree rotation preserves the result:
Applying rotation to tree preserves toList as well as the value of reduce. toList(t1) == toList(t2) => rotations can bring t1, t2 to same tree.
Parallel array reduce
The parallel array reduce is as follows:
def reduceSeg[A](inp: Array[A], left: Int, right: Int, f: (A,A) => A): A = {
if (right  left < threshold) {
var res = inp(left)
var i = left + 1
while (i < right) {
res = f(res, inp(i))
i = i + 1
}
res
} else {
val mid = left + (right  left) / 2
val (a1, a2) = parallel(reduceSeg(inp, left, mid, f),
reduceSeg(inp, mid, right, f))
f(a1,a2)
}
}
def reduce[A](inp: Array[A],f: (A,A) => A): A =
reduceSeg(inp,0,inp.length,f)
Associativity
In this section, we will examine more associative operations.
Recall that: Operation f: (A,A) => A is associative if and only if for every x,y,z, we have:
and for associative operation:
 two expressions with same list of operands connected with $\oplus$, but different parentheses evaluate to the same result
 reduce on any tree with this list of operands gives the same result
However, there is another property that is different from associativity, namely commutativity
Commutativity
Operation f: (A,A) => A is commutative if and only if for every x,y, we have:
It’s important to know that, for correctness of reduce, we need to assure associativity.
Associativity is not preserved by mapping
In general, if f(x,y) is commutative and $h_1(z)$, $h_2(z)$ are arbitrary functions, then any function defined by is equal to $h_2(f(h_1(y),h_2(x))) = g(y,x)$, so it is commutative, but it often loses associativity even if $f$ was associative to start with.
When combining and optimizing reduce and map invocations, we need to be careful that operations given to reduce remain associative.
Making an operation commutative is easy
It is easy to make an operation commutative.
Suppose we have a binary operation g and a strict total ordering less (e.g. lexicographical ordering of bit representations). Then this operation is commutative:
def f(x:A, y:A) = if (less(y,x)) g(y,x) else g(x,y)
Indeed f(x,y) == f(y,x) because:
 if x==y then both sides equal g(x,x)
 if less(y,x) then left sides is g(y,x) and it is not less(x,y) so right side is also g(y,x)
 if less(x,y) then it is not less(y,x) so left sides is g(x,y) and right side is also g(x,y)
However, We know of no such efficient trick for associativity
Associative operations on tuples
We can construct associative operations by using two associative operations on tuples.
Suppose f1: (A1, A1) => A1 and f2: (A2, A2) => A2 are associative, Then f: ((A1, A2), (A1, A2)) => (A1, A2) defined by
f((x1, x2), (y1, y2)) = (f1(x1, y1), f2(x2, y2))
is also associative
I leave out the proveness, for more info, please take the course ParallelProgramming
Similarily, we can construct associative operations on ntuples
Associativity through symmetry and commutativity
Although commutativity of $f$ alone does not imply associativity, it implies it if we have an additional property. Define:
E(x, y, z) = f(f(x, y), z)
We say arguments of $E$ can rotate if E(x, y, z) = E(y, z, x), that is
f(f(x, y), z) = f(f(y, z), x)
Then, we claim that:
if $f$ is commutative and arguments of $E$ can rotate then $f$ is also associative.
Parallel Scan (Prefix Sum) Operation
scan combine folds of all list prefixes.
scanLeft: meaning and properties
given a list List(a1, a2, a3) and a function f , by applying f to the list, we obtain the following result:
List(a1, a2, a3).scanLeft(a0)(f) == List(b0, b1, b2, b3)
where
 b0 = a0
 b1 = f(b0, a1)
 b2 = f(b1, a2)
 b3 = f(b2, a3)
We assume that f is associative, throughout this section.
Note that: the result of scanRight is different from scanLeft, even if f is associative. But they act similar, so we only consider scanLeft.
Sequential scanLeft on array
Sequentail scanleft can be implemented as follows:
def scanLeft[A](inp: Array[A],
a0: A, f: (A, A) => A,
out: Array[A]): Unit = {
out(0) = a0
var a = a0
var i = 0
while (i < inp.length) {
a = f(a, inp(i))
i = i + 1
out(i) = a
}
}
Making scan parallel
Next, we consider how to make scanLeft parallel. At first, it seems impossible, because:
 the value of the last element in sequence depends on all previous ones
 need to wait on all previous partial results to be computed first
But if we perform computation in a different order, and give up on reusing all intermediate results, we are able to implement scanLeft in parallel:
 do more work and more applications of f that need the simple sequential version
 improve parallelism and in terms of the parallel running time, which will compensate for that the fact that we are applying f a few more times than in the sequential algorithm
Highlevel approach: express scan using map and reduce
scan can be implemented by using map and reduce:
def scanLeft[A](inp: Array[A], a0: A, f: (A,A) => A, out: Array[A]) = {
//reduce array elements from `0` to `i`
val fi = { (i: Int, v: A) => reduceSeg1(inp, 0, i, a0, f)}
mapSeg(inp, 0, inp.length, fi, out)
val last = inp.length  1
//
out(last + 1) = f(out(last), inp(last))
}
In the above implementation, we do not reuse any computation across different elements of the outptu array.
However, it’s possible to resue some of the computation, if we save the intermediate results of parallel computation in a tree,
For simplicity, we also assume that input collection is also a tree, but that is going to be a different kind of tree compared to the tree that saves the intermediate results.
Tree definitions
Trees storing input collection, and it only have values in leaves:
sealed abstract class Tree[A]
case class Leaf[A](a: A) extends Tree[A]
case class Node[A](l: Tree[A], r: Tree[A]) extends Tree[A]
In contrat, trees storing intermediate values, and it also have res values in nodes:
sealed abstract class TreeRes[A] {
val res: A
}
case class LeafRes[A](override val res: A) extends TreeRes[A]
case class NodeRes[A](l: TreeRes[A],
override val res: A,
r: TreeRes[A]) extends TreeRes[A]
Parallel reduce that preserves the computation tree
upsweep can transforms input Tree values into TreeRes values in paralle:
def upsweep[A](t: Tree[A], f: (A, A) => A): TreeRes[A] = t match {
case Leaf(v) => LeafRes(v)
case Node(l, r) => {
val (tL, tR) = parallel(upsweep(l, f), upsweep(r, f))
NodeRes(tL, f(tL.res, tR.res), tR)
}
}
Here is a example. Given a initial tree:
upsweep can transform the tree into following:
The function name suggests the bottom up computation that we use in order to obtain the tree of results.
Next, let’s use the result tree to create the final collection, so called downsweep.
Using tree with results to create the final collection
Given the result tree, downsweep can compute the result of scanLeft.
def downsweep[A](t: TreeRes[A], a0: A, f: (A, A) => A): Tree[A] = t match {
case LeafRes(a) => Leaf(f(a0, a))
case NodeRes(l, _, r) => {
val (tL, tR) = parallel(downsweep[A](l, a0, f),
downsweep[A](r, f(a0, l.res), f))
Node(tL,tR)
}
}
To understand how downsweep works, the key fact to remember is that a0 is supposed to denote the reduce of all elements that come to the left of the current tree t, that we are given. That is:
 When we have a Leaf, we simply need to apply operation f to the given element a0 and the element in the Leaf.
 When we have a Node, we recursively do a downsweep on the left and right subtree, which give us two new trees, left and right, and then, we will combine it.
 An important thing to remember: for left subtree, when applying downsweep, we pass the same element a0; while for right subtree we need to take into account both the elements that we are given, a0 ,and the result of left subtree. That is, we pass the combined result to the right subtree.
scanLeft on trees
Now that we have upsweep and downsweep, we can then use both to implement scanLeft. Here is the implementation:
def scanLeft[A](t: Tree[A], a0: A, f: (A,A) => A): Tree[A] = {
val tRes = upsweep(t,f) //build up an intermediate tree
val scan1 = downsweep(tRes,a0,f) //using intermediate tree to obtain the final result
prepend(a0,scan1)
}
Prepend simply prepend the initial element to result:
def prepend[A](x: A,t: Tree[A]): Tree[A] = t match {
case Leaf(v) => Node(Leaf(x),Leaf(v))
//always prepend x to the leftmost tree
case Node(l,r) => Node(prepend(x,l),r)
}
Parallel scanLeft on arrays
Previous definition implement scanLeft using tree, but it would more efficient to implement scanLeft using arrays.
Let’s see how can we implement scanLeft by using arrays.
First, we also need to make use of intermediate result trees. But the definition of intermediate result tree is slightly different from the previous one, such a tree is defined as:
sealed abstract class TreeResA[A]{val res:A}
case class Leaf[A](from:Int,to:Int,
override val res:A) extends TreeResA[A]
case class Node[A](l:TreeResA[A],
override val res:A,
r:TreeResA[A]) extends TreeResA[A]
We can see that the difference compared to previous TreeRes: each Leaf now keeps track of the array segment range(from, to) from which res is computed.
Upsweep on array
Similarily, we define upsweep
def upsweep[A](inp: Array[A], from: Int, to: Int,
f: (A, A) => A): TreeResA[A] = {
if (to  from < threshold)
Leaf(from, to, reduceSeg1(inp, from + 1, to, inp(from), f))
else {
val mid = from + (to  from) / 2
val (tL,tR) = parallel(upsweep(inp, from, mid, f),
upsweep(inp, mid, to, f))
Node(tL,f(tL.res,tR.res),tR)
}
}
Sequential reduce for segment
def reduceSeg1[A](inp: Array[A], left: Int, right: Int,
a0: A, f: (A, A) =>A): A = {
var a = a0
var i = left
while (i < right) {
a = f(a, inp(i))
i = i+1
}
a
}
Downsweep on array
def downsweep[A](inp: Array[A],
a0: A,f: (A,A) => A,
t: TreeResA[A],
out: Array[A]):Unit =t match {
case Leaf(from, to, res)=>
scanLeftSeg(inp, from, to, a0, f, out)
case Node(l,_,r) => {
val (_,_) = parallel(
downsweep(inp, a0, f, l, out),
downsweep(inp, f(a0,l.res), f, r, out))
}
}
Sequential scanLeft on segment
def scanLeftSeg[A](inp: Array[A],
left: Int,
right: Int,
a0: A, f: (A, A) => A,
out: Array[A]) = {
if (left < right) {
var i = left
var a = a0
while (i < right) {
a = f(a, inp(i))
i = i + 1
out(i) = a
}
}
}
parallel scan on the array
def scanLeft[A](inp: Array[A],
a0: A, f: (A, A) => A,
out: Array[A]) = {
val t = upsweep(inp, 0, inp.length, f)
downsweep(inp, a0, f, t, out)
out(0) = a0 //prepends a0
}
Summary
In this week, we learn about three important data operations, namely map, reduce, and scan. We also learn how to implement these operations in parallel on Arrays and Trees.