2017-06-13

# Reduction Operations & Distributed Key-Value Pairs

## Reduction Operations

So far, we have only focus on distributing transformations such as map, flatMap, filter, etc. In this week, we will turn our attention to actions, we will see how common reduce-like actions are distributed in Spark.

### General Reduction Operations

Recall operations such as fold, reduce, and aggregate from Scala sequential collections. All of these operations and their variants (such as foldLeft, reduceRight, etc) have something in common. That is:

They walk through a collection and combine neigboring elements of the collection together to produce a single combine result(rather than another collection).

Depending on how they combine neigboring elements, some of actions is parallelizable but some of them is not. Let’s see a few example.

Recall from course Parallel Programming, we learn two operations: foldLeft and fold.

#### foldLeft

foldLeft is not parallelizable. Considering the following signature of foldLeft:

def foldLeft[B](z: B)(f: (B, A) => B): B


The foldLeft takes an initial value z of type B, and a combine function f. The combine function f takes an neigboring element of type A and produce a result of type B. The reason why we cannot parallelize foldLeft is that if we split the collection into multiple parts and run foldLeft on each part in parallel, we get back multiple middle results of type B. However, we cannot combine multiple middle results since we don’t have a function that takes two argument of type B and return a result of type B.

#### fold

On the other hand, fold is parallelizable. Consider the following signature of fold:

def fold(z: A)(f: (A, A) => A): A


The fold function takes an initial value z of type A and a combine function f. The combine function f takes two argument of type A and produce a result of type A. Say we split a collection into multiple parts and run fold on each part. We get back some middle results of type A. But here comes the interesting part. Since all middle results are of type A and we have combine function f, we can easily use f to combine these middle results and produce final results.

Yeah! That is reason why fold is parallelizable and foldLeft is not. It depends on whether you have proper combine function to combine elements.

However, although fold enables parallelization, it restricts us to always returning the same type. What if i want to combine elements and return differen type. How can i do that?

#### aggregate

The answer is to use aggregate function, whose signature is shown below:

def aggregate[B](z: => B)(seqop: (B, A) => B), combop: (B, B) => B): B


The aggregate function takes an initial value z of type B and two combine function seqop and combop. The seqop stands for sequential operator, and the combop stands for combination operator. When running in parallel, aggregate use seqop to combine signle element on each small part. Then, it uses combop to combine middle result from each small part to produce final result.

#### Reduction operations on RDDs

Because foldLeft/foldRight(reduceLeft/reduceRight) is not parallelizable, Spark only provide fold, reduce and aggregate parallelizable reduction operation.

On the other hand, it’s irreasonable for Spark to provide a serial foldLeft/foldRight operation. Because it turns out that trying to do anything serially across a cluster is actually very difficult. Enforcing ordering in a distributed system is very hard and sometimes impossible. Any in any case, it requires a lot of communication and synchronization between nodes, which is extremely costly. So it simply doesn’t make sense to try and make sure that something happens before another thing in a distibuted system. Which means it typically doesn’t make a lot of sense to try and make available serial operations, like foldLeft on a cluster.

## Distributed Key-Value Pairs (Pair RDDs)

Pair RDDs are RDDs that containing key-value pairs, and they have additional, specialized methods for working with data associated with keys. Pair RDDs are useful because they allow us to act on each key in parallel or regroup data across the network.

### Definition

Pair RDDs are RDDs parameterized by a pair:

RDD[(K,V)]


They are most often created from already-existing non-pair RDDs, such as by using the map operations. When an RDD is created with a pair as its elements type, Spark automatically adds a number of extra usefull additional methods (extension methods) for such pairs.

## Transformations and Actions on Pair RDDs

We summarize some of important operations defined on Pair RDDs but not available on regular RDDs. Similarly, these operations can be broken into two categories, transformations and actions.

### Transformations

• groupByKey
• reduceByKey
• mapValues
• keys
• join
• leftOuterJoin/rightOuterJoin

• countByKey

### groupBy

Recall groupBy from Scala collections:

def groupBy[K](f: A => K): Map[K, Traversable[A]]


groupBy will partition this traversable collection into a map of traversable collections according to some discriminator function.

In English, groupBy takes a function f that takes an element and returning a key of type K. When applying f to every element in the collection, different elements returning the same key will be put together. As a result, groupBy returns a Map mapping computed keys to collections of corresponding values.

The following example groups a list of ages into “child”, “adult”, and “senior” categories:

val ages = List(2, 51, 44, 23, 17, 14, 12, 82, 51, 64)
val grouped = ages.groupBy {
age =>
if (age >= 18 && age < 65) "adult"
else if (age < 18) "child"
else "senior"
}
//grouped: scala.collection.immutable.Map[String,List[Int]] =
//Map(senior -> List(82),
//   adult -> List(51, 44, 23, 51, 64),
//   child -> List(2, 17, 14, 12))


### Pair RDD Transformation: groupByKey

As Pair RDDs already have key/value pairs, in Spark, we have groupByKey instead. groupByKey can be thought of as a groupBy on Pair RDDs that is specialized on grouping all values that have the same key. As a result, it takes no argument.

def groupByKey(): RDD[(K, Iterable[V])]


### Pair RDD Transformation: reduceByKey

reduceByKey can be thought of as a combination of groupByKey and reduce-ing on all the values per key.

def reduceByKey(func: (V, V) => V): RDD[(K, V)]


reduceByKey takes a function which only cares about the values of the paired RDD. So we’re using V to represent the values. This is because we conceptually assume that somehow the values are already grouped by key and now we apply this function to reduce over those values that in a collection.

It’s important to note that reduceByKey is a lot more efficient than doing groupByKey and then reduce independently.

### Pair RDD Transformations: mapValues and Action: countByKey

mapValues simply skip the keys and applies a function to the values in a Pair RDD.

def mapValues[U](f: V => U): RDD[(K, U)]
//is equivalent to

pairRDD.map{case (x, y): (x, func(y))}


countByKey is an action that simply counts the number of elements per key in a Pair RDD, returning a normal Scala Map mapping from keys to counts:

def countByKey(): Map[K, Long]


### Pair RDD Transformation: keys

keys return an RDD with the keys of each tuple.

def keys: RDD[K]


One thing to notice is that keys is a transformation instead of an action. This is because the number of keys in a pair RDD could be huge, and thus is may not be possible to collect all keys at one node.

For a complete list of all available specialized Pair RDD operations, refer to PairRDDFunctions

## Joins

Joins are another sort of unique transformations on Pair RDDs. They are conceptually similar to join operations in database.

There are two kinds of joins:

• Inner joins (join)
• Outer joins (leftOuterJoin/rightOuterJoin)

### Inner joins

The default join operator is an inner join. It returns a new RDD containing combined pairs whose keys are present in both input RDDs.

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]


When there are multiple values for the same key in one of the inputs, the resulting pair RDD will have an entry for every possible pair of values with that key from the two input RDDs.

### Outer Joins (leftOuterJoin, rightOuterJoin)

Outer joins return a new RDD containing combined pairs whose keys don’t have to be present in both input RDDs.

Outer joins are useful for customizing how the resulting joined RDD deals with missing keys. With outer joins, we can decide which RDD’s keys are most essential to keep - the left, or the right RDD in the join expression.

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]


Notice the different position of the Option in leftOuterJoin and rightOuterJoin.