Spark week four notes

2017-06-13

Structured data: SQL, Dataframes, and DataSets

The topic of this post is Spark SQL. Before diving into Spark SQL, let’s first talk about structure and optimization to understand the motivation of Spark SQL.

Structured vs Unstructured Data

All data is not equal, structurally. It falls on a spectrum from unstructured to structured.

Data Specturm

Given an arbitrary RDD, Spark only knows that the RDD is parameterized with arbitrary types. But it doesn’t know anything about these type’s structure. As a result, we have to do all the optimization work ourselves.

For example, assuming we have a dataset of Account objects:

case class Account(name: String, balance: Double, risk: Boolean)

With a RDD of type Account, all Spark knows about Account is that it has these blobs of objects called Account. Spark cannot look inside of these Account objects to look at their structure and analyze what parts of their structure might be used in the subsequent computation.

For example, it’s conceivable that this account object might be bigger than these three fields, it could have hundreds of fields. And perhaps, a certain computation only needs one of those fields. But Spark, as is, will serialize each one of these really big account objects and send them all over the network, even thought it’s not necessary to use mot of the data that it’s sending around. Spark cannot do optimizations like these, because it cannot see inside of these account objects. And it cannot optimize based on their structure.

On the other hand, in a structured dataset, like some kind of database table, for example, computations are done on columns of named and typed values. So absolutely everything about the structure of a dataset is known in a structured setting like in a database table or in Hive. And in fact, databases tend to be heavily optimized.

So given a structured dataset, optimizations can be done, because we know the structure of the dataset that we’re operating on, and we know which parts of it we are going to use, and we can optimize based on that information.

There is also structured and unstructured computation. So far, in Spark, what we’ve been doing is functional transformatons on data. We pass some kind of user-defined function literal to a higher order function like map, flatMap, or filter. These are completely unstructured and opaque. Spark cannot look at the operations that we’re trying to do and then make some optimizations based on those optimizations. However, in databases, typically we do declarative transformations on the sturctured data. And all of these operations tend to be very specialized, very structured, very fiex, very regid, predefined operations.

Wouldnn’t it be nice if Spark could do some kind of optimization for ourselves? Well, that’s the whole point of Spark SQL, which makes these optimizations possible.

Spark SQL

Spark makes it possible to seamlessly intermix SQL queries with Scala and optimizes Spark SQL code using techniques from the databases world. using Spark SQL, we would give up some of the freedom, flexibility, and generality of the functional collections API in order to give Spark more opportunities to optimize though.

Goals of Spark SQL

Spark SQL has three main goals:

  1. Support relational processing both within Spark programs (on RDDs) and on external data sources with a friendly API.
  2. High performance, achieved by using techniques from research in databases.
  3. Easily support new data sources such as semi-structured data and external databases.

Component

Actually, Spark SQL is a component of the Spark stack.

  • It is a Spark module for structured data processing.
  • It is implemented as a library on top of Spark

Visually, Spark SQL relates to the rest of Spark like this: Spark Architecture

Spark SQL offer three main APIs:

  • SQL literal syntax
  • DataFrames
  • Datasets

and two specialized backend components:

  • Catalyst, query optimizer that optimize our code
  • Tungsten, off-heap serializer

DataFrame

DataFrame is the core abstraction of Spark SQL, which conceptually are RDDs full of records with a known schema. DataFrame are also untyped. That is, DataFrame hasn’t type parameter and the Scala compiler doesn’t check the types in its schema. Transformations on DataFrames are also known as untyped transformations.

SparkSession

The SparkSession is basically the SparkContext for Spark SQL. Creating a SparkSession is pretty straightforward. We just import the SparkSession object and the use the builder pattern:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
            .builder()
            .appName("My App")
            //.config("spark.some.config.option","some-value")
            .getOrCreate()

Creating DataFrames

There are two ways to create a DataFrame:

  1. From an existing RDD. Either with schema inference or with an explicit schema.
    Given pair RDD, RDD[(T1, T2, ... TN)], a DataFrame can be created with its schema automatically inferred by simply using the toDF method:
      val tupleRDD = ... // Assume RDD[(Int, String, String, String)]
      // creating a DataFrame with four columns
      val tupleDF = tupleRDD.toDF("id","name","city","country")
    

    If we use toDF without arguments, Spark will assign numbers as attributes(column names) to our DataFrame.
    If we already have an RDD containing some kind of case class instance, then Spark can infer the attributes from the case class’s fields.

      case class Person(id: Int, name: String, city: String)
      val peopleRDD = ... // Assume RDD[Person]
      val peopleDF = peopleRDD.toDF
    

    In this case, the column names automatically get inferred to be the names of the fields of these case class.
    The other way of creating DataFrame from an existing RDD is to explicitly specify a schema. This is useful for the case where we don’t have some kind of predefine case class. It takes three steps:

    1. First, create an RDD of Rows from the original RDD.
    2. Then create a schema separately represented by a StructType matching the structure of Rows in the RDD created in Step 1.
    3. Finally apply the schema to the RDD of Rows via createDataFrame method privided by SparkSession.

Given:

  case class Person(name: String, age: Int)
  val peopleRdd = sc.textFile(...) // Assume RDD[Person]
  // The following code shows how to convert the original RDD to DataFrame:
  // Encode the schema into a string
  val schemaString = "name age"

  // Generate the schema based on the string of schema
  val fields = schemeString.split(" ")
        .map(fieldName => StructField(fieldName, StringType, nullable = true))
  val schema = StructType(fields)

  // Convert records of the RDD to Rows
  val rowRDD = peopleRDD
              .map(_.split(","))
              .map(attributes => Row(attributes(0), attributes(1).trim))

  // Apply the schema to the RDD
  // Recall spark is an instance of SparkSession
  val peopleDF = spark.createDataFrame(rowRDD, schema)                    
  1. Reading in a specific data source from file. Common structured or semi-structured formats such as JSON, CSV, Parquet and JDBC.
    Using the SparkSession object, we can read in semi-structured/structured data by using the read method. For example, to read in data and infer a schema from a JSON file:
      val df = spark.read.json(".../people.json")
    

    SparkSession will parse semi-structured and structured data and build up a DataFrame with that schema of that semi-structured and structured datas.
    Spark SQL supports creating DataFrame directly from:

    • JSON
    • CSV
    • Parquet
    • JDBC

SQL literals

With a DataFrame, we can freely write SQL syntax to operate on dataset.

Given a DataFrame called peopleDF, we just have to register our DataFrame as a temporary SQL view fist:

// Register the DataFrame as a SQL temporary view
peopleDF.createOrReplaceTempView("people")

// This essentially gives a name to our DataFrame in
// SQL, so we can refer to it in an SQL FROM statement

// SQL literals can be passed to Spark SQL's sql method
val adultsDF = spark.sql("SELECT * FROM people WHERE age > 17")

Detail supported SQL syntax are listed in Supported syntax of Spark SQL

Data types

To enable optimization opportunities, Spark SQL’s DataFrames operate on a restricted set of data types:

In addition to these basic data types, Spark SQL has a handful of complex data types that can contain basic types or other complext data types:

Accessing Spark SQL types

In order to access any of these data types, either basic or complex, we must first import Spark SQL types:

import org.apache.spark.sql.types._

Common Transformations

Transformations on DataFrame can be thought of a relational API over RDDs. Example methods include:

/** 
  * selects a set of named columns and returns a new DataFrame 
  * with these columns as a result
*/
def select(col: String, cols: String*): DataFrame

/** 
  * performs aggregations on a series of columns and returns a 
  * new DataFrame with the calculated output
*/
def agg(expr: Column, exprs: Column*): DataFrame

/** 
  * groups the DataFrame using the specified columns.
*/
def groupBy(col1: String, cols: String*): DataFrame 

/** 
  * inner join with another DataFrame
*/
def join(right: DataFrame): DataFrame

The main difference between the RDD API and the DataFrames API was that DataFrame APIs accept Spark SQL expressions, instead of arbitrary user-defined function literals like we were used to on RDDs. This allows the optimizer to understand what the computation represents, and for example with filter, it can often be used to skip reading unnecessary records.

Specifying Columns

Most methods on DataFrames tend to some well-understand, pre-defined operation on a column of the date set. We can select and work with columns in three ways:

  1. Using $- notation
    /** 
      * $-notation requires import spark.implicits._
    */
    df.filter($"age" > 18)
    
  2. Referring to the Dataframe
    df.filter(df("age" > 18))
    
  3. Using SQL query string
    df.filter("age > 18")
    

Grouping and Aggregating on DataFrames

One of the most common tasks on tables is to:

  1. group data by a certain attribute, and then
  2. do some kind of aggregation on it like a count

For grouping and aggregating, Spark SQL provides:

  • a groupBy function which returns a RelationalGroupedDataset
  • which has several standard aggregation functions defined on it like count, sum, max, min, and avg.

The basic patterns is as follows:

  • call groupBy on specific attribute/column(s) of a DataFrame
  • followed by a call to a method on RelationalGroupedDataset like count, max or agg (for agg, also specify which attribute/column(s) subsequent spark.sql functions like count, sum, max, etc, should be called upon).
// Example 1
df.groupBy($"attribute1")
  .agg(sum($"attribute2"))

// Example 2
df.groupBy($"attribute1")
  .count($"attribute2")

Common Actions

DataFrames also have their own set of actions. Some examples are:

/**
  * Returns an array that contains all of Rows in the DataFrame
*/
def collect(): Array[Row]

/**
  * Returns the number of rows in the DataFrame
*/
def count(): Long

/**
  * Returns the first row in the DataFrame
*/
def first(): Row
def head(): Row

/**
  * Displays the top 20 rows of DataFrame in a tabular form
*/
def show(): unit

/**
  * Returns the first n rows in the DataFrame
*/
def take(n: Int): Array[Row]

Joins

Joins on DataFrame are similiar to those on Pair RDDs. But DataFrames aren’t key/value pairs, we have to specify which columns we should join on.

Several types of joins are available:

  • inner
  • outer
  • left_outer
  • right_outer
  • leftsemi

Given two DataFrames, df1 and df2 each with a column/attribute called id, we can perform an inner join as follows:

df1.join(df2, $"df1.id" === $"df2.id")

It’s possible to change the join type by passing ad additional string parameter to join specifying which type of join to perform. E.g.,

df1.join(df2, $"df1.id" === $"df2.id", "right_outer")

Optimizations

Optimizations are done with the help of Catalyst and Tungsten.

Recall Catalyst is Spark SQL’s query optimizer, which:

  • has full knowledge and understanding of all data types
  • knows the exact schema of data
  • has detailed knowledge of the computations

Catalyst makes optimizations by:

  • Reordering operations
    Analyze and rearrange DAG of computation/the logical operations that user would like to do, before they’re executed.
  • Reduce the amount of data we must read
    Skip reading in, serializing, and sending around parts of the dataset that aren’t needed for our computation.
  • Pruning unneeded partitioning
    Analyze DataFrame and filter operations to figure out and skip partitions that are unneeded in computation.

Tungsten is Spark SQL’s off-heap data encoder, and it can provide:

  • highly-specialized data encoders
    Tungsten can take schema information and tightly pack serialized data into memory. This means more data can fit in memory, and faster serialization/deserialization (CPU bound task). The idea is that we keep data serialized in a special format in memory, so that we can keep more data in memory and access it more quickly.
  • column-based
    Storing tabular data in a column format comes from the observation that most operations that are actually done on tables tend to be focused on specific columns/attributes of the data set. Thus, when storing data, group data by column instead of row for faster lookups of data associated with specific attributes/columns.
  • off-heap (free from garbage collection overhead!)
    Tungsten tightly pack all of data in column format off-heap in memory that it manages itself. So that means we can avoid garbage collection overhead and garbage collection pauses and things like that.

Limitations of DataFrames

  • Untyped
    With untyped API, our code may compiles but fails in run time.
  • Limited Data Types
    If our data cannot be expressed by case classes/Products and standard Spark SQL data types, it may be difficult to ensure that a Tungsten encoder exists for our data type.
  • Require Semi-Structured/Structured Data

Datasets

Datasets can be thought of as typed distributed collections of data. Datasets require structured/semi-structured data. Schemas and Encoders are core part of Datasets.

In fact, DataFrames are actually Datasets.

type DataFrame = Dataset[Row]

Datasets can be thought of as a compromise between RDDs and DataFrames. Its API unifies the DataFrame and RDD APIs. We can get more type information on Datasets that on DataFrames, and get more optimizations on Datasets than on RDDs.

We can mix and match the DataFrame API and the RDD API. For example, assume we have a variable call listingsDS, which is of type Dataset[Listing], we can perform the following operations:

listingsDS.groupByKey(l => l.zip)  // looks like groupByKey on RDDs
          .agg(avg($"price").as[Double]) // looks like DataFrame operator

Datasets can be used when we want a mix of functional and relational transformations while benefiting from some of the optimizations on DataFrames. And we’ve almost got a type safe API as well.

Creation

Datasets can be created in the following ways:

  1. From a DataFrame by calling toDS method
     myDF.toDS  // requires import spark.implicits._
    
  2. From an RDD by calling toDS method
     myRDD.toDS  // requires import spark.implicits._
    
  3. Read from structured/semistructured data file
    Use read method on the SparkSession object and then coverted to a Dataset.

  4. From common Scala types
     List("yay", "ohnoes", "hooray!").toDS // requires import spark.implicits._
    

Transformations on Datasets

The Dataset API includes both untyped and typed transformations:

  • untyped transformations are transformations we see on DataFrames
  • typed transformations are typed variants of many DataFrame transformations plus additional transformations such as RDD-like higher-order functions map, flatMap, etc.

Because both DataFrames and Datasets are Dataset, these APIs are seamlessly integrated. This means that we can call a map on a DataFrame and get back a Dataset. But remember, we may have to explicitly provide type information when going from a DataFrame to a Dataset via typed transformations.

val keyValuesDF = List((3, "Me"), (1, "Thi"), (2, "Se"), (3, "ssa"), (3, "-)"), (2, "cre"), (2, "t")).toDF
val res = keyValueDF.map(row => row(0).asInstanceOf[Int] + 1)

Aggregators

Aggregators is a class that helps us generically aggregate data.

class Aggregator[-IN, BUF, OUT]
  • IN is the input to the aggregator. When using an aggregator after groupByKey, this is the type that represents the value in the key/value pair.
  • BUF is the intermediate type during aggregation.
  • OUT is the type of the output of the aggregation.

This is how we implement our own Aggregator:

val myAgg = new Aggregator[IN, BUF, OUT] {
  /**
    * The initial value
  */
  def zero: BUF = ...  

  /**
    * Add an element to the running total
  */
  def reduce(b: BUF, a: IN): BUF = ...

  /**
    * Merge intermediate values
  */
  def merge(b1: BUF, b2: BUF): BUF = ...

  /**
    * Return the final result
  */
  def finish(b: BUF): OUT = ...
}.toColumn

Encoders

Encoders are what convert our data between JVM objects and Spark SQL’s specialized internal(tabular) representation. They’re required by all Datasets and are hightly specialized, optimized code generators that generate custom bytecode for serialization and deserialization of our data.

The serialized data is stored using Spark internal Tungsten binary format, allowing for operations on serialized data and improved memory utilization.

What sets them apart from regular Java or Kryo serialization
  • Encoders are limited to and optimal for primitives and case classes and Spark SQL data types.
  • Encoders contain schema information, which makes these highly optimized code generators possible, and enables optimization based on the shape of the data. Since Spark understands the structure of data in Datasets, it can create a more optimal layout in memory when caching Datasets.
  • Encoders uses significantly less memory than Kryo/Java serialization
  • Endoers are >10x faster thatn Kryo serialization
Introducing encoders

There are two ways to introduce encoders:

  • Automatically via implicits from a SparkSession.
    import spark.implicits._
    
  • Explicitly via org.apache.spark.sql.Encoders, which contains a large selection of methods for creating Encoders from Scala primitives types and Products.

Datasets vs DataFrames vs RDDs

Use Datasets when:

  • we have structured/semi-structured data
  • we have typesafety
  • we need to work with functional APIs
  • we need good performance, but it doesn’t have to the best

Use DataFrames when:

  • we have structured/semi-structured data
  • we want the best possible performance, automatically optimized for you

Use RDDs when:

  • we have structured data
  • we need to fine-tune and manage low-level details of RDD computations
  • we have complex data types that cannot be serialized with Encoders

Limitations of Datasets

One of the limitations of Dataset is that Dataset doesnot get all the optimizations that DataFrame get.

Catalyst cannot optimize all operations

  • When using Datasets with higher-order functions like map, we miss out on many Catalyst optimizations.
  • When using Datasets with relational operations like select, we get all of Catalyst’s optimizations.
  • Though not all operations on Datasets benefit from Catalyst’s optimizations, Tungsten is still always running under the hood of Datasets, storing and organizing data in a highly optimized way, which can result in large speedups over RDDs.

Limited data types

Tungsten encoders cannot encode complicated user defined data types, and it only works for case classes, Products and standard Spark SQL data types.

Requires Semi-Structured/Structured Data

DataFrame/DataSet requires semi-structured/structured data. So it’d better to use RDDs if we cannot reformulate unstructured data to some kind of schema.