Spark week one notes

2017-06-13

From Parallel to Distributed

Data-Parallel to Distributed Data-Parallel

In this section, we’re going to try and bridge the gap between data-parallelism in the shared memory and distributed data parallelism.

Shared memory data parallelism

Consider shared memory data parallelism. To achieve such parallelism, we have to:

  1. Split the data.
  2. Workers/threads independently operate on the data shards in parallel.
  3. Combine when done (if necessary).

we can use a jar of jelly beans to represent a task and illustrate the above operations with the following figure:

Scala parallel collections provide good abstraction over shared memory data-parallel execution. We can use them like we did in sequential programming model, but under the hood, these collections are executed parallelly:

val res = jar.map(jellyBean => doSomething(jellyBean))

Distributed data-parallelism

In fact, distributed data parallelism shares similar execution model with shared memory data parallelism: data are divided into smaller pieces, and then program operates on these pieces of data parallelly. Finally, results are combined if necessary. But there exists differences, that is:

  1. Data are split over several nodes.
  2. Nodes independently operate on the data shards in parallel.

The following figure illustrates the distributed data parallelism:

Like parallel collections, we can keep collections abstraction over distributed data-parallel execution. In other words, we can reuse the same API to achieve distributed parallelism:

val res = jar.map(jellyBean => doSomething(jellyBean))

Distribution

Compared with shared memory data parallelism, distributing parallelism introduces much more concerns, two of which are:

  • Partial failure: crash failures of a subset of the machines involved in a distributed computation.
  • Latency: certain operations have higher latency than other operations due to network communication.

We will talk about latency a bit more latter and see how Spark handle these two concerns.

Summary

In short, in shared memory case, data are partitioned in memory and operated upon in parallel. While in distributed case, data are partitioned between machines, network in between, operated upon in parallel. However, most properties related about shared memory data-parallel collections can be applied to their distribued counterparts.

Latency

Compared with memory, disk I/O and network communication has several orders-of-magnitude latency. Some of important latency numbers are shown below:

Latency Number

To have better intuitive understanding of these numbers, let’s humanize these durations by multiply all these durations by a billion. Then, we can map each latency number to a human activity in terms of days, months and years.

Intuitive Latency

Big data processing and latency

You may wonder how do these latency numbers relate to big data processing. To answer this First, let’s talk about Spark’s precessdor, Hadoop.

Hadoop is a widely-used large-scale batch data processing framework and it provde:

  • simple map and reduce API
  • fault tolterance

Fault tolerance makes it possible for Hadoop/MapReduce to scale to hunders even thousands of nodes. It’s implemented by shuffling data in network and writing intermediate data to disk. In this way, Hadoop is capable of recovering from potential failures. But fault-tolerance in Hadoop comes at a cost because reading/writing to disk is much lower than in-memory. Not to mention network communication. As a result, Hadoop has a unsatisfactroy speed when processing very large dataset.

Spark extends the MapReduce model but use different strategy for handling latency. The idea comes from functional programming: Keep all data immutable and in-memory. All operations on data are just functional transformations. Fault tolerance is achieved by replaying functional transformations over original dataset. Hence, latency is significantly reduced and fault-tolterance is retained.

Latency for both Hadoop and Spark are shown below.

Hadoop Latency

Hadoop Latency

Spark Latency

Spark Latency

As we can see, the major latency of Hadoop comes from disk and network, while the major latency of Spark comes from memory and network. In fact, Spark will aggressively minimize the latency of network as possible as it can, and shift latency to in-memory. As a result,Spark is usually 100x faster than Hadoop.

Apache Spark

Spark is a distributed data-parallel programming framework. The core of Spark is Resilient Distributed Datasets (RDDs).

Resilient Distributed Datasets (RDDs)

Basics

RDDs represent a immutable collection of items distributed across many compute nodes that can be manipulated in parallel. The ability to always recompute an RDD is actually why RDDs are called “resilient”. When a machine holding RDD data fails, Spark uses this ability to recompute the missing partitions, transparent to the user. From the surface, RDDs seem a lot like immutable sequential or parallel Scala collections, and a simplify API is shown below.

abstract class RDD[T] {
    def map[U](f: T => U): RDD[U] = ...
    def flatMap[U](f: T=> TraversableOnce[U]): RDD[U] = ...
    def filter(f: T => Boolean): RDD[T] = ...
    def reduce(f: (T, T) => T): T = ...
}

Most operations on RDDs, like Scala’s immutable List , and Scala’s parallel collections, are higher-order functions. That is, methods that work on RDDs, taking a function as an argument, and then return RDDs.

Actually, both Scala and Spark have similar API in terms of semantics:

map[B](f: A => B): List[B] // Scala List
map[B](f: A => B): RDD[B]  // Spark RDD

flatMap[B](f: A => TraversableOnce[B]): List[B] // Scala List
flatMap[B](f: A => TraversableOnce[B]): RDD[B]  // Spark RDD

filter(pred: A => Boolean): List[A]   // Scala List
filter(pred: A => Boolean): RDD[A]    // Spark RDD

reduce(op: (A, A) => A): A // Scala List
reduce(op: (A, A) => A): A // Spark RDD

fold(z: A)(op: (A, A) => A): A // Scala List
fold(z: A)(op: (A, A) => A): A // Spark RDD

aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B // Scala List
aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B // Spark RDD

In fact, Using RDDs in Spark feels a lof like normal Scala sequential/parallel collections, with the added knowledge that our data is distributed across several machines.

Creation

RDDs can be created in two ways:

  • Transforming an existing RDD
  • From a SparkContext (or SparkSession) object

By transforming an existing RDD, that is, we call higher-order functions defined on RDD and then get back another RDD.

By from a SparkContext object, that is, we call methods defined in SparkContext and get back a new RDD. The SparkContext object can be thought of as our handle to the Spark cluster. It represents the connection between the Spark cluster and our running application. It defines a handful of methods which can be used to create and populate a new RDD:

  • parallelize: convert a local Scala collection to an RDD (Require all the data are in memory. Less frequently used in production.)
  • textFile: read a text file from HDFS or a local file system and reutrn ad RDD of String.

Transformations and Actions

A RDD can offer two types of operations:

  • Transformations: always return new RDDs as results
  • Actions: compute a result based on an RDD, and either returned it to the driver program or saved it to an external stroage system(e.g., HDFS)

Transformations are lazy, their result RDD is not immediately computed. However, Actions are eager, their result is immediately computed. That’s a enormous difference between transformations and actions.

Considering the following example:

val largeList: List[String] = ...
// sc represents an instance of SparkContext
val wordsRdd = sc.parallelize(largeList)
val lengthsRdd = wordsRDD.map(_.length)

At this point on the cluster, no computations is running, because execution of map (a transformations) is deferred, and all we get is a pointer to new RDDs that doesn’t yet exists. To kick off the computation and wait for result, we need to add an action:

val totalChars = lengthRdd.reduce(_ + _)

There lists common transformations and actions

The reason that Laziness/Eagerness is important is that it is how we can limit network communication using the programming model.

Evaluation in Spark

The main reason that Spark is much faster than Hadoop is that Spark can do in-memory computations, while Hadoop needs to write and read intermediate results from and in disk.

The difference can be illustrated in the following diagram:

Iteration in Hadoop and Spark

One of example to do in-memory computation is Logistic Regression, where the classifier’s weights are iteratively updated based on training dataset:

The above formula can be implemented in Scala strightforwardly:

case class Point(x: Double, y: Double)
val points = sc.textFile(...).map(parsePoint)
var w = Vector.zeros(d)
for (i <- 1 to numIterations){
    val gradient = points.map{ p =>
    (1 / (1 + exp(-p.y *w.dot(p.x)))-1)  * p.y * p.y}.reduce(_ + _)
    w -= alpha * gradient
}

By default, RDDs are recomputed each time we run an action on them. Consequently, There exists a significant disadvantage in the above code: points is being re-evaluated upon every iteration. That is, for each iteration, points are re-read from text file and re-prased. The situation can be avoided by persisting.

If we would like to reuse an RDD in multiple actions, we can ask Spark to persist it using RDD.persist() or RDD.cache(). So, the above delimma can be solved by:

val points = sc.textFile(...).map(parsePoint).persist()

After points is read and parsed, Spark will store its contents in memory for faster access in future operations.

Caching and persistence

Spark allows many ways to configure how our data is persisted:

  • in memory as regular Java objects
  • on disk as regular Java objects
  • in memory as serialized Java objects (more compact)
  • on disk as serialized Java objects (more compact)
  • both in memory and on disk (spill over to disk to avoid re-computation)

cache()

cache() method is shorthand for using the default storage level, which is in memory only as regular Java objects.

persist()

Persistence can be customized with this method. Pass the storage level we’d like as a parameter to persiste. Storage levels are as follows:

Storage Level

Benefits of laziness

In fact, laziness is one way Spark used to tackle network latency. That is, Lazy evaluation of transformations allows Spark to stage computations. Hence, Spark can make important optimizations to the chain of operations before execution and reduce the size of data need to shuffling in network.

Example #1

val lastYearLogs: RDD[String] = ...
val firstLogsWithErrors = lastYearsLogs.filter(_.contains("ERROR")).take(10)

In the above example, the execution of filter is deferred until the take action is applied. Spark leverages this by analyzing and optimizing the chain of operations before exeucting it. As a result, Spark will not compute intermediate RDDs. Instead, as soon as 10 elements of the filtered RDD have been computed, firstLogsWithError is done. At this point Spark stops working, saving time and space computing elements of the unused result of filter.

Example #2

val lastYearsLogs: RDD[String] = ...
val numErrors = lastYearsLogs.map(_.lowercase).filter(_.contains("ERROR")).count()

With action count, Spark make optimizations and know that it can avoid doing multiple passes through the data. That is, Spark can traverse through the RDD once, computing the result of map and filter in a single pass, before returning the resulting count.

RDDs differ a lot from Scala collections

In Spark, all work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result. Although RDDs looks like Scala collections from the surface, they behave totally differently. Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as consisting of instructions on how to compute the data that we build up throught transformations.

Cluster Toplogy

Spark’s cluster toplogy has a significant impact on the behaviour of code. Consider a simple println function. Assume we have an RDD pupulated with Person objects:

case class Person(name: String, age: Int)

What does the following code snippet do?

val people: RDD[Person] = ...
people.foreach(println)

Before explaining the question, let’s consider another example. Assume we have an RDD populated with the same definition of Person objects. What does the following code snippet do?

val people: RDD[Person] = ...
val first10 = people.take(10)

Where will the Array[Person] representing first10 end up?

To answer above questions, we need to understand the toplogy of Spark:

Spark Toplogy

Spark is organised in a master-worker toplogy. The master node is the node which contains driver program. It’s the node we’re interacting with when we writing Spark programs. Workers node are actually executing the jobs. The master node can communicate with workers node via a cluste manager. Cluster manager is responsible for allocating resources across cluster, managing scheduling. Example of cluster manager includes YARN/Mesos.

A Spark application is a set of processes running on a cluster. The responsibility of driver program is coordinating all these processes. The driver is:

  • the process where the main() method of our program runs.
  • the process running the node that creates a SparkContext, creates RDDs, and stages up or sends off transformations and actions.

Executor are processes that run computations and store data for our application. They:

  • Run the tasks that represent the application.
  • Return computed results to the driver.
  • Provide in-memory storage for cached RDDs.

With master-worker toplogy, the execution of a Spark program are as follows:

  1. The driver program runs the Spark application, which creates a SparkContext upon start-up.
  2. The SparkContext connects to a cluster manager (e.g., Mesos/YARN) which allocates resources.
  3. Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for our application.
  4. Next, driver program sends our application code to the executors.
  5. Finally, SparkContext sends tasks for the executors to run.

On understanding the toplogy of Spark, we can answer questions for our previous example.

For example 1, when we execute:

val people: RDD[Person] = ...
people.foreach(println)

On master node, We see nothing on stdout of the driver. The reason is that foreach is an action, with return type Unit. It is eagerly executed on the executors, not the driver. Therefore, any calls to println are happening on the stdout of workers and thus not visible in the stdout of the driver node (The driver node will get Unit as return result).

In example 2, when we execute:

val first10 = people.take(10)

The first10 ends up in master node. In general, executing an action involves communication between worker nodes and the node running the driver program. Therefore, the combined result are sent back to master node.