Spark week three notes

2017-06-13

Partitioning and Shuffling

Shuffling

Shuffling is very important in Spark and it can happen quite a bit. Considering opertion like groupBy or groupByKey. When we make a pair RDD and call groupByKey on it:

val pairs = sc.parallelize(List((1,"one"), (2, "two"), (3,"three")))
pairs.groupByKey()
// res0: org.apache.spark.rdd.RDD[(Int, Iterable[String])] = ShuffledRDD[1] at groupByKey at <console>:28

The result type is something called ShuffledRDD. To understand what ShuffledRDD is, first let’s learn about how groupBykey works. To do a distributed groupByKey, we typically have to move data between nodes so the data can be collected together with its key in a regular, normal single machine Scala collection. groupByKey will collect all of the values associated with the given key and stores them in that single collection. That means that data has moved around the network. Operation likes doing this is called shuffling.

Shufflling often happens transparently as a part of operations like groupByKey. Having a lot of shuffling during execution can degenerate performance greatly because it means that Spark has to move a lot of its data around the netowrk and remember letency is extremely costly.

So, we may ask if there is any way to reduce shuffling? The answer is yes, and in most situations, we can use reduceByKey instead. reduceByKey will reduce the dataset in worker node first and thus reduce the amount of data that’s sent over the network during the shuffle.

Partitioning

Partitioning has a lot to do with shuffling. Recall that operations like groupByKey require Spark to move the values to be on the same machine with their corresponding keys. How does Spark knows which key to put on which machine? The answer is partitioning.

Simply speaking, the data within an RDD is split into many partitions, and some properties of partitions are:

  • Partitions never span multiple machines, i.e., tuples in the same partition are guaranteed to be on the same machine.
  • Each machine in the cluster contains one or more partitions.
  • The number of partitions to use is configurable. By default, it equals the total number of cores on all executor nodes.

Spark comes with two out of the box kinds of partition:

  • Hash partitioning
  • Range partitioning

Spark also allows us to custom partitioning but it is only possible on Pair RDDs. The reason is that partitioning is done based on keys.

Hash partitioning

Hash partitioning will try to move all of the data around the cluster as fairly as possible. First, it needs to determine the partition p for every tuple in the pair RDD, which is calculated as follows:

p = key.hashCode() % numPartitions

When hash partitioning actually begins, tuples are sent to different machines based on their partition.

Range partitioning

The other kind of partitioning is called range partitioning. This is important when some kind of order is defined on the key. Examples are Int, Char, String

For such RDDs, using range partitioning, keys are partitioned according to:

  1. an ordering for keys
  2. a set of sorted ranges of keys

After partitioning, tuples with keys in the same range appear on the same machine.

Creating a RangePartitioner requires:

  1. Specifying the desired number of partitions
  2. Providing a Pair RDD with ordered keys. This RDD is sampled to create a suitable set of sorted ranges.

Partitioning data

It’s also possible to customized and set partitioners for any of Spark jobs. There are two ways to create RDDs with specific partitionings:

  1. Call partitionBy on an RDD, providing an explicit Partitioner.
  2. Using transformations that return RDDs with specific partitioners.

partitionBy

partitionBy method can create an RDD with a specified partitioner. And it’s important to remember that the result of partitionBy should be persisted. Otherwise, the partitioning is repeatedly applied (involving shuffling) each time the partitioned RDD is used.

Partitioning data using transformations

Partition can also be created by using transformations. There are two ways partitioners can be passed around with transformation:

Partitioner from parent RDD:

Most commonly, partitioners tend to be passed around via parent RDDs. That is paired RDDs which are the result of a transformation on a already partitioned Pair RDD is typically configured to use the same hash partitioner that was used to construct its parent.

Automatically-set partitioners:

On the other hand, transformations can provide a new partitioner when some transformations automatically result in an RDD with a different partitioner.

For example, by default, when using sortByKey, a RangePartitioner is used. Further, the default partitioner when using groupByKey, is a HashPartitioner.

Here are list of transformations that either hold on to or propagates some kind of partitioner:

Transformation Partitioning

Any other transformation operation that’s not on above figure will produce a result that doesnn’t have a partitioner. Some of examples are map and flatMap. If you use map or flatMap on a partitioned RDD that RDD will lose its partitioner. The reason is that map/flatMap may change the key, and partition is only possible on pair RDDs.

When working with pair RDDs, it’s prefer using mapValues rather than map/flatMap. This operation enable us to do map transformations without changing the keys, thereby preserving the partitioner.

Optimizing with partitioners

A shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD. There are methods that helps to figure out whether a shuffle has been planned/executed via:

  1. The return type of certain transformations, e.g.,
    org.apache.spark.rdd.RDD[(String, Int)])] = ShuffledRDD[1]
    
  2. Using function toDebugString to see execution plan:
    partitioned.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
            .toDebugString
    res9: String=
    (8) MapPartitionsRDD[622] at reduceByKey at <console>:49 []
    | ShuffledRDD[615] at partitionBy at <console>:48 []
    |  CachedPartitions: 8; MemorySize: 1754.8 MB; DiskSize: 0.0 B 
    

But perhaps the best thing to do is just kind in mind which operations might cause a shuffle:

  • cogroup
  • groupWith
  • join
  • leftOuterJoin
  • rightOuterJoin
  • groupByKey
  • reduceByKey
  • combineByKey
  • distinct
  • intersection
  • repartition
  • coalesce

Avoiding a network shuffle by partitioning

There are a few ways to use operations that might cause a shuffle and to still avoid much or all network shuffling.

2 Examples are:

  1. reduceByKey running on a pre-partitioned RDD will cause the values to be computed locally, requiring only the final reduced value has to be sent from the worker to the driver.
  2. join called on two RDDs that are pre-partitioned with the same partitioner and cached on the same machine will cause the join to be computed locally, with no shuffling across the network.

Wide vs Narrow Dependencies

Wide versus narrow dependencies dicates relationships between RDDs in graphs of computation, which has a lot to do with shuflling.

Lineages

A group of computations that are done on a RDD are called a lineage graph. when we do operations on RDD, the operations can be organized into a Directed Asyclic Graph(DAG), representing the computations that were done on that RDD. In fact, linrage graphs/DAGs is what Spark analyzes to do optimizations.

RDD representation

RDDs are represent as:

  • Partitions. Atomic pieces of the data, one or many per compute node. RDD Partition
  • Dependencies. Models relationship between this RDD and its partitions with the RDD(s) it was derived from. RDD Dependencies
  • A function for computing the dataset based on its parent RDDs.
  • Metadata about its partitioning scheme and data placement. RDD Function and Metadata

RDD Dependencies and Shuffles

In fact, RDD dependencies actually encode when data must move across the network. The dependency information can tell us when a shuffle might occur. To differentiate between these, we can actually define two sets of dependencies:

  1. Narrow Dependencies
  2. Wide Dependencies

Narrow Dependencies

A transformation has narrow dependencies when each partition of the parent RDD is used by at most one partition of the child RDD. That means some child partition has only one parent partition and not many.

Transformations with narrow dependencies are typically quite fast. They require no shuffles, and optimizations like pipelining are possible, which is when we can group together many transformations into one pass. Some examples are map, filter, union and join with co-partitioned inputs.

Wide Dependencies

A transformation has wide dependencies when each partition of the parent RDD may be depended on by multiple child partitions.

Transformations with wide dependencies are slow because they require all or some data to be shuffled over the network. Some examples are groupByKey and join without co-partitioned inputs.

Narrow dependencies vs Wide dependencies, visually

Having seen the definition of narrow dependencies and wide dependencies, let’s visualize an example program and its dependencies.

Assuming we have the following DAG:

We can visualize its dependencies:

As we can see, groupBy and join between F and G are wide transformations, and map, union and join between B and G are narrow transformations.

You maybe wondering why join between B and G is a narrow transformation. The reason is that after groupby, which does partitioning already, B is cached in memory. Therefore, we don’t need to re-partition, that’s why this part of join is a narrow transformation.

Find Out Dependencies

Spark provides some useful methods to find out dependency information on RDDs.

dependencies

The method dependencies returns a sequence of Dependency objects, which are actually the dependencies used by Spark’s scheduler to know how this RDD depends on other RDDs.

The sorts of dependency objects the dependencies method may return include:

Dependencies

Here is a quick example of what the output to dependencies looks like:

val wordsRdd = sc.parallelize(largeList)
val pairs = wordsRdd.map(c => (c, 1))
                    .groupByKey()
                    .dependencies
// pairs: Seq[org.apache.spark.Dependency[_]] =
// List(org.apache.spark.ShuffleDependency@4294a23d)

Calling groupByKey on a pair RDD produces another pair RDD with shuffle dependencies(wide dependency) in it.

toDebugString

Spark also comes with another method called: toDebugString. toDebugString prints out a visualization of the RDD’s lineage, and other information pertinent to scheduling.

We use the same example:

val wordsRdd = sc.parallelize(largeList)
val pairs = wordsRdd.map(c => (c, 1))
                    .groupByKey()
                    .toDebugString
//pairs: String =
//(8) ShuffledRDD[219] at groupByKey at <console>:38 []
// +-(8) MapPartitionsRDD[218] at map at <console>:37 []
//    | ParallelCollectionRDD[217] at parallelize at <console>:36 []                   

The resulting RDD from groupByKey is a ShuffledRDD, which can from a MapPartitionedRDD. Which itself came from a ParallelCollectionRDD. So this ParallelCollectionsRDD corresponds to wordsRdd. This MapPartitionsRDD corresponds to the results of map on wordsRDD. And finally, ShuffledRDD corresponds to the result of groupByKey. The identations here actually show how Spark groups together these operations. So operations in the same indentation, like for example MapPartitions, these groupings are typically separated by shuffles.

We can print out a lineage using toDebugString, and will be able to see how our job will be broken up into different groupings of operations, separated by shuffles. Actually, these groupings are called stages. So a Spark job is broken into stages by its scheduler.

Lineages and fault tolerance

Lineages graphs are the key to fault tolerance in Spark. The ideas coming from functional programming enable fault tolerance in Spark:

  • RDDs are immutable.
  • High-order functions like map, flapMap, filter are used to do functional transformations on this immutable data.
  • A function for computing the dataset based on its parent RDDs also is part of an RDD’s representation.

Along with keeping track of dependency information between partitions as well, this allows us to recover from failures by recomputing lost partitions from lineage graphs.

Lineages and fault tolerance, visually

It’s easy to understand lineages and fault tolerance visually. Let’s assume one of our partitions from the privous example fails:

Without having to have checkpoints and write all data to disk, Spark can actually just re-drive it using this graph of dependencies. So it needs to do is go back to dependency information along with those functions that are stored with those dependencies, and just recompute data

And finally Spark restore the failing partitions:

Please know that recomputing missing partitions for narrow dependencies is fast. But is slow for wide dependencies.

For example, if we lost one partition in G, we have to trace dependencies backward.

And because this child dependency depends on all of the parent dependencies in this union, we’ve gotta recompute all of intermediate RDDs, in order to recompute this one piece of data. So losing partitions that were derived from a transformation with wide dependencies, can be much slower.