2016-10-24

# Data-Parallelism

Week three lecture notes about Parallel Programming on Coursera.

## Data-Parallel Programming

Last week, we learn about task-parallel programming. We now know how to express parallel programs with task and parallel constructs.

This week, we will learn about the data parallel programming. Data parallelism takes a different approach. The computation details are expressed once, but the same computation is expected to execute in parallel on differents parts of the data.

We can write down the difference between task parallelism and data parallelism:

Parallelism Model Definition
Task parallelism a form of parallelization that distributes execution processes across computing nodes.
Data parallelism a form of parallelization that distributes data across computing nodes.

### Data-parallel programming model

The simplest form of data-parallel programming is the parallel for loop. We declare a method initializeArray, which given an integer array xs and an integer value v, writes the value v to every array entry in parallel:

/**
* given an array, assign v to every element of the array
*/
def initializeArray(xs: Array[Int])(v: Int): Unit = {
/**
* this is a parallel for loop.
* adding .par to the range will converse it into a parallel range
* iterations of the parallel loop will be executed in different
* processes concurrently with each other.
*/
for (i <- (0 until xs.length).par) {
/**
* In the body of parallel for loop, we assign the value v to
* every corresponding array entry i
*/
xs(i) = v
}
}


The parallel for loop is not functional, and it can only affect the program by side effect. So it’s important to ensure that parallel for loop write to separate memory locations or use some kind synchronization.

We use workload to measure the parallelism of the data-parallel program. Workload is a function that maps each input element to the amount of work required to process it. It turns out that different data-parallel programs have different workloads.

The initialized array method, defined previously has a constant amount of work for each element. We call such workload as uniform workload, write it as $w(i) = constant$.

Uniform workload can be depicted using the following graph, on which the x axis is the loop iteration axis and the y axis is the running time:

Since each loop iteration is assigned the same amount of time, uniform workload is particularly easy to parallelize.

An irregular Workload on the other hand, is defined by an arbitrary function: $w(i)=f(i)$.

Showing graphically, each loop iteration can be assigned a variable amount of work:

With irregular workload, we usually implement a data-parallel scheduler that efficiently balance the workload acrosss processors without any knowledge about the $w(i)$.

## Data-Parallel Operations

In the previous section, we learn about the basic form of data parallelism, namely parallel for loop. In this section, we will see more data parallel operations.

### Parallel collections

In Scala, most collection operations can become data-parallel. The .par method call converts a sequential collection to a parallel collection. For example, we want to find out how many numbers between 0 and 1000 are either divisible by 3 and it is palindromes as well. We can do this in parallel:

(1 until 1000).par
.filter(n => n % 3 ==0)
.count(n => n.toString == n.toString.reverse)


We convert the range collection into a parallel range by calling par, and then subsequent operations are running in parallel.

But, there are some operations are not parallelizable. Let’s dive into these operation in more detail.

### Non-parallelizable operations

Consider implementing the method sum using the foldLeft method. An example implementation is as follows:

def sum(xs: Array[Int]): Int = {
xs.par.foldLeft(0)(_ + _)
}


We want to parallelize the operation by converting array into parallel array. However, this implementation doesn’t execute in parallel. Why?

Let’s examine the foldLeft signature more closely.

def foldLeft[B](z: B)(f: (B, A) => B): B


The foldLeft method takes the type parameter B which is the type of accumulation, a neutral element z of type B, and the function f that combines the accumulation and the elements of the collection into another accumulation.

Given a collection with n elements of that A, any number of neutral elements, and any number of instances of the function f, the foldLeft only have one way to connect the elements of the collection together using the function f and it’s a sequential way.

To understand why, we can explain foldLeft by using lego bricks.

We can view the function f we pass to foldLeft as a lego brick in the above figure. It takes two inputs A and B and produces one ouput B. Connecting all the lego bricks means execute a foldLeft operation. In order for the accumulation value B to become available at any position, it first must be computed for the previous elements. In other words, these functions must be invoked sequentially, one after another.

Similarily, operations like foldRight, reduceLeft, reduceRight, scanLeft, and scanRight must process the elements sequentially.

### The fold operation

To enable folding elements in parallel, we introduce a new method called fold:

def fold(z: A)(f: (A, A) => A): A


This method has a slightly different signature. Instead of having another type parameter for the accumulation type, like foldLeft did, the accumulation will have the same type as the elements in the collection. The fold function will have two input parameters of type A and return a result of type A.

It can be represented with following lego brick:

The fold operation can process the elements in a reduction tree. For this reason, it can be implemented to execute in parallel.

fold operation is one of the basic abstractions. Many other data-parallel operations can be implemented in terms of fold.

#### Use-cases of the fold operation

One example of using fold is implementing a parallel sum:

def sum(xs: Array[Int]): Int = {
xs.par.fold(0)(_ + _)
}


Another example is implementing the parallel max method:

def max(xs:Array[Int]): Int = {
xs.par.fold(Int.MinValue)(math.max)
}


Let’s consider these two fold calls and what is common to these neutral elements and functions.

• First, applying the function to the neutral element and some other element, $x$, always returns the same element x. That is $\rho(x,z)=\rho(z,x)=x$.
• Second, both of these functions are associative. That is $\rho(x_1,\rho(x_2,x_3))=\rho(\rho(x_1,x_2), x_3)$.

#### Preconditions of the fold operation

In order for the fold operation to work correctly, the following properties must hold:

• Associativity: $f(a, f(b, c))== f(f(a, b),c )$

• Identity: $f(z, a)==f(a, z)==a$

We say that the neutral element z and the binary operator f must form a monoid.

#### Limitations of the fold operation

The fold operation have its own limitations. It can only produce values of the same type as the collection that it is called on.

We introduce another method called aggregate that allows us to pass different type of neutral element.

#### The aggregate operation

aggregate is a more powerful operation that achieve both expressive and parallel:

def aggregate[B, A](z: B)(f: (B, A) => B, g: (B, B) => B): B


It takes:

• a neutral element of type B
• a sequential folding operator f
• a parallel folding operator g

its accumulation type can be of any type B, just as it was the case with foldLeft.

We use the following figure to illustrate how aggregate works:

Given a collection of elements of type A,

• Firstly, aggregate divides them into subsets on different processors.
• Then, processors concurrently use the folding operator f to fold their subsets as if they were executing a foldLeft
• After that, each processor turns an accumulation of type B. These values are then combined in a parallel reduction tree using the second operator folding operator g.

### The transformer operations

So far, all the operations we have seen are the accessor combinators. These operations will return a single value after execution, examples are sum, max, fold, count etc.

On the other hand, transformer combinators, such as map, filter, flatMap and groupBy do not return a single value, but instead return new collections as results. However, they have similar sematics compared with accessor operations, and we don’t dive into them further.

## Scala Parallel Collections

In this section, we will take a closer look at scala’s different parallel collection types.

### Scala collections hierarchy

• Traversable[T] is the root trait of many collection operations, with operations implemented using foreach. Each collection that implements this type must also implement the method foreach.
• Iterable[T] is a subtype of Traversable[T], with operations implemented using iterator. Iterable collections must implement the iterator method.
• Seq[T] is a subtype of Iterable[T] which defines sequences of elements of type T. A sequence is an ordered collection in which every element is assigned to an index.
• Set[T] is an iterable subtype that describes set collections, which cannot contain duplicate elements.
• Map[K, V] describe collections that map keys of type K to values of type V(no duplicat keys) .

Besides sequential collections, Scala also have parallel collection and generic collection.

#### Parallel Collection Hierarchy

Trait ParIterable[T], ParSeq[T], and ParMap[K, V] are the parallel counterparts of different sequential traits. Collection operations on objects that implement these traits generally execute in parallel.

For code that is agnostic about parallelism, there exists a separate hierarchy of generic collection traits GenIterable[T], GenSeq[T], GenSet[T] and GenMap[K, V]. Methods in these generic collection types may or may not execute in parallel.

The following figure shows the hierarchy of scala’s collection hierarchy:

On one axis, different collection types are subtypes of each other. For example ,the Seq type is a subtype of the Iterable type, and the ParSeq is the subtype of the ParIterable type. On the other axis, sequential and parallel collection types are subtypes of the corresponding generic collection types. For example, both Seq and ParSeq are the subtype of the GenSeq type.

#### Non-parallelizable collections

We already know that a sequential collection can be converted into a parallel one by calling par.

However, the running time required to convert different sequential collections into parallel one can vary significantly. It’s very efficient to convert a vector to a parallel one, but it’s very inefficient to convert a list to a parallel one.

It’s important to choose the right data structures which can be converted to a parallel one effificently.

#### Rules for parallel collection

Although we can obtain parallel collection easily, but these exists many rules when using these collections:

• Avoid mutations to the same memory locations without proper synchronization.
• Never modify a parallel collection on which a data-parallel operation is in progress.
• Never write to a collection that is concurrently traversed.
• Never read from a collection that is concurrently modified.

## Data-Parallel Abstractions

In this section, we study the basic obstructions required to implement data parallel operations in a generic fashion. We will examine the following obstructions:

• iterators
• splitters
• builders
• combiners

### Iterator

Each iterable collection can create its own iterator object. The simplified Iterator trait is as follows:

trait Iterator[A]{
def next(): A
def hasNext: Boolean
}

// on every collection
def iterator: Iterator[A]


Every iterator comes with two methods, next and hasNext. The iterator maintains some internal state that describes what the current element is. Calling next moves the iterator to the next element and returns it, while calling hasNext returns true only if there are more elements to traverse.

The iterator contract states that:

• next can be called only if hasNext returns true
• after hasNext return false, it will always return false

### Splitter

Splitter is a counterpart of iterator used for parallel programming and an extension of the iterator trait. The simplified Splitter trait is as follows :

trait Splitter[A] extends Iterator[A]{
def split: Seq[Splitter[A]]
def remaining: Int
}

// on every parallel collection
def splitter[A]: Splitter[A]


Splitter has the method split, which returns a sequence of splitters that traverse the subsets of the current splitter. It also comes with a remaining method, which returns an estimate on the number of remaining elements. Every parallel collection has it’s own splitter implementation and must define a method splitter which returns it’s own splitter object.

The splitter contract states that:

• after calling split, the original splitter is left in an undefined state, methods like next and hasNext cannot be called again on the splitter.
• the resulting splitters traverse disjoint subsets of the original splitter.
• remaining is an estimate on the number of remaining elements.
• split is an efficient method - $O(log n)$ or better.

### Builder

Builders are abstractions used for creating new collections. The simplified Builder trait is as follows:

trait Builder[A,Repr]{
def +=(elem:A): Builder[A,Repr]
def result: Repr
}

// on every collection
def newBuilder[A,Repr]: Builder[A,Repr]


Roughly speaking, the builder:

• has two type parameters: A and Repr, where A is the type of the elements that we can add to the builder and Repr denotes the type of the collection that the builder creates.
• has a += method used to add an element to the builder. And a result method that returns a collection.
• Each collection must implement the newBuilder method, which is used to build a new collection of the same type.

The builder contract specifies that:

• calling result returns a collection of type Repr, containing the elements that were previously added with +=.
• calling result leaves the Builder in an undefined state.

### Combiner

A combiner is a parallel version of a builder. In addition to the builder method, combiners have a method combine, which mergers two combiner objects together. The simplified Combiner trait is as follows:

trait Combiner[A, Repr] extends Builder[A, Repr] {
def combine(that: Combiner[A, Repr]): Combiner[A, Repr]
}

// on every parallel collection
def newCombiner:Combiner[T,Repr]


Every parallel collection must implement the new combiner method, which creates a new combiner object specific to that collection. The combine method return a new combiner such that it contains the elements of both input combiners.

The combiner contract:

• calling combine returns a new combiner that contains elements of input combiners
• calling combine leaves both original Combiners in an undefined state
• combine is an efficient method - $O(log n)$ or better

## Summary

In this week, we learn about data-parallelism. First, we learn about data-parallel programming model and study two operations that can be implemented in parallel: fold and aggregate. We also dive into scala parallel collections hierarchy. Finally, we study four more data-parallel abstractions.