h"p://training.databricks.com/sparkcamp.
zip
Intro to DataFrames and
Spark SQL
Professor Anthony D. Joseph, UC Berkeley
Strata NYC September 2015
DataFrames
• The preferred abstraction in Spark (introduced in 1.3)
• Strongly typed collection of distributed elements
– Built on Resilient Distributed Datasets
• Immutable once constructed
• Track lineage information to efficiently recompute lost data
• Enable operations on collection of elements in parallel
• You construct DataFrames
• by parallelizing existing collections (e.g., Pandas DataFrames)
• by transforming an existing DataFrames
• from files in HDFS or any other storage system (e.g., Parquet)
DataFrames
DataFrame split into 5 partitions
item-1 item-6 item-11 item-16 item-21
item-2 item-7 item-12 item-17 item-22
item-3 item-8 item-13 item-18 item-23
item-4 item-9 item-14 item-19 item-24 more partitions = more parallelism
item-5 item-10 item-15 item-20 item-25
Worker Worker Worker
Spark Spark Spark
executor executor executor
DataFrames API
• is intended to enable wider audiences beyond “Big
Data” engineers to leverage the power of distributed
processing
• is inspired by data frames in R and Python (Pandas)
• designed from the ground-up to support modern big
data and data science applications
• an extension to the existing RDD API
See
https://spark.apache.org/docs/latest/sql-programming-guide.html
databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-
scale-data-science.html
DataFrames Features
• Ability to scale from kilobytes of data on a single
laptop to petabytes on a large cluster
• Support for a wide array of data formats and
storage systems
• State-of-the-art optimization and code generation
through the Spark SQL Catalyst optimizer
• Seamless integration with all big data tooling and
infrastructure via Spark
• APIs for Python, Java, Scala, and R
DataFrames versus RDDs
• For new users familiar with data frames in other
programming languages, this API should make
them feel at home
• For existing Spark users, the API will make Spark
easier to program than using RDDs
• For both sets of users, DataFrames will improve
performance through intelligent optimizations and
code-generation
DataFrames and Spark SQL
DataFrames are fundamentally tied to Spark SQL
• The DataFrames API provides a programmatic
interface—really, a domain-specific language (DSL)
—for interacting with your data.
• Spark SQL provides a SQL-like interface.
• Anything you can do in Spark SQL, you can do in
DataFrames
• … and vice versa
DataFrames and Spark SQL
Like Spark SQL, the DataFrames API assumes that
the data has a table-like structure
Formally, a DataFrame is a size-mutable, potentially
heterogeneous tabular data structure with labeled
axes (i.e., rows and columns)
That’s a mouthful – just think of it as a table in a
distributed database: a distributed collection of
data organized into named, typed columns
DataFrames and RDDs
• DataFrames are built on top of the Spark RDD API
• You can use normal RDD operations on DataFrames
• However, use the DataFrame API, wherever possible
• Using RDD operations will usually yield an RDD, not a DataFrame
• The DataFrame API is likely to be more efficient, because it can
optimize the underlying operations with Catalyst
• DataFrames can be significantly faster than RDDs
• Performance is language-independent
Spark Python DF
Spark Scala DF
RDD Python
RDD Scala
0 2 4 6 8 10
Runtime performance of aggregating 10 million integer pairs
(seconds)
Plan Optimization & Execution
Logical Physical Code
Analysis
Optimization Planning Generation
SQL AST
Cost Model
Unresolved Optimized Physical Plans Selected
Logical Plan Physical
PhysicalPlans
Plans RDDs
Logical Plan Logical Plan Physical Plan
DataFrame
Catalog
DataFrames and SQL share the same optimization/execution pipeline
Transformations, Actions, Laziness
• DataFrames are lazy
• Transformations contribute to the query plan, but
they do not execute anything
• Actions cause the execution of the query
Transformation examples Action examples
• filter • count
• select • collect
• drop • show
• intersect • head
• join • take
Creating a DataFrame
• You create a DataFrame with a SQLContext object
• In the Spark Scala shell (spark-shell) or pyspark, you
have a SQLContext available automatically, as
sqlContext
• The DataFrame data source API is consistent, across
data formats
• “Opening” a data source works pretty much the same way, no
matter what
Creating a DataFrame (Python)
#
The
import
isn't
necessary
in
the
SparkShell
or
Databricks
from
pyspark
import
SparkContext,
SparkConf
#
The
following
three
lines
are
not
necessary
#
in
the
pyspark
shell
conf
=
SparkConf().setAppName(appName).setMaster(master)
sc
=
SparkContext(conf=conf)
sqlContext
=
SQLContext(sc)
df
=
sqlContext.read.parquet("/path/to/data.parquet")
df2
=
sqlContext.read.json("/path/to/data.json")
SQLContext and Hive
Our previous example created a default Spark SQLContext
object
If you are using a version of Spark that has Hive support, you
can also create a HiveContext, which provides additional
features, including:
• the ability to write queries using more complete HiveQL parser
• access to Hive user-defined functions
• the ability to read data from Hive tables
HiveContext
• To use a HiveContext, you do not need to have an
existing Hive installation, and all of the data sources
available to a SQLContext are still available
• You do, however, need to have a version of Spark that
was built with Hive support – that is not the default
• Hive is packaged separately to avoid including all of Hive’s
dependencies in the default Spark build
• If these dependencies are not a problem for your application
then using HiveContext is currently recommended
• It is not difficult to build Spark with Hive support
DataFrames Have Schemas
In the previous example, we created DataFrames from
Parquet and JSON data
• A Parquet table has a schema (column names and
types) that Spark can use
• Parquet also allows Spark to be efficient about how it
pares down data
• Spark can infer a Schema from a JSON file
Data Sources supported by
DataFrames
built-in external
JDBC
{ JSON }
and more …
A Brief Look at spark-csv
Suppose our data file has a header:
first_name,last_name,gender,age
Erin,Shannon,F,42
Norman,Lockwood,M,81
Miguel,Ruiz,M,64
Rosalita,Ramirez,F,14
Ally,Garcia,F,39
Claire,McBride,F,23
Abigail,Cottrell,F,75
José,Rivera,M,59
Ravi,Dasgupta,M,25
…
From Spark Packages: http://spark-packages.org/package/databricks/spark-csv
A Brief Look at spark-csv
Using spark-csv, we can simply create a DataFrame directly
from our CSV file
#
Python
df
=
sqlContext.read.format("com.databricks.spark.csv").\
load("people.csv",
header="true")
A Brief Look at spark-csv
You can also declare the schema programmatically, which
allows you to specify the column types
from
pyspark.sql.types
import
*
schema
=
StructType([StructField("firstName",
StringType(),
False),
StructField("gender",
StringType(),
False),
StructField("age",
IntegerType(),
False)])
df
=
sqlContext.read.format("com.databricks.spark.csv").\
schema(schema).\
load("people.csv")
What Can I Do with a DataFrame?
Once you have a DataFrame, there are a number of
operations you can perform
Let us look at a few of them
But, first, let us talk about columns
Columns
When we say “column” here, what do we mean?
A DataFrame column is an abstraction
It provides a common column-oriented view of the
underlying data, regardless of how the data is really
organized
Columns
Input Source Data Frame Data Let us see how
Format Variable Name
DataFrame
JSON dataFrame1
[
{"first":
"Amy",
"last":
"Bello",
"age":
29
},
columns map
onto some
{"first":
"Ravi",
"last":
"Agarwal",
"age":
33
},
…
]
common data
sources
CSV dataFrame2 first,last,age
Fred,Hoover,91
Joaquin,Hernandez,24
…
SQL Table dataFrame3 first last age
Joe Smith 42
Jill Jones 33
Columns dataFrame1
column: "first"
Input Source Data Frame Data
Format Variable Name
JSON dataFrame1
[
{"first":
"Amy",
"last":
"Bello",
"age":
29
},
{"first":
"Ravi",
"last":
"Agarwal",
dataFrame2
"age":
33
},
…
column: "first"
]
CSV dataFrame2 first,last,age
Fred,Hoover,91
Joaquin,Hernandez,24
…
dataFrame3
SQL Table dataFrame3 column: "first"
first last age
Joe Smith 42
Jill Jones 33
Columns
When we say “column” here, what do we mean?
Several things:
• A place (a cell) for a data value to reside, within a row of
data. This cell can have several states:
• empty (null)
• missing (not there at all)
• contains a (typed) value (non-null)
• A collection of those cells, from multiple rows
• A syntactic construct we can use to specify or target a cell
(or collections of cells) in a DataFrame query
How do you refer to a column in the DataFrame API?
Columns
Suppose we have a DataFrame, df, that reads a data
source that has "first", "last", and "age" columns
Python Java Scala R
df['first']
df.col("first")
df("first")
df$first
df.first†
$("first")‡
†In Python, it’s possible to access a DataFrame’s columns either by attribute
(df.age) or by indexing (df['age']). While the former is convenient for
interactive data exploration, you should use the index form. It's future proof and
won’t break with column names that are also attributes on the DataFrame class.
‡The $ syntax can be ambiguous, if there are multiple DataFrames in the lineage
cache()
• Spark can cache a DataFrame, using an in-memory
columnar format, by calling df.cache()
• Internally, it calls df.persist(MEMORY_ONLY)
• Spark will scan only those columns used by the
DataFrame and will automatically tune
compression to minimize memory usage and GC
pressure
• You can call the unpersist() method to remove
the cached data from memory
show()
You can look at the first n elements in a DataFrame with
the show() method (n defaults to 20)
This method is an action that:
• reads (or re-reads) the input source
• executes the RDD DAG across the cluster
• pulls the n elements back to the driver JVM
• displays those elements in a tabular form
show()
In[1]:
df.show()
+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
|firstName|lastName|gender|age|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
|
Erin|
Shannon|
F|
42|
|
Claire|
McBride|
F|
23|
|
Norman|Lockwood|
M|
81|
|
Miguel|
Ruiz|
M|
64|
|
Rosalita|
Ramirez|
F|
14|
|
Ally|
Garcia|
F|
39|
|
Abigail|Cottrell|
F|
75|
|
José|
Rivera|
M|
59|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
printSchema()
You can have Spark tell you what it thinks the data
schema is, by calling the printSchema() method
(This is mostly useful in the shell)
In[1]:
df.printSchema()
root
|-‐-‐
firstName:
string
(nullable
=
true)
|-‐-‐
lastName:
string
(nullable
=
true)
|-‐-‐
gender:
string
(nullable
=
true)
|-‐-‐
age:
integer
(nullable
=
false)
select()
select() is like a SQL SELECT, allowing you to
limit the results to specific columns
In[1]:
df.select("firstName",
"age").show(5)
+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
|firstName|age|
|
Erin|
42|
|
Claire|
23|
|
Norman|
81|
|
Miguel|
64|
|
Rosalita|
14|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
select()
The DSL also allows you create on-the-fly derived
columns
In[1]:
df.select(df['firstName'],
df['age'],
df['age']
>
49,
df['age']
+
10).show(5)
+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+
|firstName|age|(age
>
49)|(age
+
10)|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+
|
Erin|
42|
false|
52|
|
Claire|
23|
false|
33|
|
Norman|
81|
true|
91|
|
Miguel|
64|
true|
74|
|
Rosalita|
14|
false|
24|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+
filter()
The filter() method is similar to the RDD filter()
method, except that it supports the DataFrame DSL
In[1]:
df.filter(df['age']
>
49).\
select(df['first_name'],
df['age']).\
show()
+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
|firstName|age|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
|
Norman|
81|
|
Miguel|
64|
|
Abigail|
75|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
orderBy()
The orderBy() method allows you to sort the results
In
[1]:
df.filter(df['age']
>
49).\
select(df['first_name'],
df['age']).\
orderBy(df['age’],
df['first_name']).show()
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
|first_name|age|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
|
Miguel
|
64|
|
Abigail|
75|
|
Norman
|
81|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
orderBy()
It’s easy to reverse the sort order
In
[1]:
df.filter(df['age']
>
49).\
select(df['first_name'],
df['age']).\
orderBy(df['age'].desc(),
df['first_name']).show()
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
|first_name|age|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
|
Norman|
81|
|
Abigail|
75|
|
Miguel|
64|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+
groupBy()
Often used with count(), groupBy() groups data
items by a specific column value
In
[1]:
df.groupBy("age").count().show()
+-‐-‐-‐+-‐-‐-‐-‐-‐+
|age|count|
+-‐-‐-‐+-‐-‐-‐-‐-‐+
|
39|
1|
|
42|
2|
|
64|
1|
|
75|
1|
|
81|
1|
|
14|
1|
|
23|
2|
+-‐-‐-‐+-‐-‐-‐-‐-‐+
as() or alias()
as() or alias() allows you to rename a column
It is especially useful with generated columns
In
[1]:
df.select(df['first_name'],\
df['age'],\
(df['age']
<
30).alias('young')).show(5)
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+-‐-‐-‐-‐-‐+
|first_name|age|young|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+-‐-‐-‐-‐-‐+
|
Erin|
42|false|
|
Claire|
23|
true|
|
Norman|
81|false|
|
Miguel|
64|false|
|
Rosalita|
14|
true|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+-‐-‐-‐-‐-‐+
Note: In Python, you must use alias, because as is a keyword
Other Useful Transformations
Method
Descrip2on
limit(n) Limit the results to n rows. limit() is
not an action, like show() or the RDD
take() method. It returns another
DataFrame
distinct() Returns a new DataFrame containing
only the unique rows from the current
DataFrame
drop(column) Returns a new DataFrame with a column
dropped. column is a name or a Column
object
intersect(dataframe) Intersect one DataFrame with another
join(dataframe) Join one DataFrame with another, like a
SQL join. We’ll discuss this one more in a
minute.
There are many more transformations
Joins
Let’s assume we have a second file, a JSON file that
contains records like this:
[
{
"firstName":
"Erin",
"lastName":
"Shannon",
"medium":
"oil
on
canvas"
},
{
"firstName":
"Norman",
"lastName":
"Lockwood",
"medium":
"metal
(sculpture)"
},
…
]
41
Joins
We can load that into a second DataFrame and join
it with our first one.
In
[1]:
df2
=
sqlContext.read.json("artists.json")
#
Schema
inferred
as
DataFrame[firstName:
string,
lastName:
string,
medium:
string]
In
[2]:
df.join(
df2,
df.first_name
==
df2.firstName
and
df.last_name
==
df2.lastName
).show()
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐+-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+
|first_name|last_name|gender|age|firstName|lastName|
medium|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐+-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+
|
Norman|
Lockwood|
M|
81|
Norman|Lockwood|metal
(sculpture)|
|
Erin|
Shannon|
F|
42|
Erin|
Shannon|
oil
on
canvas|
|
Rosalita|
Ramirez|
F|
14|
Rosalita|
Ramirez|
charcoal|
|
Miguel|
Ruiz|
M|
64|
Miguel|
Ruiz|
oil
on
canvas|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐+-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+
42
Joins
Let’s make that a little more readable by only
selecting some of the columns.
In
[3]:
df3
=
df.join(
df2,
df.first_name
==
df2.firstName
and
df.last_name
==
df2.lastName
)
In
[4]:
df3.select("first_name",
"last_name",
"age",
"medium").show()
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+
|first_name|last_name|age|
medium|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+
|
Norman|
Lockwood|
81|metal
(sculpture)|
|
Erin|
Shannon|
42|
oil
on
canvas|
|
Rosalita|
Ramirez|
14|
charcoal|
|
Miguel|
Ruiz|
64|
oil
on
canvas|
+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐+-‐-‐-‐+-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐+
43
What is Spark SQL?
Spark SQL allows you to manipulate distributed
data with SQL queries – currently, two SQL dialects
are supported
• If you are using a Spark SQLContext, the only
supported dialect is "sql", a rich subset of SQL 92
• If you're using a HiveContext, the default dialect
is "hiveql", corresponding to Hive's SQL dialect
• "sql" is also available, but "hiveql" is a richer dialect
57
What is Spark SQL?
Spark SQL is intimately tied to the DataFrames API
• You issue SQL queries through a SQLContext or
HiveContext, using the sql() method
• The sql() method returns a DataFrame.
• You can mix DataFrame methods and SQL queries
in the same code
• We will see examples in the Labs
58
End of DataFrames and
Spark SQL Module