Mastering Spark SQL PDF
Mastering Spark SQL PDF
of Contents
Introduction 1.1
Spark SQL — Structured Data Processing with Relational Queries on Massive Scale
Datasets vs DataFrames vs RDDs 1.3 1.2
Dataset API vs SQL 1.4
Notable Features
Vectorized Parquet Decoding (Reader) 3.1
Dynamic Partition Inserts 3.2
Bucketing 3.3
Whole-Stage Java Code Generation (Whole-Stage CodeGen) 3.4
CodegenContext 3.4.1
CodeGenerator 3.4.2
GenerateColumnAccessor 3.4.2.1
GenerateOrdering 3.4.2.2
GeneratePredicate 3.4.2.3
GenerateSafeProjection 3.4.2.4
BytesToBytesMap Append-Only Hash Map 3.4.3
Vectorized Query Execution (Batch Decoding) 3.5
ColumnarBatch — ColumnVectors as Row-Wise Table 3.5.1
1
Data Source API V2 3.6
Subqueries 3.7
Hint Framework 3.8
Adaptive Query Execution 3.9
ExchangeCoordinator 3.9.1
Subexpression Elimination For Code-Generated Expression Evaluation (Common
Expression Reuse) 3.10
EquivalentExpressions 3.10.1
Cost-Based Optimization (CBO) 3.11
CatalogStatistics — Table Statistics in Metastore (External Catalog) 3.11.1
ColumnStat — Column Statistics 3.11.2
EstimationUtils 3.11.3
CommandUtils — Utilities for Table Statistics 3.11.4
Catalyst DSL — Implicit Conversions for Catalyst Data Structures 3.12
2
DataFrameStatFunctions — Working With Statistic Functions 4.5.6
Column 4.6
Column API — Column Operators 4.6.1
TypedColumn 4.6.2
Basic Aggregation — Typed and Untyped Grouping Operators 4.7
RelationalGroupedDataset — Untyped Row-based Grouping 4.7.1
KeyValueGroupedDataset — Typed Grouping 4.7.2
Dataset Join Operators 4.8
Broadcast Joins (aka Map-Side Joins) 4.8.1
Window Aggregation 4.9
WindowSpec — Window Specification 4.9.1
Window Utility Object — Defining Window Specification 4.9.2
Standard Functions — functions Object 4.10
Aggregate Functions 4.10.1
Collection Functions 4.10.2
Date and Time Functions 4.10.3
Regular Functions (Non-Aggregate Functions) 4.10.4
Window Aggregation Functions 4.10.5
User-Defined Functions (UDFs) 4.11
UDFs are Blackbox — Don’t Use Them Unless You’ve Got No Choice 4.11.1
UserDefinedFunction 4.11.2
Schema — Structure of Data 4.12
StructType 4.12.1
StructField — Single Field in StructType 4.12.2
Data Types 4.12.3
Multi-Dimensional Aggregation 4.13
Dataset Caching and Persistence 4.14
User-Friendly Names Of Cached Queries in web UI’s Storage Tab 4.14.1
Dataset Checkpointing 4.15
UserDefinedAggregateFunction — Contract for User-Defined Untyped Aggregate
Functions (UDAFs) 4.16
Aggregator — Contract for User-Defined Typed Aggregate Functions (UDAFs) 4.17
Configuration Properties 4.18
3
SparkSession Registries
Catalog — Metastore Management Interface 5.1
CatalogImpl 5.1.1
ExecutionListenerManager — Management Interface of QueryExecutionListeners 5.2
ExperimentalMethods 5.3
ExternalCatalog Contract — External Catalog (Metastore) of Permanent Relational
Entities 5.4
InMemoryCatalog 5.4.1
HiveExternalCatalog — Hive-Aware Metastore of Permanent Relational Entities
FunctionRegistry — Contract for Function Registries (Catalogs) 5.5 5.4.2
GlobalTempViewManager — Management Interface of Global Temporary Views 5.6
SessionCatalog — Session-Scoped Catalog of Relational Entities 5.7
CatalogTable — Table Specification (Native Table Metadata) 5.7.1
CatalogStorageFormat — Storage Specification of Table or Partition 5.7.1.1
CatalogTablePartition — Partition Specification of Table 5.7.1.2
BucketSpec — Bucketing Specification of Table 5.7.1.3
HiveSessionCatalog — Hive-Specific Catalog of Relational Entities 5.7.2
HiveMetastoreCatalog — Legacy SessionCatalog for Converting Hive Metastore
Relations to Data Source Relations 5.7.3
SessionState 5.8
BaseSessionStateBuilder — Generic Builder of SessionState 5.8.1
SessionStateBuilder 5.8.2
HiveSessionStateBuilder — Builder of Hive-Specific SessionState 5.8.3
SharedState — State Shared Across SparkSessions 5.9
CacheManager — In-Memory Cache for Tables and Views 5.10
CachedRDDBuilder 5.10.1
RuntimeConfig — Management Interface of Runtime Configuration 5.11
SQLConf — Internal Configuration Store 5.12
StaticSQLConf — Cross-Session, Immutable and Static SQL Configuration 5.12.1
CatalystConf 5.12.2
UDFRegistration — Session-Scoped FunctionRegistry 5.13
5
Avro Data Source
Avro Data Source 8.1
AvroFileFormat — FileFormat For Avro-Encoded Files 8.2
AvroOptions — Avro Data Source Options 8.3
CatalystDataToAvro Unary Expression 8.4
AvroDataToCatalyst Unary Expression 8.5
6
DataSourceReader 11.4
SupportsPushDownFilters 11.4.1
SupportsPushDownRequiredColumns 11.4.2
SupportsReportPartitioning 11.4.3
SupportsReportStatistics 11.4.4
SupportsScanColumnarBatch 11.4.5
DataSourceWriter 11.5
SessionConfigSupport 11.6
InputPartition 11.7
InputPartitionReader 11.8
DataWriter 11.9
DataWriterFactory 11.10
InternalRowDataWriterFactory 11.10.1
DataSourceV2StringFormat 11.11
DataSourceRDD — Input RDD Of DataSourceV2ScanExec Physical Operator 11.12
DataSourceRDDPartition 11.12.1
DataWritingSparkTask Partition Processing Function 11.13
DataSourceV2Utils Helper Object 11.14
7
BaseRelation — Collection of Tuples with Schema 14.1
HadoopFsRelation — Relation for File-Based Data Source 14.1.1
CatalystScan Contract 14.2
InsertableRelation Contract — Non-File-Based Relations with Inserting or Overwriting
Data Support 14.3
PrunedFilteredScan Contract — Relations with Column Pruning and Filter Pushdown
PrunedScan Contract 14.5 14.4
TableScan Contract — Relations with Column Pruning 14.6
Others
FileFormatWriter Helper Object 15.1
Data Source Filter Predicate (For Filter Pushdown) 15.2
FileRelation Contract 15.3
8
FilterEstimation 16.5.5.2
JoinEstimation 16.5.5.3
ProjectEstimation 16.5.5.4
Partitioning — Specification of Physical Operator’s Output Partitions 16.6
Distribution Contract — Data Distribution Across Partitions 16.7
AllTuples 16.7.1
BroadcastDistribution 16.7.2
ClusteredDistribution 16.7.3
HashClusteredDistribution 16.7.4
OrderedDistribution 16.7.5
UnspecifiedDistribution 16.7.6
Catalyst Expressions
Catalyst Expression — Executable Node in Catalyst Tree 17.1
AggregateExpression 17.2
AggregateFunction Contract — Aggregate Function Expressions 17.3
AggregateWindowFunction Contract — Declarative Window Aggregate Function
Expressions 17.4
AttributeReference 17.5
Alias 17.6
Attribute 17.7
BoundReference 17.8
CallMethodViaReflection 17.9
Coalesce 17.10
CodegenFallback 17.11
CollectionGenerator 17.12
ComplexTypedAggregateExpression 17.13
CreateArray 17.14
CreateNamedStruct 17.15
CreateNamedStructLike Contract 17.16
CreateNamedStructUnsafe 17.17
CumeDist 17.18
DeclarativeAggregate Contract — Unevaluable Aggregate Function Expressions 17.19
9
ExecSubqueryExpression 17.20
Exists 17.21
ExpectsInputTypes Contract 17.22
ExplodeBase Contract 17.23
First 17.24
Generator 17.25
GetArrayStructFields 17.26
GetArrayItem 17.27
GetMapValue 17.28
GetStructField 17.29
ImperativeAggregate 17.30
In 17.31
Inline 17.32
InSet 17.33
InSubquery 17.34
JsonToStructs 17.35
JsonTuple 17.36
ListQuery 17.37
Literal 17.38
MonotonicallyIncreasingID 17.39
Murmur3Hash 17.40
NamedExpression Contract 17.41
Nondeterministic Contract 17.42
OffsetWindowFunction Contract — Unevaluable Window Function Expressions 17.43
ParseToDate 17.44
ParseToTimestamp 17.45
PlanExpression 17.46
PrettyAttribute 17.47
RankLike Contract 17.48
ResolvedStar 17.49
RowNumberLike Contract 17.50
RuntimeReplaceable Contract 17.51
ScalarSubquery SubqueryExpression 17.52
ScalarSubquery ExecSubqueryExpression 17.53
10
ScalaUDF 17.54
ScalaUDAF 17.55
SimpleTypedAggregateExpression 17.56
SizeBasedWindowFunction Contract — Declarative Window Aggregate Functions with
Window Size 17.57
SortOrder 17.58
Stack 17.59
Star 17.60
StaticInvoke 17.61
SubqueryExpression 17.62
TimeWindow 17.63
TypedAggregateExpression 17.64
TypedImperativeAggregate 17.65
UnaryExpression Contract 17.66
UnixTimestamp 17.67
UnresolvedAttribute 17.68
UnresolvedFunction 17.69
UnresolvedGenerator 17.70
UnresolvedOrdinal 17.71
UnresolvedRegex 17.72
UnresolvedStar 17.73
UnresolvedWindowExpression 17.74
WindowExpression 17.75
WindowFunction Contract — Window Function Expressions With WindowFrame 17.76
WindowSpecDefinition 17.77
Logical Operators
11
RunnableCommand Contract — Generic Logical Command with Side Effects 19.3
DataWritingCommand Contract — Logical Commands That Write Query Data 19.4
SaveAsHiveFile Contract — DataWritingCommands That Write Query Result As Hive
Files 19.5
12
Hint 20.28
HiveTableRelation 20.29
InMemoryRelation 20.30
InsertIntoDataSourceCommand 20.31
InsertIntoDataSourceDirCommand 20.32
InsertIntoDir 20.33
InsertIntoHadoopFsRelationCommand 20.34
InsertIntoHiveDirCommand 20.35
InsertIntoHiveTable 20.36
InsertIntoTable 20.37
Intersect 20.38
Join 20.39
LeafNode 20.40
LocalRelation 20.41
LogicalRDD 20.42
LogicalRelation 20.43
OneRowRelation 20.44
Pivot 20.45
Project 20.46
Range 20.47
Repartition and RepartitionByExpression 20.48
ResolvedHint 20.49
SaveIntoDataSourceCommand 20.50
ShowCreateTableCommand 20.51
ShowTablesCommand 20.52
Sort 20.53
SubqueryAlias 20.54
TypedFilter 20.55
Union 20.56
UnresolvedCatalogRelation 20.57
UnresolvedHint 20.58
UnresolvedInlineTable 20.59
UnresolvedRelation 20.60
UnresolvedTableValuedFunction 20.61
13
Window 20.62
WithWindowDefinition 20.63
WriteToDataSourceV2 20.64
View 20.65
Physical Operators
SparkPlan Contract — Physical Operators in Physical Query Plan of Structured Query
CodegenSupport Contract — Physical Operators with Java Code Generation 21.1
14
DeserializeToObjectExec 22.9
ExecutedCommandExec 22.10
ExpandExec 22.11
ExternalRDDScanExec 22.12
FileSourceScanExec 22.13
FilterExec 22.14
GenerateExec 22.15
HashAggregateExec 22.16
HiveTableScanExec 22.17
InMemoryTableScanExec 22.18
LocalTableScanExec 22.19
MapElementsExec 22.20
ObjectHashAggregateExec 22.21
ObjectProducerExec 22.22
ProjectExec 22.23
RangeExec 22.24
RDDScanExec 22.25
ReusedExchangeExec 22.26
RowDataSourceScanExec 22.27
SampleExec 22.28
ShuffleExchangeExec 22.29
ShuffledHashJoinExec 22.30
SerializeFromObjectExec 22.31
SortAggregateExec 22.32
SortMergeJoinExec 22.33
SortExec 22.34
SubqueryExec 22.35
InputAdapter 22.36
WindowExec 22.37
AggregateProcessor 22.37.1
WindowFunctionFrame 22.37.2
WholeStageCodegenExec 22.38
WriteToDataSourceV2Exec 22.39
15
Logical Analysis Rules (Check, Evaluation,
Conversion and Resolution)
AliasViewChild 23.1
CleanupAliases 23.2
DataSourceAnalysis 23.3
DetermineTableStats 23.4
ExtractWindowExpressions 23.5
FindDataSourceTable 23.6
HandleNullInputsForUDF 23.7
HiveAnalysis 23.8
InConversion 23.9
LookupFunctions 23.10
PreprocessTableCreation 23.11
PreWriteCheck 23.12
RelationConversions 23.13
ResolveAliases 23.14
ResolveBroadcastHints 23.15
ResolveCoalesceHints 23.16
ResolveCreateNamedStruct 23.17
ResolveFunctions 23.18
ResolveHiveSerdeTable 23.19
ResolveInlineTables 23.20
ResolveMissingReferences 23.21
ResolveOrdinalInOrderByAndGroupBy 23.22
ResolveOutputRelation 23.23
ResolveReferences 23.24
ResolveRelations 23.25
ResolveSQLOnFile 23.26
ResolveSubquery 23.27
ResolveWindowFrame 23.28
ResolveWindowOrder 23.29
TimeWindowing 23.30
16
UpdateOuterReferences 23.31
WindowFrameCoercion 23.32
WindowsSubstitution 23.33
17
Extended Logical Optimizations
(SparkOptimizer)
ExtractPythonUDFFromAggregate 25.1
OptimizeMetadataOnlyQuery 25.2
PruneFileSourcePartitions 25.3
PushDownOperatorsToDataSource 25.4
Encoders
Encoder — Internal Row Converter 28.1
Encoders Factory Object 28.1.1
ExpressionEncoder — Expression-Based Encoder 28.1.2
18
RowEncoder — Encoder for DataFrames 28.1.3
LocalDateTimeEncoder — Custom ExpressionEncoder for
java.time.LocalDateTime 28.1.4
RDDs
ShuffledRowRDD 29.1
Monitoring
SQL Tab — Monitoring Structured Queries in web UI 30.1
SQLListener Spark Listener 30.1.1
QueryExecutionListener 30.2
SQLAppStatusListener Spark Listener 30.3
SQLAppStatusPlugin 30.4
SQLAppStatusStore 30.5
WriteTaskStats 30.6
BasicWriteTaskStats 30.6.1
WriteTaskStatsTracker 30.7
BasicWriteTaskStatsTracker 30.7.1
WriteJobStatsTracker 30.8
BasicWriteJobStatsTracker 30.8.1
Logging 30.9
19
QueryPlan — Structured Query Plan 32.2.1
RuleExecutor Contract — Tree Transformation Rule Executor 32.3
Catalyst Rule — Named Transformation of TreeNodes 32.3.1
QueryPlanner — Converting Logical Plan to Physical Trees 32.4
GenericStrategy 32.5
SQL Support
SQL Parsing Framework 34.1
AbstractSqlParser — Base SQL Parsing Infrastructure 34.2
AstBuilder — ANTLR-based SQL Parser 34.3
CatalystSqlParser — DataTypes and StructTypes Parser 34.4
ParserInterface — SQL Parser Contract 34.5
SparkSqlAstBuilder 34.6
SparkSqlParser — Default SQL Parser 34.7
20
Thrift JDBC/ODBC Server — Spark Thrift Server (STS) 35.1
SparkSQLEnv 35.2
Varia / Uncategorized
SQLExecution Helper Object 36.1
RDDConversions Helper Object 36.2
CatalystTypeConverters Helper Object 36.3
StatFunctions Helper Object 36.4
SubExprUtils Helper Object 36.5
PredicateHelper Scala Trait 36.6
SchemaUtils Helper Object 36.7
AggUtils Helper Object 36.8
ScalaReflection 36.9
CreateStruct Function Builder 36.10
MultiInstanceRelation 36.11
TypeCoercion Object 36.12
TypeCoercionRule — Contract For Type Coercion Rules 36.13
ExtractEquiJoinKeys — Scala Extractor for Destructuring Join Logical Operators 36.14
PhysicalAggregation — Scala Extractor for Destructuring Aggregate Logical Operators
PhysicalOperation — Scala Extractor for Destructuring Logical Query Plans 36.15
21
PartitioningAwareFileIndex 36.26
BufferedRowIterator 36.27
CompressionCodecs 36.28
(obsolete) SQLContext 36.29
22
Introduction
— Flannery O'Connor
I’m Jacek Laskowski, a freelance IT consultant, software engineer and technical instructor
specializing in Apache Spark, Apache Kafka and Kafka Streams (with Scala and sbt).
I offer software development and consultancy services with hands-on in-depth workshops
and mentoring. Reach out to me at [email protected] or @jaceklaskowski to discuss
opportunities.
Consider joining me at Warsaw Scala Enthusiasts and Warsaw Spark meetups in Warsaw,
Poland.
I’m also writing other books in the "The Internals of" series about Apache Spark,
Tip
Spark Structured Streaming, Apache Kafka, and Kafka Streams.
Expect text and code snippets from a variety of public sources. Attribution follows.
23
Spark SQL — Structured Data Processing with Relational Queries on Massive Scale
Quoting the Spark SQL: Relational Data Processing in Spark paper on Spark SQL:
Spark SQL is a new module in Apache Spark that integrates relational processing with
Spark’s functional programming API.
Spark SQL lets Spark programmers leverage the benefits of relational processing (e.g.,
declarative queries and optimized storage), and lets SQL users call complex analytics
libraries in Spark (e.g., machine learning).
The primary difference between the computation models of Spark SQL and Spark Core is
the relational framework for ingesting, querying and persisting (semi)structured data using
relational queries (aka structured queries) that can be expressed in good ol' SQL (with
many features of HiveQL) and the high-level SQL-like functional declarative Dataset API
(aka Structured Query DSL).
Semi- and structured data are collections of records that can be described
Note using schema with column names, their types and whether a column can be
null or not (aka nullability).
Whichever query interface you use to describe a structured query, i.e. SQL or Query DSL,
the query becomes a Dataset (with a mandatory Encoder).
From Shark, Spark SQL, Hive on Spark, and the future of SQL on Apache Spark:
For SQL users, Spark SQL provides state-of-the-art SQL performance and maintains
compatibility with Shark/Hive. In particular, like Shark, Spark SQL supports all existing
Hive data formats, user-defined functions (UDF), and the Hive metastore.
For Spark users, Spark SQL becomes the narrow-waist for manipulating (semi-)
structured data as well as ingesting data from sources that provide schema, such as
JSON, Parquet, Hive, or EDWs. It truly unifies SQL and sophisticated analysis, allowing
users to mix and match SQL and more imperative programming APIs for advanced
analytics.
For open source hackers, Spark SQL proposes a novel, elegant way of building query
planners. It is incredibly easy to add new optimizations under this framework.
24
Spark SQL — Structured Data Processing with Relational Queries on Massive Scale
Internally, a structured query is a Catalyst tree of (logical and physical) relational operators
and expressions.
When an action is executed on a Dataset (directly, e.g. show or count, or indirectly, e.g.
save or saveAsTable) the structured query (behind Dataset ) goes through the execution
stages:
1. Logical Analysis
2. Caching Replacement
4. Physical Planning
6. Constructing the RDD of Internal Binary Rows (that represents the structured query in
terms of Spark Core’s RDD API)
As of Spark 2.0, Spark SQL is now de facto the primary and feature-rich interface to Spark’s
underlying in-memory distributed platform (hiding Spark Core’s RDDs behind higher-level
abstractions that allow for logical and physical query optimization strategies even without
your consent).
You can find out more on the core of Apache Spark (aka Spark Core) in
Note
Mastering Apache Spark 2 gitbook.
In other words, Spark SQL’s Dataset API describes a distributed computation that will
eventually be converted to a DAG of RDDs for execution.
Spark SQL supports structured queries in batch and streaming modes (with the latter as a
separate module of Spark SQL called Spark Structured Streaming).
You can find out more on Spark Structured Streaming in Spark Structured
Note
Streaming (Apache Spark 2.2+) gitbook.
25
Spark SQL — Structured Data Processing with Relational Queries on Massive Scale
scala> teenagers.show
+-----+
| name|
+-----+
|Jacek|
+-----+
// You could however want to use good ol' SQL, couldn't you?
Spark SQL supports loading datasets from various data sources including tables in Apache
Hive. With Hive support enabled, you can load datasets from existing Apache Hive
deployments and save them back to Hive tables if needed.
26
Spark SQL — Structured Data Processing with Relational Queries on Massive Scale
sql("CREATE OR REPLACE TEMPORARY VIEW v1 (key INT, value STRING) USING csv OPTIONS ('p
ath'='people.csv', 'header'='true')")
Like SQL and NoSQL databases, Spark SQL offers performance query optimizations using
rule-based query optimizer (aka Catalyst Optimizer), whole-stage Java code generation
(aka Whole-Stage Codegen that could often be better than your own custom hand-written
code!) and Tungsten execution engine with its own internal binary row format.
As of Spark SQL 2.2, structured queries can be further optimized using Hint Framework.
Spark SQL introduces a tabular data abstraction called Dataset (that was previously
DataFrame). Dataset data abstraction is designed to make processing large amount of
structured tabular data on Spark infrastructure simpler and faster.
The following snippet shows a batch ETL pipeline to process JSON files and saving their
subset as CSVs.
spark.read
.format("json")
.load("input-json")
.select("name", "score")
.where($"score" > 15)
.write
.format("csv")
.save("output-csv")
With Structured Streaming feature however, the above static batch query becomes dynamic
and continuous paving the way for continuous applications.
27
Spark SQL — Structured Data Processing with Relational Queries on Massive Scale
import org.apache.spark.sql.types._
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("name", StringType, nullable = false) ::
StructField("score", DoubleType, nullable = false) :: Nil)
spark.readStream
.format("json")
.schema(schema)
.load("input-json")
.select("name", "score")
.where('score > 15)
.writeStream
.format("console")
.start
// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +-----+-----+
// | name|score|
// +-----+-----+
// |Jacek| 20.5|
// +-----+-----+
As of Spark 2.0, the main data abstraction of Spark SQL is Dataset. It represents a
structured data which are records with a known schema. This structured data
representation Dataset enables compact binary representation using compressed
columnar format that is stored in managed objects outside JVM’s heap. It is supposed to
speed computations up by reducing memory usage and GCs.
Spark SQL supports predicate pushdown to optimize performance of Dataset queries and
can also generate optimized code at runtime.
1. Dataset API (formerly DataFrame API) with a strongly-typed LINQ-like Query DSL that
Scala programmers will likely find very appealing to use.
3. Non-programmers will likely use SQL as their query language through direct integration
with Hive
4. JDBC/ODBC fans can use JDBC interface (through Thrift JDBC/ODBC Server) and
connect their tools to Spark’s distributed query engine.
28
Spark SQL — Structured Data Processing with Relational Queries on Massive Scale
Spark SQL comes with a uniform interface for data access in distributed storage systems
like Cassandra or HDFS (Hive, Parquet, JSON) using specialized DataFrameReader and
DataFrameWriter objects.
Spark SQL allows you to execute SQL-like queries on large volume of data that can live in
Hadoop HDFS or Hadoop-compatible file systems like S3. It can access data from different
data sources - files or tables.
standard functions or User-Defined Functions (UDFs) that take values from a single row
as input to generate a single return value for every input row.
basic aggregate functions that operate on a group of rows and calculate a single return
value per group.
window aggregate functions that operate on a group of rows and calculate a single
return value for each row in a group.
There are two supported catalog implementations — in-memory (default) and hive — that
you can set using spark.sql.catalogImplementation property.
From user@spark:
If you already loaded csv data into a dataframe, why not register it as a table, and use
Spark SQL to find max/min or any other aggregates? SELECT MAX(column_name)
FROM dftable_name … seems natural.
you’re more comfortable with SQL, it might worth registering this DataFrame as a table
and generating SQL query to it (generate a string with a series of min-max calls)
You can parse data from external data sources and let the schema inferencer to deduct the
schema.
29
Spark SQL — Structured Data Processing with Relational Queries on Massive Scale
// Example 1
val df = Seq(1 -> 2).toDF("i", "j")
val query = df.groupBy('i)
.agg(max('j).as("aggOrdering"))
.orderBy(sum('j))
.as[(Int, Int)]
query.collect contains (1, 2) // true
// Example 2
val df = Seq((1, 1), (-1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show
+-------------------+
|(IF((a > 0), a, 0))|
+-------------------+
| 1|
| 0|
+-------------------+
30
Datasets vs DataFrames vs RDDs
In RDD, you have to do an additional hop over a case class and access fields by name.
31
Dataset API vs SQL
It turns out that some structured queries can be expressed easier using Dataset API, but
there are some that are only possible in SQL. In other words, you may find mixing Dataset
API and SQL modes challenging yet rewarding.
You could at some point consider writing structured queries using Catalyst data structures
directly hoping to avoid the differences and focus on what is supported in Spark SQL, but
that could quickly become unwieldy for maintenance (i.e. finding Spark SQL developers who
could be comfortable with it as well as being fairly low-level and therefore possibly too
dependent on a specific Spark SQL version).
This section describes the differences between Spark SQL features to develop Spark
applications using Dataset API and SQL mode.
1. RuntimeReplaceable Expressions are only available using SQL mode by means of SQL
functions like nvl , nvl2 , ifnull , nullif , etc.
32
VectorizedParquetRecordReader
VectorizedParquetRecordReader
VectorizedParquetRecordReader is a concrete SpecificParquetRecordReaderBase for
columnarBatch ColumnarBatch
33
VectorizedParquetRecordReader
missingColumns
Bitmap of columns (per index) that are missing (or simply the
ones that the reader should not read)
numBatched
totalCountLoadedSoFar
nextKeyValue Method
34
VectorizedParquetRecordReader
nextKeyValue …FIXME
resultBatch Method
ColumnarBatch resultBatch()
initialize …FIXME
enableReturningBatches Method
void enableReturningBatches()
35
VectorizedParquetRecordReader
1. Uses MEMORY_MODE
initBatch creates the batch schema that is sparkSchema and the input partitionColumns
schema.
vectors per the input memMode , i.e. OFF_HEAP or ON_HEAP memory modes, respectively.
initBatch records the allocated column vectors as the internal WritableColumnVectors.
initBatch creates new slots in the allocated WritableColumnVectors for the input
36
VectorizedParquetRecordReader
nextBatch reads at least capacity rows and returns true when there are rows available.
Otherwise, nextBatch returns false (to "announce" there are no rows available).
nextBatch requests the ColumnarBatch to specify the number of rows (in batch) as 0
When the rowsReturned is greater than the totalRowCount, nextBatch finishes with
(returns) false (to "announce" there are no rows available).
nextBatch checkEndOfRowGroup.
nextBatch calculates the number of rows left to be returned as a minimum of the capacity
The number of rows in the internal ColumnarBatch matches the number of rows
Note that VectorizedColumnReaders decoded and stored in corresponding
WritableColumnVectors.
Requests the internal ColumnarBatch to set the number of rows (in batch) to be the
number of rows read
nextBatch finishes with (returns) true (to "announce" there are rows available).
37
VectorizedParquetRecordReader
checkEndOfRowGroup …FIXME
Object getCurrentValue()
38
VectorizedColumnReader
VectorizedColumnReader
VectorizedColumnReader is a vectorized column reader that
Parquet ColumnDescriptor
Parquet OriginalType
Parquet PageReader
void readBatch(
int total,
WritableColumnVector column) throws IOException
readBatch …FIXME
39
SpecificParquetRecordReaderBase
SpecificParquetRecordReaderBase — Hadoop
RecordReader
SpecificParquetRecordReaderBase is the base Hadoop RecordReader for parquet format
initialize Method
initialize …FIXME
40
ColumnVector Contract — In-Memory Columnar Data
Used when…FIXME
Used when…FIXME
Used when…FIXME
Used when…FIXME
Used when…FIXME
Decimal getDecimal(
int rowId,
int precision,
getDecimal int scale)
Used when…FIXME
Used when…FIXME
41
ColumnVector Contract — In-Memory Columnar Data
Used when…FIXME
Used when…FIXME
Used when…FIXME
Used when…FIXME
Used when…FIXME
Used when…FIXME
boolean hasNull()
hasNull
int numNulls()
numNulls
42
ColumnVector Contract — In-Memory Columnar Data
OrcColumnVector
getInterval …FIXME
getStruct …FIXME
43
WritableColumnVector Contract
WritableColumnVector Contract
WritableColumnVector is the extension of the ColumnVector contract for writable column
Used when…FIXME
Used when…FIXME
UTF8String getBytesAsUTF8String(
int rowId,
getBytesAsUTF8String int count)
Used when…FIXME
Used when…FIXME
void putArray(
int rowId,
int offset,
putArray int length)
Used when…FIXME
void putBoolean(
int rowId,
putBoolean boolean value)
Used when…FIXME
44
WritableColumnVector Contract
Used when…FIXME
void putByte(
int rowId,
putByte byte value)
Used when…FIXME
int putByteArray(
int rowId,
byte[] value,
putByteArray int offset,
int count)
Used when…FIXME
void putBytes(
int rowId,
int count,
byte value)
void putBytes(
putBytes int rowId,
int count,
byte[] src,
int srcIndex)
Used when…FIXME
void putDouble(
int rowId,
putDouble double value)
Used when…FIXME
void putDoubles(
int rowId,
int count,
byte[] src,
int srcIndex)
void putDoubles(
int rowId,
putDoubles int count,
double value)
void putDoubles(
int rowId,
int count,
double[] src,
int srcIndex)
45
WritableColumnVector Contract
Used when…FIXME
void putFloat(
int rowId,
putFloat float value)
Used when…FIXME
void putFloats(
int rowId,
int count,
byte[] src,
int srcIndex)
void putFloats(
int rowId,
int count,
putFloats float value)
void putFloats(
int rowId,
int count,
float[] src,
int srcIndex)
Used when…FIXME
void putInt(
int rowId,
putInt int value)
Used when…FIXME
void putInts(
int rowId,
int count,
byte[] src,
int srcIndex)
void putInts(
int rowId,
int count,
putInts int value)
void putInts(
int rowId,
int count,
int[] src,
int srcIndex)
Used when…FIXME
void putIntsLittleEndian(
int rowId,
int count,
byte[] src,
putIntsLittleEndian int srcIndex)
46
WritableColumnVector Contract
Used when…FIXME
void putLong(
int rowId,
putLong long value)
Used when…FIXME
void putLongs(
int rowId,
int count,
byte[] src,
int srcIndex)
void putLongs(
int rowId,
int count,
putLongs long value)
void putLongs(
int rowId,
int count,
long[] src,
int srcIndex)
Used when…FIXME
void putLongsLittleEndian(
int rowId,
int count,
putLongsLittleEndian byte[] src,
int srcIndex)
Used when…FIXME
putNotNull
void putNotNulls(
int rowId,
putNotNulls int count)
Used when…FIXME
Used when…FIXME
47
WritableColumnVector Contract
void putNulls(
putNulls int rowId,
int count)
Used when…FIXME
void putShort(
int rowId,
putShort short value)
Used when…FIXME
void putShorts(
int rowId,
int count,
byte[] src,
int srcIndex)
void putShorts(
int rowId,
int count,
putShorts short value)
void putShorts(
int rowId,
int count,
short[] src,
int srcIndex)
Used when…FIXME
Used when:
reserveInternal
OffHeapColumnVector and OnHeapColumnVector are
created
WritableColumnVector is requested to reserve memory of
a given required capacity
WritableColumnVector reserveNewColumn(
int capacity,
reserveNewColumn DataType type)
Used when…FIXME
48
WritableColumnVector Contract
Table 2. WritableColumnVectors
WritableColumnVector Description
OffHeapColumnVector
OnHeapColumnVector
reset Method
void reset()
reset …FIXME
reserve …FIXME
49
WritableColumnVector Contract
reserveDictionaryIds Method
reserveDictionaryIds …FIXME
appendNotNulls …FIXME
50
OnHeapColumnVector
OnHeapColumnVector
OnHeapColumnVector is a concrete WritableColumnVector that…FIXME
allocateColumns creates an array of OnHeapColumnVector for every field (to hold capacity
AggregateHashMap is created
When created, OnHeapColumnVector reserveInternal (for the given capacity) and reset.
51
OnHeapColumnVector
reserveInternal Method
reserveInternal …FIXME
reserveNewColumn Method
reserveNewColumn …FIXME
52
OffHeapColumnVector
OffHeapColumnVector
OffHeapColumnVector is a concrete WritableColumnVector that…FIXME
allocateColumns creates an array of OffHeapColumnVector for every field (to hold capacity
53
Vectorized Parquet Decoding (Reader)
The parquet encodings are largely designed to decode faster in batches, column by
column. This can speed up the decoding considerably.
spark.sql.parquet.enableVectorizedReader Configuration
Property
spark.sql.parquet.enableVectorizedReader configuration property is on by default.
54
Dynamic Partition Inserts
With a partitioned dataset, Spark SQL can load only the parts (partitions) that are really
needed (and avoid doing filtering out unnecessary data on JVM). That leads to faster load
time and more efficient memory consumption which gives a better performance overall.
With a partitioned dataset, Spark SQL can also be executed over different subsets
(directories) in parallel at the same time.
spark.range(10)
.withColumn("p1", 'id % 2)
.write
.mode("overwrite")
.partitionBy("p1")
.saveAsTable("partitioned_table")
Dynamic Partition Inserts is a feature of Spark SQL that allows for executing INSERT
OVERWRITE TABLE SQL statements over partitioned HadoopFsRelations that limits what
partitions are deleted to overwrite the partitioned table (and its partitions) with new data.
Dynamic partitions are the partition columns that have no values defined explicitly in the
PARTITION clause of INSERT OVERWRITE TABLE SQL statements (in the partitionSpec
part).
Static partitions are the partition columns that have values defined explicitly in the
PARTITION clause of INSERT OVERWRITE TABLE SQL statements (in the partitionSpec
part).
55
Dynamic Partition Inserts
Dynamic Partition Inserts is only supported in SQL mode (for INSERT OVERWRITE TABLE
SQL statements).
Dynamic Partition Inserts is not supported for non-file-based data sources, i.e.
InsertableRelations.
When the dynamic overwrite mode is enabled Spark will only delete the partitions for which it
has data to be written to. All the other partitions remain intact.
Spark now writes data partitioned just as Hive would — which means only the partitions
that are touched by the INSERT query get overwritten and the others are not touched.
56
Bucketing
Bucketing
Bucketing is an optimization technique that uses buckets (and bucketing columns) to
determine data partitioning and avoid data shuffle.
Bucketing can show the biggest benefit when pre-shuffled bucketed tables
Note are used more than once as bucketing itself takes time (that you will offset
executing multiple join queries later).
57
Bucketing
import org.apache.spark.sql.SaveMode
spark.range(10e4.toLong).write.mode(SaveMode.Overwrite).saveAsTable("t10e4")
spark.range(10e6.toLong).write.mode(SaveMode.Overwrite).saveAsTable("t10e6")
// Make sure that you don't end up with a BroadcastHashJoin and a BroadcastExchange
// For that, let's disable auto broadcasting
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val t4 = spark.table("t10e4")
val t6 = spark.table("t10e6")
assert(t4.count == 10e4)
assert(t6.count == 10e6)
The above join query is a fine example of a SortMergeJoinExec (aka SortMergeJoin) of two
FileSourceScanExecs (aka Scan). The join query uses ShuffleExchangeExec physical
operators (aka Exchange) to shuffle the table datasets for the SortMergeJoin.
58
Bucketing
59
Bucketing
You use DataFrameWriter.bucketBy method to specify the number of buckets and the
bucketing columns.
You can optionally sort the output rows in buckets using DataFrameWriter.sortBy method.
people.write
.bucketBy(42, "name")
.sortBy("age")
.saveAsTable("people_bucketed")
Unlike bucketing in Apache Hive, Spark SQL creates the bucket files per the number of
buckets and partitions. In other words, the number of bucketing files is the number of
buckets multiplied by the number of task writers (one per partition).
scala> println(large.queryExecution.toRdd.getNumPartitions)
8
With bucketing, the Exchanges are no longer needed (as the tables are already pre-
shuffled).
60
Bucketing
The above join query of the bucketed tables shows no ShuffleExchangeExec physical
operators (aka Exchange) as the shuffling has already been executed (before the query was
run).
61
Bucketing
62
Bucketing
Use SessionCatalog or DESCRIBE EXTENDED SQL command to find the bucketing information.
63
Bucketing
| |
|Bucket Columns |[`id`]
| |
|Sort Columns |[`id`]
| |
|Table Properties |[transient_lastDdlTime=1538470250]
| |
|Statistics |413954 bytes
| |
|Location |file:/Users/jacek/dev/oss/spark/spark-warehouse/bucketed
_4_10e4| |
|Serde Library |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
| |
|InputFormat |org.apache.hadoop.mapred.SequenceFileInputFormat
| |
|OutputFormat |org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
| |
|Storage Properties |[serialization.format=1]
| |
+----------------------------+--------------------------------------------------------
-------+-------+
import org.apache.spark.sql.catalyst.TableIdentifier
val metadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(demoTable))
scala> metadata.bucketSpec.foreach(println)
4 buckets, bucket columns: [id], sort columns: [id]
The number of buckets has to be between 0 and 100000 exclusive or Spark SQL throws
an AnalysisException :
Number of buckets should be greater than 0 but less than 100000. Got `[numBuckets]`
There are however requirements that have to be met before Spark Optimizer gives a no-
Exchange query plan:
1. The number of partitions on both sides of a join has to be exactly the same.
64
Bucketing
// Make sure that you don't end up with a BroadcastHashJoin and a BroadcastExchange
// For this, let's disable auto broadcasting
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val t1 = spark
.range(4)
.repartition(4, $"id") // Make sure that the number of partitions matches the other
side
65
Bucketing
66
Bucketing
EqualTo ( = )
EqualNullSafe ( <=> )
In
InSet
67
Bucketing
// Enable INFO logging level of FileSourceStrategy logger to see the details of the st
rategy
import org.apache.spark.sql.execution.datasources.FileSourceStrategy
val logger = FileSourceStrategy.getClass.getName.replace("$", "")
import org.apache.log4j.{Level, Logger}
Logger.getLogger(logger).setLevel(Level.INFO)
scala> println(sparkPlan57.numberedTreeString)
00 *(1) Filter id#0L IN (50,70)
01 +- *(1) FileScan parquet default.bucketed_4_id[id#0L,part#1L] Batched: true, Format
: Parquet, Location: CatalogFileIndex[file:/Users/jacek/dev/oss/spark/spark-warehouse/
bucketed_4_id], PartitionCount: 2, PartitionFilters: [], PushedFilters: [In(id, [50,70
])], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 4
import org.apache.spark.sql.execution.FileSourceScanExec
val scan57 = sparkPlan57.collectFirst { case exec: FileSourceScanExec => exec }.get
import org.apache.spark.sql.execution.datasources.FileScanRDD
val rdd57 = scan57.inputRDDs.head.asInstanceOf[FileScanRDD]
import org.apache.spark.sql.execution.datasources.FilePartition
val bucketFiles57 = for {
FilePartition(bucketId, files) <- rdd57.filePartitions
f <- files
} yield s"Bucket $bucketId => $f"
scala> println(bucketFiles57.size)
24
Sorting
68
Bucketing
// Make sure that you don't end up with a BroadcastHashJoin and a BroadcastExchange
// Disable auto broadcasting
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val t1 = spark.range(4)
.repartition(2, $"id") // Use just 2 partitions
.sortWithinPartitions("id") // sort partitions
69
Bucketing
There are two exchanges and sorts which makes the above use case
Warning almost unusable. I filed an issue at SPARK-24025 Join of bucketed and non-
bucketed tables can give two exchanges and sorts for non-bucketed side.
70
Bucketing
71
Bucketing
Figure 4. SortMergeJoin of Sorted Dataset and Bucketed Table (Details for Query)
// Bucketing is on by default
assert(spark.sessionState.conf.bucketingEnabled, "Bucketing disabled?!")
72
Whole-Stage Java Code Generation (Whole-Stage CodeGen)
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
scala> spark.conf.get(WHOLESTAGE_CODEGEN_ENABLED)
res0: String = true
Note
scala> spark.sessionState.conf.wholeStageEnabled
res1: Boolean = true
Note Janino is used to compile a Java source code into a Java class at runtime.
There are the following code generation paths (as coined in this commit):
73
Whole-Stage Java Code Generation (Whole-Stage CodeGen)
1. Non-whole-stage-codegen path
Review SPARK-12795 Whole stage codegen to learn about the work to support
Tip
it.
BenchmarkWholeStageCodegen — Performance
Benchmark
BenchmarkWholeStageCodegen class provides a benchmark to measure whole stage codegen
performance.
74
Whole-Stage Java Code Generation (Whole-Stage CodeGen)
75
CodegenContext
CodegenContext
CodegenContext is…FIXME
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
val ctx = new CodegenContext
76
CodegenContext
Name Description
Mutable Scala Map with function names, their Java
source code and a class name
EquivalentExpressions
Expressions are added and then fetched as equivalent
equivalentExpressions
sets when CodegenContext is requested to
subexpressionElimination (for generateExpressions with
subexpression elimination enabled)
INPUT_ROW The variable name of the input row of the current operator
GenerateSafeProjection
references
GenerateUnsafeProjection
WholeStageCodegenExec
Elements are added when:
CodegenContext is requested to
addReferenceObj
SubExprEliminationStates by Expression
subExprEliminationExprs
Used when…FIXME
77
CodegenContext
generateExpressions(
expressions: Seq[Expression],
doSubexpressionElimination: Boolean = false): Seq[ExprCode]
In the end, generateExpressions requests every expressions to generate the Java source
code for code-generated (non-interpreted) expression evaluation.
addReferenceObj Method
addReferenceObj …FIXME
subexpressionEliminationForWholeStageCodegen
Method
subexpressionEliminationForWholeStageCodegen …FIXME
78
CodegenContext
addNewFunction(
funcName: String,
funcCode: String,
inlineToOuterClass: Boolean = false): String
addNewFunction …FIXME
1. Takes the first expression and requests it to generate a Java source code for the
expression tree
79
CodegenContext
addMutableState(
javaType: String,
variableName: String,
initFunc: String => String = _ => "",
forceInline: Boolean = false,
useFreshName: Boolean = true): String
addMutableState …FIXME
addImmutableStateIfNotExists(
javaType: String,
variableName: String,
initFunc: String => String = _ => ""): Unit
addImmutableStateIfNotExists …FIXME
freshName Method
freshName …FIXME
80
CodegenContext
addNewFunctionToClass(
funcName: String,
funcCode: String,
className: String): mutable.Map[String, mutable.Map[String, String]]
addNewFunctionToClass …FIXME
addClass …FIXME
declareAddedFunctions Method
declareAddedFunctions(): String
declareAddedFunctions …FIXME
declareMutableStates Method
declareMutableStates(): String
declareMutableStates …FIXME
initMutableStates Method
initMutableStates(): String
initMutableStates …FIXME
81
CodegenContext
initPartition Method
initPartition(): String
initPartition …FIXME
emitExtraCode Method
emitExtraCode(): String
emitExtraCode …FIXME
addPartitionInitializationStatement Method
addPartitionInitializationStatement …FIXME
82
CodeGenerator
CodeGenerator
CodeGenerator is a base class for generators of JVM bytecode for expression evaluation.
cache
Guava’s LoadingCache with at most 100 pairs of
CodeAndComment and GeneratedClass .
genericMutableRowType
Refer to Logging.
CodeGenerator Contract
package org.apache.spark.sql.catalyst.expressions.codegen
83
CodeGenerator
Caution FIXME
create Method
Caution FIXME
newCodeGenContext(): CodegenContext
84
CodeGenerator
85
GenerateColumnAccessor
GenerateColumnAccessor
GenerateColumnAccessor is a CodeGenerator for…FIXME
create …FIXME
86
GenerateOrdering
GenerateOrdering
GenerateOrdering is…FIXME
create …FIXME
genComparisons Method
genComparisons …FIXME
87
GeneratePredicate
GeneratePredicate
GeneratePredicate is…FIXME
create …FIXME
88
GenerateSafeProjection
GenerateSafeProjection
GenerateSafeProjection is…FIXME
create …FIXME
89
BytesToBytesMap Append-Only Hash Map
lookup Method
Caution FIXME
safeLookup Method
void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc, int hash)
safeLookup …FIXME
90
Vectorized Query Execution (Batch Decoding)
91
ColumnarBatch — ColumnVectors as Row-Wise Table
ColumnarBatch — ColumnVectors as Row-
Wise Table
ColumnarBatch allows to work with multiple ColumnVectors as a row-wise table.
import org.apache.spark.sql.types._
val schema = new StructType()
.add("intCol", IntegerType)
.add("doubleCol", DoubleType)
.add("intCol2", IntegerType)
.add("string", BinaryType)
import org.apache.spark.sql.vectorized.ColumnarBatch
val batch = new ColumnarBatch(columns.toArray)
assert(batch.getRow(0).numFields == 4)
newReaderIterator )
92
ColumnarBatch — ColumnVectors as Row-Wise Table
The number of columns in a ColumnarBatch is the number of ColumnVectors (this batch was
created with).
Iterator<InternalRow> rowIterator()
rowIterator …FIXME
In essence, setNumRows resets the batch and makes it available for reuse.
93
ColumnarBatch — ColumnVectors as Row-Wise Table
94
Data Source API V2
ReadSupport
DataSourceReader
WriteSupport
DataSourceWriter
SessionConfigSupport
DataSourceV2StringFormat
InputPartition
The work on Data Source API V2 was tracked under SPARK-15689 Data
Note
source API v2 that was fixed in Apache Spark 2.3.0.
Note Data Source API V2 is already heavily used in Spark Structured Streaming.
Data Reading
Data Source API V2 uses DataSourceV2Relation logical operator to represent data reading
(aka data scan).
partitions.
95
Data Source API V2
Data Writing
Data Source API V2 uses WriteToDataSourceV2 and AppendData logical operators to
represent data writing (over a DataSourceV2Relation logical operator). As of Spark SQL
2.4.0, WriteToDataSourceV2 operator was deprecated for the more specific AppendData
operator (compare "data writing" to "data append" which is certainly more specific).
Enable INFO logging level for the DataSourceV2Strategy logger to be told what
Tip
the pushed filters are.
96
Subqueries
A subquery (aka subquery expression) is a query that is nested inside of another query.
A scalar subquery is a structured query that returns a single row and a single column only.
Spark SQL uses ScalarSubquery (SubqueryExpression) expression to represent scalar
subqueries (while parsing a SQL statement).
It is said that scalar subqueries should be used very rarely if at all and you should join
instead.
Spark Analyzer uses ResolveSubquery resolution rule to resolve subqueries and at the end
makes sure that they are valid.
Spark Physical Optimizer uses PlanSubqueries physical optimization to plan queries with
scalar subqueries.
97
Subqueries
98
Hint Framework
Hint Framework
Structured queries can be optimized using Hint Framework that allows for specifying query
hints.
Query hints allow for annotating a query and give a hint to the query optimizer how to
optimize logical plans. This can be very useful when the query optimizer cannot make
optimal decision, e.g. with respect to join methods due to conservativeness or the lack of
proper statistics.
Spark SQL supports COALESCE and REPARTITION and BROADCAST hints. All remaining
unresolved hints are silently removed from a query plan at analysis.
// Dataset API
val q = spark.range(1).hint(name = "myHint", 100, true)
val plan = q.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'UnresolvedHint myHint, [100, true]
01 +- Range (0, 1, step=1, splits=Some(8))
// SQL
val q = sql("SELECT /*+ myHint (100, true) */ 1")
val plan = q.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'UnresolvedHint myHint, [100, true]
01 +- 'Project [unresolvedalias(1, None)]
02 +- OneRowRelation
99
Hint Framework
Spark SQL 2.4 added support for COALESCE and REPARTITION hints (using SQL
comments):
Broadcast Hints
Spark SQL 2.2 supports BROADCAST hints using broadcast standard function or SQL
comments:
broadcast standard function is used for broadcast joins (aka map-side joins), i.e. to hint the
100
Hint Framework
// Please note that broadcast standard function uses ResolvedHint not UnresolvedHint
// join is "clever"
// i.e. resolves UnresolvedHint into ResolvedHint immediately
val q = large.join(smallHinted, "id")
val plan = q.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'Join UsingJoin(Inner,List(id))
01 :- Range (0, 100, step=1, splits=Some(8))
02 +- ResolvedHint (broadcast)
03 +- Range (0, 1, step=1, splits=Some(8))
Spark Analyzer
There are the following logical rules that Spark Analyzer uses to analyze logical plans with
the UnresolvedHint logical operator:
101
Hint Framework
102
Hint Framework
103
Adaptive Query Execution
At runtime, the adaptive execution mode can change shuffle join to broadcast join if it
finds the size of one table is less than the broadcast threshold. It can also handle
skewed input data for join and change the partition number of the next stage to better fit
the data scale. In general, adaptive execution decreases the effort involved in tuning
SQL query parameters and improves the execution performance by choosing a better
execution plan and parallelism at runtime.
EnsureRequirements
EnsureRequirements is…FIXME
2. An adaptive execution mode for Spark SQL by Carson Wang (Intel), Yucai Yu (Intel) at
Strata Data Conference in Singapore, December 7, 2017
104
ExchangeCoordinator
ExchangeCoordinator
ExchangeCoordinator is created when EnsureRequirements physical query optimization is
postShuffleRDD Method
postShuffleRDD …FIXME
doEstimationIfNecessary(): Unit
doEstimationIfNecessary …FIXME
105
ExchangeCoordinator
estimatePartitionStartIndices Method
estimatePartitionStartIndices(
mapOutputStatistics: Array[MapOutputStatistics]): Array[Int]
estimatePartitionStartIndices …FIXME
registerExchange Method
106
Subexpression Elimination For Code-Generated Expression Evaluation (Common
Expression Reuse)
ProjectExec
ObjectHashAggregateExec
SortAggregateExec
WindowExec (and creates a lookup table for WindowExpressions and factory functions
for WindowFunctionFrame)
spark.sql.subexpressionElimination.enabled Configuration
Property
spark.sql.subexpressionElimination.enabled internal configuration property controls whether
the subexpression elimination optimization is enabled or not.
107
Subexpression Elimination For Code-Generated Expression Evaluation (Common
Expression Reuse)
scala> conf.subexpressionEliminationEnabled
res1: Boolean = true
108
EquivalentExpressions
EquivalentExpressions
EquivalentExpressions is…FIXME
addExprTree Method
addExprTree …FIXME
addExpr Method
addExpr …FIXME
getAllEquivalentExprs: Seq[Seq[Expression]]
109
EquivalentExpressions
110
Cost-Based Optimization (CBO)
You first use ANALYZE TABLE COMPUTE STATISTICS SQL command to compute table
statistics. Use DESCRIBE EXTENDED SQL command to inspect the statistics.
Logical operators have statistics support that is used for query planning.
Table Statistics
The table statistics can be computed for tables, partitions and columns and are as follows:
111
Cost-Based Optimization (CBO)
Depending on the variant, ANALYZE TABLE computes different statistics, i.e. of a table,
partitions or columns.
1. ANALYZE TABLE with neither PARTITION specification nor FOR COLUMNS clause
112
Cost-Based Optimization (CBO)
ANALYZE TABLE with PARTITION specification and FOR COLUMNS clause is incorrect.
Note In such a case, SparkSqlAstBuilder reports a WARN message to the logs and simply ignores
the partition specification.
When executed, the above ANALYZE TABLE variants are translated to the following logical
commands (in a logical query plan), respectively:
1. AnalyzeTableCommand
2. AnalyzePartitionCommand
3. AnalyzeColumnCommand
Table-level statistics are in Statistics row while partition-level statistics are in Partition
Statistics row.
Use DESC EXTENDED tableName for table-level statistics and DESC EXTENDED
Tip
tableName PARTITION (p1, p2, …) for partition-level statistics only.
113
Cost-Based Optimization (CBO)
|p2 |string
|null |
|# Partition Information |
| |
|# col_name |data_type
|comment|
|p1 |int
|null |
|p2 |string
|null |
| |
| |
|# Detailed Table Information|
| |
|Database |default
| |
|Table |t1
| |
|Owner |jacek
| |
|Created Time |Wed Dec 27 14:10:44 CET 2017
| |
|Last Access |Thu Jan 01 01:00:00 CET 1970
| |
|Created By |Spark 2.3.0
| |
|Type |MANAGED
| |
|Provider |parquet
| |
|Table Properties |[transient_lastDdlTime=1514453141]
| |
|Statistics |714 bytes, 2 rows
| |
|Location |file:/Users/jacek/dev/oss/spark/spark-warehouse/t1
| |
|Serde Library |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSe
rDe | |
|InputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputF
ormat | |
|OutputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutput
Format| |
|Storage Properties |[serialization.format=1]
| |
|Partition Provider |Catalog
| |
+----------------------------+--------------------------------------------------------
------+-------+
scala> spark.table("t1").show
+---+---+----+
| id| p1| p2|
+---+---+----+
114
Cost-Based Optimization (CBO)
| 0| 0|zero|
| 1| 1| one|
+---+---+----+
115
Cost-Based Optimization (CBO)
|# Storage Information |
| |
|Location |file:/Users/jacek/dev/oss/spark/spark-warehouse/t1
| |
|Serde Library |org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHi
veSerDe | |
|InputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetIn
putFormat | |
|OutputFormat |org.apache.hadoop.hive.ql.io.parquet.MapredParquetOu
tputFormat | |
|Storage Properties |[serialization.format=1]
| |
+--------------------------------+----------------------------------------------------
-----------------------------+-------+
You can view the statistics of a single column using DESC EXTENDED tableName columnName
that are in a Dataset with two columns, i.e. info_name and info_value .
116
Cost-Based Optimization (CBO)
Cost-Based Optimizations
117
Cost-Based Optimization (CBO)
The Spark Optimizer uses heuristics (rules) that are applied to a logical query plan for cost-
based optimization.
1. AnalyzeTableCommand
2. AnalyzeColumnCommand
3. AlterTableAddPartitionCommand
4. AlterTableDropPartitionCommand
5. AlterTableSetLocationCommand
6. TruncateTableCommand
7. InsertIntoHiveTable
8. InsertIntoHadoopFsRelationCommand
9. LoadDataCommand
118
Cost-Based Optimization (CBO)
For equi-height histogram, the heights of all bins(intervals) are the same. The default
number of bins we use is 254.
Note that this method takes two table scans. In the future we may provide other
algorithms which need only one table scan.
For equi-height histogram, all buckets (intervals) have the same height (frequency).
2. construct range values of buckets, e.g. [p(0), p(1/n)], [p(1/n), p(2/n)] … [p((n-1)/n),
p(1)], and use ApproxCountDistinctForIntervals to count ndv in each bucket. Each
bucket is of the form: (lowerBound, higherBound, ndv).
Spark SQL uses column statistics that may optionally hold the histogram of values (which is
empty by default). With spark.sql.statistics.histogram.enabled configuration property turned
on ANALYZE TABLE COMPUTE STATISTICS FOR COLUMNS SQL command generates
column (equi-height) histograms.
119
Cost-Based Optimization (CBO)
// CREATE TABLE t1
Seq((0, 0, "zero"), (1, 1, "one")).
toDF("id", "p1", "p2").
write.
saveAsTable(tableName)
// As we drop and create immediately we may face problems with unavailable partition f
iles
// Invalidate cache
spark.sql(s"REFRESH TABLE $tableName")
You can inspect the column statistics using DESCRIBE EXTENDED SQL command.
120
Cost-Based Optimization (CBO)
121
CatalogStatistics — Table Statistics in Metastore (External Catalog)
metastore):
CommandUtils is requested for updating existing table statistics, the current statistics (if
changed)
Hive Metastore)
122
CatalogStatistics — Table Statistics in Metastore (External Catalog)
scala> stats.map(_.simpleString).foreach(println)
714 bytes, 2 rows
toPlanStats converts the table statistics (from an external metastore) to Spark statistics.
With cost-based optimization enabled and row count statistics available, toPlanStats
creates a Statistics with the estimated total (output) size, row count and column statistics.
Caution FIXME Why does toPlanStats compute sizeInBytes differently per CBO?
123
CatalogStatistics — Table Statistics in Metastore (External Catalog)
124
ColumnStat — Column Statistics
ColumnStat — Column Statistics
ColumnStat holds the statistics of a table column (as part of the table statistics in a
metastore).
histogram
Histogram of values (as Histogram which is empty by
default)
ColumnStat is computed (and created from the result row) using ANALYZE TABLE
ColumnStat may optionally hold the histogram of values which is empty by default. With
You can inspect the column statistics using DESCRIBE EXTENDED SQL command.
125
ColumnStat — Column Statistics
// Make sure that you ran ANALYZE TABLE (as described above)
val db = spark.catalog.currentDatabase
val tableName = "t1"
val metadata = spark.sharedState.externalCatalog.getTable(db, tableName)
val stats = metadata.stats.get
metastore.
126
ColumnStat — Column Statistics
import org.apache.spark.sql.types.DoubleType
val props = p1stats.toMap(colName, dataType = DoubleType)
scala> println(props)
Map(distinctCount -> 2, min -> 0.0, version -> 1, max -> 1.4, maxLen -> 8, avgLen -> 8
, nullCount -> 0)
requested for restoring table statistics from properties (from a Hive Metastore).
scala> println(props)
Map(distinctCount -> 2, min -> 0.0, version -> 1, max -> 1.4, maxLen -> 8, avgLen -> 8
, nullCount -> 0)
import org.apache.spark.sql.types.StructField
val p1 = $"p1".double
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
val colStatsOpt = ColumnStat.fromMap(table = "t1", field = p1, map = props)
127
ColumnStat — Column Statistics
// CREATE TABLE t1
Seq((0, 0, "zero"), (1, 1, "one")).
toDF("id", "p1", "p2").
write.
saveAsTable(tableName)
// As we drop and create immediately we may face problems with unavailable partition f
iles
// Invalidate cache
spark.sql(s"REFRESH TABLE $tableName")
ColumnStat does not support minimum and maximum metrics for binary (i.e.
Note
Array[Byte] ) and string types.
128
ColumnStat — Column Statistics
toExternalString …FIXME
supportsHistogram Method
supportsHistogram …FIXME
distinctCount distinctCount
nullCount nullCount
avgLen avgLen
maxLen maxLen
histogram
Serialized version of Histogram (using
HistogramSerializer.serialize )
129
ColumnStat — Column Statistics
Note toMap adds min , max , histogram entries only if they are available.
fromMap creates a ColumnStat by fetching properties of every column statistic from the
input map .
fromMap returns None when recovering column statistics fails for whatever reason.
WARN Failed to parse column statistics for column [fieldName] in table [table]
rowToColumnStat(
row: InternalRow,
attr: Attribute,
rowCount: Long,
percentiles: Option[ArrayData]): ColumnStat
rowToColumnStat creates a ColumnStat from the input row and the following positions:
130
ColumnStat — Column Statistics
0. distinctCount
1. min
2. max
3. nullCount
4. avgLen
5. maxLen
statExprs Method
statExprs(
col: Attribute,
conf: SQLConf,
colPercentiles: AttributeMap[ArrayData]): CreateNamedStruct
statExprs …FIXME
131
EstimationUtils
EstimationUtils
EstimationUtils is…FIXME
getOutputSize Method
getOutputSize(
attributes: Seq[Attribute],
outputRowCount: BigInt,
attrStats: AttributeMap[ColumnStat] = AttributeMap(Nil)): BigInt
getOutputSize …FIXME
nullColumnStat Method
nullColumnStat …FIXME
rowCountsExist is positive (i.e. true ) when every logical plan (in the input plans ) has
132
EstimationUtils
133
CommandUtils — Utilities for Table Statistics
statistics.
Refer to Logging.
updateTableStats updates the table statistics of the input CatalogTable (only if the statistics
134
CommandUtils — Utilities for Table Statistics
calculateTotalSize calculates total file size for the entire input CatalogTable (when it has
calculateLocationSize(
sessionState: SessionState,
identifier: TableIdentifier,
locationUri: Option[URI]): Long
INFO CommandUtils: Starting to calculate the total file size under path [locationUri].
135
CommandUtils — Utilities for Table Statistics
calculateLocationSize calculates the sum of the length of all the files under the input
locationUri .
In the end, you should see the following INFO message in the logs:
INFO CommandUtils: It took [durationInMs] ms to calculate the total file size under pa
th [locationUri].
compareAndGetNewStats(
oldStats: Option[CatalogStatistics],
newTotalSize: BigInt,
newRowCount: Option[BigInt]): Option[CatalogStatistics]
136
Catalyst DSL — Implicit Conversions for Catalyst Data Structures
The goal of Catalyst DSL is to make working with Spark SQL’s building blocks easier (e.g.
for testing or Spark SQL internals exploration).
hint
plans join
table
DslLogicalPlan
import org.apache.spark.sql.catalyst.dsl.expressions._
scala> :type $"hello"
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
137
Catalyst DSL — Implicit Conversions for Catalyst Data Structures
Some implicit conversions from the Catalyst DSL interfere with the implicits conversions fr
automatically in spark-shell (through spark.implicits._ ).
scala> 'hello.decimal
<console>:30: error: type mismatch;
found : Symbol
required: ?{def decimal: ?}
Note that implicit conversions are not applicable because they are ambiguous:
both method symbolToColumn in class SQLImplicits of type (s: Symbol)org.apache.spark.
and method DslSymbol in trait ExpressionConversions of type (sym: Symbol)org.apache.s
are possible conversion functions from Symbol to ?{def decimal: ?}
'hello.decimal
^
<console>:30: error: value decimal is not a member of Symbol
'hello.decimal
^
Important
Use sbt console with Spark libraries defined (in build.sbt ) instead.
You can also disable an implicit conversion using a trick described in How can an implicit b
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
// ExpressionConversions
import org.apache.spark.sql.catalyst.expressions.Literal
scala> val trueLit: Literal = true
trueLit: org.apache.spark.sql.catalyst.expressions.Literal = true
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
scala> val name: UnresolvedAttribute = 'name
name: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'name
138
Catalyst DSL — Implicit Conversions for Catalyst Data Structures
import org.apache.spark.sql.catalyst.expressions.Expression
scala> val expr: Expression = sum('id)
expr: org.apache.spark.sql.catalyst.expressions.Expression = sum('id)
scala> 'hello.attr
res4: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'hello
scala> "helo".attr
res1: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute = 'helo
// logical plans
expressions.
139
Catalyst DSL — Implicit Conversions for Catalyst Data Structures
String , Date , Timestamp ) and Spark SQL types (i.e. Decimal ) to Literal expressions.
// DEMO FIXME
AttributeReference expressions.
// DEMO FIXME
expression.
// DEMO FIXME
import org.apache.spark.sql.catalyst.dsl.expressions._
val s = star()
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar
assert(s.isInstanceOf[UnresolvedStar])
140
Catalyst DSL — Implicit Conversions for Catalyst Data Structures
distinctFunction operators.
import org.apache.spark.sql.catalyst.dsl.expressions._
scala> f.isDistinct
res0: Boolean = false
val g = 'g.distinctFunction()
scala> g.isDistinct
res1: Boolean = true
notNull: AttributeReference
canBeNull: AttributeReference
// DEMO FIXME
BoundReference expressions.
141
Catalyst DSL — Implicit Conversions for Catalyst Data Structures
import org.apache.spark.sql.catalyst.dsl.expressions._
val boundRef = 'hello.string.at(4)
scala> println(boundRef)
input[4, string, true]
join(
otherPlan: LogicalPlan,
joinType: JoinType = Inner,
condition: Option[Expression] = None): LogicalPlan
import org.apache.spark.sql.catalyst.dsl.plans._
val t1 = table("t1")
scala> println(t1.treeString)
'UnresolvedRelation `t1`
142
Catalyst DSL — Implicit Conversions for Catalyst Data Structures
DslLogicalPlan implicit class is part of plans implicit conversions with extension methods
143
Catalyst DSL — Implicit Conversions for Catalyst Data Structures
import org.apache.spark.sql.catalyst.dsl.expressions._
val id = 'id.long
val logicalPlan = t1.select(id)
scala> println(logicalPlan.numberedTreeString)
00 'Project [id#1L]
01 +- 'UnresolvedRelation `t1`
val t2 = table("t2")
import org.apache.spark.sql.catalyst.plans.LeftSemi
val logicalPlan = t1.join(t2, joinType = LeftSemi, condition = Some(id))
scala> println(logicalPlan.numberedTreeString)
00 'Join LeftSemi, id#1: bigint
01 :- 'UnresolvedRelation `t1`
02 +- 'UnresolvedRelation `t2`
analyze: LogicalPlan
// DEMO FIXME
144
Fundamentals of Spark SQL Application Development
3. Creating SparkSession
145
SparkSession — The Entry Point to Spark SQL
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("My Spark Application") // optional and will be autogenerated if not speci
fied
.master("local[*]") // only for demo and testing purposes, use spark-s
ubmit instead
.enableHiveSupport() // self-explanatory, isn't it?
.config("spark.sql.warehouse.dir", "target/spark-warehouse")
.withExtensions { extensions =>
extensions.injectResolutionRule { session =>
...
}
extensions.injectOptimizerRule { session =>
...
}
}
.getOrCreate
Once created, SparkSession allows for creating a DataFrame (based on an RDD or a Scala
Seq ), creating a Dataset, accessing the Spark SQL services (e.g. ExperimentalMethods,
You can enable Apache Hive support with support for an external Hive metastore.
146
SparkSession — The Entry Point to Spark SQL
You can have as many SparkSessions as you want in a single Spark application. The
common use case is to keep relational entities separate logically in catalogs per
SparkSession .
spark.stop
active: SparkSession
active
(New in 2.4.0)
builder(): Builder
builder
Object method to create a Builder to get the current SparkSession instance or
create a new one.
catalog: Catalog
catalog
Access to the current metadata catalog of relational entities, e.g. database(s),
tables, functions, table columns, and temporary views.
clearActiveSession(): Unit
clearActiveSession
Object method
clearDefaultSession(): Unit
clearDefaultSession
Object method
conf: RuntimeConfig
conf
Access to the current runtime configuration
147
SparkSession — The Entry Point to Spark SQL
experimental: ExperimentalMethods
experimental
Access to the current ExperimentalMethods
getActiveSession: Option[SparkSession]
getActiveSession
Object method
getDefaultSession: Option[SparkSession]
getDefaultSession
Object method
import spark.implicits._
implicits
Implicits conversions
listenerManager: ExecutionListenerManager
listenerManager
Access to the current ExecutionListenerManager
newSession(): SparkSession
newSession
Creates a new SparkSession
148
SparkSession — The Entry Point to Spark SQL
Creates a Dataset[java.lang.Long]
read: DataFrameReader
read
Access to the current DataFrameReader to load data from external data
sources
sessionState: SessionState
sharedState: SharedState
sharedState
Access to the current SharedState
sparkContext: SparkContext
sparkContext
Access to the underlying SparkContext
149
SparkSession — The Entry Point to Spark SQL
sqlContext: SQLContext
sqlContext
stop(): Unit
stop
Stops the associated SparkContext
time
Executes a code block and prints out (to standard output) the time taken to
execute it
udf: UDFRegistration
udf
Access to the current UDFRegistration
version: String
version
Returns the version of Apache Spark
builder(): Builder
150
SparkSession — The Entry Point to Spark SQL
builder creates a new Builder that you use to build a fully-configured SparkSession using
a fluent API.
import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
Tip Read about Fluent interface design pattern in Wikipedia, the free encyclopedia.
version: String
Internally, version uses spark.SPARK_VERSION value that is the version property in spark-
version-info.properties properties file on CLASSPATH.
emptyDataset creates an empty Dataset (assuming that future records being of type T ).
scala> strings.printSchema
root
|-- value: string (nullable = true)
151
SparkSession — The Entry Point to Spark SQL
createDataset is an experimental API to create a Dataset from a local Scala collection, i.e.
scala> one.show
+-----+
|value|
+-----+
| 1|
+-----+
createDataset creates a LocalRelation (for the input data collection) or LogicalRDD (for
You may want to consider implicits object and toDS method instead.
Internally, createDataset first looks up the implicit expression encoder in scope to access
the AttributeReference s (of the schema).
The expression encoder is then used to map elements (of the input Seq[T] ) into a
collection of InternalRows. With the references and rows, createDataset returns a Dataset
with a LocalRelation logical query plan.
152
SparkSession — The Entry Point to Spark SQL
The three first variants (that do not specify numPartitions explicitly) use
Note
SparkContext.defaultParallelism for the number of partitions numPartitions .
Internally, range creates a new Dataset[Long] with Range logical plan and Encoders.LONG
encoder.
emptyDataFrame: DataFrame
Caution FIXME
153
SparkSession — The Entry Point to Spark SQL
Internally, sql requests the current ParserInterface to execute a SQL query that gives a
LogicalPlan.
sql then creates a DataFrame using the current SparkSession (itself) and the LogicalPlan.
spark-sql is the main SQL environment in Spark to work with pure SQL
statements (where you do not have to use Scala to execute them).
udf: UDFRegistration
154
SparkSession — The Entry Point to Spark SQL
table creates a DataFrame (wrapper) from the input tableName table (but only if available
scala> spark.catalog.tableExists("t1")
res1: Boolean = true
catalog: Catalog
catalog attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational
155
SparkSession — The Entry Point to Spark SQL
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default| null| MANAGED| false|
| strs| null| null|TEMPORARY| true|
+------------------+--------+-----------+---------+-----------+
read: DataFrameReader
read method returns a DataFrameReader that is used to read data from external storage
conf: RuntimeConfig
Internally, conf creates a RuntimeConfig (when requested the very first time and cached
afterwards) with the SQLConf of the SessionState.
readStream method
readStream: DataStreamReader
streams Attribute
156
SparkSession — The Entry Point to Spark SQL
streams: StreamingQueryManager
experimentalMethods Attribute
experimental: ExperimentalMethods
newSession(): SparkSession
newSession creates (starts) a new SparkSession (with the current SparkContext and
SharedState).
stop(): Unit
157
SparkSession — The Entry Point to Spark SQL
instantiateSessionState finds the className that is then used to create and build a
BaseSessionStateBuilder .
class of a SessionState :
spark.sql.catalogImplementation, i.e.
158
SparkSession — The Entry Point to Spark SQL
internalCreateDataFrame(
catalystRows: RDD[InternalRow],
schema: StructType,
isStreaming: Boolean = false): DataFrame
Optional SharedState
Optional SessionState
SparkSessionExtensions
clearActiveSession(): Unit
clearActiveSession …FIXME
159
SparkSession — The Entry Point to Spark SQL
clearDefaultSession(): Unit
clearDefaultSession …FIXME
experimental: ExperimentalMethods
experimental …FIXME
getActiveSession: Option[SparkSession]
getActiveSession …FIXME
getDefaultSession: Option[SparkSession]
getDefaultSession …FIXME
Accessing ExecutionListenerManager
— listenerManager Method
listenerManager: ExecutionListenerManager
listenerManager …FIXME
sessionState: SessionState
sessionState …FIXME
160
SparkSession — The Entry Point to Spark SQL
setActiveSession …FIXME
setDefaultSession …FIXME
sharedState: SharedState
sharedState …FIXME
time …FIXME
161
Builder — Building SparkSession using Fluent API
162
Builder — Building SparkSession using Fluent API
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.appName("My Spark Application") // optional and will be autogenerated if not speci
fied
.master("local[*]") // only for demo and testing purposes, use spark-s
ubmit instead
.enableHiveSupport() // self-explanatory, isn't it?
.config("spark.sql.warehouse.dir", "target/spark-warehouse")
.withExtensions { extensions =>
extensions.injectResolutionRule { session =>
...
}
extensions.injectOptimizerRule { session =>
...
}
}
.getOrCreate
You can have multiple SparkSession s in a single Spark application for different
Note
data catalogs (through relational entities).
options
Used when…FIXME
getOrCreate(): SparkSession
getOrCreate …FIXME
enableHiveSupport(): Builder
enableHiveSupport enables Hive support, i.e. running structured queries on Hive tables (and
a persistent Hive metastore, support for Hive serdes and Hive user-defined functions).
163
Builder — Building SparkSession using Fluent API
You do not need any existing Hive installation to use Spark’s Hive support.
SparkSession context will automatically create metastore_db in the current
directory of a Spark application and a directory configured by
Note spark.sql.warehouse.dir.
Refer to SharedState.
Internally, enableHiveSupport makes sure that the Hive classes are on CLASSPATH, i.e.
Spark SQL’s org.apache.hadoop.hive.conf.HiveConf , and sets
spark.sql.catalogImplementation internal configuration property to hive .
withExtensions Method
appName Method
appName …FIXME
config Method
config …FIXME
master Method
master …FIXME
164
Builder — Building SparkSession using Fluent API
165
implicits Object — Implicits Conversions
Encoders Encoders for primitive and object types in Scala and Java (aka
implicits object is defined inside SparkSession and hence requires that you build a
166
implicits Object — Implicits Conversions
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import spark.implicits._
scala> :imports
1) import org.apache.spark.SparkContext._ (69 terms, 1 are implicit)
2) import spark.implicits._ (1 types, 67 terms, 37 are implicit)
3) import spark.sql (1 terms)
4) import org.apache.spark.sql.functions._ (354 terms)
DatasetHolder has toDS and toDF methods that simply return the Dataset[T] (it was
toDS(): Dataset[T]
toDF(): DataFrame
toDF(colNames: String*): DataFrame
167
implicits Object — Implicits Conversions
168
SparkSessionExtensions
SparkSessionExtensions
SparkSessionExtensions is an interface that a Spark developer can use to extend a
SparkSession with custom query execution rules and a relational entity parser.
create one).
169
SparkSessionExtensions
buildOptimizerRules gives the optimizerRules logical rules that are associated with the
input SparkSession.
injectCheckRule …FIXME
injectParser …FIXME
injectPlannerStrategy …FIXME
injectPostHocResolutionRule …FIXME
injectResolutionRule …FIXME
171
Dataset — Structured Query with Data Encoder
The following figure shows the relationship between different entities of Spark SQL that all
together give the Dataset data structure.
2. Encoder (of the type of the records for fast serialization and deserialization to and from
InternalRow)
3. SparkSession
172
Dataset — Structured Query with Data Encoder
Datasets are lazy and structured query operators and expressions are only triggered when
an action is invoked.
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
The Dataset API offers declarative and type-safe operators that makes for an improved
experience for data processing (comparing to DataFrames that were a set of index- or
column name-based Rows).
173
Dataset — Structured Query with Data Encoder
and the strong static type-safety of Scala. The last feature of bringing the strong type-safety
to DataFrame makes Dataset so appealing. All the features together give you a more
functional programming interface to work with structured data.
== Physical Plan ==
*Filter (id#51L = 0)
+- *Range (0, 1, splits=8)
== Physical Plan ==
*Filter <function1>.apply
+- *Range (0, 1, splits=8)
It is only with Datasets to have syntax and analysis checks at compile time (that was not
possible using DataFrame, regular SQL queries or even RDDs).
174
Dataset — Structured Query with Data Encoder
Using Dataset objects turns DataFrames of Row instances into a DataFrames of case
classes with proper names and types (following their equivalents in the case classes).
Instead of using indices to access respective fields in a DataFrame and cast it to a type, all
this is automatically handled by Datasets and checked by the Scala compiler.
If however a LogicalPlan is used to create a Dataset , the logical plan is first executed
(using the current SessionState in the SparkSession ) that yields the QueryExecution plan.
You can request the "untyped" view of a Dataset or access the RDD that is generated after
executing the query. It is supposed to give you a more pleasant experience while
transitioning from the legacy RDD-based or DataFrame-based APIs you may have used in
the earlier versions of Spark SQL or encourage migrating from Spark Core’s RDD API to
Spark SQL’s Dataset API.
The default storage level for Datasets is MEMORY_AND_DISK because recomputing the
in-memory columnar representation of the underlying table is expensive. You can however
persist a Dataset .
Spark 2.0 has introduced a new query model called Structured Streaming for
continuous incremental execution of structured queries. That made possible to
Note
consider Datasets a static and bounded as well as streaming and unbounded
data sets with a single unified API for different execution models.
175
Dataset — Structured Query with Data Encoder
Implicit ExpressionEncoder
exprEnc
Used when…FIXME
Analyzed logical plan with all logical commands executed and turned into a
LocalRelation.
logicalPlan: LogicalPlan
logicalPlan
rdd: RDD[T]
rdd gives RDD with the extra execution step to convert rows from thei
internal binary row format to JVM objects that will impact the JVM memo
Note
as the objects are inside JVM (while were outside before). You should n
use rdd directly.
Internally, rdd first creates a new logical plan that deserializes the Dataset’s
plan.
scala> dataset.queryExecution.toRdd.toDebugString
res2: String =
(8) MapPartitionsRDD[11] at toRdd at <console>:26 []
| MapPartitionsRDD[10] at toRdd at <console>:26 []
| ParallelCollectionRDD[9] at toRdd at <console>:26 []
176
Dataset — Structured Query with Data Encoder
rdd then requests SessionState to execute the logical plan to get the correspond
RDD of binary rows.
rdd then requests the Dataset’s ExpressionEncoder for the data type
deserializer expression) and maps over them (per partition) to create records of th
expected type T .
rdd is at the "boundary" between the internal binary row format and th
Note JVM type of the dataset. Avoid the extra deserialization step to lower JV
memory requirements of your Spark application.
Lazily-created SQLContext
sqlContext
Used when…FIXME
inputFiles: Array[String]
inputFiles requests QueryExecution for optimized logical plan and collects the following
logical operators:
FileRelation
HiveTableRelation
inputFiles then requests the logical operators for their underlying files:
Caution FIXME
177
Dataset — Structured Query with Data Encoder
SparkSession
QueryExecution
You can also create a Dataset using LogicalPlan that is immediately executed
Note
using SessionState .
isLocal: Boolean
isLocal flag is enabled (i.e. true ) when operators like collect or take could be run
Internally, isLocal checks whether the logical query plan of a Dataset is LocalRelation.
isStreaming: Boolean
Internally, isStreaming takes the Dataset’s logical plan and gives whether the plan is
streaming or not.
Queryable
Caution FIXME
withNewRDDExecutionId executes the input body action under new execution id.
178
Dataset — Structured Query with Data Encoder
ofRows returns DataFrame (which is the type alias for Dataset[Row] ). ofRows uses
RowEncoder to convert the schema (based on the input logicalPlan logical plan).
Internally, ofRows prepares the input logicalPlan for execution and creates a
Dataset[Row] with the current SparkSession, the QueryExecution and RowEncoder.
179
Dataset — Structured Query with Data Encoder
withNewExecutionId executes the input body action under new execution id.
withAction requests QueryExecution for the optimized physical query plan and resets the
withAction requests SQLExecution to execute the input action with the executable
In the end, withAction notifies ExecutionListenerManager that the name action has finished
successfully or with an exception.
180
Dataset — Structured Query with Data Encoder
apply …FIXME
collectFromPlan …FIXME
selectUntyped …FIXME
withTypedPlan …FIXME
181
Dataset — Structured Query with Data Encoder
withSetOperator …FIXME
sortInternal creates a Dataset with Sort unary logical operator (and the logicalPlan as the
Internally, sortInternal firstly builds ordering expressions for the given sortExprs
columns, i.e. takes the sortExprs columns and makes sure that they are SortOrder
expressions already (and leaves them untouched) or wraps them into SortOrder expressions
182
Dataset — Structured Query with Data Encoder
In the end, sortInternal creates a Dataset with Sort unary logical operator (with the
ordering expressions, the given global flag, and the logicalPlan as the child logical plan).
withPlan simply uses ofRows internal factory method to create a DataFrame for the input
withPlan is annotated with Scala’s @inline annotation that requests the Scala
Note
compiler to try especially hard to inline it.
183
DataFrame — Dataset of Rows with RowEncoder
DataFrame is a collection of rows with a schema that is the result of executing a structured
query (once it will have been executed).
DataFrame uses the immutable, in-memory, resilient, distributed and parallel capabilities of
RDD, and applies a structure called schema to the data.
See org.apache.spark.package.scala.
DataFrame is a distributed collection of tabular data organized into rows and named
data.groupBy('Product_ID).sum('Score)
Spark SQL borrowed the concept of DataFrame from pandas' DataFrame and made it
immutable, parallel (one machine, perhaps with many processors and cores) and
distributed (many machines, perhaps with many processors and cores).
Hey, big data consultants, time to help teams migrate the code from pandas'
Note DataFrame into Spark’s DataFrames (at least to PySpark’s DataFrame) and
offer services to set up large clusters!
DataFrames in Spark SQL strongly rely on the features of RDD - it’s basically a RDD
exposed as structured DataFrame by appropriate operations to handle very big data from
the day one. So, petabytes of data should not scare you (unless you’re an administrator to
create such clustered Spark environment - contact me when you feel alone with the task).
184
DataFrame — Dataset of Rows with RowEncoder
scala> df.show
+----+-----+
|word|count|
+----+-----+
| one| 1|
| one| 1|
| two| 1|
+----+-----+
scala> counted.show
+----+-----+
|word|count|
+----+-----+
| two| 1|
| one| 2|
+----+-----+
You can create DataFrames by loading data from structured files (JSON, Parquet, CSV),
RDDs, tables in Hive, or external databases (JDBC). You can also create DataFrames from
scratch and build upon them (as in the above example). See DataFrame API. You can read
any format given you have appropriate Spark SQL extension of DataFrameReader to format
the dataset appropriately.
the good ol' SQL - helps migrating from "SQL databases" world into the world of
DataFrame in Spark SQL
Query DSL - an API that helps ensuring proper syntax at compile time.
Filtering
DataFrames use the Catalyst query optimizer to produce efficient queries (and so they are
supposed to be faster than corresponding RDD-based queries).
Your DataFrames can also be type-safe and moreover further improve their
Note performance through specialized encoders that can significantly cut serialization
and deserialization times.
185
DataFrame — Dataset of Rows with RowEncoder
You can enforce types on generic rows and hence bring type safety (at compile time) by
encoding rows into type-safe Dataset object. As of Spark 2.0 it is a preferred way of
developing Spark applications.
Features of DataFrame
A DataFrame is a collection of "generic" Row instances (as RDD[Row] ) and a schema.
The quickest and easiest way to work with Spark SQL is to use Spark shell and spark
object.
scala> spark
res1: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@60ae950f
The Apache Hive™ data warehouse software facilitates querying and managing large
datasets residing in distributed storage.
Using toDF
After you import spark.implicits._ (which is done for you by Spark shell) you may apply
toDF method to convert objects to DataFrames.
186
DataFrame — Dataset of Rows with RowEncoder
This method assumes the data comes from a Scala case class that will describe the
schema.
scala> df.show
+------+---+
| name|age|
+------+---+
| Jacek| 42|
|Patryk| 19|
|Maksym| 5|
+------+---+
187
DataFrame — Dataset of Rows with RowEncoder
scala> auctions.printSchema
root
|-- auctionid: string (nullable = true)
|-- bid: string (nullable = true)
|-- bidtime: string (nullable = true)
|-- bidder: string (nullable = true)
|-- bidderrate: string (nullable = true)
|-- openbid: string (nullable = true)
|-- price: string (nullable = true)
scala> auctions.dtypes
res28: Array[(String, String)] = Array((auctionid,StringType), (bid,StringType), (bidt
ime,StringType), (bidder,StringType), (bidderrate,StringType), (openbid,StringType), (
price,StringType))
scala> auctions.show(5)
+----------+----+-----------+-----------+----------+-------+-----+
| auctionid| bid| bidtime| bidder|bidderrate|openbid|price|
+----------+----+-----------+-----------+----------+-------+-----+
|1638843936| 500|0.478368056| kona-java| 181| 500| 1625|
|1638843936| 800|0.826388889| doc213| 60| 500| 1625|
|1638843936| 600|3.761122685| zmxu| 7| 500| 1625|
|1638843936|1500|5.226377315|carloss8055| 5| 500| 1625|
|1638843936|1600| 6.570625| jdrinaz| 6| 500| 1625|
+----------+----+-----------+-----------+----------+-------+-----+
only showing top 5 rows
188
DataFrame — Dataset of Rows with RowEncoder
scala> lines.count
res3: Long = 1349
scala> case class Auction(auctionid: String, bid: Float, bidtime: Float, bidder: Strin
g, bidderrate: Int, openbid: Float, price: Float)
defined class Auction
scala> df.printSchema
root
|-- auctionid: string (nullable = true)
|-- bid: float (nullable = false)
|-- bidtime: float (nullable = false)
|-- bidder: string (nullable = true)
|-- bidderrate: integer (nullable = false)
|-- openbid: float (nullable = false)
|-- price: float (nullable = false)
scala> df.show
+----------+------+----------+-----------------+----------+-------+------+
| auctionid| bid| bidtime| bidder|bidderrate|openbid| price|
+----------+------+----------+-----------------+----------+-------+------+
|1638843936| 500.0|0.47836804| kona-java| 181| 500.0|1625.0|
|1638843936| 800.0| 0.8263889| doc213| 60| 500.0|1625.0|
|1638843936| 600.0| 3.7611227| zmxu| 7| 500.0|1625.0|
|1638843936|1500.0| 5.2263775| carloss8055| 5| 500.0|1625.0|
|1638843936|1600.0| 6.570625| jdrinaz| 6| 500.0|1625.0|
|1638843936|1550.0| 6.8929167| carloss8055| 5| 500.0|1625.0|
|1638843936|1625.0| 6.8931136| carloss8055| 5| 500.0|1625.0|
|1638844284| 225.0| 1.237419|[email protected]| 0| 200.0| 500.0|
|1638844284| 500.0| 1.2524074| njbirdmom| 33| 200.0| 500.0|
|1638844464| 300.0| 1.8111342| aprefer| 58| 300.0| 740.0|
|1638844464| 305.0| 3.2126737| 19750926o| 3| 300.0| 740.0|
189
DataFrame — Dataset of Rows with RowEncoder
Support for CSV data sources is available by default in Spark 2.0.0. No need for
Note
an external module.
190
DataFrame — Dataset of Rows with RowEncoder
scala> df.printSchema
root
|-- auctionid: string (nullable = true)
|-- bid: string (nullable = true)
|-- bidtime: string (nullable = true)
|-- bidder: string (nullable = true)
|-- bidderrate: string (nullable = true)
|-- openbid: string (nullable = true)
|-- price: string (nullable = true)
scala> df.show
+----------+------+-----------+-----------------+----------+-------+-----+
| auctionid| bid| bidtime| bidder|bidderrate|openbid|price|
+----------+------+-----------+-----------------+----------+-------+-----+
|1638843936| 500|0.478368056| kona-java| 181| 500| 1625|
|1638843936| 800|0.826388889| doc213| 60| 500| 1625|
|1638843936| 600|3.761122685| zmxu| 7| 500| 1625|
|1638843936| 1500|5.226377315| carloss8055| 5| 500| 1625|
|1638843936| 1600| 6.570625| jdrinaz| 6| 500| 1625|
|1638843936| 1550|6.892916667| carloss8055| 5| 500| 1625|
|1638843936| 1625|6.893113426| carloss8055| 5| 500| 1625|
|1638844284| 225|1.237418982|[email protected]| 0| 200| 500|
|1638844284| 500|1.252407407| njbirdmom| 33| 200| 500|
|1638844464| 300|1.811134259| aprefer| 58| 300| 740|
|1638844464| 305|3.212673611| 19750926o| 3| 300| 740|
|1638844464| 450|4.165798611| coharley| 30| 300| 740|
|1638844464| 450|6.736319444| adammurry| 5| 300| 740|
|1638844464| 500|6.736469907| adammurry| 5| 300| 740|
|1638844464|505.78|6.988194444| 19750926o| 3| 300| 740|
|1638844464| 551|6.989652778| 19750926o| 3| 300| 740|
|1638844464| 570|6.993148148| 19750926o| 3| 300| 740|
|1638844464| 601|6.993900463| 19750926o| 3| 300| 740|
|1638844464| 610|6.994965278| 19750926o| 3| 300| 740|
|1638844464| 560| 6.99537037| ps138| 5| 300| 740|
+----------+------+-----------+-----------------+----------+-------+-----+
only showing top 20 rows
191
DataFrame — Dataset of Rows with RowEncoder
read: DataFrameReader
Among the supported structured data (file) formats are (consult Specifying Data Format
(format method) for DataFrameReader ):
JSON
parquet
JDBC
ORC
libsvm
reader.parquet("file.parquet")
reader.json("file.json")
reader.format("libsvm").load("sample_libsvm_data.txt")
Querying DataFrame
This variant (in which you use stringified column names) can only select existing
Note
columns, i.e. you cannot create new ones using select expressions.
192
DataFrame — Dataset of Rows with RowEncoder
scala> predictions.printSchema
root
|-- id: long (nullable = false)
|-- topic: string (nullable = true)
|-- text: string (nullable = true)
|-- label: double (nullable = true)
|-- words: array (nullable = true)
| |-- element: string (containsNull = true)
|-- features: vector (nullable = true)
|-- rawPrediction: vector (nullable = true)
|-- probability: vector (nullable = true)
|-- prediction: double (nullable = true)
scala> auctions.groupBy("bidder").count().show(5)
+--------------------+-----+
| bidder|count|
+--------------------+-----+
| dennisthemenace1| 1|
| amskymom| 5|
| [email protected]| 4|
| millyjohn| 1|
|ykelectro@hotmail...| 2|
+--------------------+-----+
only showing top 5 rows
In the following example you query for the top 5 of the most active bidders.
Note the tiny $ and desc together with the column name to sort the rows by.
193
DataFrame — Dataset of Rows with RowEncoder
scala> auctions.groupBy("bidder").count().sort($"count".desc).show(5)
+------------+-----+
| bidder|count|
+------------+-----+
| lass1004| 22|
| pascal1666| 19|
| freembd| 17|
|restdynamics| 17|
| happyrova| 17|
+------------+-----+
only showing top 5 rows
scala> auctions.groupBy("bidder").count().sort(desc("count")).show(5)
+------------+-----+
| bidder|count|
+------------+-----+
| lass1004| 22|
| pascal1666| 19|
| freembd| 17|
|restdynamics| 17|
| happyrova| 17|
+------------+-----+
only showing top 5 rows
194
DataFrame — Dataset of Rows with RowEncoder
scala> df.select("auctionid").distinct.count
res88: Long = 97
scala> df.groupBy("bidder").count.show
+--------------------+-----+
| bidder|count|
+--------------------+-----+
| dennisthemenace1| 1|
| amskymom| 5|
| [email protected]| 4|
| millyjohn| 1|
|ykelectro@hotmail...| 2|
| [email protected]| 1|
| rrolex| 1|
| bupper99| 2|
| cheddaboy| 2|
| adcc007| 1|
| varvara_b| 1|
| yokarine| 4|
| steven1328| 1|
| anjara| 2|
| roysco| 1|
|lennonjasonmia@ne...| 2|
|northwestportland...| 4|
| bosspad| 10|
| 31strawberry| 6|
| nana-tyler| 11|
+--------------------+-----+
only showing top 20 rows
Using SQL
Register a DataFrame as a named temporary table to run SQL.
You can execute a SQL query on a DataFrame using sql operation, but before the query is
executed it is optimized by Catalyst query optimizer. You can print the physical plan for a
DataFrame using the explain operation.
195
DataFrame — Dataset of Rows with RowEncoder
scala> sql.explain
== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[
count#148L])
TungstenExchange SinglePartition
TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], outp
ut=[currentCount#156L])
TungstenProject
Scan PhysicalRDD[auctionid#49,bid#50,bidtime#51,bidder#52,bidderrate#53,openbid#54
,price#55]
scala> sql.show
+-----+
|count|
+-----+
| 1348|
+-----+
Filtering
scala> df.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa| 100| 0.12|
| aaa| 200| 0.29|
| bbb| 200| 0.53|
| bbb| 300| 0.42|
+----+---------+-----+
scala> df.filter($"name".like("a%")).show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa| 100| 0.12|
| aaa| 200| 0.29|
+----+---------+-----+
196
DataFrame — Dataset of Rows with RowEncoder
And then…
import org.apache.spark.sql.SaveMode
val df = spark.read.format("com.databricks.spark.avro").load("test.avro")
Example Datasets
eBay online auctions
197
Row
Row
Row is a generic row object with an ordered collection of fields that can be accessed by an
ordinal / an index (aka generic access by ordinal), a name (aka native primitive access) or
using Scala’s pattern matching.
import org.apache.spark.sql.Row
Caution FIXME
scala> row(1)
res0: Any = hello
scala> row.get(1)
res1: Any = hello
Note Generic access by ordinal (using apply or get ) returns a value of type Any .
198
Row
You can query for fields with their proper types using getAs with an index
scala> row.getAs[Int](0)
res1: Int = 1
scala> row.getAs[String](1)
res2: String = hello
FIXME
Note row.getAs[String](null)
Schema
A Row instance can have a schema defined.
Unless you are instantiating Row yourself (using Row Object), a Row has
Note
always a schema.
Row Object
Row companion object offers factory methods to create Row instances from a collection of
199
Row
200
DataSource API — Managing Datasets in External Data Sources
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.getOrCreate
As of Spark 2.0, DataFrameReader can read text files using textFile methods that return
Dataset[String] (not DataFrames ).
spark.read.textFile("README.md")
There are two operation modes in Spark SQL, i.e. batch and streaming (part of Spark
Structured Streaming).
import org.apache.spark.sql.streaming.DataStreamReader
val stream: DataStreamReader = spark.readStream
201
DataSource API — Managing Datasets in External Data Sources
Saving Datasets
Spark SQL can save data to external storage systems like files, Hive tables and JDBC
databases through DataFrameWriter interface.
batch fashion.
import org.apache.spark.sql.streaming.DataStreamWriter
val writer: DataStreamWriter[String] = papers.writeStream
202
DataFrameReader — Loading Data From External Data Sources
jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
jdbc(
url: String,
table: String,
jdbc properties: Properties): DataFrame
jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
load(): DataFrame
load load(path: String): DataFrame
load(paths: String*): DataFrame
203
DataFrameReader — Loading Data From External Data Sources
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import org.apache.spark.sql.DataFrameReader
val reader: DataFrameReader = spark.read
DataFrameReader supports many file formats natively and offers the interface to define
custom formats.
DataFrameReader assumes parquet data source file format by default that you
Note
can change using spark.sql.sources.default Spark property.
After you have described the loading pipeline (i.e. the "Extract" part of ETL in Spark SQL),
you eventually "trigger" the loading using format-agnostic load or format-specific (e.g. json,
csv, jdbc) operators.
204
DataFrameReader — Loading Data From External Data Sources
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import org.apache.spark.sql.DataFrame
DataFrameReader can read text files using textFile methods that return typed Datasets .
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import org.apache.spark.sql.Dataset
val lines: Dataset[String] = spark
.read
.textFile("README.md")
(New in Spark 2.2) DataFrameReader can load datasets from Dataset[String] (with lines
being complete "files") using format-specific csv and json operators.
205
DataFrameReader — Loading Data From External Data Sources
import org.apache.spark.sql.Dataset
val cities: Dataset[String] = Seq(csvLine).toDS
scala> cities.show
+---------------+
| value|
+---------------+
|0,Warsaw,Poland|
+---------------+
import org.apache.spark.sql.DataFrame
val citiesDF: DataFrame = spark
.read
.schema(schema)
.csv(cities)
scala> citiesDF.show
+---+------+-------+
| id| city|country|
+---+------+-------+
| 0|Warsaw| Poland|
+---+------+-------+
206
DataFrameReader — Loading Data From External Data Sources
extraOptions
Used when…FIXME
json
orc
text
jdbc
207
DataFrameReader — Loading Data From External Data Sources
Note Spark SQL allows for developing custom data source formats.
schema allows for specifying the schema of a data source (that the DataFrameReader is
import org.apache.spark.sql.types.StructType
val schema = new StructType()
.add($"id".long.copy(nullable = false))
.add($"city".string)
.add($"country".string)
scala> schema.printTreeString
root
|-- id: long (nullable = false)
|-- city: string (nullable = true)
|-- country: string (nullable = true)
import org.apache.spark.sql.DataFrameReader
val r: DataFrameReader = spark.read.schema(schema)
Some formats can infer schema from datasets (e.g. csv or json) using
Note
inferSchema option.
You can also use options method to describe different options in a single Map .
208
DataFrameReader — Loading Data From External Data Sources
JSON
CSV
parquet
ORC
text
json method
csv method
parquet method
New in 2.0.0: snappy is the default Parquet codec. See [SPARK-14482][SQL] Change
default Parquet codec from gzip to snappy.
209
DataFrameReader — Loading Data From External Data Sources
none or uncompressed
lzo
210
DataFrameReader — Loading Data From External Data Sources
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.
scala:137)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:134)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:117)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.sca
la:65)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:65)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:390)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
... 48 elided
orc method
Optimized Row Columnar (ORC) file format is a highly efficient columnar format to store
Hive data with more than 1,000 columns and improve performance. ORC format was
introduced in Hive version 0.11 to use and retain the type information from the table
definition.
Tip Read ORC Files document to learn about the ORC file format.
text method
Example
211
DataFrameReader — Loading Data From External Data Sources
scala> lines.show
+--------------------+
| value|
+--------------------+
| # Apache Spark|
| |
|Spark is a fast a...|
|high-level APIs i...|
|supports general ...|
|rich set of highe...|
|MLlib for machine...|
|and Spark Streami...|
| |
|<http://spark.apa...|
| |
| |
|## Online Documen...|
| |
|You can find the ...|
|guide, on the [pr...|
|and [project wiki...|
|This README file ...|
| |
| ## Building Spark|
+--------------------+
only showing top 20 rows
table loads the content of the tableName table into an untyped DataFrame.
scala> spark.catalog.tableExists("t1")
res1: Boolean = true
table simply passes the call to SparkSession.table after making sure that a
Note
user-defined schema has not been specified.
212
DataFrameReader — Loading Data From External Data Sources
jdbc loads data from an external table using the JDBC data source.
Internally, jdbc creates a JDBCOptions from the input url , table and extraOptions
with connectionProperties .
In the end, jdbc requests the SparkSession to create a DataFrame for a JDBCRelation
(with JDBCPartitions and JDBCOptions created earlier).
Review the exercise Creating DataFrames from Tables using JDBC and
Tip
PostgreSQL.
213
DataFrameReader — Loading Data From External Data Sources
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
import org.apache.spark.sql.Dataset
val lines: Dataset[String] = spark
.read
.textFile("README.md")
textFile are similar to text family of methods in that they both read text files
Note but text methods return untyped DataFrame while textFile return typed
Dataset[String] .
Internally, textFile passes calls on to text method and selects the only value column
before it applies Encoders.STRING encoder.
SparkSession
loadV1Source creates a DataSource and requests it to resolve the underlying relation (as a
BaseRelation).
load(): DataFrame
load(path: String): DataFrame
load(paths: String*): DataFrame
214
DataFrameReader — Loading Data From External Data Sources
load loads a dataset from a data source (with optional support for multiple paths ) as an
untyped DataFrame.
Internally, load lookupDataSource for the source. load then branches off per its type (i.e.
whether it is of DataSourceV2 marker type or not).
Otherwise, if the source is not a "Data Source V2" data source, load simply loadV1Source.
Hive data source can only be used with tables, you can not read files of Hive data sou
rce directly.
defined.
verifyColumnNameOfCorruptRecord(
schema: StructType,
columnNameOfCorruptRecord: String): Unit
verifyColumnNameOfCorruptRecord …FIXME
215
DataFrameWriter — Saving Data To External Data Sources
216
DataFrameWriter — Saving Data To External Data Sources
save(): Unit
save(path: String): Unit
save
Saves a DataFrame (i.e. writes the result of executing a structured query) to the
source
scala> :type df
org.apache.spark.sql.DataFrame
DataFrameWriter supports many file formats and JDBC databases. It also allows for
DataFrameWriter defaults to parquet data source format. You can change the default format
217
DataFrameWriter — Saving Data To External Data Sources
In the end, you trigger the actual saving of the content of a Dataset (i.e. the result of
executing a structured query) using save method.
writer.save
mode mode
partitioningColumns partitionBy
bucketColumnNames bucketBy
numBuckets bucketBy
sortColumnNames sortBy
runCommand uses the input SparkSession to access the SessionState that is in turn
runCommand records the current time (start time) and uses the SQLExecution helper object
to execute the action (under a new execution id) that simply requests the QueryExecution
for the RDD[InternalRow] (and triggers execution of logical commands).
Use web UI’s SQL tab to see the execution or a SparkListener to be notified
Tip when the execution is started and finished. The SparkListener should intercept
SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd events.
219
DataFrameWriter — Saving Data To External Data Sources
Internally, saveAsTable requests the current ParserInterface to parse the input table
name.
saveAsTable then requests the SessionCatalog to check whether the table exists or not.
In the end, saveAsTable branches off per whether the table exists or not and the save
mode.
save(): Unit
220
DataFrameWriter — Saving Data To External Data Sources
Internally, save uses DataSource to look up the class of the requested data source (for the
source option and the SQLConf).
save does not support saving to Hive (i.e. the source is hive ) and throws an
Hive data source can only be used with tables, you can not write files of Hive data so
urce directly.
save does not support bucketing (i.e. when the numBuckets or sortColumnNames options
jdbc method saves the content of the DataFrame to an external database table via JDBC.
You can use mode to control save mode, i.e. what happens when an external table exists
when save is executed.
It is assumed that the jdbc save pipeline is not partitioned and bucketed.
driver which is the class name of the JDBC driver (that is passed to Spark’s own
221
DataFrameWriter — Saving Data To External Data Sources
When table exists and the override save mode is in use, DROP TABLE table is executed.
It creates the input table (using CREATE TABLE table (schema) where schema is the
schema of the DataFrame ).
bucketBy Method
bucketBy simply sets the internal numBuckets and bucketColumnNames to the input
val df = spark.range(5)
import org.apache.spark.sql.DataFrameWriter
val writer: DataFrameWriter[java.lang.Long] = df.write
partitionBy Method
Caution FIXME
mode defines the behaviour of save when an external file or table (Spark writes to) already
222
DataFrameWriter — Saving Data To External Data Sources
Ignore
Do not save the records and not change the existing data
in any way.
sortBy simply sets sorting columns to the input colName and colNames column names.
option …FIXME
options …FIXME
Caution FIXME
223
DataFrameWriter — Saving Data To External Data Sources
Parquet
Caution FIXME
insertInto inserts the content of the DataFrame to the specified tableName table.
224
DataFrameWriter — Saving Data To External Data Sources
partitioningColumns is defined.
225
DataFrameWriter — Saving Data To External Data Sources
getBucketSpec: Option[BucketSpec]
In the end, createTable creates a CreateTable logical command (with the CatalogTable ,
mode and the logical query plan of the dataset) and runs it.
226
DataFrameWriter — Saving Data To External Data Sources
saveToV1Source(): Unit
saveToV1Source creates a DataSource (for the source class name, the partitioningColumns
and the extraOptions) and requests it for the logical command for writing (with the mode and
the analyzed logical plan of the structured query).
assertNotPartitioned …FIXME
227
DataFrameWriter — Saving Data To External Data Sources
csv Method
csv …FIXME
json Method
json …FIXME
orc Method
orc …FIXME
parquet Method
parquet …FIXME
text Method
text …FIXME
partitionBy Method
228
DataFrameWriter — Saving Data To External Data Sources
229
Dataset API — Dataset Operators
An untyped transformation
apply
An untyped transformation to select a column based on the colum
Dataset onto a Column )
A typed transformation
cache(): this.type
cache
A basic action that is a mere synonym of persist.
230
Dataset API — Dataset Operators
checkpoint(): Dataset[T]
checkpoint checkpoint(eager: Boolean): Dataset[T]
collect(): Array[T]
collect
An action
colRegex
An untyped transformation to create a column (reference) based o
specified as a regex
columns: Array[String]
columns
A basic action
count(): Long
count
An action to count the number of rows
231
Dataset API — Dataset Operators
A basic action
An untyped transformation
distinct(): Dataset[T]
distinct
A typed transformation that is a mere synonym of dropDuplicates
the Dataset )
An untyped transformation
dropDuplicates(): Dataset[T]
dropDuplicates(colNames: Array[String]): Dataset[T
dropDuplicates(colNames: Seq[String]): Dataset[T]
dropDuplicates dropDuplicates(col1: String, cols: String*): Dataset
A typed transformation
dtypes
232
Dataset API — Dataset Operators
A basic action
explain(): Unit
explain(extended: Boolean): Unit
explain
A basic action to display the logical and physical plans of the
logical and physical plans (with optional cost and codegen summa
output
A typed transformation
first(): T
first
An action that is a mere synonym of head
233
Dataset API — Dataset Operators
An untyped transformation
head(): T (1)
head(n: Int): Array[T]
head
1. Uses 1 for n
An action
inputFiles: Array[String]
inputFiles
A basic action
isEmpty: Boolean
isEmpty
(New in 2.4.4) A basic action
isLocal: Boolean
isLocal
A basic action
234
Dataset API — Dataset Operators
An untyped transformation
A typed transformation
localCheckpoint(): Dataset[T]
localCheckpoint(eager: Boolean): Dataset[T]
localCheckpoint
na: DataFrameNaFunctions
na
An untyped transformation
A typed transformation
235
Dataset API — Dataset Operators
persist(): this.type
persist(newLevel: StorageLevel): this.type
printSchema(): Unit
printSchema
A basic action
rdd: RDD[T]
rdd
A basic action
A typed transformation
An untyped transformation
236
Dataset API — Dataset Operators
A typed transformation
schema: StructType
schema
A basic action
show(): Unit
show(truncate: Boolean): Unit
show(numRows: Int): Unit
show(numRows: Int, truncate: Boolean): Unit
show show(numRows: Int, truncate: Int): Unit
show(numRows: Int, truncate: Int, vertical: Boolean
An action
sort
237
Dataset API — Dataset Operators
stat: DataFrameStatFunctions
stat
An untyped transformation
storageLevel: StorageLevel
storageLevel
A basic action
summary
An action to calculate statistics (e.g. count , mean ,
50% , 75% percentiles)
toDF(): DataFrame
toDF(colNames: String*): DataFrame
toDF
toJSON: Dataset[String]
toJSON
A typed transformation
toLocalIterator(): java.util.Iterator[T]
toLocalIterator
An action that returns an iterator with all rows in the
as much memory as the largest partition in the Dataset
238
Dataset API — Dataset Operators
unpersist
1. Uses unpersist with blocking disabled ( false
A basic action to unpersist the Dataset
A typed transformation
write: DataFrameWriter[T]
write
A basic action that returns a DataFrameWriter for saving the conte
streaming) Dataset out to an external storage
239
Typed Transformations
Typed transformations are the methods in the Dataset Scala class that are
Note
grouped in typedrel group name, i.e. @group typedrel .
Repartitions a Dataset
coalesce
coalesce(numPartitions: Int): Dataset[T]
dropDuplicates(): Dataset[T]
dropDuplicates(colNames: Array[String]): Dataset[T]
dropDuplicates dropDuplicates(colNames: Seq[String]): Dataset[T]
dropDuplicates(col1: String, cols: String*): Dataset[T]
240
Typed Transformations
241
Typed Transformations
as Typed Transformation
242
Typed Transformations
as …FIXME
as[T] allows for converting from a weakly-typed Dataset of Rows to Dataset[T] with T
scala> df.printSchema
root
|-- id: integer (nullable = false)
|-- token: string (nullable = true)
Internally, coalesce creates a Repartition logical operator with shuffle disabled (which
is marked as false in the below explain 's output).
243
Typed Transformations
== Physical Plan ==
Coalesce 1
+- *Range (0, 5, step=1, splits=Some(8))
dropDuplicates(): Dataset[T]
dropDuplicates(colNames: Array[String]): Dataset[T]
dropDuplicates(colNames: Seq[String]): Dataset[T]
dropDuplicates(col1: String, cols: String*): Dataset[T]
dropDuplicates …FIXME
except …FIXME
exceptAll …FIXME
244
Typed Transformations
filter …FIXME
flatMap returns a new Dataset (of type U ) with all records (of type T ) mapped over
intersect …FIXME
245
Typed Transformations
intersectAll …FIXME
joinWith …FIXME
limit …FIXME
map …FIXME
mapPartitions …FIXME
You can define seed and if you don’t, a random seed will be used.
246
Typed Transformations
randomSplit is commonly used in Spark MLlib to split an input Dataset into two
Note
datasets for training and validation.
val ds = spark.range(10)
scala> ds.randomSplit(Array[Double](2, 3)).foreach(_.show)
+---+
| id|
+---+
| 0|
| 1|
| 2|
+---+
+---+
| id|
+---+
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
partitionExprs expressions.
247
Typed Transformations
== Physical Plan ==
Exchange RoundRobinPartitioning(1)
+- *Range (0, 5, step=1, splits=Some(8))
operator.
248
Typed Transformations
scala> spark.version
res1: String = 2.3.1
scala> println(q.queryExecution.toRdd.getNumPartitions)
5
scala> println(q.queryExecution.toRdd.toDebugString)
(5) ShuffledRowRDD[18] at toRdd at <console>:26 []
+-(8) MapPartitionsRDD[17] at toRdd at <console>:26 []
| MapPartitionsRDD[13] at toRdd at <console>:26 []
| MapPartitionsRDD[12] at toRdd at <console>:26 []
| ParallelCollectionRDD[11] at toRdd at <console>:26 []
repartitionByRange uses a SortOrder with the Ascending sort order, i.e. ascending nulls
by expression is specified.
sample …FIXME
249
Typed Transformations
select …FIXME
sort …FIXME
sortWithinPartitions simply calls the internal sortInternal method with the global flag
disabled ( false ).
toJSON: Dataset[String]
250
Typed Transformations
scala> ds.toJSON.show
+-------------------+
| value|
+-------------------+
| {"value":"hello"}|
| {"value":"world"}|
|{"value":"foo bar"}|
+-------------------+
Internally, toJSON grabs the RDD[InternalRow] (of the QueryExecution of the Dataset ) and
maps the records (per RDD partition) into JSON.
// Transformation t
import org.apache.spark.sql.Dataset
def withDoubled(longs: Dataset[java.lang.Long]) = longs.withColumn("doubled", 'id * 2)
scala> dataset.transform(withDoubled).show
+---+-------+
| id|doubled|
+---+-------+
| 0| 0|
| 1| 2|
| 2| 4|
| 3| 6|
| 4| 8|
+---+-------+
251
Typed Transformations
union …FIXME
unionByName creates a new Dataset that is an union of the rows in this and the other
Datasets column-wise, i.e. the order of columns in Datasets does not matter as long as their
names and number match.
Internally, unionByName creates a Union logical operator for this Dataset and Project logical
operator with the other Dataset.
In the end, unionByName applies the CombineUnions logical optimization to the Union
logical operator and requests the result LogicalPlan to wrap the child operators with
AnalysisBarriers.
scala> println(q.queryExecution.logical.numberedTreeString)
00 'Union
01 :- AnalysisBarrier
02 : +- Project [id#90L, rand#92]
03 : +- Project [id#90L, rand(-9144575865446031058) AS rand#92]
04 : +- Range (0, 1, step=1, splits=Some(8))
05 +- AnalysisBarrier
06 +- Project [id#103, rand#102]
07 +- Project [_1#99 AS rand#102, _2#100 AS id#103]
08 +- LocalRelation [_1#99, _2#100]
252
Typed Transformations
where is simply a synonym of the filter operator, i.e. passes the input parameters along to
filter .
253
Typed Transformations
254
Typed Transformations
255
Untyped Transformations
Untyped transformations are the methods in the Dataset Scala class that are
Note
grouped in untypedrel group name, i.e. @group untypedrel .
colRegex
Selects a column based on the column name specified as a regex (i.e.
maps a Dataset onto a Column )
256
Untyped Transformations
na na: DataFrameNaFunctions
257
Untyped Transformations
agg …FIXME
apply selects a column based on the column name (i.e. maps a Dataset onto a Column ).
col selects a column based on the column name (i.e. maps a Dataset onto a Column ).
If the column name is * (a star), col simply creates a Column with ResolvedStar
expression (with the schema output attributes of the analyzed logical plan of the
QueryExecution).
colRegex selects a column based on the column name specified as a regex (i.e. maps a
Internally, colRegex matches the input column name to different regular expressions (in the
order):
1. For column names with quotes without a qualifier, colRegex simply creates a Column
with a UnresolvedRegex (with no table)
258
Untyped Transformations
2. For column names with quotes with a qualifier, colRegex simply creates a Column with
a UnresolvedRegex (with a table specified)
3. For other column names, colRegex (behaves like col and) creates a Column with the
column name resolved (as a NamedExpression)
crossJoin …FIXME
cube …FIXME
drop …FIXME
groupBy …FIXME
259
Untyped Transformations
join …FIXME
na Untyped Transformation
na: DataFrameNaFunctions
rollup …FIXME
select …FIXME
260
Untyped Transformations
val ds = spark.range(5)
Internally, it executes select with every expression in exprs mapped to Column (using
SparkSqlParser.parseExpression).
stat: DataFrameStatFunctions
withColumn …FIXME
261
Untyped Transformations
withColumnRenamed …FIXME
262
Basic Actions
Basic actions are the methods in the Dataset Scala class that are grouped in
Note
basic group name, i.e. @group basic .
cache(): this.type
cache
checkpoint(): Dataset[T]
checkpoint(eager: Boolean): Dataset[T]
checkpoint
Checkpoints the Dataset in a reliable way (using a
reliable HDFS-compliant file system, e.g. Hadoop HDFS
or Amazon S3)
263
Basic Actions
explain(): Unit
explain(extended: Boolean): Unit
explain
Displays the logical and physical plans of the Dataset
i.e. displays the logical and physical plans (with optional
cost and codegen summaries) to the standard output
isEmpty: Boolean
isEmpty
(New in 2.4.4)
localCheckpoint(): Dataset[T]
localCheckpoint(eager: Boolean): Dataset[T]
localCheckpoint
Checkpoints the Dataset locally on executors (and
therefore unreliably)
264
Basic Actions
toDF(): DataFrame
toDF toDF(colNames: String*): DataFrame
unpersist(): this.type
unpersist(blocking: Boolean): this.type
unpersist
write: DataFrameWriter[T]
write
Returns a DataFrameWriter for saving the content of the
(non-streaming) Dataset out to an external storage
checkpoint simply requests the Dataset to checkpoint with the given eager flag and the
265
Basic Actions
createTempView …FIXME
createOrReplaceTempView …FIXME
createGlobalTempView …FIXME
createOrReplaceGlobalTempView …FIXME
createTempViewCommand(
viewName: String,
replace: Boolean,
global: Boolean): CreateViewCommand
createTempViewCommand …FIXME
266
Basic Actions
explain prints the logical and (with extended flag enabled) physical plans, their cost and
Tip Use explain to review the structured queries and optimizations applied.
explain then requests QueryExecution for the optimized physical query plan and collects
In the end, explain goes over the InternalRow records and converts them to lines to
display to console.
If you are serious about query debugging you could also use the Debugging
Tip
Query Execution facility.
267
Basic Actions
== Physical Plan ==
*Range (0, 10, step=1, splits=Some(8))
hint operator is part of Hint Framework to specify a hint (by name and parameters ) for a
Dataset .
val ds = spark.range(3)
val plan = ds.queryExecution.logical
scala> println(plan.numberedTreeString)
00 Range (0, 3, step=1, splits=Some(8))
// Attach a hint
val dsHinted = ds.hint("myHint", 100, true)
val plan = dsHinted.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'UnresolvedHint myHint, [100, true]
01 +- Range (0, 3, step=1, splits=Some(8))
localCheckpoint simply uses Dataset.checkpoint operator with the input eager flag and
binary rows (aka internalRdd ) and then requests the RDD to make a copy of all the rows
(by adding a MapPartitionsRDD ).
With eager flag on, checkpoint counts the number of records in the RDD (by executing
RDD.count ) that gives the effect of immediate eager checkpointing.
checkpoint requests QueryExecution (of the Dataset ) for optimized physical query plan
(the plan is used to get the outputPartitioning and outputOrdering for the result Dataset ).
In the end, checkpoint creates a DataFrame with a new logical plan node for scanning data
from an RDD of InternalRows ( LogicalRDD ).
rdd: RDD[T]
269
Basic Actions
Whenever you are in need to convert a Dataset into a RDD , executing rdd method gives
you the RDD of the proper input object type (not Row as in DataFrames) that sits behind the
Dataset .
Internally, it looks ExpressionEncoder (for the Dataset ) up and accesses the deserializer
expression. That gives the DataType of the result of evaluating the expression.
schema: StructType
You may also use the following methods to learn about the schema:
printSchema(): Unit
Tip
explain
toDF(): DataFrame
toDF(colNames: String*): DataFrame
Internally, the empty-argument toDF creates a Dataset[Row] using the Dataset 's
SparkSession and QueryExecution with the encoder being RowEncoder.
270
Basic Actions
unpersist(): this.type
unpersist(blocking: Boolean): this.type
Caution FIXME
write: DataFrameWriter[T]
isEmpty: Boolean
isEmpty …FIXME
isLocal: Boolean
isLocal …FIXME
271
Actions
Dataset API — Actions
Actions are part of the Dataset API for…FIXME
Actions are the methods in the Dataset Scala class that are grouped in
Note
action group name, i.e. @group action .
first first(): T
head(): T
head head(n: Int): Array[T]
show(): Unit
show(truncate: Boolean): Unit
show(numRows: Int): Unit
show show(numRows: Int, truncate: Boolean): Unit
show(numRows: Int, truncate: Int): Unit
show(numRows: Int, truncate: Int, vertical: Boolean)
: Unit
272
Actions
collect Action
collect(): Array[T]
collect …FIXME
count Action
count(): Long
count …FIXME
describe …FIXME
first Action
first(): T
273
Actions
first …FIXME
foreach Action
foreach …FIXME
foreachPartition Action
foreachPartition …FIXME
head Action
head(): T (1)
head(n: Int): Array[T]
1. Calls the other head with n as 1 and takes the first element
head …FIXME
reduce Action
reduce …FIXME
show Action
show(): Unit
show(truncate: Boolean): Unit
show(numRows: Int): Unit
show(numRows: Int, truncate: Boolean): Unit
show(numRows: Int, truncate: Int): Unit
show(numRows: Int, truncate: Int, vertical: Boolean): Unit
show …FIXME
274
Actions
The default statistics are: count , mean , stddev , min , max and 25% , 50% , 75%
percentiles.
Internally, summary uses the StatFunctions to calculate the requested summaries for the
Dataset.
take loads all the data into the memory of the Spark application’s driver
Warning
process and for a large n could result in OutOfMemoryError .
Internally, take creates a new Dataset with Limit logical plan for Literal expression
and the current LogicalPlan . It then runs the SparkPlan that produces a
Array[InternalRow] that is in turn decoded to Array[T] using a bounded encoder.
toLocalIterator Action
toLocalIterator(): java.util.Iterator[T]
toLocalIterator …FIXME
275
DataFrameNaFunctions — Working With Missing Data
DataFrameNaFunctions — Working With
Missing Data
DataFrameNaFunctions is used to work with missing data in a structured query (a
DataFrame).
drop(): DataFrame
drop(cols: Array[String]): DataFrame
drop(minNonNulls: Int): DataFrame
drop(minNonNulls: Int, cols: Array[String]): DataFrame
drop drop(minNonNulls: Int, cols: Seq[String]): DataFrame
drop(cols: Seq[String]): DataFrame
drop(how: String): DataFrame
drop(how: String, cols: Array[String]): DataFrame
drop(how: String, cols: Seq[String]): DataFrame
276
DataFrameNaFunctions — Working With Missing Data
convertToDouble …FIXME
drop Method
drop(): DataFrame
drop(cols: Array[String]): DataFrame
drop(minNonNulls: Int): DataFrame
drop(minNonNulls: Int, cols: Array[String]): DataFrame
drop(minNonNulls: Int, cols: Seq[String]): DataFrame
drop(cols: Seq[String]): DataFrame
drop(how: String): DataFrame
drop(how: String, cols: Array[String]): DataFrame
drop(how: String, cols: Seq[String]): DataFrame
drop …FIXME
fill Method
fill …FIXME
fillCol …FIXME
277
DataFrameNaFunctions — Working With Missing Data
fillMap …FIXME
fillValue …FIXME
replace0 …FIXME
replace Method
replace …FIXME
replaceCol …FIXME
278
DataFrameNaFunctions — Working With Missing Data
279
DataFrameStatFunctions — Working With Statistic Functions
DataFrameStatFunctions — Working With
Statistic Functions
DataFrameStatFunctions is used to work with statistic functions in a structured query (a
DataFrame).
280
DataFrameStatFunctions — Working With Statistic Functions
approxQuantile(
cols: Array[String],
probabilities: Array[Double],
relativeError: Double): Array[Array[Double]]
approxQuantile approxQuantile(
col: String,
probabilities: Array[Double],
relativeError: Double): Array[Double]
281
DataFrameStatFunctions — Working With Statistic Functions
approxQuantile Method
approxQuantile(
cols: Array[String],
probabilities: Array[Double],
relativeError: Double): Array[Array[Double]]
approxQuantile(
col: String,
probabilities: Array[Double],
relativeError: Double): Array[Double]
approxQuantile …FIXME
bloomFilter Method
bloomFilter …FIXME
buildBloomFilter …FIXME
corr Method
corr …FIXME
countMinSketch Method
282
DataFrameStatFunctions — Working With Statistic Functions
countMinSketch …FIXME
cov Method
cov …FIXME
crosstab Method
crosstab …FIXME
freqItems Method
freqItems …FIXME
sampleBy Method
sampleBy …FIXME
283
DataFrameStatFunctions — Working With Statistic Functions
284
Column
Column
Column represents a column in a Dataset that holds a Catalyst Expression that produces a
With the implicits converstions imported, you can create "free" column references using
Scala’s symbols.
import org.apache.spark.sql.Column
scala> val nameCol: Column = 'name
nameCol: org.apache.spark.sql.Column = name
You can also create free column references from $ -prefixed strings.
import org.apache.spark.sql.Column
Beside using the implicits conversions, you can create columns using col and column
functions.
import org.apache.spark.sql.functions._
285
Column
Finally, you can create a bound Column using the Dataset the column is supposed to be
part of using Dataset.apply factory method or Dataset.col operator.
You can use bound Column references only with the Dataset s they have been
Note
created from.
name
Column has a reference to Catalyst’s Expression it was created for using expr method.
as creates a TypedColumn (that gives a type hint about the expected return value of the
column).
scala> $"id".as[Int]
res1: org.apache.spark.sql.TypedColumn[Any,Int] = id
286
Column
name Operator
name …FIXME
withColumn method returns a new DataFrame with the new column col with colName
name added.
scala> df.show
+------+------+
|number|polish|
+------+------+
| 1| jeden|
| 2| dwa|
+------+------+
287
Column
like Operator
Caution FIXME
288
Column
scala> df.select('id)
res0: org.apache.spark.sql.DataFrame = [id: int]
scala> df.select('id).show
+---+
| id|
+---+
| 0|
| 1|
+---+
over(): Column
over(window: WindowSpec): Column
over creates a windowing column (aka analytic clause) that allows to execute a
aggregate function over a window (i.e. a group of records that are in some relation to the
current record).
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.WindowSpec
val spec: WindowSpec = Window.rangeBetween(Window.unboundedPreceding, Window.currentRo
w)
scala> val overRange = $"someColumn" over spec
overRange: org.apache.spark.sql.Column = someColumn OVER (RANGE BETWEEN UNBOUNDED PREC
EDING AND CURRENT ROW)
cast Operator
cast method casts a column to a data type. It makes for type-safe maps with Row objects
289
Column
cast uses CatalystSqlParser to parse the data type from its canonical string
representation.
cast Example
scala> df.printSchema
root
|-- label: float (nullable = false)
|-- text: string (nullable = true)
// without cast
import org.apache.spark.sql.Row
scala> df.select("label").map { case Row(label) => label.getClass.getName }.show(false
)
+---------------+
|value |
+---------------+
|java.lang.Float|
+---------------+
// with cast
import org.apache.spark.sql.types.DoubleType
scala> df.select(col("label").cast(DoubleType)).map { case Row(label) => label.getClas
s.getName }.show(false)
+----------------+
|value |
+----------------+
|java.lang.Double|
+----------------+
generateAlias Method
generateAlias …FIXME
290
Column
named Method
named: NamedExpression
named …FIXME
Note Dataset.select
KeyValueGroupedDataset.agg
291
Column API — Column Operators
isInCollection
(New in 2.4.4) An expression operator that is true if the value
of the column is in the given values collection
isin Operator
292
Column API — Column Operators
293
TypedColumn
TypedColumn
TypedColumn is a Column with the ExpressionEncoder for the types of the input and the
output.
scala> id.expr
res1: org.apache.spark.sql.catalyst.expressions.Expression = 'id
name Operator
name …FIXME
withInputType(
inputEncoder: ExpressionEncoder[_],
inputAttributes: Seq[Attribute]): TypedColumn[T, U]
withInputType …FIXME
Dataset.select
Note
KeyValueGroupedDataset.agg
RelationalGroupedDataset.agg
294
TypedColumn
Catalyst expression
295
Basic Aggregation — Typed and Untyped Grouping Operators
You can also use SparkSession to execute good ol' SQL with GROUP BY should
you prefer.
SQL or Dataset API’s operators go through the same query planning and
optimizations, and have the same performance characteristic in the end.
296
Basic Aggregation — Typed and Untyped Grouping Operators
agg applies an aggregate function on a subset or the entire Dataset (i.e. considering the
entire data set as one group).
groupBy operator groups the rows in a Dataset by columns (as Column expressions or
names).
val q = nms.
groupBy('m).
agg(sum('n) as "sum").
orderBy('m)
scala> q.show
+---+------+
| m| sum|
+---+------+
| 0|250500|
| 1|250000|
+---+------+
297
Basic Aggregation — Typed and Untyped Grouping Operators
Note The following uses the data setup as described in Test Setup section below.
scala> tokens.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa| 100| 0.12|
| aaa| 200| 0.29|
| bbb| 200| 0.53|
| bbb| 300| 0.42|
+----+---------+-----+
scala> tokens.groupBy('name).avg().show
+----+--------------+----------+
|name|avg(productId)|avg(score)|
+----+--------------+----------+
| aaa| 150.0| 0.205|
| bbb| 250.0| 0.475|
+----+--------------+----------+
scala> tokens.groupBy('name).count.show
+----+-----+
|name|count|
+----+-----+
| aaa| 2|
| bbb| 2|
+----+-----+
scala> tokens.groupBy('name).max("score").show
+----+----------+
|name|max(score)|
+----+----------+
| aaa| 0.29|
| bbb| 0.53|
+----+----------+
scala> tokens.groupBy('name).sum("score").show
+----+----------+
|name|sum(score)|
+----+----------+
| aaa| 0.41|
| bbb| 0.95|
298
Basic Aggregation — Typed and Untyped Grouping Operators
+----+----------+
scala> tokens.groupBy('productId).sum("score").show
+---------+------------------+
|productId| sum(score)|
+---------+------------------+
| 300| 0.42|
| 100| 0.12|
| 200|0.8200000000000001|
+---------+------------------+
groupByKey groups records (of type T ) by the input func and in the end returns a
scala> tokens.groupByKey(_.productId).count.orderBy($"value").show
+-----+--------+
|value|count(1)|
+-----+--------+
| 100| 1|
| 200| 2|
| 300| 1|
+-----+--------+
import org.apache.spark.sql.expressions.scalalang._
val q = tokens.
groupByKey(_.productId).
agg(typed.sum[Token](_.score)).
toDF("productId", "sum").
orderBy('productId)
scala> q.show
+---------+------------------+
|productId| sum|
+---------+------------------+
| 100| 0.12|
| 200|0.8200000000000001|
| 300| 0.42|
+---------+------------------+
Test Setup
This is a setup for learning GroupedData . Paste it into Spark Shell using :paste .
299
Basic Aggregation — Typed and Untyped Grouping Operators
import spark.implicits._
1. Cache the dataset so the following queries won’t load/recompute data over and over
again.
300
RelationalGroupedDataset — Untyped Row-based Grouping
RelationalGroupedDataset — Untyped Row-
based Grouping
RelationalGroupedDataset is an interface to calculate aggregates over groups of rows in a
DataFrame.
groupBy
rollup
cube
pivot
avg
count
max
mean
min
1. New in 2.4.0
Pivots on a column (with new columns per distinct value)
sum
301
RelationalGroupedDataset — Untyped Row-based Grouping
scala> spark.conf.get("spark.sql.retainGroupColumns")
Note res1: String = true
agg creates a DataFrame with the rows being the result of executing grouping expressions
302
RelationalGroupedDataset — Untyped Row-based Grouping
// groupBy above
scala> println(countsAndSums.queryExecution.logical.numberedTreeString)
00 'Aggregate [group#179L], [group#179L, count('id) AS count#188, sum('id) AS sum#190]
01 +- Project [id#176L, (id#176L % cast(2 as bigint)) AS group#179L]
02 +- Range (0, 10, step=1, splits=Some(8))
// rollup operator
val rollupQ = spark.range(2).rollup('id).agg(count('id))
scala> println(rollupQ.queryExecution.logical.numberedTreeString)
00 'Aggregate [rollup('id)], [unresolvedalias('id, None), count('id) AS count(id)#267]
01 +- Range (0, 2, step=1, splits=Some(8))
// cube operator
val cubeQ = spark.range(2).cube('id).agg(count('id))
scala> println(cubeQ.queryExecution.logical.numberedTreeString)
00 'Aggregate [cube('id)], [unresolvedalias('id, None), count('id) AS count(id)#280]
01 +- Range (0, 2, step=1, splits=Some(8))
// pivot operator
val pivotQ = spark.
range(10).
withColumn("group", 'id % 2).
groupBy("group").
pivot("group").
agg(count("id"))
scala> println(pivotQ.queryExecution.logical.numberedTreeString)
00 'Pivot [group#296L], group#296: bigint, [0, 1], [count('id)]
01 +- Project [id#293L, (id#293L % cast(2 as bigint)) AS group#296L]
02 +- Range (0, 10, step=1, splits=Some(8))
Caution FIXME
Caution FIXME
For PivotType , toDF creates a DataFrame with Pivot unary logical operator.
303
RelationalGroupedDataset — Untyped Row-based Grouping
aggregateNumericColumns …FIXME
DataFrame
Grouping expressions
CubeType
RollupType
PivotType
pivot Operator
304
RelationalGroupedDataset — Untyped Row-based Grouping
1. Selects distinct and sorted values on pivotColumn and calls the other pivot (that
results in 3 extra "scanning" jobs)
2. Preferred as more efficient because the unique values are aleady provided
3. New in 2.4.0
pivot pivots on a pivotColumn column, i.e. adds new columns per distinct values in
pivotColumn .
305
RelationalGroupedDataset — Untyped Row-based Grouping
val q = visits
.groupBy("city") // <-- rows in pivot table
.pivot("year") // <-- columns (unique values queried)
.count() // <-- values in cells
scala> q.show
+------+----+----+----+
| city|2015|2016|2017|
+------+----+----+----+
|Warsaw| 1| 1|null|
|Boston|null|null| 1|
+------+----+----+----+
scala> q.explain
== Physical Plan ==
HashAggregate(keys=[city#8], functions=[pivotfirst(year#9, count(1) AS `count`#222L, 2
015, 2016, 2017, 0, 0)])
+- Exchange hashpartitioning(city#8, 200)
+- HashAggregate(keys=[city#8], functions=[partial_pivotfirst(year#9, count(1) AS `
count`#222L, 2015, 2016, 2017, 0, 0)])
+- *HashAggregate(keys=[city#8, year#9], functions=[count(1)])
+- Exchange hashpartitioning(city#8, year#9, 200)
+- *HashAggregate(keys=[city#8, year#9], functions=[partial_count(1)])
+- LocalTableScan [city#8, year#9]
scala> visits
.groupBy('city)
.pivot("year", Seq("2015")) // <-- one column in pivot table
.count
.show
+------+----+
| city|2015|
+------+----+
|Warsaw| 1|
|Boston|null|
+------+----+
Use pivot with a list of distinct values to pivot on so Spark does not have
Important
to compute the list itself (and run three extra "scanning" jobs).
306
RelationalGroupedDataset — Untyped Row-based Grouping
Figure 2. pivot in web UI — Three Extra Scanning Jobs Due to Unspecified Distinct Values
spark.sql.pivotMaxValues (default: 10000 ) controls the maximum number of
Note (distinct) values that will be collected without error (when doing pivot without
specifying the values for the pivot column).
expressions.
toDF internal method maps PivotType group type to a DataFrame with Pivot
unary logical operator.
scala> q.queryExecution.logical
Note res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Pivot [city#8], year#9: int, [2015, 2016, 2017], [count(1) AS count#24L]
+- Project [_1#3 AS id#7, _2#4 AS city#8, _3#5 AS year#9]
+- LocalRelation [_1#3, _2#4, _3#5]
strToExpr …FIXME
alias Method
alias …FIXME
307
RelationalGroupedDataset — Untyped Row-based Grouping
308
KeyValueGroupedDataset — Typed Grouping
KeyValueGroupedDataset — Typed Grouping
KeyValueGroupedDataset is an experimental interface to calculate aggregates over groups of
cogroup
count
flatMapGroups
flatMapGroupsWithState
keys
keyAs
mapGroups
mapGroupsWithState
mapValues
reduceGroups
309
KeyValueGroupedDataset — Typed Grouping
scala> tokensByName.keys.show
+-----+
|value|
+-----+
| aaa|
| bbb|
+-----+
aggUntyped …FIXME
logicalPlan: AnalysisBarrier
logicalPlan …FIXME
310
Dataset Join Operators
Queries can access multiple tables at once, or access the same table in such a way
that multiple rows of the table are being processed at the same time. A query that
accesses multiple rows of the same or different tables at one time is called a join
query.
You can join two datasets using the join operators with an optional join condition.
You can also use SQL mode to join datasets using good ol' SQL.
You can specify a join condition (aka join expression) as part of join operators or using
where or filter operators.
You can specify the join type as part of join operators (using joinType optional parameter).
311
Dataset Join Operators
ExistenceJoin is an artifical join type used to express an existential sub-query, that is often
You can also find that Spark SQL uses the following two families of joins:
Name are case-insensitive and can use the underscore ( _ ) at any position, i.e.
Tip
left_anti and LEFT_ANTI are equivalent.
Spark SQL offers different join strategies with Broadcast Joins (aka Map-Side
Note Joins) among them that are supposed to optimize your join queries over large
distributed datasets.
join Operators
312
Dataset Join Operators
5. Inner join
// Inner join
scala> left.join(right, "id").show
+---+----+-----+
| id|left|right|
+---+----+-----+
| 0|zero| zero|
+---+----+-----+
// Full outer
scala> left.join(right, Seq("id"), "fullouter").show
+---+----+-----+
| id|left|right|
+---+----+-----+
| 1| one| null|
| 3|null|three|
| 2|null| two|
| 0|zero| zero|
+---+----+-----+
313
Dataset Join Operators
// Left anti
scala> left.join(right, Seq("id"), "leftanti").show
+---+----+
| id|left|
+---+----+
| 1| one|
+---+----+
Note That is usually considered a trivially true condition and refused as acceptable.
314
Dataset Join Operators
crossJoin Method
crossJoin joins two Datasets using Cross join type with no condition.
crossJoin creates an explicit cartesian join that can be very expensive without
Note
an extra filter (that can be pushed down).
1. inner equi-join
joinWith creates a Dataset with two columns _1 and _2 that each contain records for
315
Dataset Join Operators
316
Broadcast Joins (aka Map-Side Joins)
Broadcast join can be very efficient for joins between a large table (fact) with relatively small
tables (dimensions) that could then be used to perform a star-schema join. It can avoid
sending all data of the large table over the network.
You can use broadcast function or SQL’s broadcast hints to mark a dataset to be broadcast
when used in a join query.
According to the article Map-Side Join in Spark, broadcast join is also called a
Note replicated join (in the distributed system community) or a map-side join (in the
Hadoop community).
CanBroadcast object matches a LogicalPlan with output small enough for broadcast join.
Currently statistics are only supported for Hive Metastore tables where the
Note
command ANALYZE TABLE [tableName] COMPUTE STATISTICS noscan has been run.
scala> println(q.queryExecution.sparkPlan.numberedTreeString)
00 BroadcastHashJoin [id#0L], [id#4L], Inner, BuildRight
01 :- Range (0, 100, step=1, splits=8)
02 +- Range (0, 100, step=1, splits=8)
scala> q.explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#4L], Inner, BuildRight
317
Broadcast Joins (aka Map-Side Joins)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res1: String = -1
scala> q.explain
== Physical Plan ==
*SortMergeJoin [id#0L], [id#4L], Inner
:- *Sort [id#0L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0L, 200)
: +- *Range (0, 100, step=1, splits=8)
+- *Sort [id#4L ASC NULLS FIRST], false, 0
+- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200)
318
Broadcast Joins (aka Map-Side Joins)
319
Window Aggregation
Window Aggregation
Window Aggregation is…FIXME
320
WindowSpec — Window Specification
WindowSpec — Window Specification
WindowSpec is a window specification that defines which rows are included in a window
(frame), i.e. the set of rows that are associated with the current row by some relation.
import org.apache.spark.sql.expressions.Window
scala> val byHTokens = Window.partitionBy('token startsWith "h")
byHTokens: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressi
ons.WindowSpec@574985d8
Once the initial version of a WindowSpec is created, you use the methods to further configure
the window specification.
321
WindowSpec — Window Specification
With a window specification fully defined, you use Column.over operator that associates the
WindowSpec with an aggregate or window function.
import org.apache.spark.sql.functions.rank
val c = rank over windowSpec
withAggregate …FIXME
322
Window Utility Object — Defining Window Specification
currentRow: Long
currentRow
Value representing the current row that is used to define frame
boundaries.
rangeBetween
Creates a WindowSpec with the frame boundaries defined, from
start (inclusive) to end (inclusive). Both start and end
are relative to the current row based on the actual value of the
ORDER BY expression(s).
unboundedFollowing: Long
unboundedFollowing
323
Window Utility Object — Defining Window Specification
unboundedPreceding: Long
unboundedPreceding
Value representing the first row in a partition (equivalent to
"UNBOUNDED PRECEDING" in SQL) that is used to define
frame boundaries.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{currentRow, lit}
val windowSpec = Window
.partitionBy($"orderId")
.orderBy($"time")
.rangeBetween(currentRow, lit(1))
scala> :type windowSpec
org.apache.spark.sql.expressions.WindowSpec
spec: WindowSpec
spec creates an "empty" WindowSpec, i.e. with empty partition and ordering specifications,
and a UnspecifiedFrame .
324
Standard Functions — functions Object
You can access the standard functions using the following import statement in your Scala
application:
import org.apache.spark.sql.functions._
325
Standard Functions — functions Object
326
Standard Functions — functions Object
327
Standard Functions — functions Object
(New in 2.4.0)
(New in 2.4.0)
(New in 2.4.0)
328
Standard Functions — functions Object
(New in 2.4.0)
Collection
functions explode_outer(e: Column): Column
explode_outer
Creates a new row for each element in the given array o
If the array/map is null or empty then null
1. New in 2.4.0
Parses a column with a JSON string into a StructType
StructType elements with the specified schema.
329
Standard Functions — functions Object
(New in 2.4.0)
(New in 2.4.0)
(New in 2.4.0)
330
Standard Functions — functions Object
1. New in 2.4.0
1. New in 2.4.0
1. New in 2.4.0
331
Standard Functions — functions Object
window(
timeColumn: Column,
windowDuration: String): Column
window(
timeColumn: Column,
window windowDuration: String,
slideDuration: String): Column
window(
timeColumn: Column,
windowDuration: String,
slideDuration: String,
startTime: String): Column
Math
bin Converts the value of a long column to binary format
functions
array
broadcast
coalesce Gives the first non- null value among the given column
expr
Regular
functions
lit
(Non-
aggregate
functions) map
struct
typedLit
when
split
String
functions
upper
332
Standard Functions — functions Object
cume_dist(): Column
cume_dist
Computes the cumulative distribution of records across
partitions
dense_rank(): Column
dense_rank
Computes the rank of records per window partition
Window
functions ntile(n: Int): Column
ntile
Computes the ntile group
percent_rank(): Column
percent_rank
Computes the rank of records per window partition
rank(): Column
rank
Computes the rank of records per window partition
row_number(): Column
row_number
Computes the sequential numbering per window partitio
333
Standard Functions — functions Object
The page gives only a brief ovierview of the many functions available in
Tip functions object and so you should read the official documentation of the
functions object.
The udf family of functions allows you to create user-defined functions (UDFs) based on a
user-defined function in Scala. It accepts f function of 0 to 10 arguments and the input and
output types are automatically inferred (given the types of the respective input and output
types of the function f ).
import org.apache.spark.sql.functions._
val _length: String => Int = _.length
val _lengthUDF = udf(_length)
// define a dataframe
val df = sc.parallelize(0 to 3).toDF("num")
334
Standard Functions — functions Object
udf(f: AnyRef, dataType: DataType) allows you to use a Scala closure for the function
argument (as f ) and explicitly declaring the output data type (as dataType ).
import org.apache.spark.sql.types.IntegerType
val byTwo = udf((n: Int) => n * 2, IntegerType)
split Function
split function splits str column using pattern . It returns a new Column .
scala> withSplit.show
+---+-------------+----------------+
|num| input| split|
+---+-------------+----------------+
| 0| hello|world| [hello, world]|
| 1|witaj|swiecie|[witaj, swiecie]|
+---+-------------+----------------+
Note .$|()[{^?*+\ are RegEx’s meta characters and are considered special.
upper Function
335
Standard Functions — functions Object
upper function converts a string column into one with all letter upper. It returns a new
Column .
The following example uses two functions that accept a Column and return
Note
another to showcase how to chain them.
scala> withUpperReversed.show
+---+---+-----+-----+
| id|val| name|upper|
+---+---+-----+-----+
| 0| 1|hello|OLLEH|
| 2| 3|world|DLROW|
| 2| 4| ala| ALA|
+---+---+-----+-----+
bin converts the long value in a column to its binary format (i.e. as an unsigned integer in
336
Standard Functions — functions Object
scala> withBin.queryExecution.logical
res2: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
'Project [*, bin('id) AS binary#14]
+- Range (0, 5, step=1, splits=Some(8))
337
Aggregate Functions
338
Aggregate Functions
339
Aggregate Functions
340
Aggregate Functions
or not and:
grouping can only be used with cube, rollup or GROUPING SETS multi-
Note dimensional aggregate operators (and is verified when Analyzer does check
analysis).
From Hive’s documentation about Grouping__ID function (that can somehow help to
understand grouping ):
When aggregates are displayed for a column its value is null . This may conflict in
case the column itself has some null values. There needs to be some way to identify
NULL in column, which means aggregate and NULL in column, which means value.
341
Aggregate Functions
scala> workshops.show
+-------+----+-----+
| city|year|count|
+-------+----+-----+
| Warsaw|2016| 2|
|Toronto|2016| 4|
|Toronto|2017| 1|
| null|2016| 2|
+-------+----+-----+
val q = workshops
.cube("city", "year")
.agg(grouping("city"), grouping("year")) // <-- grouping here
.sort($"city".desc_nulls_last, $"year".desc_nulls_last)
scala> q.show
+-------+----+--------------+--------------+
| city|year|grouping(city)|grouping(year)|
+-------+----+--------------+--------------+
| Warsaw|2016| 0| 0|
| Warsaw|null| 0| 1|
|Toronto|2017| 0| 0|
|Toronto|2016| 0| 0|
|Toronto|null| 0| 1|
| null|2017| 1| 0|
| null|2016| 1| 0|
| null|2016| 0| 0| <-- null is city
| null|null| 0| 1| <-- null is city
| null|null| 1| 1|
+-------+----+--------------+--------------+
342
Aggregate Functions
scala> println(q.queryExecution.analyzed)
Aggregate [city#724, year#725, spark_grouping_id#721], [city#724, year#725, cast((shif
tright(spark_grouping_id#721, 1) & 1) as tinyint) AS grouping(city)#720]
+- Expand [List(city#182, year#183, count#184, city#722, year#723, 0), List(city#182,
year#183, count#184, city#722, null, 1), List(city#182, year#183, count#184, null, yea
r#723, 2), List(city#182, year#183, count#184, null, null, 3)], [city#182, year#183, c
ount#184, city#724, year#725, spark_grouping_id#721]
+- Project [city#182, year#183, count#184, city#182 AS city#722, year#183 AS year#7
23]
+- Union
:- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
: +- LocalRelation [_1#178, _2#179, _3#180]
+- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
+- LocalRelation [_1#192, _2#193, _3#194]
1. Calls the first grouping_id with colName and colNames as objects of type Column
And so on…
343
Aggregate Functions
scala> workshops.show
+-------+----+-----+
| city|year|count|
+-------+----+-----+
| Warsaw|2016| 2|
|Toronto|2016| 4|
|Toronto|2017| 1|
| null|2016| 2|
+-------+----+-----+
scala> spark.catalog.listFunctions.filter(_.name.contains("grouping_id")).show(false)
+-----------+--------+-----------+----------------------------------------------------
+-----------+
|name |database|description|className
|isTemporary|
+-----------+--------+-----------+----------------------------------------------------
+-----------+
|grouping_id|null |null |org.apache.spark.sql.catalyst.expressions.GroupingID|
true |
+-----------+--------+-----------+----------------------------------------------------
+-----------+
// bin function gives the string representation of the binary value of the given long
column
344
Aggregate Functions
The list of columns of grouping_id should match grouping columns (in cube or rollup )
exactly, or empty which means all the grouping columns (which is exactly what the function
expects).
grouping_id can only be used with cube, rollup or GROUPING SETS multi-
Note dimensional aggregate operators (and is verified when Analyzer does check
analysis).
When aggregates are displayed for a column its value is null . This may conflict in
case the column itself has some null values. There needs to be some way to identify
NULL in column, which means aggregate and NULL in column, which means value.
345
Aggregate Functions
id()#742]
+- Union
:- Project [_1#178 AS city#182, _2#179 AS year#183, _3#180 AS count#184]
: +- LocalRelation [_1#178, _2#179, _3#180]
+- Project [_1#192 AS city#196, _2#193 AS year#197, _3#194 AS count#198]
+- LocalRelation [_1#192, _2#193, _3#194]
== Physical Plan ==
*HashAggregate(keys=[city#757, year#758, spark_grouping_id#754], functions=[], output=
[city#757, year#758, grouping_id()#742])
+- Exchange hashpartitioning(city#757, year#758, spark_grouping_id#754, 200)
+- *HashAggregate(keys=[city#757, year#758, spark_grouping_id#754], functions=[], o
utput=[city#757, year#758, spark_grouping_id#754])
+- *Expand [List(city#755, year#756, 0), List(city#755, null, 1), List(null, yea
r#756, 2), List(null, null, 3)], [city#757, year#758, spark_grouping_id#754]
+- Union
:- LocalTableScan [city#755, year#756]
+- LocalTableScan [city#755, year#756]
346
Collection Functions
explode_outer
Creates a new row for each element in the given array or map column.
If the array/map is null or empty then null is produced.
347
Collection Functions
reverse
Returns a reversed string or an array with reverse order of elements
reverse …FIXME
size returns the size of the given array or map. Returns -1 if null .
import org.apache.spark.sql.functions.size
val c = size('id)
scala> println(c.expr.asCode)
Size(UnresolvedAttribute(ArrayBuffer(id)))
posexplode …FIXME
348
Collection Functions
posexplode_outer …FIXME
Caution FIXME
explode_outer generates a new row for each element in e array or map column.
349
Collection Functions
2. (fixme)
5. Uses schema as DataType in the JSON format or falls back to StructType in the DDL
format
import org.apache.spark.sql.types._
val schema = new StructType()
.add($"id".int.copy(nullable = false))
import org.apache.spark.sql.functions.from_json
scala> jsons.select(from_json($"json", schema) as "ids").show
+---+
|ids|
+---+
|[0]|
+---+
350
Collection Functions
351
Collection Functions
"nullable" : true,
"metadata" : { }
}, {
"name" : "addresses",
"type" : {
"type" : "array",
"elementType" : {
"type" : "struct",
"fields" : [ {
"name" : "city",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "state",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "zip",
"type" : "string",
"nullable" : true,
"metadata" : { }
} ]
},
"containsNull" : true
},
"nullable" : true,
"metadata" : { }
} ]
}
352
Collection Functions
}
""").toDF("rawjson")
val people = rawJsons
.select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
.select("json.*") // <-- flatten the struct field
.withColumn("address", explode($"addresses")) // <-- explode the array field
.drop("addresses") // <-- no longer needed
.select("firstName", "lastName", "email", "address.*") // <-- flatten the struct fie
ld
scala> people.show
+---------+---------+---------------+------+-----+------+
|firstName| lastName| email| city|state| zip|
+---------+---------+---------------+------+-----+------+
| Jacek|Laskowski|[email protected]|Warsaw| N/A|02-791|
+---------+---------+---------------+------+-----+------+
options controls how a JSON is parsed and contains the same options as the
Note
json format.
import org.apache.spark.sql.types._
val schema = new StructType()
.add($"id".int.copy(nullable = false))
.add($"corrupted_records".string)
val opts = Map("columnNameOfCorruptRecord" -> "corrupted_records")
scala> jsons.select(from_json($"json", schema, opts) as "ids").show
+----+
| ids|
+----+
|null|
+----+
353
Collection Functions
array_contains creates a Column for a column argument as an array and the value of
// Arguments must be an array followed by a value of same type as the array elements
import org.apache.spark.sql.functions.array_contains
val c = array_contains(column = $"ids", value = 1)
import org.apache.spark.sql.functions.array_contains
val c = array_contains(column = $"ids", value = Array(1, 2))
val e = c.expr
scala> println(e.sql)
array_contains(`ids`, [1,2])
Use SQL’s array_contains to use values from columns for the column and
Tip
value arguments.
354
Collection Functions
355
Collection Functions
map_keys …FIXME
map_values …FIXME
356
Date and Time Functions
current_timestamp
date_format
current_date(): Column
val df = spark.range(1).select(current_date)
scala> df.show
+--------------+
|current_date()|
+--------------+
| 2017-09-16|
+--------------+
scala> df.printSchema
root
|-- current_date(): date (nullable = false)
357
Date and Time Functions
val c = current_date()
import org.apache.spark.sql.catalyst.expressions.CurrentDate
val cd = c.expr.asInstanceOf[CurrentDate]
scala> println(cd.prettyName)
current_date
scala> println(cd.numberedTreeString)
00 current_date(None)
date_format Function
import org.apache.spark.sql.catalyst.expressions.DateFormatClass
val dfc = c.expr.asInstanceOf[DateFormatClass]
scala> println(dfc.prettyName)
date_format
scala> println(dfc.numberedTreeString)
00 date_format('date, dd/MM/yyyy, None)
01 :- 'date
02 +- dd/MM/yyyy
current_timestamp Function
current_timestamp(): Column
Caution FIXME
358
Date and Time Functions
2. Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds)
unix_timestamp converts the current or specified time in the specified format to a Unix
359
Date and Time Functions
window(
timeColumn: Column,
windowDuration: String): Column (1)
window(
timeColumn: Column,
windowDuration: String,
slideDuration: String): Column (2)
window(
timeColumn: Column,
windowDuration: String,
slideDuration: String,
startTime: String): Column (3)
360
Date and Time Functions
Tumbling windows group elements of a stream into finite sets where each
Note set corresponds to an interval.
Tumbling windows discretize a stream into non-overlapping windows.
// https://docs.oracle.com/javase/8/docs/api/java/time/LocalDateTime.html
import java.time.LocalDateTime
// https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
import java.sql.Timestamp
val levels = Seq(
// (year, month, dayOfMonth, hour, minute, second)
((2012, 12, 12, 12, 12, 12), 5),
((2012, 12, 12, 12, 12, 14), 9),
((2012, 12, 12, 13, 13, 14), 4),
((2016, 8, 13, 0, 0, 0), 10),
((2017, 5, 27, 0, 0, 0), 15)).
map { case ((yy, mm, dd, h, m, s), a) => (LocalDateTime.of(yy, mm, dd, h, m, s), a)
}.
map { case (ts, a) => (Timestamp.valueOf(ts), a) }.
toDF("time", "level")
scala> levels.show
+-------------------+-----+
| time|level|
+-------------------+-----+
|2012-12-12 12:12:12| 5|
|2012-12-12 12:12:14| 9|
|2012-12-12 13:13:14| 4|
|2016-08-13 00:00:00| 10|
|2017-05-27 00:00:00| 15|
+-------------------+-----+
361
Date and Time Functions
scala> q.printSchema
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- level: integer (nullable = false)
windowDuration and slideDuration are strings specifying the width of the window for
362
Date and Time Functions
scala> println(timeColumn.expr.numberedTreeString)
00 timewindow('time, 5000000, 5000000, 0) AS window#22
01 +- timewindow('time, 5000000, 5000000, 0)
02 +- 'time
Example — Traffic Sensor
Note The example is borrowed from Introducing Stream Windows in Apache Flink.
The example shows how to use window function to model a traffic sensor that counts every
15 seconds the number of vehicles passing a certain location.
Tip Use ParseToDate expression to use a column for the values of fmt .
363
Date and Time Functions
Tip Use ParseToTimestamp expression to use a column for the values of fmt .
364
Regular Functions (Non-Aggregate Functions)
array
broadcast
expr
lit
map
monotonically_increasing_id
struct
typedLit
when
broadcast Function
broadcast function marks the input Dataset as small enough to be used in broadcast join.
365
Regular Functions (Non-Aggregate Functions)
== Physical Plan ==
*Project [token#127, id#126, prob#140]
+- *BroadcastHashJoin [token#127], [token#139], Inner, BuildRight
:- *Project [_1#123 AS id#126, _2#124 AS token#127]
: +- *Filter isnotnull(_2#124)
: +- LocalTableScan [_1#123, _2#124]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [_1#136 AS token#139, _2#137 AS prob#140]
+- *Filter isnotnull(_1#136)
+- LocalTableScan [_1#136, _2#137]
coalesce Function
366
Regular Functions (Non-Aggregate Functions)
coalesce gives the first non- null value among the given columns or null .
coalesce requires at least one column and all columns have to be of the same or
compatible types.
Internally, coalesce creates a Column with a Coalesce expression (with the children being
the expressions of the input Column ).
val q = spark.range(2)
.select(
coalesce(
lit(null),
lit(null),
lit(2) + 2,
$"id") as "first non-null value")
scala> q.show
+--------------------+
|first non-null value|
+--------------------+
| 4|
| 4|
+--------------------+
col and column methods create a Column that you can later use to reference a column in
a dataset.
import org.apache.spark.sql.functions._
367
Regular Functions (Non-Aggregate Functions)
expr Function
expr function parses the input expr SQL statement to a Column it represents.
scala> ds.show
+---+-----+
| id|token|
+---+-----+
| 0|hello|
| 1|world|
+---+-----+
scala> ds.filter(filterExpr).show
+---+-----+
| id|token|
+---+-----+
| 0|hello|
+---+-----+
Internally, expr uses the active session’s sqlParser or creates a new SparkSqlParser to call
parseExpression method.
lit Function
lit function…FIXME
struct Functions
struct family of functions allows you to create a new struct column based on a collection of
368
Regular Functions (Non-Aggregate Functions)
The difference between struct and another similar array function is that the
Note
types of the columns can be different (in struct ).
typedLit Function
typedLit …FIXME
array Function
array …FIXME
map Function
map …FIXME
when Function
when …FIXME
monotonically_increasing_id Function
369
Regular Functions (Non-Aggregate Functions)
monotonically_increasing_id(): Column
generated IDs are guaranteed to be monotonically increasing and unique, but not
consecutive (unless all rows are in the same single partition which you rarely want due to the
amount of the data).
val q = spark.range(1).select(monotonically_increasing_id)
scala> q.show
+-----------------------------+
|monotonically_increasing_id()|
+-----------------------------+
| 60129542144|
+-----------------------------+
The current implementation uses the partition ID in the upper 31 bits, and the lower 33 bits
represent the record number within each partition. That assumes that the data set has less
than 1 billion partitions, and each partition has less than 8 billion records.
370
Regular Functions (Non-Aggregate Functions)
// Make sure that every partition has the same number of rows
q.mapPartitions(rows => Iterator(rows.size)).foreachPartition(rows => assert(rows.next
== 2))
q.select(monotonically_increasing_id).show
371
Window Aggregation Functions
In other words, when executed, a window function computes a value for each and every row
in a window (per window specification).
Window functions are also called over functions due to how they are applied
Note
using over operator.
ranking functions
analytic functions
aggregate functions
dense_rank
Ranking
percent_rank
functions
ntile
row_number
cume_dist
Analytic
lag
functions
lead
For aggregate functions, you can use the existing aggregate functions as window functions,
e.g. sum , avg , min , max and count .
372
Window Aggregation Functions
import org.apache.spark.sql.expressions.Window
// Windows are partitions of deptName
scala> val byDepName = Window.partitionBy('depName)
byDepName: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressi
ons.WindowSpec@1a711314
You describe a window using the convenient factory methods in Window object that create a
window specification that you can further refine with partitioning, ordering, and frame
boundaries.
After you describe a window you can apply window aggregate functions like ranking
functions (e.g. RANK ), analytic functions (e.g. LAG ), and the regular aggregate functions,
e.g. sum , avg , max .
Window functions are supported in structured queries using SQL and Column-
Note
based expressions.
373
Window Aggregation Functions
Although similar to aggregate functions, a window function does not group rows into a single
output row and retains their separate identities. A window function can access rows that are
linked to the current row.
You can mark a function window by OVER clause after a function in SQL, e.g. avg(revenue)
OVER (…) or over method on a function in the Dataset API, e.g. rank().over(…) .
Note Window functions belong to Window functions group in Spark’s Scala API.
Window object
Window object provides functions to define windows (as WindowSpec instances).
functions.
import org.apache.spark.sql.expressions.Window
There are two families of the functions available in Window object that create WindowSpec
instance for one or many Column instances:
partitionBy
orderBy
partitionBy creates an instance of WindowSpec with partition expression(s) defined for one
or more columns.
374
Window Aggregation Functions
scala> .show
+---+-----+-----------------+
| id|token|sum over h tokens|
+---+-----+-----------------+
| 0|hello| 4|
| 1|henry| 4|
| 2| and| 2|
| 3|harry| 4|
+---+-----+-----------------+
375
Window Aggregation Functions
import org.apache.spark.sql.expressions.Window
val byDepnameSalaryDesc = Window.partitionBy('depname).orderBy('salary desc)
// a numerical rank within the current row's partition for each distinct ORDER BY value
rangeBetween Method
rangeBetween creates a WindowSpec with the frame boundaries from start (inclusive) to
end (inclusive).
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.WindowSpec
val spec: WindowSpec = Window.rangeBetween(Window.unboundedPreceding, Window.currentRo
w)
376
Window Aggregation Functions
Frame
At its core, a window function calculates a return value for every input row of a table based
on a group of rows, called the frame. Every input row can have a unique frame associated
with it.
When you define a frame you have to specify three components of a frame specification -
the start and end boundaries, and the type.
CURRENT ROW
<value> PRECEDING
<value> FOLLOWING
Types of frames:
ROW - based on physical offsets from the position of the current input row
RANGE - based on logical offsets from the position of the current input row
In the current implementation of WindowSpec you can use two methods to define a frame:
rowsBetween
rangeBetween
3. RANGE , ROWS , RANGE BETWEEN , and ROWS BETWEEN for window frame types,
377
Window Aggregation Functions
Examples
Question: What are the best-selling and the second best-selling products in every category?
378
Window Aggregation Functions
scala> dataset.show
+----------+----------+-------+
| product| category|revenue|
+----------+----------+-------+
| Thin|cell phone| 6000|
| Normal| tablet| 1500|
| Mini| tablet| 5500|
|Ultra thin|cell phone| 5000|
| Very thin|cell phone| 6000|
| Big| tablet| 2500|
| Bendable|cell phone| 3000|
| Foldable|cell phone| 3000|
| Pro| tablet| 4500|
| Pro2| tablet| 6500|
+----------+----------+-------+
The question boils down to ranking products in a category based on their revenue, and to
pick the best selling and the second best-selling products based the ranking.
379
Window Aggregation Functions
import org.apache.spark.sql.expressions.Window
val overCategory = Window.partitionBy('category).orderBy('revenue.desc)
scala> ranked.show
+----------+----------+-------+----+
| product| category|revenue|rank|
+----------+----------+-------+----+
| Pro2| tablet| 6500| 1|
| Mini| tablet| 5500| 2|
| Pro| tablet| 4500| 3|
| Big| tablet| 2500| 4|
| Normal| tablet| 1500| 5|
| Thin|cell phone| 6000| 1|
| Very thin|cell phone| 6000| 1|
|Ultra thin|cell phone| 5000| 2|
| Bendable|cell phone| 3000| 3|
| Foldable|cell phone| 3000| 3|
+----------+----------+-------+----+
This example is the 2nd example from an excellent article Introducing Window
Note
Functions in Spark SQL.
380
Window Aggregation Functions
import org.apache.spark.sql.expressions.Window
val reveDesc = Window.partitionBy('category).orderBy('revenue.desc)
val reveDiff = max('revenue).over(reveDesc) - 'revenue
Difference on Column
Compute a difference between values in rows in a column.
381
Window Aggregation Functions
scala> ds.show
+---+----+
| ns|tens|
+---+----+
| 1| 10|
| 1| 20|
| 2| 20|
| 2| 40|
| 3| 30|
| 3| 60|
| 4| 40|
| 4| 80|
| 5| 50|
| 5| 100|
+---+----+
import org.apache.spark.sql.expressions.Window
val overNs = Window.partitionBy('ns).orderBy('tens)
val diff = lead('tens, 1).over(overNs)
Please note that Why do Window functions fail with "Window function X does not take a
frame specification"?
The key here is to remember that DataFrames are RDDs under the covers and hence
aggregation like grouping by a key in DataFrames is RDD’s groupBy (or worse,
reduceByKey or aggregateByKey transformations).
382
Window Aggregation Functions
Running Total
The running total is the sum of all previous lines including the current one.
scala> sales.show
+---+-------+------+--------+
| id|orderID|prodID|orderQty|
+---+-------+------+--------+
| 0| 0| 0| 5|
| 1| 0| 1| 3|
| 2| 0| 2| 1|
| 3| 1| 0| 2|
| 4| 2| 0| 8|
| 5| 2| 2| 8|
+---+-------+------+--------+
scala> salesTotalQty.show
16/04/10 23:01:52 WARN Window: No Partition Defined for Window operation! Moving all d
ata to a single partition, this can cause serious performance degradation.
+---+-------+------+--------+-------------+
| id|orderID|prodID|orderQty|running_total|
+---+-------+------+--------+-------------+
| 0| 0| 0| 5| 5|
| 1| 0| 1| 3| 8|
| 2| 0| 2| 1| 9|
| 3| 1| 0| 2| 11|
| 4| 2| 0| 8| 19|
| 5| 2| 2| 8| 27|
+---+-------+------+--------+-------------+
scala> salesTotalQtyPerOrder.show
+---+-------+------+--------+-----------------------+
| id|orderID|prodID|orderQty|running_total_per_order|
+---+-------+------+--------+-----------------------+
383
Window Aggregation Functions
| 0| 0| 0| 5| 5|
| 1| 0| 1| 3| 8|
| 2| 0| 2| 1| 9|
| 3| 1| 0| 2| 2|
| 4| 2| 0| 8| 8|
| 5| 2| 2| 8| 16|
+---+-------+------+--------+-----------------------+
With the Interval data type, you could use intervals as values specified in <value> PRECEDING
and <value> FOLLOWING for RANGE frame. It is specifically suited for time-series analysis with
window functions.
Moving Average
Cumulative Aggregates
Eg. cumulative sum
With the window function support, you could use user-defined aggregate functions as
window functions.
384
Window Aggregation Functions
import org.apache.spark.sql.expressions.Window
val byDepnameSalaryDesc = Window.partitionBy('depname).orderBy('salary desc)
== Physical Plan ==
Window [rank(salary#7L) windowspecdefinition(depname#5, salary#7L DESC, ROWS BETWEEN U
NBOUNDED PRECEDING AND CURRENT ROW) AS rank#9], [depname#5], [salary#7L DESC]
+- *Sort [depname#5 ASC, salary#7L DESC], false, 0
+- Exchange hashpartitioning(depname#5, 200)
+- LocalTableScan [depName#5, empNo#6L, salary#7L]
lag returns the value in e / columnName column that is offset records before the
current record. lag returns null value if the number of records in a window partition is
less than offset or defaultValue .
385
Window Aggregation Functions
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("lag", lag('id, 1) over windowSpec).show
+---+------+----+
| id|bucket| lag|
+---+------+----+
| 0| 0|null|
| 3| 0| 0|
| 6| 0| 3|
| 1| 1|null|
| 4| 1| 1|
| 7| 1| 4|
| 2| 2|null|
| 5| 2| 2|
| 8| 2| 5|
+---+------+----+
FIXME It looks like lag with a default value has a bug — the default value’s
Caution
not used at all.
386
Window Aggregation Functions
lead returns the value that is offset records after the current records, and defaultValue
if there is less than offset records after the current record. lag returns null value if the
number of records in a window partition is less than offset or defaultValue .
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("lead", lead('id, 1) over windowSpec).show
+---+------+----+
| id|bucket|lead|
+---+------+----+
| 0| 0| 0|
| 0| 0| 3|
| 3| 0| 3|
| 3| 0| 6|
| 6| 0| 6|
| 6| 0|null|
| 1| 1| 1|
| 1| 1| 4|
| 4| 1| 4|
| 4| 1| 7|
| 7| 1| 7|
| 7| 1|null|
| 2| 2| 2|
| 2| 2| 5|
| 5| 2| 5|
| 5| 2| 8|
| 8| 2| 8|
| 8| 2|null|
+---+------+----+
387
Window Aggregation Functions
| 2| 2| 5|
| 5| 2| 8|
| 5| 2| 8|
| 8| 2|null|
| 8| 2|null|
+---+------+----+
FIXME It looks like lead with a default value has a bug — the default
Caution
value’s not used at all.
cume_dist(): Column
cume_dist computes the cumulative distribution of the records in window partitions. This is
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("cume_dist", cume_dist over windowSpec).show
+---+------+------------------+
| id|bucket| cume_dist|
+---+------+------------------+
| 0| 0|0.3333333333333333|
| 3| 0|0.6666666666666666|
| 6| 0| 1.0|
| 1| 1|0.3333333333333333|
| 4| 1|0.6666666666666666|
| 7| 1| 1.0|
| 2| 2|0.3333333333333333|
| 5| 2|0.6666666666666666|
| 8| 2| 1.0|
+---+------+------------------+
row_number(): Column
388
Window Aggregation Functions
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('bucket).orderBy('id)
scala> dataset.withColumn("row_number", row_number() over windowSpec).show
+---+------+----------+
| id|bucket|row_number|
+---+------+----------+
| 0| 0| 1|
| 0| 0| 2|
| 3| 0| 3|
| 3| 0| 4|
| 6| 0| 5|
| 6| 0| 6|
| 1| 1| 1|
| 1| 1| 2|
| 4| 1| 3|
| 4| 1| 4|
| 7| 1| 5|
| 7| 1| 6|
| 2| 2| 1|
| 2| 2| 2|
| 5| 2| 3|
| 5| 2| 4|
| 8| 2| 5|
| 8| 2| 6|
+---+------+----------+
ntile computes the ntile group id (from 1 to n inclusive) in an ordered window partition.
389
Window Aggregation Functions
import org.apache.spark.sql.expressions.Window
val byBuckets = Window.partitionBy('bucket).orderBy('id)
scala> dataset.select('*, ntile(3) over byBuckets as "ntile").show
+---+------+-----+
| id|bucket|ntile|
+---+------+-----+
| 0| 0| 1|
| 3| 0| 2|
| 6| 0| 3|
| 1| 1| 1|
| 4| 1| 2|
| 2| 2| 1|
| 5| 2| 2|
+---+------+-----+
Caution FIXME How is ntile different from rank ? What about performance?
rank(): Column
dense_rank(): Column
percent_rank(): Column
rank functions assign the sequential rank of each distinct value per window partition. They
are equivalent to RANK , DENSE_RANK and PERCENT_RANK functions in the good ol' SQL.
390
Window Aggregation Functions
import org.apache.spark.sql.expressions.Window
val byBucket = Window.partitionBy('bucket).orderBy('id)
rank function assigns the same rank for duplicate rows with a gap in the sequence
(similarly to Olympic medal places). dense_rank is like rank for duplicate rows but
compacts the ranks and removes the gaps.
391
Window Aggregation Functions
| 6| 0| 5|
| 1| 1| 1|
| 1| 1| 1|
| 4| 1| 3|
| 4| 1| 3|
| 7| 1| 5|
| 7| 1| 5|
| 2| 2| 1|
| 2| 2| 1|
| 5| 2| 3|
| 5| 2| 3|
| 8| 2| 5|
| 8| 2| 5|
+---+------+----+
392
Window Aggregation Functions
| 1| 1| 0.0|
| 1| 1| 0.0|
| 4| 1| 0.4|
| 4| 1| 0.4|
| 7| 1| 0.8|
| 7| 1| 0.8|
| 2| 2| 0.0|
| 2| 2| 0.0|
| 5| 2| 0.4|
| 5| 2| 0.4|
| 8| 2| 0.8|
| 8| 2| 0.8|
+---+------+------------+
currentRow(): Column
currentRow …FIXME
unboundedFollowing(): Column
unboundedFollowing …FIXME
unboundedPreceding(): Column
unboundedPreceding …FIXME
393
Window Aggregation Functions
Window Functions
394
User-Defined Functions (UDFs)
UDFs — User-Defined Functions
User-Defined Functions (aka UDF) is a feature of Spark SQL to define new Column-based
functions that extend the vocabulary of Spark SQL’s DSL for transforming Datasets.
You define a new UDF by defining a Scala function as an input parameter of udf function.
It accepts Scala functions of up to 10 input parameters.
// Define a UDF that wraps the upper Scala function defined above
// You could also define the function in place, i.e. inside udf
// but separating Scala functions from Spark SQL's UDFs allows for easier testing
import org.apache.spark.sql.functions.udf
val upperUDF = udf(upper)
You can register UDFs to use in SQL-based query expressions via UDFRegistration (that is
available through SparkSession.udf attribute).
395
User-Defined Functions (UDFs)
You can query for available standard and user-defined functions using the Catalog interface
(that is available through SparkSession.catalog attribute).
UDFs play a vital role in Spark MLlib to define new Transformers that are
Note function objects that transform DataFrames into DataFrames by introducing new
columns.
org.apache.spark.sql.functions object comes with udf function to let you define a UDF for
a Scala function f .
396
User-Defined Functions (UDFs)
val df = Seq(
(0, "hello"),
(1, "world")).toDF("id", "text")
import org.apache.spark.sql.functions.udf
val upper = udf(toUpper)
397
UDFs are Blackbox — Don’t Use Them Unless You’ve Got No Choice
scala> spark.conf.get("spark.sql.parquet.filterPushdown")
res0: String = true
You are going to use the following cities dataset that is based on Parquet file (as used in
Predicate Pushdown / Filter Pushdown for Parquet Data Source section). The reason for
parquet is that it is an external data source that does support optimization Spark uses to
optimize itself like predicate pushdown.
398
UDFs are Blackbox — Don’t Use Them Unless You’ve Got No Choice
cities6chars.explain(true)
cities6chars.explain(true)
// The filter predicate is pushed down fine for Dataset's Column-based query in where
operator
scala> cities.where('name === "Warsaw").queryExecution.executedPlan
res29: org.apache.spark.sql.execution.SparkPlan =
*Project [id#128L, name#129]
+- *Filter (isnotnull(name#129) && (name#129 = Warsaw))
+- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, Input
Paths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFil
ters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct<id:bigint,name:strin
g>
399
UserDefinedFunction
UserDefinedFunction
UserDefinedFunction represents a user-defined function.
FunctionRegistry )
import org.apache.spark.sql.functions.udf
val lengthUDF = udf { s: String => s.length }
val r = lengthUDF($"name")
scala> :type r
org.apache.spark.sql.Column
UserDefinedFunction is deterministic by default, i.e. produces the same result for the same
assert(lengthUDF.deterministic)
val ndUDF = lengthUDF.asNondeterministic
assert(ndUDF.deterministic == false)
400
UserDefinedFunction
import org.apache.spark.sql.functions.udf
scala> val lengthUDF = udf { s: String => s.length }
lengthUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(
<function1>,IntegerType,Some(List(StringType)))
scala> lengthUDF($"name")
res1: org.apache.spark.sql.Column = UDF(name)
asNonNullable(): UserDefinedFunction
asNonNullable …FIXME
401
UserDefinedFunction
withName …FIXME
402
Schema — Structure of Data
Schema — Structure of Data
A schema is the description of the structure of your data (which together create a Dataset in
Spark SQL). It can be implicit (and inferred at runtime) or explicit (and known at compile
time).
import org.apache.spark.sql.types.StructType
val schemaUntyped = new StructType()
.add("a", "int")
.add("b", "string")
You can use the canonical string representation of SQL types to describe the types in a
schema (that is inherently untyped at compile type) or use type-safe types from the
org.apache.spark.sql.types package.
It is however recommended to use the singleton DataTypes class with static methods to
create schema types.
import org.apache.spark.sql.types.DataTypes._
val schemaWithMap = StructType(
StructField("map", createMapType(LongType, StringType), false) :: Nil)
StructType offers printTreeString that makes presenting the schema more user-friendly.
403
Schema — Structure of Data
scala> schemaTyped.printTreeString
root
|-- a: integer (nullable = true)
|-- b: string (nullable = true)
scala> schemaWithMap.printTreeString
root
|-- map: map (nullable = false)
| |-- key: long
| |-- value: string (valueContainsNull = true)
As of Spark 2.0, you can describe the schema of your strongly-typed datasets using
encoders.
import org.apache.spark.sql.Encoders
scala> Encoders.INT.schema.printTreeString
root
|-- value: integer (nullable = true)
Implicit Schema
404
Schema — Structure of Data
scala> df.printSchema
root
|-- label: integer (nullable = false)
|-- sentence: string (nullable = true)
scala> df.schema
res0: org.apache.spark.sql.types.StructType = StructType(StructField(label,IntegerType,
false), StructField(sentence,StringType,true))
scala> df.schema("label").dataType
res1: org.apache.spark.sql.types.DataType = IntegerType
405
StructType
You can compare two StructType instances to see whether they are equal.
import org.apache.spark.sql.types.StructType
scala> schemaTyped.foreach(println)
Note StructField(a,IntegerType,true)
StructField(b,StringType,true)
As of Spark 2.4.0, StructType can be converted to DDL format using toDDL method.
406
StructType
fromAttributes Method
fromAttributes …FIXME
toAttributes Method
toAttributes: Seq[AttributeReference]
toAttributes …FIXME
407
StructType
simpleString: String
catalogString: String
sql: String
StructType as a custom DataType is used in query plans or SQL. It can present itself using
scala> schemaTyped.simpleString
res0: String = struct<a:int,b:string>
scala> schemaTyped.catalogString
res1: String = struct<a:int,b:string>
scala> schemaTyped.sql
res2: String = STRUCT<`a`: INT, `b`: STRING>
408
StructType
StructType defines its own apply method that gives you an easy access to a
StructField by name.
scala> schemaTyped.printTreeString
root
|-- a: integer (nullable = true)
|-- b: string (nullable = true)
scala> schemaTyped("a")
res4: org.apache.spark.sql.types.StructField = StructField(a,IntegerType,true)
This variant of apply lets you create a StructType out of an existing StructType with the
names only.
printTreeString(): Unit
409
StructType
scala> schemaTyped.printTreeString
root
|-- a: integer (nullable = true)
|-- b: string (nullable = true)
Internally, it uses treeString method to build the tree and then println it.
fromDDL …FIXME
toDDL: String
toDDL converts all the fields to DDL format and concatenates them using the comma ( , ).
410
StructField — Single Field in StructType
Name
DataType
A comment is part of metadata under comment key and is used to build a Hive column or
when describing a table.
scala> schemaTyped("a").getComment
res0: Option[String] = None
As of Spark 2.4.0, StructField can be converted to DDL format using toDDL method.
import org.apache.spark.sql.types.MetadataBuilder
val metadata = new MetadataBuilder()
.putString("comment", "this is a comment")
.build
import org.apache.spark.sql.types.{LongType, StructField}
val f = new StructField(name = "id", dataType = LongType, nullable = false, metadata)
scala> println(f.toDDL)
`id` BIGINT COMMENT 'this is a comment'
toDDL: String
411
StructField — Single Field in StructType
412
Data Types
Data Types
DataType abstract class is the base type of all built-in data types in Spark SQL, e.g. strings,
longs.
Atomic Types as an internal type to represent types that are not null , UDTs, arrays,
structs, and maps
413
Data Types
BooleanType
Atomic Types
DateType
(except fractional and
integral types)
StringType
TimestampType java.sql.Timestamp
DecimalType
Fractional Types
DoubleType
(concrete NumericType)
FloatType
ByteType
IntegerType
Integral Types
(concrete NumericType) LongType
ShortType
ArrayType
CalendarIntervalType
MapType
NullType
ObjectType
StructType
UserDefinedType
AnyDataType
Matches any concrete
data type
You can extend the type system and create your own user-defined types (UDTs).
The DataType Contract defines methods to build SQL, JSON and string representations.
414
Data Types
import org.apache.spark.sql.types.StringType
scala> StringType.json
res0: String = "string"
scala> StringType.sql
res1: String = STRING
scala> StringType.catalogString
res2: String = string
You should use DataTypes object in your code to create complex Spark SQL types, i.e.
arrays or maps.
import org.apache.spark.sql.types.DataTypes
DataType has support for Scala’s pattern matching using unapply method.
???
DataType Contract
Any type in Spark SQL follows the DataType contract which means that the types define the
following methods:
415
Data Types
import org.apache.spark.sql.types.DataTypes._
scala> maps.prettyJson
res0: String =
{
"type" : "struct",
"fields" : [ {
"name" : "longs2strings",
"type" : {
"type" : "map",
"keyType" : "long",
"valueType" : "string",
"valueContainsNull" : true
},
"nullable" : false,
"metadata" : { }
} ]
}
scala> maps.defaultSize
res1: Int = 2800
scala> maps.simpleString
res2: String = struct<longs2strings:map<bigint,string>>
scala> maps.catalogString
res3: String = struct<longs2strings:map<bigint,string>>
scala> maps.sql
res4: String = STRUCT<`longs2strings`: MAP<BIGINT, STRING>>
416
Data Types
import org.apache.spark.sql.types.DataTypes
UDTs — User-Defined Types
Caution FIXME
417
Multi-Dimensional Aggregation
Multi-Dimensional Aggregation
Multi-dimensional aggregate operators are enhanced variants of groupBy operator that
allow you to create queries for subtotals, grand totals and superset of subtotals in one go.
// very labor-intense
// groupBy's unioned
val groupByCityAndYear = sales
.groupBy("city", "year") // <-- subtotals (city, year)
.agg(sum("amount") as "amount")
val groupByCityOnly = sales
.groupBy("city") // <-- subtotals (city)
.agg(sum("amount") as "amount")
.select($"city", lit(null) as "year", $"amount") // <-- year is null
val withUnion = groupByCityAndYear
.union(groupByCityOnly)
.sort($"city".desc_nulls_last, $"year".asc_nulls_last)
scala> withUnion.show
+-------+----+------+
| city|year|amount|
+-------+----+------+
| Warsaw|2016| 100|
| Warsaw|2017| 200|
| Warsaw|null| 300|
|Toronto|2017| 50|
|Toronto|null| 50|
| Boston|2015| 50|
| Boston|2016| 150|
| Boston|null| 200|
+-------+----+------+
418
Multi-Dimensional Aggregation
It is assumed that using one of the operators is usually more efficient (than
Note union and groupBy ) as it gives more freedom for query optimization.
419
Multi-Dimensional Aggregation
Beside cube and rollup multi-dimensional aggregate operators, Spark SQL supports
GROUPING SETS clause in SQL mode only.
SQL’s GROUPING SETS is the most general aggregate "operator" and can
Note generate the same dataset as using a simple groupBy, cube and rollup
operators.
420
Multi-Dimensional Aggregation
import java.time.LocalDate
import java.sql.Date
val expenses = Seq(
((2012, Month.DECEMBER, 12), 5),
((2016, Month.AUGUST, 13), 10),
((2017, Month.MAY, 27), 15))
.map { case ((yy, mm, dd), a) => (LocalDate.of(yy, mm, dd), a) }
.map { case (d, a) => (d.toString, a) }
.map { case (d, a) => (Date.valueOf(d), a) }
.toDF("date", "amount")
scala> expenses.show
+----------+------+
| date|amount|
+----------+------+
|2012-12-12| 5|
|2016-08-13| 10|
|2017-05-27| 15|
+----------+------+
// rollup time!
val q = expenses
.rollup(year($"date") as "year", month($"date") as "month")
.agg(sum("amount") as "amount")
.sort($"year".asc_nulls_last, $"month".asc_nulls_last)
scala> q.show
+----+-----+------+
|year|month|amount|
+----+-----+------+
|2012| 12| 5|
|2012| null| 5|
|2016| 8| 10|
|2016| null| 10|
|2017| 5| 15|
|2017| null| 15|
|null| null| 30|
+----+-----+------+
rollup Operator
421
Multi-Dimensional Aggregation
calculates subtotals and a grand total across specified group of n + 1 dimensions (with n
being the number of columns as cols and col1 and 1 for where values become null ,
i.e. undefined).
rollup operator is commonly used for analysis over hierarchical data; e.g.
total salary by department, division, and company-wide total.
Note
See PostgreSQL’s 7.2.4. GROUPING SETS, CUBE, and ROLLUP
val q = sales
.rollup("city", "year")
.agg(sum("amount") as "amount")
.sort($"city".desc_nulls_last, $"year".asc_nulls_last)
scala> q.show
+-------+----+------+
| city|year|amount|
+-------+----+------+
| Warsaw|2016| 100| <-- subtotal for Warsaw in 2016
| Warsaw|2017| 200|
| Warsaw|null| 300| <-- subtotal for Warsaw (across years)
|Toronto|2017| 50|
|Toronto|null| 50|
| Boston|2015| 50|
| Boston|2016| 150|
| Boston|null| 200|
| null|null| 550| <-- grand total
+-------+----+------+
422
Multi-Dimensional Aggregation
From Using GROUP BY with ROLLUP, CUBE, and GROUPING SETS in Microsoft’s
TechNet:
The ROLLUP, CUBE, and GROUPING SETS operators are extensions of the GROUP
BY clause. The ROLLUP, CUBE, or GROUPING SETS operators can generate the
same result set as when you use UNION ALL to combine single grouping queries;
however, using one of the GROUP BY operators is usually more efficient.
References to the grouping columns or expressions are replaced by null values in result
rows for grouping sets in which those columns do not appear.
The ROLLUP operator is useful in generating reports that contain subtotals and totals.
(…) ROLLUP generates a result set that shows aggregates for a hierarchy of values in
the selected columns.
423
Multi-Dimensional Aggregation
scala> inventory.show
+-----+-----+--------+
| item|color|quantity|
+-----+-----+--------+
|chair| blue| 101|
|chair| red| 210|
|table| blue| 124|
|table| red| 223|
+-----+-----+--------+
WITH ROLLUP is used with the GROUP BY only. ROLLUP clause is used with GROUP
BY to compute the aggregate at the hierarchy levels of a dimension.
GROUP BY a, b, c with ROLLUP assumes that the hierarchy is "a" drilling down to "b"
drilling down to "c".
424
Multi-Dimensional Aggregation
scala> quarterlyScores.show
+----------+-------+-----+
| period|student|score|
+----------+-------+-----+
|winter2014| Agata| 99|
|winter2014| Jacek| 97|
|summer2015| Agata| 100|
|summer2015| Jacek| 63|
|winter2015| Agata| 97|
|winter2015| Jacek| 55|
|summer2016| Agata| 98|
|summer2016| Jacek| 97|
+----------+-------+-----+
425
Multi-Dimensional Aggregation
Internally, rollup converts the Dataset into a DataFrame (i.e. uses RowEncoder as the
encoder) and then creates a RelationalGroupedDataset (with RollupType group type).
Read up on rollup in Deeper into Postgres 9.5 - New Group By Options for
Tip
Aggregation.
cube Operator
426
Multi-Dimensional Aggregation
calculating subtotals and a grand total across all combinations of specified group of n + 1
dimensions (with n being the number of columns as cols and col1 and 1 for where
values become null , i.e. undefined).
cube returns RelationalGroupedDataset that you can use to execute aggregate function or
operator.
cube is more than rollup operator, i.e. cube does rollup with aggregation
Note
over all the missing combinations given the columns.
GROUPING SETS clause generates a dataset that is equivalent to union operator of multiple
groupBy operators.
427
Multi-Dimensional Aggregation
428
Multi-Dimensional Aggregation
| Boston|null| 200|
| null|2015| 50| <-- total across all cities in 2015
| null|2016| 250| <-- total across all cities in 2016
| null|2017| 250| <-- total across all cities in 2017
| null|null| 550|
+-------+----+------+
Rollup(groupByExprs: Seq[Expression])
extends GroupingSet
Rollup expression represents rollup operator in Spark’s Catalyst Expression tree (after
429
Dataset Caching and Persistence
cache
persist
unpersist
At this point you could use web UI’s Storage tab to review the Datasets persisted. Visit
http://localhost:4040/storage.
InMemoryRelation logical operators), and is used to cache structured queries (which simply
registers the structured queries as InMemoryRelation leaf logical operators).
subqueries).
scala> println(data.queryExecution.withCachedData.numberedTreeString)
00 InMemoryRelation [id#9L], StorageLevel(disk, memory, deserialized, 1 replicas)
01 +- *(1) Range (0, 1, step=1, splits=8)
430
Dataset Caching and Persistence
== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialize
d, 1 replicas)
+- *Range (0, 1, step=1, splits=Some(8))
You can also use SQL’s CACHE TABLE [tableName] to cache tableName table in
memory. Unlike cache and persist operators, CACHE TABLE is an eager
operation which is executed as soon as the statement is executed.
431
Dataset Caching and Persistence
Be careful what you cache, i.e. what Dataset is cached, as it gives different queries
cached.
You can check whether a Dataset was cached or not using the following code:
scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
Tip
val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false
432
User-Friendly Names Of Cached Queries in web UI’s Storage Tab
433
User-Friendly Names Of Cached Queries in web UI’s Storage Tab
scala> spark.catalog.isCached("one")
res0: Boolean = true
one.unpersist
import org.apache.spark.storage.StorageLevel
// caching is lazy
spark.catalog.cacheTable("one", StorageLevel.MEMORY_ONLY)
// The following gives "In-memory table one"
one.show
spark.range(100).createOrReplaceTempView("hundred")
// SQL's CACHE TABLE is eager
// The following gives "In-memory table `hundred`"
// WHY single quotes?
spark.sql("CACHE TABLE hundred")
434
Dataset Checkpointing
Dataset Checkpointing
Dataset Checkpointing is a feature of Spark SQL to truncate a logical query plan that could
specifically be useful for highly iterative data algorithms (e.g. Spark MLlib that uses Spark
SQL’s Dataset API for data manipulation).
Checkpointing is actually a feature of Spark Core (that Spark SQL uses for
distributed computations) that allows a driver to be restarted on failure with
previously computed state of a distributed computation described as an RDD .
That has been successfully used in Spark Streaming - the now-obsolete Spark
module for stream processing based on RDD API.
Note Checkpointing truncates the lineage of a RDD to be checkpointed. That has
been successfully used in Spark MLlib in iterative machine learning algorithms
like ALS.
Dataset checkpointing in Spark SQL uses checkpointing to truncate the lineage
of the underlying RDD of a Dataset being checkpointed.
Checkpointing can be eager or lazy per eager flag of checkpoint operator. Eager
checkpointing is the default checkpointing and happens immediately when requested. Lazy
checkpointing does not and will only happen when an action is executed.
Using Dataset checkpointing requires that you specify the checkpoint directory. The
directory stores the checkpoint files for RDDs to be checkpointed. Use
SparkContext.setCheckpointDir to set the path to a checkpoint directory.
Checkpointing can be local or reliable which defines how reliable the checkpoint directory is