Apache Spark Detailed Guide
Apache Spark Detailed Guide
of Contents
Introduction 1.1
Overview of Apache Spark 1.2
1
StorageTab 3.4
StoragePage 3.4.1
RDDPage 3.4.2
EnvironmentTab 3.5
EnvironmentPage 3.5.1
ExecutorsTab 3.6
ExecutorsPage 3.6.1
ExecutorThreadDumpPage 3.6.2
SparkUI — Web UI of Spark Application 3.7
SparkUITab 3.7.1
BlockStatusListener Spark Listener 3.8
EnvironmentListener Spark Listener 3.9
ExecutorsListener Spark Listener 3.10
JobProgressListener Spark Listener 3.11
StorageStatusListener Spark Listener 3.12
StorageListener — Spark Listener for Tracking Persistence Status of RDD Blocks 3.13
RDDOperationGraphListener Spark Listener 3.14
WebUI — Framework For Web UIs 3.15
WebUIPage — Contract of Pages in Web UI 3.15.1
WebUITab — Contract of Tabs in Web UI 3.15.2
RDDStorageInfo 3.16
RDDInfo 3.17
LiveEntity 3.18
LiveRDD 3.18.1
UIUtils 3.19
JettyUtils 3.20
web UI Configuration Properties 3.21
2
Sink — Contract of Metrics Sinks 4.5
MetricsServlet JSON Metrics Sink 4.5.1
Metrics Configuration Properties 4.6
Spark MLlib
Spark MLlib — Machine Learning in Spark 6.1
ML Pipelines (spark.ml) 6.2
Pipeline 6.2.1
PipelineStage 6.2.2
Transformers 6.2.3
Transformer 6.2.3.1
Tokenizer 6.2.3.2
Estimators 6.2.4
Estimator 6.2.4.1
StringIndexer 6.2.4.1.1
KMeans 6.2.4.1.2
TrainValidationSplit 6.2.4.1.3
Predictor 6.2.4.2
RandomForestRegressor 6.2.4.2.1
3
Regressor 6.2.4.3
LinearRegression 6.2.4.3.1
Classifier 6.2.4.4
RandomForestClassifier 6.2.4.4.1
DecisionTreeClassifier 6.2.4.4.2
Models 6.2.5
Model 6.2.5.1
Evaluator — ML Pipeline Component for Model Scoring 6.2.6
BinaryClassificationEvaluator — Evaluator of Binary Classification Models
ClusteringEvaluator — Evaluator of Clustering Models 6.2.6.2 6.2.6.1
MulticlassClassificationEvaluator — Evaluator of Multiclass Classification
Models 6.2.6.3
RegressionEvaluator — Evaluator of Regression Models 6.2.6.4
CrossValidator — Model Tuning / Finding The Best Model 6.2.7
CrossValidatorModel 6.2.7.1
ParamGridBuilder 6.2.7.2
CrossValidator with Pipeline Example 6.2.7.3
Params and ParamMaps 6.2.8
ValidatorParams 6.2.8.1
HasParallelism 6.2.8.2
ML Persistence — Saving and Loading Models and Pipelines 6.3
MLWritable 6.3.1
MLReader 6.3.2
Example — Text Classification 6.4
Example — Linear Regression 6.5
Logistic Regression 6.6
LogisticRegression 6.6.1
Latent Dirichlet Allocation (LDA) 6.7
Vector 6.8
LabeledPoint 6.9
Streaming MLlib 6.10
GeneralizedLinearRegression 6.11
Alternating Least Squares (ALS) Matrix Factorization 6.12
ALS — Estimator for ALSModel 6.12.1
4
ALSModel — Model for Predictions 6.12.2
ALSModelReader 6.12.3
Instrumentation 6.13
MLUtils 6.14
5
Inside Creating SparkContext 9.3.2
ConsoleProgressBar 9.3.3
SparkStatusTracker 9.3.4
Local Properties — Creating Logical Job Groups 9.3.5
RDD — Resilient Distributed Dataset 9.4
RDD 9.4.1
RDD Lineage — Logical Execution Plan 9.4.2
TaskLocation 9.4.3
ParallelCollectionRDD 9.4.4
MapPartitionsRDD 9.4.5
OrderedRDDFunctions 9.4.6
CoGroupedRDD 9.4.7
SubtractedRDD 9.4.8
HadoopRDD 9.4.9
NewHadoopRDD 9.4.10
ShuffledRDD 9.4.11
Operators 9.5
Transformations 9.5.1
PairRDDFunctions 9.5.1.1
Actions 9.5.2
Caching and Persistence 9.6
StorageLevel 9.6.1
Partitions and Partitioning 9.7
Partition 9.7.1
Partitioner 9.7.2
HashPartitioner 9.7.2.1
Shuffling 9.8
Checkpointing 9.9
CheckpointRDD 9.9.1
RDD Dependencies 9.10
NarrowDependency — Narrow Dependencies 9.10.1
ShuffleDependency — Shuffle Dependencies 9.10.2
Map/Reduce-side Aggregator 9.11
AppStatusStore 9.12
6
AppStatusPlugin 9.13
AppStatusListener 9.14
KVStore 9.15
KVStoreView 9.15.1
ElementTrackingStore 9.15.2
InMemoryStore 9.15.3
LevelDB 9.15.4
InterruptibleIterator — Iterator With Support For Task Cancellation 9.16
7
TaskScheduler — Spark Scheduler 11.5
Tasks 11.5.1
ShuffleMapTask — Task for ShuffleMapStage 11.5.1.1
ResultTask 11.5.1.2
FetchFailedException 11.5.2
MapStatus — Shuffle Map Output Status 11.5.3
TaskSet — Set of Tasks for Stage 11.5.4
TaskSetManager 11.5.5
Schedulable 11.5.5.1
Schedulable Pool 11.5.5.2
Schedulable Builders 11.5.5.3
FIFOSchedulableBuilder 11.5.5.3.1
FairSchedulableBuilder 11.5.5.3.2
Scheduling Mode — spark.scheduler.mode Spark Property 11.5.5.4
TaskInfo 11.5.5.5
TaskDescription — Metadata of Single Task 11.5.6
TaskSchedulerImpl — Default TaskScheduler 11.5.7
Speculative Execution of Tasks 11.5.7.1
TaskResultGetter 11.5.7.2
TaskContext 11.5.8
TaskContextImpl 11.5.8.1
TaskResults — DirectTaskResult and IndirectTaskResult 11.5.9
TaskMemoryManager — Memory Manager of Single Task 11.5.10
MemoryConsumer 11.5.10.1
TaskMetrics 11.5.11
ShuffleWriteMetrics 11.5.11.1
TaskSetBlacklist — Blacklisting Executors and Nodes For TaskSet 11.5.12
SchedulerBackend — Pluggable Scheduler Backends 11.6
CoarseGrainedSchedulerBackend 11.6.1
DriverEndpoint — CoarseGrainedSchedulerBackend RPC Endpoint 11.6.1.1
ExecutorBackend — Pluggable Executor Backends 11.7
CoarseGrainedExecutorBackend 11.7.1
MesosExecutorBackend 11.7.2
BlockManager — Key-Value Store of Blocks of Data 11.8
8
MemoryStore 11.8.1
BlockEvictionHandler 11.8.2
StorageMemoryPool 11.8.3
MemoryPool 11.8.4
DiskStore 11.8.5
BlockDataManager 11.8.6
RpcHandler 11.8.7
RpcResponseCallback 11.8.8
TransportRequestHandler 11.8.9
TransportContext 11.8.10
TransportServer 11.8.11
TransportClientFactory 11.8.12
MessageHandler 11.8.13
BlockManagerMaster — BlockManager for Driver 11.8.14
BlockManagerMasterEndpoint — BlockManagerMaster RPC Endpoint
DiskBlockManager 11.8.15 11.8.14.1
BlockInfoManager 11.8.16
BlockInfo 11.8.16.1
BlockManagerSlaveEndpoint 11.8.17
DiskBlockObjectWriter 11.8.18
BlockManagerSource — Metrics Source for BlockManager 11.8.19
ShuffleMetricsSource — Metrics Source of BlockManager for Shuffle-Related
Metrics 11.8.20
StorageStatus 11.8.21
ManagedBuffer 11.8.22
MapOutputTracker — Shuffle Map Output Registry 11.9
MapOutputTrackerMaster — MapOutputTracker For Driver 11.9.1
MapOutputTrackerMasterEndpoint 11.9.1.1
MapOutputTrackerWorker — MapOutputTracker for Executors 11.9.2
ShuffleManager — Pluggable Shuffle Systems 11.10
SortShuffleManager — The Default Shuffle System 11.10.1
ExternalShuffleService 11.10.2
OneForOneStreamManager 11.10.3
ShuffleBlockResolver 11.10.4
9
IndexShuffleBlockResolver 11.10.4.1
ShuffleWriter 11.10.5
BypassMergeSortShuffleWriter 11.10.5.1
SortShuffleWriter 11.10.5.2
UnsafeShuffleWriter — ShuffleWriter for SerializedShuffleHandle 11.10.5.3
BaseShuffleHandle — Fallback Shuffle Handle 11.10.6
BypassMergeSortShuffleHandle — Marker Interface for Bypass Merge Sort Shuffle
Handles 11.10.7
SerializedShuffleHandle — Marker Interface for Serialized Shuffle Handles 11.10.8
ShuffleReader 11.10.9
BlockStoreShuffleReader 11.10.9.1
ShuffleBlockFetcherIterator 11.10.10
ShuffleExternalSorter — Cache-Efficient Sorter 11.10.11
ExternalSorter 11.10.12
Serialization 11.11
Serializer — Task SerDe 11.11.1
SerializerInstance 11.11.2
SerializationStream 11.11.3
DeserializationStream 11.11.4
ExternalClusterManager — Pluggable Cluster Managers 11.12
BroadcastManager 11.13
BroadcastFactory — Pluggable Broadcast Variable Factories 11.13.1
TorrentBroadcastFactory 11.13.1.1
TorrentBroadcast 11.13.1.2
CompressionCodec 11.13.2
ContextCleaner — Spark Application Garbage Collector 11.14
CleanerListener 11.14.1
Dynamic Allocation (of Executors) 11.15
ExecutorAllocationManager — Allocation Manager for Spark Core 11.15.1
ExecutorAllocationClient 11.15.2
ExecutorAllocationListener 11.15.3
ExecutorAllocationManagerSource 11.15.4
HTTP File Server 11.16
Data Locality 11.17
10
Cache Manager 11.18
OutputCommitCoordinator 11.19
RpcEnv — RPC Environment 11.20
RpcEndpoint 11.20.1
RpcEndpointRef 11.20.2
RpcEnvFactory 11.20.3
Netty-based RpcEnv 11.20.4
TransportConf — Transport Configuration 11.21
Utils Helper Object 11.22
Spark on YARN
Spark on YARN 14.1
YarnShuffleService — ExternalShuffleService on YARN 14.2
ExecutorRunnable 14.3
Client 14.4
YarnRMClient 14.5
ApplicationMaster 14.6
AMEndpoint — ApplicationMaster RPC Endpoint 14.6.1
YarnClusterManager — ExternalClusterManager for YARN 14.7
TaskSchedulers for YARN 14.8
11
YarnScheduler 14.8.1
YarnClusterScheduler 14.8.2
SchedulerBackends for YARN 14.9
YarnSchedulerBackend 14.9.1
YarnClientSchedulerBackend 14.9.2
YarnClusterSchedulerBackend 14.9.3
YarnSchedulerEndpoint RPC Endpoint 14.9.4
YarnAllocator 14.10
Introduction to Hadoop YARN 14.11
Setting up YARN Cluster 14.12
Kerberos 14.13
ConfigurableCredentialManager 14.13.1
ClientDistributedCacheManager 14.14
YarnSparkHadoopUtil 14.15
Settings 14.16
Spark Standalone
Spark Standalone 15.1
Standalone Master — Cluster Manager of Spark Standalone 15.2
Standalone Worker 15.3
web UI 15.4
ApplicationPage 15.4.1
LocalSparkCluster — Single-JVM Spark Standalone Cluster 15.5
Submission Gateways 15.6
Management Scripts for Standalone Master 15.7
Management Scripts for Standalone Workers 15.8
Checking Status 15.9
Example 2-workers-on-1-node Standalone Cluster (one executor per worker) 15.10
StandaloneSchedulerBackend 15.11
Spark on Mesos
Spark on Mesos 16.1
12
MesosCoarseGrainedSchedulerBackend 16.2
About Mesos 16.3
Execution Model
Execution Model 17.1
Varia
Building Apache Spark from Sources 19.1
Spark and Hadoop 19.2
SparkHadoopUtil 19.2.1
13
Spark and software in-memory file systems 19.3
Spark and The Others 19.4
Distributed Deep Learning on Spark 19.5
Spark Packages 19.6
Interactive Notebooks
Interactive Notebooks 20.1
Apache Zeppelin 20.1.1
Spark Notebook 20.1.2
Exercises
One-liners using PairRDDFunctions 22.1
Learning Jobs and Partitions Using take Action 22.2
Spark Standalone - Using ZooKeeper for High-Availability of Master 22.3
Spark’s Hello World using Spark shell and Scala 22.4
WordCount using Spark shell 22.5
Your first complete Spark application (using Scala and sbt) 22.6
Spark (notable) use cases 22.7
Using Spark SQL to update data in Hive using ORC files 22.8
Developing Custom SparkListener to monitor DAGScheduler in Scala 22.9
Developing RPC Environment 22.10
Developing Custom RDD 22.11
Working with Datasets from JDBC Data Sources (and PostgreSQL) 22.12
Causing Stage to Fail 22.13
14
Further Learning
Courses 23.1
Books 23.2
15
Introduction
— Flannery O'Connor
I’m Jacek Laskowski, an independent consultant, software developer and technical instructor
specializing in Apache Spark, Apache Kafka and Kafka Streams (with Scala, sbt,
Kubernetes, DC/OS, Apache Mesos, and Hadoop YARN).
I offer software development and consultancy services with very hands-on in-depth
workshops and mentoring. Reach out to me at [email protected] or @jaceklaskowski to
discuss opportunities.
Consider joining me at Warsaw Scala Enthusiasts and Warsaw Spark meetups in Warsaw,
Poland.
I’m also writing Mastering Spark SQL, Mastering Kafka Streams, Apache Kafka
Tip
Notebook and Spark Structured Streaming Notebook gitbooks.
Expect text and code snippets from a variety of public sources. Attribution follows.
16
Overview of Apache Spark
Apache Spark
Apache Spark is an open-source distributed general-purpose cluster computing
framework with (mostly) in-memory data processing engine that can do ETL, analytics,
machine learning and graph processing on large volumes of data at rest (batch processing)
or in motion (streaming processing) with rich concise high-level APIs for the programming
languages: Scala, Python, Java, R, and SQL.
17
Overview of Apache Spark
Using Spark Application Frameworks, Spark simplifies access to machine learning and
predictive analytics at scale.
Spark is mainly written in Scala, but provides developer API for languages like Java, Python,
and R.
If you have large amounts of data that requires low latency processing that a typical
MapReduce program cannot provide, Spark is a viable alternative.
The Apache Spark project is an umbrella for SQL (with Datasets), streaming, machine
learning (pipelines) and graph processing engines built atop Spark Core. You can run them
all in a single application using a consistent API.
Spark runs locally as well as in clusters, on-premises or in cloud. It runs on top of Hadoop
YARN, Apache Mesos, standalone or in the cloud (Amazon EC2 or IBM Bluemix).
Apache Spark’s Streaming and SQL programming models with MLlib and GraphX make it
easier for developers and data scientists to build applications that exploit machine learning
and graph analytics.
At a high level, any Spark application creates RDDs out of some input, run (lazy)
transformations of these RDDs to some other form (shape), and finally perform actions to
collect or store data. Not much, huh?
You can look at Spark from programmer’s, data engineer’s and administrator’s point of view.
And to be honest, all three types of people will spend quite a lot of their time with Spark to
finally reach the point where they exploit all the available features. Programmers use
language-specific APIs (and work at the level of RDDs using transformations and actions),
data engineers use higher-level abstractions like DataFrames or Pipelines APIs or external
tools (that connect to Spark), and finally it all can only be possible to run because
administrators set up Spark clusters to deploy Spark applications to.
18
Overview of Apache Spark
When you hear "Apache Spark" it can be two things — the Spark engine aka
Spark Core or the Apache Spark open source project which is an "umbrella"
term for Spark Core and the accompanying Spark Application Frameworks, i.e.
Note
Spark SQL, Spark Streaming, Spark MLlib and Spark GraphX that sit on top of
Spark Core and the main data abstraction in Spark called RDD - Resilient
Distributed Dataset.
Why Spark
Let’s list a few of the many reasons for Spark. We are doing it first, and then comes the
overview that lends a more technical helping hand.
You could then use Spark Standalone built-in cluster manager to deploy your Spark
applications to a production-grade cluster to run on a full dataset.
One of the Spark project goals was to deliver a platform that supports a very wide array
of diverse workflows - not only MapReduce batch jobs (there were available in
Hadoop already at that time), but also iterative computations like graph algorithms or
Machine Learning.
And also different scales of workloads from sub-second interactive jobs to jobs that run
for many hours.
Spark combines batch, interactive, and streaming workloads under one rich concise API.
Spark supports near real-time streaming workloads via Spark Streaming application
framework.
ETL workloads and Analytics workloads are different, however Spark attempts to offer a
unified platform for a wide variety of workloads.
Graph and Machine Learning algorithms are iterative by nature and less saves to disk or
transfers over network means better performance.
19
Overview of Apache Spark
You should watch the video What is Apache Spark? by Mike Olson, Chief Strategy Officer
and Co-Founder at Cloudera, who provides a very exceptional overview of Apache Spark, its
rise in popularity in the open source community, and how Spark is primed to replace
MapReduce as the general processing engine in Hadoop.
Spark draws many ideas out of Hadoop MapReduce. They work together well - Spark on
YARN and HDFS - while improving on the performance and simplicity of the distributed
computing engine.
And it should not come as a surprise, without Hadoop MapReduce (its advances and
deficiencies), Spark would not have been born at all.
It is also exposed in Java, Python and R (as well as SQL, i.e. SparkSQL, in a sense).
So, when you have a need for distributed Collections API in Scala, Spark with RDD API
should be a serious contender.
It expanded on the available computation styles beyond the only map-and-reduce available
in Hadoop MapReduce.
20
Overview of Apache Spark
and Spark GraphX, you still use the same development and deployment environment to for
large data sets to yield a result, be it a prediction (Spark MLlib), a structured data queries
(Spark SQL) or just a large distributed batch (Spark Core) or streaming (Spark Streaming)
computation.
It’s also very productive of Spark that teams can exploit the different skills the team
members have acquired so far. Data analysts, data scientists, Python programmers, or Java,
or Scala, or R, can all use the same Spark platform using tailor-made API. It makes for
bringing skilled people with their expertise in different programming languages together to a
Spark project.
Using the Spark shell you can execute computations to process large amount of data (The
Big Data). It’s all interactive and very useful to explore the data before final production
release.
Also, using the Spark shell you can access any Spark cluster as if it was your local machine.
Just point the Spark shell to a 20-node of 10TB RAM memory in total (using --master ) and
use all the components (and their abstractions) like Spark SQL, Spark MLlib, Spark
Streaming, and Spark GraphX.
Depending on your needs and skills, you may see a better fit for SQL vs programming APIs
or apply machine learning algorithms (Spark MLlib) from data in graph data structures
(Spark GraphX).
Single Environment
Regardless of which programming language you are good at, be it Scala, Java, Python, R or
SQL, you can use the same single clustered runtime environment for prototyping, ad hoc
queries, and deploying your applications leveraging the many ingestion data points offered
by the Spark platform.
You can be as low-level as using RDD API directly or leverage higher-level APIs of Spark
SQL (Datasets), Spark MLlib (ML Pipelines), Spark GraphX (Graphs) or Spark Streaming
(DStreams).
The single programming model and execution engine for different kinds of workloads
simplify development and deployment architectures.
21
Overview of Apache Spark
Both, input and output data sources, allow programmers and data engineers use Spark as
the platform with the large amount of data that is read from or saved to for processing,
interactively (using Spark shell) or in applications.
Spark embraces many concepts in a single unified development and runtime environment.
Machine learning that is so tool- and feature-rich in Python, e.g. SciKit library, can now
be used by Scala developers (as Pipeline API in Spark MLlib or calling pipe() ).
This single platform gives plenty of opportunities for Python, Scala, Java, and R
programmers as well as data engineers (SparkR) and scientists (using proprietary enterprise
data warehouses with Thrift JDBC/ODBC Server in Spark SQL).
Mind the proverb if all you have is a hammer, everything looks like a nail, too.
Low-level Optimizations
Apache Spark uses a directed acyclic graph (DAG) of computation stages (aka execution
DAG). It postpones any processing until really required for actions. Spark’s lazy evaluation
gives plenty of opportunities to induce low-level optimizations (so users have to know less to
do more).
22
Overview of Apache Spark
Spark supports diverse workloads, but successfully targets low-latency iterative ones. They
are often used in Machine Learning and graph algorithms.
Many Machine Learning algorithms require plenty of iterations before the result models get
optimal, like logistic regression. The same applies to graph algorithms to traverse all the
nodes and edges when needed. Such computations can increase their performance when
the interim partial results are stored in memory or at very fast solid state drives.
Spark can cache intermediate data in memory for faster model building and training. Once
the data is loaded to memory (as an initial step), reusing it multiple times incurs no
performance slowdowns.
Also, graph algorithms can traverse graphs one connection per iteration with the partial
result in memory.
Less disk access and network can make a huge difference when you need to process lots of
data, esp. when it is a BIG Data.
Scala in Spark, especially, makes for a much less boiler-plate code (comparing to other
languages and approaches like MapReduce in Java).
Developers no longer have to learn many different processing engines and platforms, and let
the time be spent on mastering framework APIs per use case (atop a single computation
engine Spark).
23
Overview of Apache Spark
In the no-so-long-ago times, when the most prevalent distributed computing framework was
Hadoop MapReduce, you could reuse a data between computation (even partial ones!) only
after you’ve written it to an external storage like Hadoop Distributed Filesystem (HDFS). It
can cost you a lot of time to compute even very basic multi-stage computations. It simply
suffers from IO (and perhaps network) overhead.
One of the many motivations to build Spark was to have a framework that is good at data
reuse.
Spark cuts it out in a way to keep as much data as possible in memory and keep it there
until a job is finished. It doesn’t matter how many stages belong to a job. What does matter
is the available memory and how effective you are in using Spark API (so no shuffle occur).
The less network and disk IO, the better performance, and Spark tries hard to find ways to
minimize both.
The reasonably small codebase of Spark invites project contributors - programmers who
extend the platform and fix bugs in a more steady pace.
24
ShuffleClient — Contract to Fetch Shuffle Blocks
ShuffleClient can optionally be initialized with an appId (that actually does nothing by
default)
ShuffleClient has shuffle-related Spark metrics that are used when BlockManager is
requested for a shuffle-related Spark metrics source (only when Executor is created for a
non-local / cluster mode).
package org.apache.spark.network.shuffle;
Table 2. ShuffleClients
ShuffleClient Description
BlockTransferService
ExternalShuffleClient
init Method
25
ShuffleClient — Contract to Fetch Shuffle Blocks
MetricSet shuffleMetrics()
26
BlockTransferService — Pluggable Block Transfers (To Fetch and Upload Blocks)
BlockTransferService — Pluggable Block
Transfers (To Fetch and Upload Blocks)
BlockTransferService is the base for ShuffleClients that can fetch and upload blocks of data
synchronously or asynchronously.
package org.apache.spark.network
27
BlockTransferService — Pluggable Block Transfers (To Fetch and Upload Blocks)
fetchBlockSync Method
fetchBlockSync(
host: String,
port: Int,
execId: String,
blockId: String,
tempFileManager: TempFileManager): ManagedBuffer
fetchBlockSync …FIXME
Synchronous (and hence blocking) fetchBlockSync to fetch one block blockId (that
corresponds to the ShuffleClient parent’s asynchronous fetchBlocks).
fetchBlockSync is a mere wrapper around fetchBlocks to fetch one blockId block that
28
BlockTransferService — Pluggable Block Transfers (To Fetch and Upload Blocks)
uploadBlockSync(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Unit
uploadBlockSync …FIXME
uploadBlockSync is a mere blocking wrapper around uploadBlock that waits until the upload
finishes.
29
ExternalShuffleClient
ExternalShuffleClient
ExternalShuffleClient is a ShuffleClient that…FIXME
void registerWithShuffleServer(
String host,
int port,
String execId,
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException
registerWithShuffleServer …FIXME
fetchBlocks Method
void fetchBlocks(
String host,
int port,
String execId,
String[] blockIds,
BlockFetchingListener listener,
TempFileManager tempFileManager)
fetchBlocks …FIXME
30
NettyBlockTransferService — Netty-Based BlockTransferService
NettyBlockTransferService — Netty-Based
BlockTransferService
NettyBlockTransferService is a BlockTransferService that uses Netty for uploading or
Refer to Logging.
fetchBlocks Method
31
NettyBlockTransferService — Netty-Based BlockTransferService
fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener): Unit
When executed, fetchBlocks prints out the following TRACE message in the logs:
createAndStart method…FIXME
If however the number of retries is not greater than 0 (it could be 0 or less), the
RetryingBlockFetcher.BlockFetchStarter created earlier is started (with the input blockIds
and listener ).
In case of any Exception , you should see the following ERROR message in the logs and
the input BlockFetchingListener gets notified (using onBlockFetchFailure for every block
id).
Caution FIXME
close(): Unit
32
NettyBlockTransferService — Netty-Based BlockTransferService
close …FIXME
In the end, you should see the INFO message in the logs:
uploadBlock(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Future[Unit]
33
NettyBlockTransferService — Netty-Based BlockTransferService
The UploadBlock message holds the application id, the input execId and blockId . It also
holds the serialized bytes for block metadata with level and classTag serialized (using
the internal JavaSerializer ) as well as the serialized bytes for the input blockData itself
(this time however the serialization uses ManagedBuffer.nioByteBuffer method).
When blockId block was successfully uploaded, you should see the following TRACE
message in the logs:
When an upload failed, you should see the following ERROR message in the logs:
UploadBlock Message
UploadBlock is a BlockTransferMessage that describes a block being uploaded, i.e. send
metadata
34
NettyBlockTransferService — Netty-Based BlockTransferService
As an Encodable , UploadBlock can calculate the encoded size and do encoding and
decoding itself to or from a ByteBuf , respectively.
createServer …FIXME
SparkConf
SecurityManager
Port number
35
NettyBlockRpcServer — NettyBlockTransferService’s RpcHandler
NettyBlockRpcServer —
NettyBlockTransferService’s RpcHandler
NettyBlockRpcServer is a RpcHandler that handles messages for
NettyBlockTransferService.
Tip Enable TRACE logging level to see received messages in the logs.
Refer to Logging.
NettyBlockRpcServer then registers a stream of ManagedBuffer s (for the blocks) with the
36
NettyBlockRpcServer — NettyBlockTransferService’s RpcHandler
In the end, NettyBlockRpcServer responds with a StreamHandle (with the streamId and the
number of blocks). The response is serialized as a ByteBuffer .
Application ID
Serializer
BlockDataManager
37
NettyBlockRpcServer — NettyBlockTransferService’s RpcHandler
receive(
client: TransportClient,
rpcMessage: ByteBuffer,
responseContext: RpcResponseCallback): Unit
receive …FIXME
38
BlockFetchingListener
BlockFetchingListener
BlockFetchingListener is the contract of EventListeners that want to be notified about
package org.apache.spark.network.shuffle;
Table 2. BlockFetchingListeners
BlockFetchingListener Description
RetryingBlockFetchListener
"Unnamed" in
ShuffleBlockFetcherIterator
"Unnamed" in
BlockTransferService
39
RetryingBlockFetcher
RetryingBlockFetcher
RetryingBlockFetcher is…FIXME
0 which it is by default)
At initiateRetry, RetryingBlockFetcher prints out the following INFO message to the logs
(with the number of outstandingBlocksIds):
TransportConf
BlockFetchStarter
BlockFetchingListener
40
RetryingBlockFetcher
void start()
initiateRetry …FIXME
void fetchAllOutstanding()
outstandingBlocksIds.
RetryingBlockFetchListener
RetryingBlockFetchListener is a BlockFetchingListener that RetryingBlockFetcher uses to
onBlockFetchSuccess Method
41
RetryingBlockFetcher
onBlockFetchSuccess …FIXME
onBlockFetchFailure Method
onBlockFetchFailure …FIXME
42
BlockFetchStarter
BlockFetchStarter
BlockFetchStarter is the contract of…FIXME…to createAndStart.
43
Web UI — Spark Application’s Web Console
web UI comes with the following tabs (which may not all be visible immediately, but only
after the respective modules are in use, e.g. the SQL or Streaming tabs):
1. Jobs
2. Stages
3. Storage
4. Environment
5. Executors
You can use the web UI after the application has finished by persisting events
Tip
(using EventLoggingListener) and using Spark History Server.
44
Web UI — Spark Application’s Web Console
45
Jobs
Jobs Tab
Jobs tab in web UI shows status of all Spark jobs in a Spark application (i.e. a
SparkContext).
46
Jobs
Details for Job page is registered under /job URL, i.e. http://localhost:4040/jobs/job/?
id=0 and accepts one mandatory id request parameter as a job identifier.
When a job id is not found, you should see "No information to display for job ID" message.
Figure 4. "No information to display for job" in Details for Job Page
JobPage displays the job’s status, group (if available), and the stages per state: active,
47
Jobs
Figure 5. Details for Job Page with Active and Pending Stages
48
Jobs
49
Stages
Stages Tab
Stages tab in web UI shows…FIXME
50
Storage
Storage Tab
Storage tab in web UI shows…FIXME
51
Environment
Environment Tab
Environment tab in web UI shows…FIXME
52
Executors
Executors Tab
Executors tab in web UI shows…FIXME
What’s interesting in how Storage Memory is displayed in the Executors tab is that the default
in a way that is different from what the page displays (using the custom JavaScript
getExecInfo Method
getExecInfo(
listener: ExecutorsListener,
statusId: Int,
isActive: Boolean): ExecutorSummary
53
Executors
Caution FIXME
Settings
spark.ui.threadDumpsEnabled
spark.ui.threadDumpsEnabled (default: true ) is to enable ( true ) or disable ( false )
ExecutorThreadDumpPage.
54
JobsTab
JobsTab
JobsTab is a SparkUITab with jobs prefix.
Parent SparkUI
AppStatusStore
When created, JobsTab creates the following pages and attaches them immediately:
AllJobsPage
JobPage
handleKillRequest Method
handleKillRequest …FIXME
55
AllJobsPage
AllJobsPage renders a summary, an event timeline, and active, completed, and failed jobs
of a Spark application.
Tip Jobs (in any state) are displayed when their number is greater than 0 .
AllJobsPage displays the Summary section with the current Spark user, total uptime,
56
AllJobsPage
When you hover over a job in Event Timeline not only you see the job legend but also the
job is highlighted in the Summary section.
Figure 4. Hovering Over Job in Event Timeline Highlights The Job in Status Section
The Event Timeline section shows not only jobs but also executors.
57
AllJobsPage
Parent JobsTab
AppStatusStore
58
JobPage
JobPage
JobPage is a WebUIPage with job prefix.
Parent JobsTab
AppStatusStore
59
StagesTab — Stages for All Jobs
When created, StagesTab creates the following pages and attaches them immediately:
AllStagesPage
StagePage
PoolPage
Stages tab in web UI shows the current state of all stages of all jobs in a Spark application
(i.e. a SparkContext) with two optional pages for the tasks and statistics for a stage (when a
stage is selected) and pool details (when the application works in FAIR scheduling mode).
You can access the Stages tab under /stages URL, i.e. http://localhost:4040/stages.
With no jobs submitted yet (and hence no stages to display), the page shows nothing but the
title.
60
StagesTab — Stages for All Jobs
The state sections are only displayed when there are stages in a given state.
Note
Refer to Stages for All Jobs.
In FAIR scheduling mode you have access to the table showing the scheduler pools.
The page uses the parent’s SparkUI to access required services, i.e. SparkContext,
SparkConf, JobProgressListener, RDDOperationGraphListener, and to know whether kill is
enabled or not.
killEnabled flag
Caution FIXME
SparkUI
AppStatusStore
handleKillRequest …FIXME
61
StagesTab — Stages for All Jobs
62
AllStagesPage — Stages for All Jobs
stages in a Spark application - active, pending, completed, and failed stages with their count.
Figure 1. Stages Tab in web UI for FAIR scheduling mode (with pools only)
In FAIR scheduling mode you have access to the table showing the scheduler pools as well
as the pool names per stage.
Internally, AllStagesPage is a WebUIPage with access to the parent Stages tab and more
importantly the JobProgressListener to have access to current state of the entire Spark
application.
63
AllStagesPage — Stages for All Jobs
There are 4 different tables for the different states of stages - active, pending, completed,
and failed. They are displayed only when there are stages in a given state.
Figure 2. Stages Tab in web UI for FAIR scheduling mode (with pools and stages)
You could also notice "retry" for stage when it was retried.
64
StagePage — Stage Details
StagePage — Stage Details
StagePage is a WebUIPage with stage prefix.
StagePage shows the task details for a stage given its id and attempt id.
StagePage uses ExecutorsListener to display stdout and stderr logs of the executors in
Tasks section.
Tasks Section
65
StagePage — Stage Details
The section uses ExecutorsListener to access stdout and stderr logs for
Note
Executor ID / Host column.
The table consists of the following columns: Metric, Min, 25th percentile, Median, 75th
percentile, Max.
66
StagePage — Stage Details
The 1st row is Duration which includes the quantiles based on executorRunTime .
The 2nd row is the optional Scheduler Delay which includes the time to ship the task from
the scheduler to executors, and the time to send the task result from the executors to the
scheduler. It is not enabled by default and you should select Scheduler Delay checkbox
under Show Additional Metrics to include it in the summary table.
The 3rd row is the optional Task Deserialization Time which includes the quantiles based
on executorDeserializeTime task metric. It is not enabled by default and you should select
Task Deserialization Time checkbox under Show Additional Metrics to include it in the
summary table.
The 4th row is GC Time which is the time that an executor spent paused for Java garbage
collection while the task was running (using jvmGCTime task metric).
The 5th row is the optional Result Serialization Time which is the time spent serializing the
task result on a executor before sending it back to the driver (using
resultSerializationTime task metric). It is not enabled by default and you should select
Result Serialization Time checkbox under Show Additional Metrics to include it in the
summary table.
The 6th row is the optional Getting Result Time which is the time that the driver spends
fetching task results from workers. It is not enabled by default and you should select Getting
Result Time checkbox under Show Additional Metrics to include it in the summary table.
If Getting Result Time is large, consider decreasing the amount of data returned
Tip
from each task.
If Tungsten is enabled (it is by default), the 7th row is the optional Peak Execution Memory
which is the sum of the peak sizes of the internal data structures created during shuffles,
aggregations and joins (using peakExecutionMemory task metric). For SQL jobs, this only
tracks all unsafe operators, broadcast joins, and external sort. It is not enabled by default
and you should select Peak Execution Memory checkbox under Show Additional Metrics
to include it in the summary table.
If the stage has an input, the 8th row is Input Size / Records which is the bytes and records
read from Hadoop or from a Spark storage (using inputMetrics.bytesRead and
inputMetrics.recordsRead task metrics).
If the stage has an output, the 9th row is Output Size / Records which is the bytes and
records written to Hadoop or to a Spark storage (using outputMetrics.bytesWritten and
outputMetrics.recordsWritten task metrics).
67
StagePage — Stage Details
If the stage has shuffle read there will be three more rows in the table. The first row is
Shuffle Read Blocked Time which is the time that tasks spent blocked waiting for shuffle
data to be read from remote machines (using shuffleReadMetrics.fetchWaitTime task
metric). The other row is Shuffle Read Size / Records which is the total shuffle bytes and
records read (including both data read locally and data read from remote executors using
shuffleReadMetrics.totalBytesRead and shuffleReadMetrics.recordsRead task metrics). And
the last row is Shuffle Remote Reads which is the total shuffle bytes read from remote
executors (which is a subset of the shuffle read bytes; the remaining shuffle data is read
locally). It uses shuffleReadMetrics.remoteBytesRead task metric.
If the stage has shuffle write, the following row is Shuffle Write Size / Records (using
shuffleWriteMetrics.bytesWritten and shuffleWriteMetrics.recordsWritten task metrics).
If the stage has bytes spilled, the following two rows are Shuffle spill (memory) (using
memoryBytesSpilled task metric) and Shuffle spill (disk) (using diskBytesSpilled task
metric).
Request Parameters
id is…
attempt is…
Metrics
Scheduler Delay is…FIXME
68
StagePage — Stage Details
69
StagePage — Stage Details
Executor ID
Address
Task Time
Total Tasks
Failed Tasks
Killed Tasks
Succeeded Tasks
(optional) Input Size / Records (only when the stage has an input)
(optional) Output Size / Records (only when the stage has an output)
(optional) Shuffle Read Size / Records (only when the stage read bytes for a shuffle)
(optional) Shuffle Write Size / Records (only when the stage wrote bytes for a shuffle)
70
StagePage — Stage Details
(optional) Shuffle Spill (Memory) (only when the stage spilled memory bytes)
(optional) Shuffle Spill (Disk) (only when the stage spilled bytes to disk)
Accumulators
Stage page displays the table with named accumulators (only if they exist). It contains the
name and value of the accumulators.
Parent StagesTab
AppStatusStore
71
PoolPage — Pool Details
The Fair Scheduler Pool Details page shows information about a Schedulable pool and is
only available when a Spark application uses the FAIR scheduling mode (which is controlled
by spark.scheduler.mode setting).
PoolPage uses the parent’s SparkContext to access information about the pool and
order by default).
Summary Table
The Summary table shows the details of a Schedulable pool.
Pool Name
72
PoolPage — Pool Details
Minimum Share
Pool Weight
Running Tasks
SchedulingMode
All the columns are the attributes of a Schedulable but the number of active stages which is
calculated using the list of active stages of a pool (from the parent’s JobProgressListener).
Stage Id
Description
Submitted
Duration
Tasks: Succeeded/Total
Shuffle Read — Total shuffle bytes and records read (includes both data read locally
and data read from remote executors).
The table uses JobProgressListener for information per stage in the pool.
Request Parameters
73
PoolPage — Pool Details
poolname
poolname is the name of the scheduler pool to display on the page. It is a mandatory
request parameter.
74
StorageTab
StorageTab
StorageTab is a SparkUITab with storage prefix.
Parent SparkUI
AppStatusStore
When created, StorageTab creates the following pages and attaches them immediately:
StoragePage
RDDPage
75
StoragePage
StoragePage
StoragePage is a WebUIPage with an empty prefix.
Parent SparkUITab
AppStatusStore
rddRow …FIXME
rddTable …FIXME
receiverBlockTables Method
receiverBlockTables …FIXME
76
StoragePage
render requests the AppStatusStore for rddList and renders an HTML table with their
render requests the AppStatusStore for streamBlocksList and renders an HTML table with
77
RDDPage
RDDPage
RDDPage is a WebUIPage with rdd prefix.
Parent SparkUITab
AppStatusStore
render Method
render …FIXME
78
EnvironmentTab
EnvironmentTab
EnvironmentTab is a SparkUITab with environment prefix.
Parent SparkUI
AppStatusStore
79
EnvironmentPage
EnvironmentPage
EnvironmentPage is a WebUIPage with an empty prefix.
Parent EnvironmentTab
SparkConf
AppStatusStore
80
ExecutorsTab
ExecutorsTab
ExecutorsTab is a SparkUITab with executors prefix.
When created, ExecutorsTab creates the following pages and attaches them immediately:
ExecutorsPage
ExecutorThreadDumpPage
application.
81
ExecutorsPage
ExecutorsPage
ExecutorsPage is a WebUIPage with an empty prefix.
Parent SparkUITab
threadDumpEnabled flag
82
ExecutorThreadDumpPage
ExecutorThreadDumpPage
ExecutorThreadDumpPage is a WebUIPage with threadDump prefix.
SparkUITab
Optional SparkContext
83
SparkUI — Web UI of Spark Application
application)
Name of the Spark application that is exactly the value of spark.app.name configuration
property
When started, SparkUI binds to appUIAddress address that you can control using
SPARK_PUBLIC_DNS environment variable or spark.driver.host Spark property.
84
SparkUI — Web UI of Spark Application
taskList
Refer to Logging.
stop(): Unit
stop stops the HTTP server and prints the following INFO message to the logs:
85
SparkUI — Web UI of Spark Application
appUIAddress Method
appUIAddress: String
appUIAddress returns the entire URL of a Spark application’s web UI, including http://
scheme.
getSparkUser: String
getSparkUser returns the name of the user a Spark application runs as.
createLiveUI Method
createLiveUI(
sc: SparkContext,
conf: SparkConf,
listenerBus: SparkListenerBus,
jobProgressListener: JobProgressListener,
securityManager: SecurityManager,
appName: String,
startTime: Long): SparkUI
86
SparkUI — Web UI of Spark Application
createHistoryUI Method
Caution FIXME
appUIHostPort Method
appUIHostPort: String
appUIHostPort returns the Spark application’s web UI which is the public hostname and
getAppName Method
getAppName: String
getAppName returns the name of the Spark application (of a SparkUI instance).
create(
sc: Option[SparkContext],
store: AppStatusStore,
conf: SparkConf,
securityManager: SecurityManager,
appName: String,
basePath: String = "",
startTime: Long,
appSparkVersion: String = org.apache.spark.SPARK_VERSION): SparkUI
Internally, create simply creates a new SparkUI (with the predefined Spark version).
87
SparkUI — Web UI of Spark Application
AppStatusStore
SparkContext
SparkConf
SecurityManager
Application name
basePath
Start time
appSparkVersion
SparkUI initializes the internal registries and counters and the tabs and handlers.
initialize(): Unit
initialize creates and attaches the following tabs (with the reference to the SparkUI and
its AppStatusStore):
1. JobsTab
2. StagesTab
3. StorageTab
4. EnvironmentTab
88
SparkUI — Web UI of Spark Application
5. ExecutorsTab
1. Creates a static handler for serving files from a static directory, i.e. /static to serve
static files from org/apache/spark/ui/static directory (on CLASSPATH)
2. Creates a redirect handler to redirect / to /jobs/ (and so the Jobs tab is the
welcome tab when you open the web UI)
3. Creates the /api/* context handler for the Status REST API
89
SparkUITab
SparkUITab
SparkUITab is the contract of WebUITab extensions with two additional properties:
appName
appSparkVersion
package org.apache.spark.ui
Table 2. SparkUITabs
SparkUITab Description
EnvironmentTab
ExecutorsTab
JobsTab
StagesTab
StorageTab
90
SparkUITab
91
BlockStatusListener Spark Listener
blockManagers
The lookup table for a collection of BlockId and
BlockUIData per BlockManagerId.
onBlockManagerAdded
Registers a BlockManager in blockManagers internal
registry (with no blocks).
onBlockManagerRemoved
Removes a BlockManager from blockManagers internal
registry.
onBlockUpdated
Ignores updates for unregistered BlockManager s or
non- StreamBlockId s.
92
EnvironmentListener Spark Listener
Caution FIXME
93
ExecutorsListener Spark Listener
application for Stage Details page, Jobs tab and /allexecutors REST endpoint.
onExecutorBlacklisted FIXME
onExecutorUnblacklisted FIXME
onNodeBlacklisted FIXME
onNodeUnblacklisted FIXME
94
ExecutorsListener Spark Listener
A collection of SparkListenerEvents.
executorEvents
Used to build the event timeline in AllJobsPage and
Details for Job pages.
updateExecutorBlacklist Method
Caution FIXME
Caution FIXME
Caution FIXME
Caution FIXME
Caution FIXME
95
ExecutorsListener Spark Listener
defined) and finds the driver’s active StorageStatus (using the current
StorageStatusListener). onApplicationStart then uses the driver’s StorageStatus (if
defined) to set executorLogs .
onExecutorAdded finds the executor (using the input executorAdded ) in the internal
totalCores totalCores
96
ExecutorsListener Spark Listener
removes the oldest event if the number of elements in executorEvents collection is greater
than spark.ui.timeline.executors.maximum configuration property.
SparkListenerTaskStart ).
97
ExecutorsListener Spark Listener
Note onTaskEnd is part of SparkListener contract to announce that a task has ended.
tasksActive is decremented but only when the number of active tasks for the executor is
greater than 0 .
If the TaskMetrics (in the input taskEnd ) is available, the metrics are added to the
taskSummary for the task’s executor.
98
ExecutorsListener Spark Listener
inputRecords inputMetrics.recordsRead
outputBytes outputMetrics.bytesWritten
outputRecords outputMetrics.recordsWritten
shuffleRead shuffleReadMetrics.remoteBytesRead
shuffleWrite shuffleWriteMetrics.bytesWritten
jvmGCTime metrics.jvmGCTime
activeStorageStatusList: Seq[StorageStatus]
executors).
FIXME
AllExecutorListResource does executorList
Note
ExecutorListResource does executorList
99
JobProgressListener Spark Listener
onExecutorMetricsUpdate
onBlockManagerAdded
Records an executor and its block manager in the internal
executorIdToBlockManagerId registry.
onBlockManagerRemoved
Removes the executor from the internal
executorIdToBlockManagerId registry.
100
JobProgressListener Spark Listener
Does nothing.
onTaskGettingResult
FIXME: Why is this event intercepted at all?!
updateAggregateMetrics Method
Caution FIXME
101
JobProgressListener Spark Listener
numFailedStages
stageIdToData
Holds StageUIData per stage, i.e. the stage and stage
attempt ids.
stageIdToInfo
stageIdToActiveJobIds
poolToActiveStages
activeJobs
completedJobs
failedJobs
jobIdToData
jobGroupToJobIds
pendingStages
activeStages
completedStages
skippedStages
failedStages
onJobStart Callback
102
JobProgressListener Spark Listener
onJobStart reads the optional Spark Job group id as spark.jobGroup.id (from properties
onJobStart then creates a JobUIData using the input jobStart with status attribute set
onJobStart looks the job ids for the group id (in jobGroupToJobIds registry) and adds the
job id.
The internal pendingStages is updated with StageInfo for the stage id (for every StageInfo
in SparkListenerJobStart.stageInfos collection).
onJobEnd Method
onJobEnd removes the job from activeJobs registry. It removes stages from pendingStages
registry.
When completed successfully, the job is added to completedJobs registry with status
attribute set to JobExecutionStatus.SUCCEEDED . numCompletedJobs gets incremented.
When failed, the job is added to failedJobs registry with status attribute set to
JobExecutionStatus.FAILED . numFailedJobs gets incremented.
For every stage in the job, the stage is removed from the active jobs (in
stageIdToActiveJobIds) that can remove the entire entry if no active jobs exist.
onExecutorMetricsUpdate Method
103
JobProgressListener Spark Listener
onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Un
it
onTaskStart Method
onTaskStart looks the StageUIData for the stage and stage attempt ids up (in
stageIdToData registry).
stageData.taskData .
Ultimately, onTaskStart looks the stage in the internal stageIdToActiveJobIds and for each
active job reads its JobUIData (from jobIdToData). It then increments numActiveTasks .
onTaskEnd Method
onTaskEnd looks the StageUIData for the stage and stage attempt ids up (in stageIdToData
registry).
onTaskEnd reads the ExecutorSummary for the executor (the task has finished on).
104
JobProgressListener Spark Listener
Again, depending on the task end’s reason onTaskEnd computes errorMessage and
updates StageUIData .
Ultimately, onTaskEnd looks the stage in the internal stageIdToActiveJobIds and for each
active job reads its JobUIData (from jobIdToData). It then decrements numActiveTasks and
increments numCompletedTasks , numKilledTasks or numFailedTasks depending on the task’s
end reason.
onStageSubmitted Method
onStageCompleted Method
stageIdToInfo registry.
onStageCompleted looks the StageUIData for the stage and the stage attempt ids up in
stageIdToData registry.
If the stage completed successfully (i.e. has no failureReason ), onStageCompleted adds the
stage to completedStages registry and increments numCompletedStages counter. It trims
completedStages.
105
JobProgressListener Spark Listener
Otherwise, when the stage failed, onStageCompleted adds the stage to failedStages registry
and increments numFailedStages counter. It trims failedStages.
Ultimately, onStageCompleted looks the stage in the internal stageIdToActiveJobIds and for
each active job reads its JobUIData (from jobIdToData). It then decrements
numActiveStages . When completed successfully, it adds the stage to
JobUIData
Caution FIXME
blockManagerIds method
blockManagerIds: Seq[BlockManagerId]
Caution FIXME
StageUIData
Caution FIXME
Settings
Table 3. Spark Properties
Setting Default Value Description
spark.ui.retainedJobs 1000
The number of jobs to hold
information about
spark.ui.retainedStages 1000
The number of stages to
hold information about
spark.ui.retainedTasks 100000
The number of tasks to
hold information about
106
StorageStatusListener Spark Listener
107
StorageStatusListener Spark Listener
Caution FIXME
storageStatusList: Seq[StorageStatus]
internal registry).
108
StorageStatusListener Spark Listener
deadStorageStatusList Method
deadStorageStatusList: Seq[StorageStatus]
updateStorageStatus(unpersistedRDDId: Int)
updateStorageStatus then finds RDD blocks for unpersistedRDDId RDD (for every
109
StorageListener — Spark Listener for Tracking Persistence Status of RDD Blocks
StorageStatusListener
110
StorageListener — Spark Listener for Tracking Persistence Status of RDD Blocks
activeStorageStatusList: Seq[StorageStatus]
executors).
updates registered RDDInfos (with block updates from BlockManagers) (passing in BlockId
and BlockStatus as a single-element collection of updated blocks).
onStageCompleted finds the identifiers of the RDDs that have participated in the completed
stage and removes them from _rddInfoMap registry as well as the RDDs that are no longer
cached.
111
StorageListener — Spark Listener for Tracking Persistence Status of RDD Blocks
stageSubmitted , possibly adding new RDDInfo instances if they were not registered yet.
onUnpersistRDD removes the RDDInfo from _rddInfoMap registry for the unpersisted RDD
(from unpersistRDD ).
updateRDDInfo finds the RDDs for the input updatedBlocks (for BlockIds).
updateRDDInfo takes RDDInfo entries (in _rddInfoMap registry) for which there are blocks in
Caution FIXME
112
StorageListener — Spark Listener for Tracking Persistence Status of RDD Blocks
113
RDDOperationGraphListener Spark Listener
Caution FIXME
114
WebUI — Framework For Web UIs
WebUI — Base Web UI
WebUI is the base of the web UIs in Apache Spark:
Note Spark on YARN uses a different web framework for the web UI.
package org.apache.spark.ui
WebUI is a Scala abstract class and cannot be created directly, but only as one of the web
UIs.
115
WebUI — Framework For Web UIs
Table 2. WebUIs
WebUI Description
Once bound to a Jetty HTTP server, WebUI is available at an HTTP port (and is used in the
web URL as boundPort ).
publicHostName is…FIXME and the boundPort is the port that the port the Jetty HTTP
Server bound to.
116
WebUI — Framework For Web UIs
WebUITabs
tabs
Used when…FIXME
ServletContextHandlers
handlers
Used when…FIXME
className
Used when…FIXME
Enable INFO or ERROR logging level for the corresponding loggers of the
WebUIs, e.g. org.apache.spark.ui.SparkUI , to see what happens inside.
Refer to Logging.
117
WebUI — Framework For Web UIs
SecurityManager
SSLOptions
Port number
SparkConf
WebUI is a Scala abstract class and cannot be created directly, but only as one
Note
of the implementations.
detachPage …FIXME
detachTab …FIXME
detachHandler …FIXME
118
WebUI — Framework For Web UIs
detachHandler …FIXME
Internally, attachPage creates the path of the WebUIPage that is / (forward slash)
followed by the prefix of the page.
addStaticHandler …FIXME
119
WebUI — Framework For Web UIs
attachHandler simply adds the input Jetty ServletContextHandler to handlers registry and
getBasePath Method
getBasePath: String
getTabs: Seq[WebUITab]
Note getTabs is used exclusively when WebUITab is requested for the header tabs.
120
WebUI — Framework For Web UIs
getHandlers: Seq[ServletContextHandler]
bind(): Unit
bind …FIXME
stop(): Unit
stop …FIXME
121
WebUIPage — Contract of Pages in Web UI
JSON.
attached to a WebUITab
package org.apache.spark.ui
render
Used exclusively when WebUI is requested to attach a
page (and…FIXME)
Table 2. WebUIPages
WebUIPage Description
AllExecutionsPage Used in Spark SQL module
AllJobsPage
AllStagesPage
122
WebUIPage — Contract of Pages in Web UI
EnvironmentPage
ExecutorsPage
ExecutorThreadDumpPage
JobPage
PoolPage
RDDPage
StagePage
StoragePage
123
WebUITab — Contract of Tabs in Web UI
attached to a WebUITab
attachPage prepends the page prefix (of the input WebUIPage ) with the tab prefix (with no
basePath: String
headerTabs: Seq[WebUITab]
124
WebUITab — Contract of Tabs in Web UI
Parent WebUI
Prefix
WebUITab is a Scala abstract class and cannot be created directly, but only as
Note
one of the implementations.
125
RDDStorageInfo
RDDStorageInfo
RDDStorageInfo contains information about RDD persistence:
RDD id
RDD name
Storage level ID
Memory used
Disk used
requested to write).
1. web UI’s StoragePage is requested to render an HTML table row and an entire table for
RDD details
126
RDDInfo
RDDInfo
RDDInfo is…FIXME
127
LiveEntity
LiveEntity
LiveEntity is the contract of a live entity in Spark that…FIXME
package org.apache.spark.status
LiveEntity tracks the last write time (in lastWriteTime internal registry).
write Method
128
LiveRDD
LiveRDD
LiveRDD is a LiveEntity that…FIXME
onStageSubmitted event
doUpdate Method
doUpdate(): Any
doUpdate …FIXME
129
UIUtils
UIUtils
UIUtils is a utility object for…FIXME
headerSparkPage Method
headerSparkPage(
request: HttpServletRequest,
title: String,
content: => Seq[Node],
activeTab: SparkUITab,
refreshInterval: Option[Int] = None,
helpText: Option[String] = None,
showVisualization: Boolean = false,
useDataTables: Boolean = false): Seq[Node]
headerSparkPage …FIXME
130
JettyUtils
JettyUtils
JettyUtils is a set of utility methods for creating Jetty HTTP Server-specific components.
createRedirectHandler
createServletHandler(
path: String,
servlet: HttpServlet,
basePath: String): ServletContextHandler (1)
createServletHandler[T <: AnyRef](
path: String,
servletParams: ServletParams[T],
securityMgr: SecurityManager,
conf: SparkConf,
basePath: String = ""): ServletContextHandler (2)
createServletHandler …FIXME
131
JettyUtils
createServlet creates the X-Frame-Options header that can be either ALLOW-FROM with the
createServlet creates a Java Servlets HttpServlet with support for GET requests.
When handling GET requests, the HttpServlet first checks view permissions of the remote
user (by requesting the SecurityManager to checkUIViewPermissions of the remote user).
Enable DEBUG logging level for org.apache.spark.SecurityManager logger to see what happens w
SecurityManager does the security check.
log4j.logger.org.apache.spark.SecurityManager=DEBUG
Tip
With view permissions check passed, the HttpServlet sends a response with the following:
FIXME
In case the view permissions didn’t allow to view the page, the HttpServlet sends an error
response with the following:
Status 403
132
JettyUtils
successful, sets resourceBase init parameter of the Jetty DefaultServlet to the URL.
Note resourceBase init parameter is used to replace the context resource base.
resolved.
createRedirectHandler Method
createRedirectHandler(
srcPath: String,
destPath: String,
beforeRedirect: HttpServletRequest => Unit = x => (),
basePath: String = "",
httpMethods: Set[String] = Set("GET")): ServletContextHandler
133
JettyUtils
createRedirectHandler …FIXME
134
web UI Configuration Properties
spark.ui.consoleProgress.update.interval
200 Update interval, i.e. how often to
(ms) show the progress.
135
web UI Configuration Properties
deadExecutorStorageStatus (in
StorageStatusListener ) internal
registries.
spark.ui.timeline.tasks.maximum 1000
136
Spark Metrics
Spark Metrics
Spark Metrics gives you execution metrics of Spark subsystems (aka metrics instances),
e.g. the driver of a Spark application or the master of a Spark Standalone cluster.
Spark Metrics uses Dropwizard Metrics 3.1.0 Java library for the metrics infrastructure.
Metrics is a Java library which gives you unparalleled insight into what your code does
in production.
MetricsSystem uses Dropwizard Metrics' MetricRegistry that acts as the integration point
configuration properties.
137
Spark Metrics
Among the metrics sinks is MetricsServlet that is used when sink.servlet metrics sink is
configured in metrics configuration.
You can then use jconsole to access Spark metrics through JMX.
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
138
Spark Metrics
Content-Type: text/json;charset=utf-8
Date: Sat, 25 Feb 2017 14:14:16 GMT
Server: Jetty(9.2.z-SNAPSHOT)
X-Frame-Options: SAMEORIGIN
{
"counters": {
"app-20170225151406-0000.driver.HiveExternalCatalog.fileCacheHits": {
"count": 0
},
"app-20170225151406-0000.driver.HiveExternalCatalog.filesDiscovered": {
"count": 0
},
"app-20170225151406-0000.driver.HiveExternalCatalog.hiveClientCalls": {
"count": 2
},
"app-20170225151406-0000.driver.HiveExternalCatalog.parallelListingJobCount":
{
"count": 0
},
"app-20170225151406-0000.driver.HiveExternalCatalog.partitionsFetched": {
"count": 0
}
},
"gauges": {
...
"timers": {
"app-20170225151406-0000.driver.DAGScheduler.messageProcessingTime": {
"count": 0,
"duration_units": "milliseconds",
"m15_rate": 0.0,
"m1_rate": 0.0,
"m5_rate": 0.0,
"max": 0.0,
"mean": 0.0,
"mean_rate": 0.0,
"min": 0.0,
"p50": 0.0,
"p75": 0.0,
"p95": 0.0,
"p98": 0.0,
"p99": 0.0,
"p999": 0.0,
"rate_units": "calls/second",
"stddev": 0.0
}
},
"version": "3.0.0"
}
139
Spark Metrics
Note You have to use the trailing slash ( / ) to have the output.
$ http http://192.168.1.4:8080/metrics/master/json/path
HTTP/1.1 200 OK
Cache-Control: no-cache, no-store, must-revalidate
Content-Length: 207
Content-Type: text/json;charset=UTF-8
Server: Jetty(8.y.z-SNAPSHOT)
X-Frame-Options: SAMEORIGIN
{
"counters": {},
"gauges": {
"master.aliveWorkers": {
"value": 0
},
"master.apps": {
"value": 0
},
"master.waitingApps": {
"value": 0
},
"master.workers": {
"value": 0
}
},
"histograms": {},
"meters": {},
"timers": {},
"version": "3.0.0"
}
140
MetricsSystem
by default).
141
MetricsSystem
142
MetricsSystem
MetricsConfig
metricsConfig Initialized when MetricsSystem is created.
Used when MetricsSystem registers sinks and sources.
143
MetricsSystem
Refer to Logging.
registerSource creates an identifier for the metrics source and registers it with
MetricRegistry.
When registerSource tries to register a name more than once, you should see the following
INFO message in the logs:
144
MetricsSystem
DAGScheduler
BlockManager
145
MetricsSystem
registerSources(): Unit
registerSources finds the configuration of all the metrics sources for the subsystem (as
For every metrics source, registerSources finds class property, creates an instance, and
in the end registers it.
When registerSources fails, you should see the following ERROR message in the logs
followed by the exception.
getServletHandlers: Array[ServletContextHandler]
If the MetricsSystem is running and the MetricsServlet is defined for the metrics system,
getServletHandlers simply requests the MetricsServlet for the JSON servlet handler.
146
MetricsSystem
SparkContext is created
Note
Spark Standalone’s Master and Worker are requested to start (as
onStart )
registerSinks(): Unit
registerSinks requests the MetricsConfig for the configuration of all metrics sinks (i.e.
For every metrics sink configuration, registerSinks takes class property and (if defined)
creates an instance of the metric sink using an constructor that takes the configuration,
MetricRegistry and SecurityManager.
For a single servlet metrics sink, registerSinks converts the sink to a MetricsServlet and
sets the metricsServlet internal registry.
For all other metrics sinks, registerSinks adds the sink to the sinks internal registry.
In case of an Exception , registerSinks prints out the following ERROR message to the
logs:
stop Method
stop(): Unit
stop …FIXME
147
MetricsSystem
getSourcesByName Method
getSourcesByName …FIXME
removeSource Method
removeSource …FIXME
Instance name
SparkConf
SecurityManager
createMetricsSystem(
instance: String
conf: SparkConf
securityMgr: SecurityManager): MetricsSystem
148
MetricsSystem
report(): Unit
start(): Unit
start registers the "static" metrics sources for Spark SQL, i.e. CodegenMetrics and
HiveCatalogMetrics .
start then registers the configured metrics sources and sinks for the Spark instance.
SparkContext is created
149
MetricsSystem
150
MetricsConfig — Metrics System Configuration
configured using spark.metrics.conf configuration property. The file is first loaded from the
path directly before using Spark’s CLASSPATH.
configuration properties.
MetricsConfig makes sure that the default metrics properties are always defined.
*.sink.servlet.path /metrics/json
master.sink.servlet.path /metrics/master/json
applications.sink.servlet.path /metrics/applications/json
151
MetricsConfig — Metrics System Configuration
initialize(): Unit
initialize sets the default properties and loads configuration properties from a
initialize takes all Spark properties that start with spark.metrics.conf. prefix from
In the end, initialize splits configuration per Spark subsystem with the default
configuration (denoted as * ) assigned to all subsystems afterwards.
152
MetricsConfig — Metrics System Configuration
loadPropertiesFromFile tries to open the input path file (if defined) or the default metrics
If either file is available, loadPropertiesFromFile loads the properties (to properties registry).
In case of exceptions, you should see the following ERROR message in the logs followed by
the exception.
subProperties takes prop properties and destructures keys given regex . subProperties
takes the matching prefix (of a key per regex ) and uses it as a new key with the value(s)
being the matching suffix(es).
getInstance Method
getInstance …FIXME
153
Source — Contract of Metrics Sources
package org.apache.spark.metrics.source
trait Source {
def sourceName: String
def metricRegistry: MetricRegistry
}
154
Source — Contract of Metrics Sources
Table 2. Sources
Source Description
ApplicationSource
BlockManagerSource
CacheMetrics
CodegenMetrics
DAGSchedulerSource
ExecutorAllocationManagerSource
ExecutorSource
ExternalShuffleServiceSource
HiveCatalogMetrics
JvmSource
LiveListenerBusMetrics
MasterSource
MesosClusterSchedulerSource
ShuffleMetricsSource
StreamingSource
WorkerSource
155
Sink — Contract of Metrics Sinks
package org.apache.spark.metrics.sink
trait Sink {
def start(): Unit
def stop(): Unit
def report(): Unit
}
Table 2. Sinks
Sink Description
ConsoleSink
CsvSink
GraphiteSink
JmxSink
MetricsServlet
Slf4jSink
StatsdSink
156
Sink — Contract of Metrics Sinks
157
MetricsServlet JSON Metrics Sink
MetricsServlet is a "special" sink as it is only available to the metrics instances with a web
UI:
You can access the metrics from MetricsServlet at /metrics/json URI by default. The entire
URL depends on a metrics instance, e.g. http://localhost:4040/metrics/json/ for a running
Spark application.
158
MetricsServlet JSON Metrics Sink
$ http http://localhost:4040/metrics/json/
HTTP/1.1 200 OK
Cache-Control: no-cache, no-store, must-revalidate
Content-Length: 5005
Content-Type: text/json;charset=utf-8
Date: Mon, 11 Jun 2018 06:29:03 GMT
Server: Jetty(9.3.z-SNAPSHOT)
X-Content-Type-Options: nosniff
X-Frame-Options: SAMEORIGIN
X-XSS-Protection: 1; mode=block
{
"counters": {
"local-1528698499919.driver.HiveExternalCatalog.fileCacheHits": {
"count": 0
},
"local-1528698499919.driver.HiveExternalCatalog.filesDiscovered": {
"count": 0
},
"local-1528698499919.driver.HiveExternalCatalog.hiveClientCalls": {
"count": 0
},
"local-1528698499919.driver.HiveExternalCatalog.parallelListingJobCount": {
"count": 0
},
"local-1528698499919.driver.HiveExternalCatalog.partitionsFetched": {
"count": 0
},
"local-1528698499919.driver.LiveListenerBus.numEventsPosted": {
"count": 7
},
"local-1528698499919.driver.LiveListenerBus.queue.appStatus.numDroppedEvents":
{
"count": 0
},
"local-1528698499919.driver.LiveListenerBus.queue.executorManagement.numDroppe
dEvents": {
"count": 0
}
},
...
MetricsServlet can be configured using configuration properties with sink.servlet prefix (in
metrics configuration). That is not required since MetricsConfig makes sure that
MetricsServlet is always configured.
159
MetricsServlet JSON Metrics Sink
MetricsServlet uses jackson-databind, the general data-binding package for Jackson (as
ObjectMapper) with Dropwizard Metrics library (i.e. registering a Coda Hale MetricsModule ).
sample false
Whether to show entire set of samples for
histograms
SecurityManager
160
MetricsServlet JSON Metrics Sink
getHandlers returns just a single ServletContextHandler (in a collection) that gives metrics
161
Metrics Configuration Properties
162
Status REST API — Monitoring Spark Applications Using REST API
SparkUI - Application UI for an active Spark application (i.e. a Spark application that is
still running)
HistoryServer - Application UI for active and completed Spark applications (i.e. Spark
applications that are still running or have already finished)
Status REST API uses ApiRootResource main resource class that registers /api/v1 URI
path and the subpaths.
Jersey RESTful Web Services framework with support for the Java API for RESTful
Web Services (JAX-RS API)
Eclipse Jetty as the lightweight HTTP server and the Java Servlet container
163
ApiRootResource — /api/v1 URI Handler
ApiRootResource uses @Path("/v1") annotation at the class level. It is a partial URI path
template relative to the base URI of the server on which the resource is deployed, the
context root of the application, and the URL pattern to which the JAX-RS runtime responds.
Learn more about @Path annotation in The @Path Annotation and URI Path
Tip
Templates.
ApiRootResource registers the /api/* context handler (with the REST resources and
With the @Path("/v1") annotation and after registering the /api/* context handler,
ApiRootResource serves HTTP requests for paths under the /api/v1 URI paths for SparkUI
and HistoryServer.
ApiRootResource gives the metrics of a Spark application in JSON format (using JAX-RS
API).
164
ApiRootResource — /api/v1 URI Handler
// start spark-shell
$ http http://localhost:4040/api/v1/applications
HTTP/1.1 200 OK
Content-Encoding: gzip
Content-Length: 257
Content-Type: application/json
Date: Tue, 05 Jun 2018 18:36:16 GMT
Server: Jetty(9.3.z-SNAPSHOT)
Vary: Accept-Encoding, User-Agent
[
{
"attempts": [
{
"appSparkVersion": "2.3.1-SNAPSHOT",
"completed": false,
"duration": 0,
"endTime": "1969-12-31T23:59:59.999GMT",
"endTimeEpoch": -1,
"lastUpdated": "2018-06-05T15:04:48.328GMT",
"lastUpdatedEpoch": 1528211088328,
"sparkUser": "jacek",
"startTime": "2018-06-05T15:04:48.328GMT",
"startTimeEpoch": 1528211088328
}
],
"id": "local-1528211089216",
"name": "Spark shell"
}
]
{
"spark": "2.3.1"
}
165
ApiRootResource — /api/v1 URI Handler
applications
Delegates to the
ApplicationListResource resource class
applications/{appId}
Delegates to the
OneApplicationResource resource class
166
ApplicationListResource — applications URI Handler
ApplicationListResource — applications URI
Handler
ApplicationListResource is a ApiRequestContext that ApiRootResource uses to handle
// start spark-shell
// there should be a single Spark application -- the spark-shell itself
$ http http://localhost:4040/api/v1/applications
HTTP/1.1 200 OK
Content-Encoding: gzip
Content-Length: 255
Content-Type: application/json
Date: Wed, 06 Jun 2018 12:40:33 GMT
Server: Jetty(9.3.z-SNAPSHOT)
Vary: Accept-Encoding, User-Agent
[
{
"attempts": [
{
"appSparkVersion": "2.3.1-SNAPSHOT",
"completed": false,
"duration": 0,
"endTime": "1969-12-31T23:59:59.999GMT",
"endTimeEpoch": -1,
"lastUpdated": "2018-06-06T12:30:19.220GMT",
"lastUpdatedEpoch": 1528288219220,
"sparkUser": "jacek",
"startTime": "2018-06-06T12:30:19.220GMT",
"startTimeEpoch": 1528288219220
}
],
"id": "local-1528288219790",
"name": "Spark shell"
}
]
167
ApplicationListResource — applications URI Handler
isAttemptInRange(
attempt: ApplicationAttemptInfo,
minStartDate: SimpleDateParam,
maxStartDate: SimpleDateParam,
minEndDate: SimpleDateParam,
maxEndDate: SimpleDateParam,
anyRunning: Boolean): Boolean
isAttemptInRange …FIXME
appList Method
appList(
@QueryParam("status") status: JList[ApplicationStatus],
@DefaultValue("2010-01-01") @QueryParam("minDate") minDate: SimpleDateParam,
@DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: SimpleDateParam,
@DefaultValue("2010-01-01") @QueryParam("minEndDate") minEndDate: SimpleDateParam,
@DefaultValue("3000-01-01") @QueryParam("maxEndDate") maxEndDate: SimpleDateParam,
@QueryParam("limit") limit: Integer)
: Iterator[ApplicationInfo]
appList …FIXME
168
OneApplicationResource — applications/appId URI Handler
OneApplicationResource — applications/appId
URI Handler
OneApplicationResource is a AbstractApplicationResource (and so a ApiRequestContext
// start spark-shell
// there should be a single Spark application -- the spark-shell itself
$ http http://localhost:4040/api/v1/applications
HTTP/1.1 200 OK
Content-Encoding: gzip
Content-Length: 255
Content-Type: application/json
Date: Wed, 06 Jun 2018 12:40:33 GMT
Server: Jetty(9.3.z-SNAPSHOT)
Vary: Accept-Encoding, User-Agent
[
{
"attempts": [
{
"appSparkVersion": "2.3.1-SNAPSHOT",
"completed": false,
"duration": 0,
"endTime": "1969-12-31T23:59:59.999GMT",
"endTimeEpoch": -1,
"lastUpdated": "2018-06-06T12:30:19.220GMT",
"lastUpdatedEpoch": 1528288219220,
"sparkUser": "jacek",
"startTime": "2018-06-06T12:30:19.220GMT",
"startTimeEpoch": 1528288219220
}
],
"id": "local-1528288219790",
"name": "Spark shell"
}
]
$ http http://localhost:4040/api/v1/applications/local-1528288219790
HTTP/1.1 200 OK
Content-Encoding: gzip
Content-Length: 255
169
OneApplicationResource — applications/appId URI Handler
Content-Type: application/json
Date: Wed, 06 Jun 2018 12:41:43 GMT
Server: Jetty(9.3.z-SNAPSHOT)
Vary: Accept-Encoding, User-Agent
{
"attempts": [
{
"appSparkVersion": "2.3.1-SNAPSHOT",
"completed": false,
"duration": 0,
"endTime": "1969-12-31T23:59:59.999GMT",
"endTimeEpoch": -1,
"lastUpdated": "2018-06-06T12:30:19.220GMT",
"lastUpdatedEpoch": 1528288219220,
"sparkUser": "jacek",
"startTime": "2018-06-06T12:30:19.220GMT",
"startTimeEpoch": 1528288219220
}
],
"id": "local-1528288219790",
"name": "Spark shell"
}
getApp Method
getApp(): ApplicationInfo
getApp requests the UIRoot for the application info (given the appId).
170
StagesResource
StagesResource
StagesResource is…FIXME
GET stageList
{stageId:
\d+}/{stageAttemptId: GET oneAttemptData
\d+}
{stageId:
\d+}/{stageAttemptId: GET taskSummary
\d+}/taskSummary
{stageId:
\d+}/{stageAttemptId: GET taskList
\d+}/taskList
stageList Method
stageList …FIXME
stageData Method
stageData(
@PathParam("stageId") stageId: Int,
@QueryParam("details") @DefaultValue("true") details: Boolean): Seq[StageData]
stageData …FIXME
oneAttemptData Method
171
StagesResource
oneAttemptData(
@PathParam("stageId") stageId: Int,
@PathParam("stageAttemptId") stageAttemptId: Int,
@QueryParam("details") @DefaultValue("true") details: Boolean): StageData
oneAttemptData …FIXME
taskSummary Method
taskSummary(
@PathParam("stageId") stageId: Int,
@PathParam("stageAttemptId") stageAttemptId: Int,
@DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: St
ring)
: TaskMetricDistributions
taskSummary …FIXME
taskList Method
taskList(
@PathParam("stageId") stageId: Int,
@PathParam("stageAttemptId") stageAttemptId: Int,
@DefaultValue("0") @QueryParam("offset") offset: Int,
@DefaultValue("20") @QueryParam("length") length: Int,
@DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData]
taskList …FIXME
172
OneApplicationAttemptResource
OneApplicationAttemptResource
OneApplicationAttemptResource is a AbstractApplicationResource (and so a
ApiRequestContext indirectly).
applicationAttempt.
// start spark-shell
// there should be a single Spark application -- the spark-shell itself
// CAUTION: FIXME Demo of OneApplicationAttemptResource in Action
getAttempt Method
getAttempt(): ApplicationAttemptInfo
getAttempt requests the UIRoot for the application info (given the appId) and finds the
173
AbstractApplicationResource
AbstractApplicationResource
AbstractApplicationResource is a BaseAppResource with a set of URI paths that are
174
AbstractApplicationResource
// start spark-shell
$ http http://localhost:4040/api/v1/applications
HTTP/1.1 200 OK
Content-Encoding: gzip
Content-Length: 257
Content-Type: application/json
Date: Tue, 05 Jun 2018 18:46:32 GMT
Server: Jetty(9.3.z-SNAPSHOT)
Vary: Accept-Encoding, User-Agent
[
{
"attempts": [
{
"appSparkVersion": "2.3.1-SNAPSHOT",
"completed": false,
"duration": 0,
"endTime": "1969-12-31T23:59:59.999GMT",
"endTimeEpoch": -1,
"lastUpdated": "2018-06-05T15:04:48.328GMT",
"lastUpdatedEpoch": 1528211088328,
"sparkUser": "jacek",
"startTime": "2018-06-05T15:04:48.328GMT",
"startTimeEpoch": 1528211088328
}
],
"id": "local-1528211089216",
"name": "Spark shell"
}
]
$ http http://localhost:4040/api/v1/applications/local-1528211089216/storage/rdd
HTTP/1.1 200 OK
Content-Length: 3
Content-Type: application/json
Date: Tue, 05 Jun 2018 18:48:00 GMT
Server: Jetty(9.3.z-SNAPSHOT)
Vary: Accept-Encoding, User-Agent
[]
$ http http://localhost:4040/api/v1/applications/local-1528211089216/storage/rdd
// output omitted for brevity
175
AbstractApplicationResource
Table 1. AbstractApplicationResources
AbstractApplicationResource Description
OneApplicationAttemptResource
stages stages
storage/rdd/{rddId:
\\d+} GET rddData
rddList Method
rddList(): Seq[RDDStorageInfo]
rddList …FIXME
environmentInfo Method
environmentInfo(): ApplicationEnvironmentInfo
environmentInfo …FIXME
176
AbstractApplicationResource
rddData Method
rddData …FIXME
allExecutorList Method
allExecutorList(): Seq[ExecutorSummary]
allExecutorList …FIXME
executorList Method
executorList(): Seq[ExecutorSummary]
executorList …FIXME
oneJob Method
oneJob …FIXME
jobsList Method
177
AbstractApplicationResource
jobsList …FIXME
178
BaseAppResource
BaseAppResource
BaseAppResource is the contract of ApiRequestContexts that can withUI and use appId and
@PathParam("attemptId")
attemptId
Used when…FIXME
Table 2. BaseAppResources
BaseAppResource Description
AbstractApplicationResource
BaseStreamingAppResource
StagesResource
withUI Method
withUI …FIXME
179
ApiRequestContext
ApiRequestContext
ApiRequestContext is the contract of…FIXME
package org.apache.spark.status.api.v1
trait ApiRequestContext {
// only required methods that have no implementation
// the others follow
@Context
var servletContext: ServletContext = _
@Context
var httpRequest: HttpServletRequest = _
}
Table 2. ApiRequestContexts
ApiRequestContext Description
ApiRootResource
ApiStreamingApp
ApplicationListResource
BaseAppResource
SecurityFilter
180
ApiRequestContext
uiRoot: UIRoot
uiRoot simply requests UIRootFromServletContext to get the current UIRoot (for the given
servletContext).
181
UIRoot — Contract for Root Contrainers of Application UI Information
package org.apache.spark.status.api.v1
trait UIRoot {
// only required methods that have no implementation
// the others follow
def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T
def getApplicationInfoList: Iterator[ApplicationInfo]
def getApplicationInfo(appId: String): Option[ApplicationInfo]
def securityManager: SecurityManager
}
withSparkUI
Used exclusively when BaseAppResource is requested
withUI
Table 2. UIRoots
UIRoot Description
Application UI for active and completed Spark
HistoryServer applications (i.e. Spark applications that are still running
or have already finished)
writeEventLogs Method
182
UIRoot — Contract for Root Contrainers of Application UI Information
writeEventLogs …FIXME
183
UIRootFromServletContext
UIRootFromServletContext
UIRootFromServletContext manages the current UIRoot object in a Jetty ContextHandler .
UIRootFromServletContext uses its canonical name for the context attribute that is used to
setUiRoot Method
setUiRoot …FIXME
getUiRoot Method
getUiRoot …FIXME
184
Spark MLlib — Machine Learning in Spark
Spark MLlib
I’m new to Machine Learning as a discipline and Spark MLlib in particular so
Caution
mistakes in this document are considered a norm (not an exception).
You can find the following types of machine learning algorithms in MLlib:
Classification
Regression
Recommendation
Clustering
Statistics
Linear Algebra
Pipelines
Machine Learning uses large datasets to identify (infer) patterns and make decisions (aka
predictions). Automated decision making is what makes Machine Learning so appealing.
You can teach a system from a dataset and let the system act by itself to predict future.
The amount of data (measured in TB or PB) is what makes Spark MLlib especially important
since a human could not possibly extract much value from the dataset in a short time.
Spark handles data distribution and makes the huge data available by means of RDDs,
DataFrames, and recently Datasets.
185
Spark MLlib — Machine Learning in Spark
Use cases for Machine Learning (and hence Spark MLlib that comes with appropriate
algorithms):
Operational optimizations
Concepts
This section introduces the concepts of Machine Learning and how they are modeled in
Spark MLlib.
Observation
An observation is used to learn about or evaluate (i.e. draw conclusions about) the
observed item’s target value.
Feature
A feature (aka dimension or variable) is an attribute of an observation. It is an independent
variable.
Spark models features as columns in a DataFrame (one per feature or a set of features).
Categorical with discrete values, i.e. the set of possible values is limited, and can range
from one to many thousands. There is no ordering implied, and so the values are
incomparable.
Numerical with quantitative values, i.e. any numerical values that you can compare to
each other. You can further classify them into discrete and continuous features.
Label
A label is a variable that a machine learning system learns to predict that are assigned to
observations.
186
Spark MLlib — Machine Learning in Spark
FP-growth Algorithm
Spark 1.5 have significantly improved on frequent pattern mining capabilities with new
algorithms for association rule generation and sequential pattern mining.
Frequent Itemset Mining using the Parallel FP-growth algorithm (since Spark 1.3)
finds popular routing paths that generate most traffic in a particular region
the algorithm looks for common subsets of items that appear across transactions,
e.g. sub-paths of the network that are frequently traversed.
A naive solution: generate all possible itemsets and count their occurrence
the algorithm finds all frequent itemsets without generating and testing all
candidates
187
Spark MLlib — Machine Learning in Spark
retailer could then use this information, put both toothbrush and floss on sale, but
raise the price of toothpaste to increase overall profit.
FPGrowth model
extract frequent sequential patterns like routing updates, activation failures, and
broadcasting timeouts that could potentially lead to customer complaints and
proactively reach out to customers when it happens.
Power Iteration Clustering (PIC) in MLlib, a simple and scalable graph clustering
method
org.apache.spark.mllib.clustering.PowerIterationClustering
a graph algorithm
takes an undirected graph with similarities defined on edges and outputs clustering
assignment on nodes
The edge properties are cached and remain static during the power iterations.
188
Spark MLlib — Machine Learning in Spark
New MLlib Algorithms in Spark 1.3: FP-Growth and Power Iteration Clustering
(video) GOTO 2015 • A Taste of Random Decision Forests on Apache Spark • Sean
Owen
189
ML Pipelines (spark.ml)
ML Pipelines (spark.ml)
ML Pipeline API (aka Spark ML or spark.ml due to the package the API lives in) lets Spark
users quickly and easily assemble and configure practical distributed Machine Learning
pipelines (aka workflows) by standardizing the APIs for different Machine Learning concepts.
Both scikit-learn and GraphLab have the concept of pipelines built into their
Note
system.
Pipeline
PipelineStage
Transformers
Models
Estimators
Evaluator
190
ML Pipelines (spark.ml)
You may also think of two additional steps before the final model becomes production ready
and hence of any use:
You use a collection of Transformer instances to prepare input DataFrame - the dataset
with proper input data (in columns) for a chosen ML algorithm.
With a Model you can calculate predictions (in prediction column) on features input
column through DataFrame transformation.
Example: In text classification, preprocessing steps like n-gram extraction, and TF-IDF
feature weighting are often necessary before training of a classification model like an SVM.
Upon deploying a model, your system must not only know the SVM weights to apply to input
features, but also transform raw data into the format the model is trained on.
191
ML Pipelines (spark.ml)
Components of ML Pipeline:
Pipelines become objects that can be saved out and applied in real-time to new
data.
You could persist (i.e. save to a persistent storage) or unpersist (i.e. load from a
persistent storage) ML components as described in Persisting Machine Learning
Components.
Parameter tuning
Pipelines
A ML pipeline (or a ML workflow) is a sequence of Transformers and Estimators to fit a
PipelineModel to an input dataset.
import org.apache.spark.ml.Pipeline
192
ML Pipelines (spark.ml)
Pipeline instances).
The Pipeline object can read or load pipelines (refer to Persisting Machine Learning
Components page).
read: MLReader[Pipeline]
load(path: String): Pipeline
You can create a Pipeline with an optional uid identifier. It is of the format
pipeline_[randomUid] when unspecified.
scala> println(pipeline.uid)
pipeline_94be47c3b709
scala> println(pipeline.uid)
my_pipeline
The fit method returns a PipelineModel that holds a collection of Transformer objects
that are results of Estimator.fit method for every Estimator in the Pipeline (with possibly-
modified dataset ) or simply input Transformer objects. The input dataset DataFrame is
193
ML Pipelines (spark.ml)
It then searches for the index of the last Estimator to calculate Transformers for Estimator
and simply return Transformer back up to the index in the pipeline. For each Estimator the
fit method is called with the input dataset . The result DataFrame is passed to the next
transform method is called for every Transformer calculated but the last one (that is the
The method returns a PipelineModel with uid and transformers. The parent Estimator is
the Pipeline itself.
(video) Building, Debugging, and Tuning Spark Machine Learning Pipelines - Joseph
Bradley (Databricks)
(video) Spark MLlib: Making Practical Machine Learning Easy and Scalable
(video) Apache Spark MLlib 2 0 Preview: Data Science and Production by Joseph K.
Bradley (Databricks)
194
Pipeline
195
PipelineStage
PipelineStage has the following direct implementations (of which few are abstract classes,
too):
Estimators
Models
Pipeline
Predictor
Transformer
196
Transformers
Transformers
A transformer is a ML Pipeline component that transforms a DataFrame into another
DataFrame (both called datasets).
Transformers prepare a dataset for an machine learning algorithm to work with. They are
also very helpful to transform DataFrames in general (even outside the machine learning
space).
StopWordsRemover
Binarizer
SQLTransformer
UnaryTransformer
Tokenizer
RegexTokenizer
NGram
HashingTF
OneHotEncoder
Model
197
Transformers
StopWordsRemover
StopWordsRemover is a machine learning feature transformer that takes a string array column
and outputs a string array column with all defined stop words removed. The transformer
comes with a standard set of English stop words as default (that are the same as scikit-learn
uses, i.e. from the Glasgow Information Retrieval Group).
import org.apache.spark.ml.feature.StopWordsRemover
val stopWords = new StopWordsRemover
scala> println(stopWords.explainParams)
caseSensitive: whether to do case-sensitive comparison during filtering (default: false
)
inputCol: input column name (undefined)
outputCol: output column name (default: stopWords_9c2c0fdd8a68__output)
stopWords: stop words (default: [Ljava.lang.String;@5dabe7c8)
null values from the input array are preserved unless adding null to
Note
stopWords explicitly.
198
Transformers
import org.apache.spark.ml.feature.RegexTokenizer
val regexTok = new RegexTokenizer("regexTok")
.setInputCol("text")
.setPattern("\\W+")
import org.apache.spark.ml.feature.StopWordsRemover
val stopWords = new StopWordsRemover("stopWords")
.setInputCol(regexTok.getOutputCol)
scala> stopWords.transform(regexTok.transform(df)).show(false)
+-------------------------------+---+------------------------------------+------------
-----+
|text |id |regexTok__output |stopWords__o
utput|
+-------------------------------+---+------------------------------------+------------
-----+
|please find it done (and empty)|0 |[please, find, it, done, and, empty]|[]
|
|About to be rich! |1 |[about, to, be, rich] |[rich]
|
|empty |2 |[empty] |[]
|
+-------------------------------+---+------------------------------------+------------
-----+
Binarizer
Binarizer is a Transformer that splits the values in the input column into two groups -
"ones" for values larger than the threshold and "zeros" for the others.
It works with DataFrames with the input column of DoubleType or VectorUDT. The type of
the result output column matches the type of the input column, i.e. DoubleType or
VectorUDT .
199
Transformers
import org.apache.spark.ml.feature.Binarizer
val bin = new Binarizer()
.setInputCol("rating")
.setOutputCol("label")
.setThreshold(3.5)
scala> println(bin.explainParams)
inputCol: input column name (current: rating)
outputCol: output column name (default: binarizer_dd9710e2a831__output, current: label
)
threshold: threshold used to binarize continuous features (default: 0.0, current: 3.5)
scala> bin.transform(doubles).show
+---+------+-----+
| id|rating|label|
+---+------+-----+
| 0| 1.0| 0.0|
| 1| 1.0| 0.0|
| 2| 5.0| 1.0|
+---+------+-----+
import org.apache.spark.mllib.linalg.Vectors
val denseVec = Vectors.dense(Array(4.0, 0.4, 3.7, 1.5))
val vectors = Seq((0, denseVec)).toDF("id", "rating")
scala> bin.transform(vectors).show
+---+-----------------+-----------------+
| id| rating| label|
+---+-----------------+-----------------+
| 0|[4.0,0.4,3.7,1.5]|[1.0,0.0,1.0,0.0]|
+---+-----------------+-----------------+
SQLTransformer
SQLTransformer is a Transformer that does transformations by executing SELECT … FROM
THIS with THIS being the underlying temporary table registered for the input dataset.
Internally, THIS is replaced with a random name for a temporary table (using
registerTempTable).
It requires that the SELECT query uses THIS that corresponds to a temporary table and
simply executes the mandatory statement using sql method.
You have to specify the mandatory statement parameter using setStatement method.
200
Transformers
import org.apache.spark.ml.feature.SQLTransformer
val sql = new SQLTransformer()
scala> println(sql.explainParams)
statement: SQL statement (current: SELECT sentence FROM __THIS__ WHERE label = 0)
VectorAssembler
VectorAssembler is a feature transformer that assembles (merges) multiple columns into a
It supports columns of the types NumericType , BooleanType , and VectorUDT . Doubles are
passed on untouched. Other numberic types and booleans are cast to doubles.
201
Transformers
import org.apache.spark.ml.feature.VectorAssembler
val vecAssembler = new VectorAssembler()
scala> print(vecAssembler.explainParams)
inputCols: input column names (undefined)
outputCol: output column name (default: vecAssembler_5ac31099dbee__output)
final case class Record(id: Int, n1: Int, n2: Double, flag: Boolean)
val ds = Seq(Record(0, 4, 2.0, true)).toDS
scala> ds.printSchema
root
|-- id: integer (nullable = false)
|-- n1: integer (nullable = false)
|-- n2: double (nullable = false)
|-- flag: boolean (nullable = false)
scala> features.printSchema
root
|-- id: integer (nullable = false)
|-- n1: integer (nullable = false)
|-- n2: double (nullable = false)
|-- flag: boolean (nullable = false)
|-- features: vector (nullable = true)
scala> features.show
+---+---+---+----+-------------+
| id| n1| n2|flag| features|
+---+---+---+----+-------------+
| 0| 4|2.0|true|[4.0,2.0,1.0]|
+---+---+---+----+-------------+
UnaryTransformers
The UnaryTransformer abstract class is a specialized Transformer that applies
transformation to one input column and writes results to another (by appending a new
column).
Each UnaryTransformer defines the input and output columns using the following "chain"
methods (they return the transformer on which they were executed and so are chainable):
setInputCol(value: String)
202
Transformers
setOutputCol(value: String)
When transform is called, it first calls transformSchema (with DEBUG logging enabled) and
then adds the column as a result of calling a protected abstract createTransformFunc .
Internally, transform method uses Spark SQL’s udf to define a function (based on
createTransformFunc function described above) that will create the new output column (with
appropriate outputDataType ). The UDF is later applied to the input column of the input
DataFrame and the result becomes the output column (using DataFrame.withColumn
method).
Tokenizer that converts a string column to lowercase and then splits it by white spaces.
NGram that converts the input array of strings into an array of n-grams.
HashingTF that maps a sequence of terms to their term frequencies (cf. SPARK-13998
HashingTF should extend UnaryTransformer)
OneHotEncoder that maps a numeric input column of label indices onto a column of
binary vectors.
RegexTokenizer
RegexTokenizer is a UnaryTransformer that tokenizes a String into a collection of String .
203
Transformers
import org.apache.spark.ml.feature.RegexTokenizer
val regexTok = new RegexTokenizer()
scala> tokenized.show(false)
+-----+------------------+-----------------------------+
|label|sentence |regexTok_810b87af9510__output|
+-----+------------------+-----------------------------+
|0 |hello world |[hello, world] |
|1 |two spaces inside|[two, spaces, inside] |
+-----+------------------+-----------------------------+
It supports minTokenLength parameter that is the minimum token length that you can change
using setMinTokenLength method. It simply filters out smaller tokens and defaults to 1 .
scala> rt.setInputCol("line").setMinTokenLength(6).transform(df).show
+-----+--------------------+-----------------------------+
|label| line|regexTok_8c74c5e8b83a__output|
+-----+--------------------+-----------------------------+
| 1| hello world| []|
| 2|yet another sentence| [another, sentence]|
+-----+--------------------+-----------------------------+
It has gaps parameter that indicates whether regex splits on gaps ( true ) or matches
tokens ( false ). You can set it using setGaps . It defaults to true .
When set to true (i.e. splits on gaps) it uses Regex.split while Regex.findAllIn for false .
204
Transformers
scala> rt.setInputCol("line").setGaps(false).transform(df).show
+-----+--------------------+-----------------------------+
|label| line|regexTok_8c74c5e8b83a__output|
+-----+--------------------+-----------------------------+
| 1| hello world| []|
| 2|yet another sentence| [another, sentence]|
+-----+--------------------+-----------------------------+
scala> rt.setInputCol("line").setGaps(false).setPattern("\\W").transform(df).show(false
)
+-----+--------------------+-----------------------------+
|label|line |regexTok_8c74c5e8b83a__output|
+-----+--------------------+-----------------------------+
|1 |hello world |[] |
|2 |yet another sentence|[another, sentence] |
+-----+--------------------+-----------------------------+
It has pattern parameter that is the regex for tokenizing. It uses Scala’s .r method to
convert the string to regex. Use setPattern to set it. It defaults to \\s+ .
It has toLowercase parameter that indicates whether to convert all characters to lowercase
before tokenizing. Use setToLowercase to change it. It defaults to true .
NGram
In this example you use org.apache.spark.ml.feature.NGram that converts the input
collection of strings into a collection of n-grams (of n words).
import org.apache.spark.ml.feature.NGram
+---+--------------+---------------+
| id| tokens|bigrams__output|
+---+--------------+---------------+
| 0|[hello, world]| [hello world]|
+---+--------------+---------------+
HashingTF
Another example of a transformer is org.apache.spark.ml.feature.HashingTF that works on a
Column of ArrayType .
It transforms the rows for the input column into a sparse term frequency vector.
205
Transformers
import org.apache.spark.ml.feature.HashingTF
val hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("features")
.setNumFeatures(5000)
// Use HashingTF
val hashedDF = hashingTF.transform(regexedDF)
scala> hashedDF.show(false)
+---+------------------+---------------------+-----------------------------------+
|id |text |words |features |
+---+------------------+---------------------+-----------------------------------+
|0 |hello world |[hello, world] |(5000,[2322,3802],[1.0,1.0])
|
|1 |two spaces inside|[two, spaces, inside]|(5000,[276,940,2533],[1.0,1.0,1.0])|
+---+------------------+---------------------+-----------------------------------+
The name of the output column is optional, and if not specified, it becomes the identifier of a
HashingTF object with the __output suffix.
scala> hashingTF.uid
res7: String = hashingTF_fe3554836819
scala> hashingTF.transform(regexDF).show(false)
+---+------------------+---------------------+----------------------------------------
---+
|id |text |words |hashingTF_fe3554836819__output
|
+---+------------------+---------------------+----------------------------------------
---+
|0 |hello world |[hello, world] |(262144,[71890,72594],[1.0,1.0])
|
|1 |two spaces inside|[two, spaces, inside]|(262144,[53244,77869,115276],[1.0,1.0,1.0
])|
+---+------------------+---------------------+----------------------------------------
---+
OneHotEncoder
OneHotEncoder is a Tokenizer that maps a numeric input column of label indices onto a
206
Transformers
// dataset to transform
val df = Seq(
(0, "a"), (1, "b"),
(2, "c"), (3, "a"),
(4, "a"), (5, "c"))
.toDF("label", "category")
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer().setInputCol("category").setOutputCol("cat_index").fi
t(df)
val indexed = indexer.transform(df)
import org.apache.spark.sql.types.NumericType
scala> indexed.schema("cat_index").dataType.isInstanceOf[NumericType]
res0: Boolean = true
import org.apache.spark.ml.feature.OneHotEncoder
val oneHot = new OneHotEncoder()
.setInputCol("cat_index")
.setOutputCol("cat_vec")
scala> oneHotted.show(false)
+-----+--------+---------+-------------+
|label|category|cat_index|cat_vec |
+-----+--------+---------+-------------+
|0 |a |0.0 |(2,[0],[1.0])|
|1 |b |2.0 |(2,[],[]) |
|2 |c |1.0 |(2,[1],[1.0])|
|3 |a |0.0 |(2,[0],[1.0])|
|4 |a |0.0 |(2,[0],[1.0])|
|5 |c |1.0 |(2,[1],[1.0])|
+-----+--------+---------+-------------+
scala> oneHotted.printSchema
root
|-- label: integer (nullable = false)
|-- category: string (nullable = true)
|-- cat_index: double (nullable = true)
|-- cat_vec: vector (nullable = true)
scala> oneHotted.schema("cat_vec").dataType.isInstanceOf[VectorUDT]
res1: Boolean = true
Custom UnaryTransformer
The following class is a custom UnaryTransformer that transforms words using upper letters.
207
Transformers
package pl.japila.spark
import org.apache.spark.ml._
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types._
scala> upper.setInputCol("text").transform(df).show
+---+-----+--------------------------+
| id| text|upper_0b559125fd61__output|
+---+-----+--------------------------+
| 0|hello| HELLO|
| 1|world| WORLD|
+---+-----+--------------------------+
208
Transformer
Transformer
Transformer is the contract in Spark MLlib for transformers that transform one dataset into
another.
Caution FIXME
Transformer Contract
package org.apache.spark.ml
209
Tokenizer
Tokenizer
Tokenizer is a unary transformer that converts the column of String values to lowercase
import org.apache.spark.ml.feature.Tokenizer
val tok = new Tokenizer()
// dataset to transform
val df = Seq(
(1, "Hello world!"),
(2, "Here is yet another sentence.")).toDF("id", "sentence")
210
Estimators
That was so machine learning to explain an estimator this way, wasn’t it? It is
Note that the more I spend time with Pipeline API the often I use the terms and
phrases from this space. Sorry.
Technically, an Estimator produces a Model (i.e. a Transformer) for a given DataFrame and
parameters (as ParamMap ). It fits a model to the input DataFrame and ParamMap to produce
a Transformer (a Model ) that can calculate predictions for any DataFrame -based input
datasets.
It is basically a function that maps a DataFrame onto a Model through fit method, i.e. it
takes a DataFrame and produces a Transformer as a Model .
fit(dataset: DataFrame): M
211
Estimator
Estimator
Estimator is the contract in Spark MLlib for estimators that fit models to a dataset.
Estimator accepts parameters that you can set through dedicated setter methods upon
creating an Estimator . You could also fit a model with extra parameters.
import org.apache.spark.ml.classification.LogisticRegression
Estimator Contract
package org.apache.spark.ml
212
Estimator
fit copies the extra paramMap and fits a model (of type M ).
fit is used mainly for model tuning to find the best model (using
Note
CrossValidator and TrainValidationSplit).
213
Estimator
StringIndexer
org.apache.spark.ml.feature.StringIndexer is an Estimator that produces a
StringIndexerModel .
import org.apache.spark.ml.feature.StringIndexer
val strIdx = new StringIndexer()
.setInputCol("label")
.setOutputCol("index")
scala> println(strIdx.explainParams)
handleInvalid: how to handle invalid entries. Options are skip (which will filter out
rows with bad values), or error (which will throw an error). More options may be added
later (default: error)
inputCol: input column name (current: label)
outputCol: output column name (default: strIdx_ded89298e014__output, current: index)
scala> indexed.show
+---+-----+-----+
| id|label|index|
+---+-----+-----+
| 0| a| 3.0|
| 1| b| 5.0|
| 2| c| 7.0|
| 3| d| 9.0|
| 4| e| 0.0|
| 5| f| 2.0|
| 6| g| 6.0|
| 7| h| 8.0|
| 8| i| 4.0|
| 9| j| 1.0|
+---+-----+-----+
214
Estimator
KMeans
KMeans class is an implementation of the K-means clustering algorithm in machine learning
import org.apache.spark.ml.clustering._
val kmeans = new KMeans()
scala> println(kmeans.explainParams)
featuresCol: features column name (default: features)
initMode: initialization algorithm (default: k-means||)
initSteps: number of steps for k-means|| (default: 5)
k: number of clusters to create (default: 2)
maxIter: maximum number of iterations (>= 0) (default: 20)
predictionCol: prediction column name (default: prediction)
seed: random seed (default: -1689246527)
tol: the convergence tolerance for iterative algorithms (default: 1.0E-4)
type IntegerType .
215
Estimator
Internally, fit method "unwraps" the feature vector in featuresCol column in the input
DataFrame and creates an RDD[Vector] . It then hands the call over to the MLlib variant of
Each item (row) in a data set is described by a numeric vector of attributes called features .
A single feature (a dimension of the vector) represents a word (token) with a value that is a
metric that defines the importance of that word or term in the document.
Refer to Logging.
KMeans Example
You can represent a text corpus (document collection) using the vector space model. In this
representation, the vectors have dimension that is the number of different words in the
corpus. It is quite natural to have vectors with a lot of zero values as not all words will be in a
document. We will use an optimized memory representation to avoid zero values using
sparse vectors.
This example shows how to use k-means to classify emails as a spam or not.
// NOTE Don't copy and paste the final case class with the other lines
// It won't work with paste mode in spark-shell
final case class Email(id: Int, text: String)
216
Estimator
.setInputCol("tokens")
.setOutputCol("features")
.setNumFeatures(20)
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans
scala> kmModel.clusterCenters.map(_.toSparse)
res36: Array[org.apache.spark.mllib.linalg.SparseVector] = Array((20,[13],[3.0]), (20,[
0,2,3,6,7,8,10,11,17,19],[1.5,0.5,1.0,0.5,0.5,0.5,1.5,1.0,1.0,1.0]))
scala> .show(false)
+---------+------------+---------------------+----------+
|text |tokens |features |prediction|
+---------+------------+---------------------+----------+
|hello mom|[hello, mom]|(20,[2,19],[1.0,1.0])|1 |
+---------+------------+---------------------+----------+
217
Estimator
218
Estimator
TrainValidationSplit
TrainValidationSplit is…FIXME
219
Predictor
Predictor
Predictor is an Estimator for a PredictionModel with its own abstract train method.
train(dataset: DataFrame): M
The train method is supposed to ease dealing with schema validation and copying
parameters to a trained PredictionModel model. It also sets the parent of the model to itself.
It implements the abstract fit(dataset: DataFrame) of the Estimator abstract class that
validates and transforms the schema of a dataset (using a custom transformSchema of
PipelineStage), and then calls the abstract train method.
220
Predictor
RandomForestRegressor
RandomForestRegressor is a Predictor for Random Forest machine learning algorithm that
trains a RandomForestRegressionModel .
221
Predictor
import org.apache.spark.mllib.linalg.Vectors
val features = Vectors.sparse(10, Seq((2, 0.2), (4, 0.4)))
scala> data.show(false)
+-----+--------------------------+
|label|features |
+-----+--------------------------+
|0.0 |(10,[2,4,6],[0.2,0.4,0.6])|
|1.0 |(10,[2,4,6],[0.2,0.4,0.6])|
|2.0 |(10,[2,4,6],[0.2,0.4,0.6])|
|3.0 |(10,[2,4,6],[0.2,0.4,0.6])|
|4.0 |(10,[2,4,6],[0.2,0.4,0.6])|
+-----+--------------------------+
scala> model.trees.foreach(println)
DecisionTreeRegressionModel (uid=dtr_247e77e2f8e0) of depth 1 with 3 nodes
DecisionTreeRegressionModel (uid=dtr_61f8eacb2b61) of depth 2 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_63fc5bde051c) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_64d4e42de85f) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_693626422894) of depth 3 with 9 nodes
DecisionTreeRegressionModel (uid=dtr_927f8a0bc35e) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_82da39f6e4e1) of depth 3 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_cb94c2e75bd1) of depth 0 with 1 nodes
DecisionTreeRegressionModel (uid=dtr_29e3362adfb2) of depth 1 with 3 nodes
DecisionTreeRegressionModel (uid=dtr_d6d896abcc75) of depth 3 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_aacb22a9143d) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_18d07dadb5b9) of depth 2 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_f0615c28637c) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_4619362d02fc) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_d39502f828f4) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_896f3a4272ad) of depth 3 with 9 nodes
DecisionTreeRegressionModel (uid=dtr_891323c29838) of depth 3 with 7 nodes
DecisionTreeRegressionModel (uid=dtr_d658fe871e99) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_d91227b13d41) of depth 2 with 5 nodes
DecisionTreeRegressionModel (uid=dtr_4a7976921f4b) of depth 2 with 5 nodes
scala> model.treeWeights
res12: Array[Double] = Array(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0
, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
scala> model.featureImportances
res13: org.apache.spark.mllib.linalg.Vector = (1,[0],[1.0])
222
Predictor
223
Regressor
Regressor
Regressor is…FIXME
224
Regressor
LinearRegression
LinearRegression is a Regressor that represents the linear regression algorithm in Machine
Learning.
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression
scala> println(lr.explainParams)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the
penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label)
maxIter: maximum number of iterations (>= 0) (default: 100)
predictionCol: prediction column name (default: prediction)
regParam: regularization parameter (>= 0) (default: 0.0)
solver: the solver algorithm for optimization. If this is not set or empty, default va
lue is 'auto' (default: auto)
standardization: whether to standardize the training features before fitting the model
(default: true)
tol: the convergence tolerance for iterative algorithms (default: 1.0E-6)
weightCol: weight column name. If this is not set or empty, we treat all instance weig
hts as 1.0 (default: )
LinearRegression Example
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val data = (0.0 to 9.0 by 1) // create a collection of Doubles
.map(n => (n, n)) // make it pairs
.map { case (label, features) =>
LabeledPoint(label, Vectors.dense(features)) } // create labeled points of dense v
ectors
.toDF // make it a DataFrame
225
Regressor
scala> data.show
+-----+--------+
|label|features|
+-----+--------+
| 0.0| [0.0]|
| 1.0| [1.0]|
| 2.0| [2.0]|
| 3.0| [3.0]|
| 4.0| [4.0]|
| 5.0| [5.0]|
| 6.0| [6.0]|
| 7.0| [7.0]|
| 8.0| [8.0]|
| 9.0| [9.0]|
+-----+--------+
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression
scala> model.intercept
res1: Double = 0.0
scala> model.coefficients
res2: org.apache.spark.mllib.linalg.Vector = [1.0]
// make predictions
scala> val predictions = model.transform(data)
predictions: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 1 m
ore field]
scala> predictions.show
+-----+--------+----------+
|label|features|prediction|
+-----+--------+----------+
| 0.0| [0.0]| 0.0|
| 1.0| [1.0]| 1.0|
| 2.0| [2.0]| 2.0|
| 3.0| [3.0]| 3.0|
| 4.0| [4.0]| 4.0|
| 5.0| [5.0]| 5.0|
| 6.0| [6.0]| 6.0|
| 7.0| [7.0]| 7.0|
| 8.0| [8.0]| 8.0|
| 9.0| [9.0]| 9.0|
+-----+--------+----------+
import org.apache.spark.ml.evaluation.RegressionEvaluator
226
Regressor
import org.apache.spark.mllib.linalg.DenseVector
// NOTE Follow along to learn spark.ml-way (not RDD-way)
predictions.rdd.map { r =>
(r(0).asInstanceOf[Double], r(1).asInstanceOf[DenseVector](0).toDouble, r(2).asInsta
nceOf[Double]))
.toDF("label", "feature0", "prediction").show
+-----+--------+----------+
|label|feature0|prediction|
+-----+--------+----------+
| 0.0| 0.0| 0.0|
| 1.0| 1.0| 1.0|
| 2.0| 2.0| 2.0|
| 3.0| 3.0| 3.0|
| 4.0| 4.0| 4.0|
| 5.0| 5.0| 5.0|
| 6.0| 6.0| 6.0|
| 7.0| 7.0| 7.0|
| 8.0| 8.0| 8.0|
| 9.0| 9.0| 9.0|
+-----+--------+----------+
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.DenseVector
case class Prediction(label: Double, feature0: Double, prediction: Double)
object Prediction {
def apply(r: Row) = new Prediction(
label = r(0).asInstanceOf[Double],
feature0 = r(1).asInstanceOf[DenseVector](0).toDouble,
prediction = r(2).asInstanceOf[Double])
}
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.DenseVector
defined class Prediction
defined object Prediction
scala> predictions.rdd.map(Prediction.apply).toDF.show
+-----+--------+----------+
|label|feature0|prediction|
+-----+--------+----------+
| 0.0| 0.0| 0.0|
| 1.0| 1.0| 1.0|
227
Regressor
train Method
columns:
It returns LinearRegressionModel .
It first counts the number of elements in features column (usually features ). The column
has to be of mllib.linalg.Vector type (and can easily be prepared using HashingTF
transformer).
import org.apache.spark.ml.feature.RegexTokenizer
val regexTok = new RegexTokenizer()
val spamTokens = regexTok.setInputCol("email").transform(spam)
scala> spamTokens.show(false)
+---+--------------------------------+---------------------------------------+
|id |email |regexTok_646b6bcc4548__output |
+---+--------------------------------+---------------------------------------+
|0 |Hi Jacek. Wanna more SPAM? Best!|[hi, jacek., wanna, more, spam?, best!]|
|1 |This is SPAM. This is SPAM |[this, is, spam., this, is, spam] |
+---+--------------------------------+---------------------------------------+
import org.apache.spark.ml.feature.HashingTF
val hashTF = new HashingTF()
.setInputCol(regexTok.getOutputCol)
.setOutputCol("features")
.setNumFeatures(5000)
228
Regressor
scala> spamLabeled.show
+---+--------------------+-----------------------------+--------------------+-----+
| id| email|regexTok_646b6bcc4548__output| features|label|
+---+--------------------+-----------------------------+--------------------+-----+
| 0|Hi Jacek. Wanna m...| [hi, jacek., wann...|(5000,[2525,2943,...| 1.0|
| 1|This is SPAM. Thi...| [this, is, spam.,...|(5000,[1713,3149,...| 1.0|
+---+--------------------+-----------------------------+--------------------+-----+
scala> training.show
+---+--------------------+-----------------------------+--------------------+-----+
| id| email|regexTok_646b6bcc4548__output| features|label|
+---+--------------------+-----------------------------+--------------------+-----+
| 2|Hi Jacek. I hope ...| [hi, jacek., i, h...|(5000,[72,105,942...| 0.0|
| 3|Welcome to Apache...| [welcome, to, apa...|(5000,[2894,3365,...| 0.0|
| 0|Hi Jacek. Wanna m...| [hi, jacek., wann...|(5000,[2525,2943,...| 1.0|
| 1|This is SPAM. Thi...| [this, is, spam.,...|(5000,[1713,3149,...| 1.0|
+---+--------------------+-----------------------------+--------------------+-----+
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression
229
Regressor
scala> lrModel.transform(emailHashed).select("prediction").show
+-----------------+
| prediction|
+-----------------+
|0.563603440350882|
+-----------------+
230
Classifier
Classifier
Classifier is a Predictor that…FIXME
extractLabeledPoints Method
extractLabeledPoints …FIXME
getNumClasses Method
getNumClasses …FIXME
231
Classifier
RandomForestClassifier
RandomForestClassifier is a probabilistic Classifier for…FIXME
232
Classifier
DecisionTreeClassifier
DecisionTreeClassifier is a probabilistic Classifier for…FIXME
233
Models
ML Pipeline Models
Model abstract class is a Transformer with the optional Estimator that has produced it (as a
An Estimator is optional and is available only after fit (of an Estimator) has
Note
been executed whose result a model is.
There are two direct implementations of the Model class that are not directly related to a
concrete ML algorithm:
PipelineModel
PredictionModel
PipelineModel
Once fit, you can use the result model as any other models to transform datasets (as
DataFrame ).
234
Models
// Transformer #1
import org.apache.spark.ml.feature.Tokenizer
val tok = new Tokenizer().setInputCol("text")
// Transformer #2
import org.apache.spark.ml.feature.HashingTF
val hashingTF = new HashingTF().setInputCol(tok.getOutputCol).setOutputCol("features")
PredictionModel
PredictionModel is an abstract class to represent a model for prediction algorithms like
regression and classification (that have their own specialized models - details coming up
below).
import org.apache.spark.ml.PredictionModel
The contract of PredictionModel class requires that every custom implementation defines
predict method (with FeaturesType type being the type of features ).
RegressionModel
235
Models
ClassificationModel
RandomForestRegressionModel
Internally, transform first ensures that the type of the features column matches the type
of the model and adds the prediction column of type Double to the schema of the result
DataFrame .
It then creates the result DataFrame and adds the prediction column with a predictUDF
function applied to the values of the features column.
FIXME A diagram to show the transformation from a dataframe (on the left)
Caution and another (on the right) with an arrow to represent the transformation
method.
Refer to Logging.
ClassificationModel
ClassificationModel is a PredictionModel that transforms a DataFrame with mandatory
features , label , and rawPrediction (of type Vector) columns to a DataFrame with
ClassificationModel comes with its own transform (as Transformer) and predict (as
PredictionModel).
models)
DecisionTreeClassificationModel ( final )
236
Models
LogisticRegressionModel
NaiveBayesModel
RandomForestClassificationModel ( final )
RegressionModel
RegressionModel is a PredictionModel that transforms a DataFrame with mandatory label ,
It comes with no own methods or values and so is more a marker abstract class (to combine
different features of regression models under one type).
LinearRegressionModel
LinearRegressionModel represents a model produced by a LinearRegression estimator. It
label (required)
features (required)
prediction
regParam
elasticNetParam
maxIter (Int)
tol (Double)
fitIntercept (Boolean)
standardization (Boolean)
weightCol (String)
solver (String)
237
Models
With DEBUG logging enabled (see above) you can see the following messages in the logs
when transform is called and transforms the schema.
The coefficients Vector and intercept Double are the integral part of
Note
LinearRegressionModel as the required input parameters of the constructor.
LinearRegressionModel Example
238
Models
import org.apache.spark.ml.regression.LinearRegression
val lr = new LinearRegression
// Importing LinearRegressionModel and being explicit about the type of model value
// is for learning purposes only
import org.apache.spark.ml.regression.LinearRegressionModel
val model: LinearRegressionModel = lr.fit(ds)
RandomForestRegressionModel
RandomForestRegressionModel is a PredictionModel with features column of type Vector.
KMeansModel
KMeansModel is a Model of KMeans algorithm.
239
Models
// See spark-mllib-estimators.adoc#KMeans
val kmeans: KMeans = ???
val trainingDF: DataFrame = ???
val kmModel = kmeans.fit(trainingDF)
scala> kmModel.transform(inputDF).show(false)
+-----+---------+----------+
|label|features |prediction|
+-----+---------+----------+
|0.0 |[0.2,0.4]|0 |
+-----+---------+----------+
240
Model
Model
Model is the contract for a fitted model, i.e. a Transformer that was produced by an
Estimator.
Model Contract
package org.apache.spark.ml
241
Evaluator — ML Pipeline Component for Model Scoring
ML Pipeline evaluators are transformers that take DataFrames and compute metrics
indicating how good a model is.
Evaluator is used to evaluate models and is usually (if not always) used for best model
Evaluator uses isLargerBetter method to indicate whether the Double metric should be
Table 1. Evaluators
Evaluator Description
BinaryClassificationEvaluator Evaluator of binary classification models
Evaluator Contract
242
Evaluator — ML Pipeline Component for Model Scoring
package org.apache.spark.ml.evaluation
243
BinaryClassificationEvaluator — Evaluator of Binary Classification Models
BinaryClassificationEvaluator — Evaluator of
Binary Classification Models
BinaryClassificationEvaluator is an Evaluator of cross-validate models from binary
metric that is the area under the specified curve (and so isLargerBetter is turned on for either
metric).
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
val binEval = new BinaryClassificationEvaluator().
setMetricName("areaUnderROC").
setRawPredictionCol("rawPrediction").
setLabelCol("label")
scala> binEval.isLargerBetter
res0: Boolean = true
scala> println(binEval.explainParams)
labelCol: label column name (default: label)
metricName: metric name in evaluation (areaUnderROC|areaUnderPR) (default: areaUnderRO
C)
rawPredictionCol: raw prediction (a.k.a. confidence) column name (default: rawPredicti
on)
rawPredictionCol rawPrediction
Column name with raw predictions (a.k.a.
confidence)
labelCol label
Name of the column with indexed labels
(i.e. 0 s or 1 s)
244
BinaryClassificationEvaluator — Evaluator of Binary Classification Models
evaluate …FIXME
245
ClusteringEvaluator — Evaluator of Clustering Models
ClusteringEvaluator — Evaluator of Clustering
Models
ClusteringEvaluator is an Evaluator of clustering models (e.g. FPGrowth ,
NaiveBayes )
ClusteringEvaluator finds the best model by maximizing the model evaluation metric (i.e.
import org.apache.spark.ml.evaluation.ClusteringEvaluator
val cluEval = new ClusteringEvaluator().
setPredictionCol("prediction").
setFeaturesCol("features").
setMetricName("silhouette")
scala> cluEval.isLargerBetter
res0: Boolean = true
scala> println(cluEval.explainParams)
featuresCol: features column name (default: features, current: features)
metricName: metric name in evaluation (silhouette) (default: silhouette, current: silh
ouette)
predictionCol: prediction column name (default: prediction, current: prediction)
featuresCol features
Name of the column with features (of type
VectorUDT )
predictionCol prediction
Name of the column with prediction (of
type NumericType )
246
ClusteringEvaluator — Evaluator of Clustering Models
evaluate …FIXME
247
MulticlassClassificationEvaluator — Evaluator of Multiclass Classification Models
MulticlassClassificationEvaluator — Evaluator
of Multiclass Classification Models
MulticlassClassificationEvaluator is an Evaluator that takes datasets with the following
two columns:
248
RegressionEvaluator — Evaluator of Regression Models
RegressionEvaluator — Evaluator of
Regression Models
RegressionEvaluator is an Evaluator of regression models (e.g. ALS,
GeneralizedLinearRegression).
import org.apache.spark.ml.evaluation.RegressionEvaluator
val regEval = new RegressionEvaluator().
setMetricName("r2").
setPredictionCol("prediction").
setLabelCol("label")
scala> regEval.isLargerBetter
res0: Boolean = true
scala> println(regEval.explainParams)
labelCol: label column name (default: label, current: label)
metricName: metric name in evaluation (mse|rmse|r2|mae) (default: rmse, current: r2)
predictionCol: prediction column name (default: prediction, current: prediction)
249
RegressionEvaluator — Evaluator of Regression Models
250
RegressionEvaluator — Evaluator of Regression Models
import org.apache.spark.ml.feature.HashingTF
val hashTF = new HashingTF()
.setInputCol(tok.getOutputCol) // it reads the output of tok
.setOutputCol("features")
import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline().setStages(Array(tok, hashTF, lr))
// Let's do prediction
// Note that we're using the same dataset as for fitting the model
// Something you'd definitely not be doing in prod
val predictions = model.transform(dataset)
import org.apache.spark.ml.evaluation.RegressionEvaluator
val regEval = new RegressionEvaluator
scala> regEval.evaluate(predictions)
res0: Double = 0.0
evaluate …FIXME
251
RegressionEvaluator — Evaluator of Regression Models
252
CrossValidator — Model Tuning / Finding The Best Model
cross-validation metrics.
CrossValidator takes any Estimator for model selection, including the Pipeline
Note
that is used to transform raw datasets and generate a Model.
Use ParamGridBuilder for the parameter grid, i.e. collection of ParamMaps for
Note
model tuning.
import org.apache.spark.ml.Pipeline
val pipeline: Pipeline = ...
import org.apache.spark.ml.param.ParamMap
val paramGrid: Array[ParamMap] = new ParamGridBuilder().
addGrid(...).
addGrid(...).
build
import org.apache.spark.ml.tuning.CrossValidator
val cv = new CrossValidator().
setEstimator(pipeline).
setEvaluator(...).
setEstimatorParamMaps(paramGrid).
setNumFolds(...).
setParallelism(...)
import org.apache.spark.ml.tuning.CrossValidatorModel
val bestModel: CrossValidatorModel = cv.fit(training)
CrossValidator is a MLWritable.
253
CrossValidator — Model Tuning / Finding The Best Model
Refer to Logging.
Note fit is part of Estimator Contract to fit a model (i.e. produce a model).
254
CrossValidator — Model Tuning / Finding The Best Model
fit creates a Instrumentation and requests it to print out the parameters numFolds,
INFO ...FIXME
fit requests Instrumentation to print out the tuning parameters to the logs.
INFO ...FIXME
fit kFolds the RDD of the dataset per numFolds and seed parameters.
fit computes metrics for every pair of training and validation RDDs.
fit requests the Estimator to fit the best model (for the dataset and the best set of
estimatorParamMap).
In the end, fit creates a CrossValidatorModel (for the ID, the best model and the average
metrics for every kFold) and copies parameters to it.
255
CrossValidator — Model Tuning / Finding The Best Model
Tip You can monitor the storage for persisting the datasets in web UI’s Storage tab.
For every map in estimatorParamMaps parameter fit fits a model using the Estimator.
fit unpersists the training data (per pair of training and validation RDDs)
Note
when all models have been trained.
fit requests the models to transform their respective validation datasets (with the
fit waits until all metrics are available and unpersists the validation dataset.
Unique ID
256
CrossValidator — Model Tuning / Finding The Best Model
257
CrossValidatorModel
CrossValidatorModel
CrossValidatorModel is a Model that is created when CrossValidator is requested to find
Unique ID
Best Model
258
ParamGridBuilder
ParamGridBuilder
ParamGridBuilder is…FIXME
259
CrossValidator with Pipeline Example
import org.apache.spark.ml.classification.RandomForestClassifier
val rfc = new RandomForestClassifier
import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline()
.setStages(Array(tok, hashTF, rfc))
+--------------------------+-----+--------------------------+----------+
|text |label|features |prediction|
+--------------------------+-----+--------------------------+----------+
|[science] hello world |0.0 |(10,[0,8],[2.0,1.0]) |0.0 |
|long text |1.0 |(10,[4,9],[1.0,1.0]) |1.0 |
|[science] hello all people|0.0 |(10,[0,6,8],[1.0,1.0,2.0])|0.0 |
|[science] hello hello |0.0 |(10,[0,8],[1.0,2.0]) |0.0 |
+--------------------------+-----+--------------------------+----------+
260
CrossValidator with Pipeline Example
+-------------+--------------------------------------+----------+
|text |rawPrediction |prediction|
+-------------+--------------------------------------+----------+
|Hello ScienCE|[12.666666666666668,7.333333333333333]|0.0 |
+-------------+--------------------------------------+----------+
import org.apache.spark.ml.tuning.ParamGridBuilder
val paramGrid = new ParamGridBuilder().build
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
val binEval = new BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
val cv = new CrossValidator()
.setEstimator(pipeline) // <-- pipeline is the estimator
.setEvaluator(binEval) // has to match the estimator
.setEstimatorParamMaps(paramGrid)
261
Params and ParamMaps
import org.apache.spark.ml.recommendation.ALS
val als = new ALS().
setMaxIter(5).
setRegParam(0.01).
setUserCol("userId").
setItemCol("movieId").
setRatingCol("rating")
scala> :type als.params
Array[org.apache.spark.ml.param.Param[_]]
scala> println(als.explainParams)
alpha: alpha for implicit preference (default: 1.0)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10
means that the cache will get checkpointed every 10 iterations (default: 10)
coldStartStrategy: strategy for dealing with unknown or new users/items at prediction
time. This may be useful in cross-validation or production scenarios, for handling use
r/item ids the model has not seen in the training data. Supported values: nan,drop. (d
efault: nan)
finalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)
implicitPrefs: whether to use implicit preference (default: false)
intermediateStorageLevel: StorageLevel for intermediate datasets. Cannot be 'NONE'. (d
efault: MEMORY_AND_DISK)
itemCol: column name for item ids. Ids must be within the integer value range. (defaul
t: item, current: movieId)
maxIter: maximum number of iterations (>= 0) (default: 10, current: 5)
nonnegative: whether to use nonnegative constraint for least squares (default: false)
numItemBlocks: number of item blocks (default: 10)
numUserBlocks: number of user blocks (default: 10)
predictionCol: prediction column name (default: prediction)
rank: rank of the factorization (default: 10)
ratingCol: column name for ratings (default: rating, current: rating)
regParam: regularization parameter (>= 0) (default: 0.1, current: 0.01)
seed: random seed (default: 1994790107)
userCol: column name for user ids. Ids must be within the integer value range. (defaul
t: user, current: userId)
262
Params and ParamMaps
import org.apache.spark.ml.tuning.CrossValidator
val cv = new CrossValidator
scala> println(cv.explainParams)
estimator: estimator for selection (undefined)
estimatorParamMaps: param maps for the estimator (undefined)
evaluator: evaluator used to select hyper-parameters that maximize the validated metri
c (undefined)
numFolds: number of folds for cross validation (>= 2) (default: 3)
seed: random seed (default: -1191137437)
Params comes with $ (dollar) method for Spark MLlib developers to access the user-
Params Contract
package org.apache.spark.ml.param
trait Params {
def copy(extra: ParamMap): Params
}
explainParams(): String
corresponding help text with the param name, the description and optionally the default and
the user-defined values if available.
263
Params and ParamMaps
import org.apache.spark.ml.recommendation.ALS
val als = new ALS().
setMaxIter(5).
setRegParam(0.01).
setUserCol("userId").
setItemCol("movieId").
setRatingCol("rating")
scala> println(als.explainParams)
alpha: alpha for implicit preference (default: 1.0)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10
means that the cache will get checkpointed every 10 iterations (default: 10)
coldStartStrategy: strategy for dealing with unknown or new users/items at prediction
time. This may be useful in cross-validation or production scenarios, for handling use
r/item ids the model has not seen in the training data. Supported values: nan,drop. (d
efault: nan)
finalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)
implicitPrefs: whether to use implicit preference (default: false)
intermediateStorageLevel: StorageLevel for intermediate datasets. Cannot be 'NONE'. (d
efault: MEMORY_AND_DISK)
itemCol: column name for item ids. Ids must be within the integer value range. (default
: item, current: movieId)
maxIter: maximum number of iterations (>= 0) (default: 10, current: 5)
nonnegative: whether to use nonnegative constraint for least squares (default: false)
numItemBlocks: number of item blocks (default: 10)
numUserBlocks: number of user blocks (default: 10)
predictionCol: prediction column name (default: prediction)
rank: rank of the factorization (default: 10)
ratingCol: column name for ratings (default: rating, current: rating)
regParam: regularization parameter (>= 0) (default: 0.1, current: 0.01)
seed: random seed (default: 1994790107)
userCol: column name for user ids. Ids must be within the integer value range. (default
: user, current: userId)
copyValues iterates over params collection and sets the default value followed by what may
264
Params and ParamMaps
265
ValidatorParams
ValidatorParams
Table 1. ValidatorParams' Parameters
Parameter Default Value Description
estimator (undefined) Estimator for best model selection
logTuningParams Method
logTuningParams …FIXME
loadImpl Method
loadImpl[M](
path: String,
sc: SparkContext,
expectedClassName: String): (Metadata, Estimator[M], Evaluator, Array[ParamMap])
loadImpl …FIXME
transformSchemaImpl Method
transformSchemaImpl …FIXME
266
ValidatorParams
267
HasParallelism
HasParallelism
HasParallelism is a Scala trait for Spark MLlib components that allow for specifying the
getExecutionContext Method
getExecutionContext: ExecutionContext
getExecutionContext …FIXME
268
ML Persistence — Saving and Loading Models and Pipelines
They allow you to save and load models despite the languages — Scala, Java, Python or R
— they have been saved in and loaded later on.
MLWriter
MLWriter abstract class comes with save(path: String) method to save a ML component
to a given path .
It comes with another (chainable) method overwrite to overwrite the output path if it
already exists.
overwrite(): this.type
The component is saved into a JSON file (see MLWriter Example section below).
Enable INFO logging level for the MLWriter implementation logger to see what
happens inside.
Add the following line to conf/log4j.properties :
Tip
log4j.logger.org.apache.spark.ml.Pipeline$.PipelineWriter=INFO
Refer to Logging.
FIXME The logging doesn’t work and overwriting does not print out INFO
Caution
message to the logs :(
MLWriter Example
import org.apache.spark.ml._
val pipeline = new Pipeline().setStages(Array.empty[PipelineStage])
pipeline.write.overwrite.save("sample-pipeline")
269
ML Persistence — Saving and Loading Models and Pipelines
The result of save for "unfitted" pipeline is a JSON file for metadata (as shown below).
$ cat sample-pipeline/metadata/part-00000 | jq
{
"class": "org.apache.spark.ml.Pipeline",
"timestamp": 1472747720477,
"sparkVersion": "2.1.0-SNAPSHOT",
"uid": "pipeline_181c90b15d65",
"paramMap": {
"stageUids": []
}
}
The result of save for pipeline model is a JSON file for metadata while Parquet for model
data, e.g. coefficients.
270
ML Persistence — Saving and Loading Models and Pipelines
$ cat sample-model/metadata/part-00000 | jq
{
"class": "org.apache.spark.ml.PipelineModel",
"timestamp": 1472748168005,
"sparkVersion": "2.1.0-SNAPSHOT",
"uid": "pipeline_3ed598da1c4b",
"paramMap": {
"stageUids": [
"regexTok_bf73e7c36e22",
"hashingTF_ebece38da130",
"logreg_819864aa7120"
]
}
}
$ tree sample-model/stages/
sample-model/stages/
|-- 0_regexTok_bf73e7c36e22
| `-- metadata
| |-- _SUCCESS
| `-- part-00000
|-- 1_hashingTF_ebece38da130
| `-- metadata
| |-- _SUCCESS
| `-- part-00000
`-- 2_logreg_819864aa7120
|-- data
| |-- _SUCCESS
| `-- part-r-00000-56423674-0208-4768-9d83-2e356ac6a8d2.snappy.parquet
`-- metadata
|-- _SUCCESS
`-- part-00000
7 directories, 8 files
MLReader
MLReader abstract class comes with load(path: String) method to load a ML component
271
ML Persistence — Saving and Loading Models and Pipelines
import org.apache.spark.ml._
val pipeline = Pipeline.read.load("sample-pipeline")
scala> pipelineModel.stages
res1: Array[org.apache.spark.ml.Transformer] = Array(regexTok_bf73e7c36e22, hashingTF_
ebece38da130, logreg_819864aa7120)
272
MLWritable
MLWritable
MLWritable is…FIXME
273
MLReader
MLReader
MLReader is the contract for…FIXME
MLReader Contract
package org.apache.spark.ml.util
274
Example — Text Classification
Example — Text Classification
The example was inspired by the video Building, Debugging, and Tuning Spark
Note
Machine Learning Pipelines - Joseph Bradley (Databricks).
The example uses a case class LabeledText to have the schema described
Note
nicely.
import spark.implicits._
scala> data.show
+-----+-------------+
|label| text|
+-----+-------------+
| 0| hello world|
| 1|witaj swiecie|
+-----+-------------+
It is then tokenized and transformed into another DataFrame with an additional column
called features that is a Vector of numerical values.
Note Paste the code below into Spark Shell using :paste mode.
import spark.implicits._
275
Example — Text Classification
Now, the tokenization part comes that maps the input text of each text document into tokens
(a Seq[String] ) and then into a Vector of numerical values that can only then be
understood by a machine learning algorithm (that operates on Vector instances).
scala> articles.show
+---+------------+--------------------+
| id| topic| text|
+---+------------+--------------------+
| 0| sci.math| Hello, Math!|
| 1|alt.religion| Hello, Religion!|
| 2| sci.physics| Hello, Physics!|
| 3| sci.math|Hello, Math Revised!|
| 4| sci.math| Better Math|
| 5|alt.religion| TGIF|
+---+------------+--------------------+
scala> trainDF.show
+---+------------+--------------------+-----+
| id| topic| text|label|
+---+------------+--------------------+-----+
| 1|alt.religion| Hello, Religion!| 0.0|
| 3| sci.math|Hello, Math Revised!| 1.0|
+---+------------+--------------------+-----+
scala> testDF.show
+---+------------+---------------+-----+
| id| topic| text|label|
+---+------------+---------------+-----+
| 0| sci.math| Hello, Math!| 1.0|
| 2| sci.physics|Hello, Physics!| 1.0|
| 4| sci.math| Better Math| 1.0|
| 5|alt.religion| TGIF| 0.0|
+---+------------+---------------+-----+
The train a model phase uses the logistic regression machine learning algorithm to build a
model and predict label for future input text documents (and hence classify them as
scientific or non-scientific).
276
Example — Text Classification
import org.apache.spark.ml.feature.RegexTokenizer
val tokenizer = new RegexTokenizer()
.setInputCol("text")
.setOutputCol("words")
import org.apache.spark.ml.feature.HashingTF
val hashingTF = new HashingTF()
.setInputCol(tokenizer.getOutputCol) // it does not wire transformers -- it's just
a column name
.setOutputCol("features")
.setNumFeatures(5000)
import org.apache.spark.ml.classification.LogisticRegression
val lr = new LogisticRegression().setMaxIter(20).setRegParam(0.01)
import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))
It uses two columns, namely label and features vector to build a logistic regression
model to make predictions.
277
Example — Text Classification
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
val evaluator = new BinaryClassificationEvaluator().setMetricName("areaUnderROC")
import org.apache.spark.ml.param.ParamMap
val evaluatorParams = ParamMap(evaluator.metricName -> "areaUnderROC")
278
Example — Text Classification
import org.apache.spark.ml.tuning.ParamGridBuilder
val paramGrid = new ParamGridBuilder()
.addGrid(hashingTF.numFeatures, Array(100, 1000))
.addGrid(lr.regParam, Array(0.05, 0.2))
.addGrid(lr.maxIter, Array(5, 10, 15))
.build
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
logreg_cdb8970c1f11-maxIter: 5,
hashingTF_8d7033d05904-numFeatures: 100,
logreg_cdb8970c1f11-regParam: 0.05
}, {
logreg_cdb8970c1f11-maxIter: 5,
hashingTF_8d7033d05904-numFeatures: 1000,
logreg_cdb8970c1f11-regParam: 0.05
}, {
logreg_cdb8970c1f11-maxIter: 10,
hashingTF_8d7033d05904-numFeatures: 100,
logreg_cdb8970c1f11-regParam: 0.05
}, {
logreg_cdb8970c1f11-maxIter: 10,
hashingTF_8d7033d05904-numFeatures: 1000,
logreg_cdb8970c1f11-regParam: 0.05
}, {
logreg_cdb8970c1f11-maxIter: 15,
hashingTF_8d7033d05904-numFeatures: 100,
logreg_cdb8970c1f11-regParam: 0.05
}, {
logreg_cdb8970c1f11-maxIter: 15,
hashingTF_8d7033d05904-numFeatures: 1000,
logreg_cdb8970c1f11-...
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.param._
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEstimatorParamMaps(paramGrid)
.setEvaluator(evaluator)
.setNumFolds(10)
Let’s use the cross-validated model to calculate predictions and evaluate their precision.
279
Example — Text Classification
FIXME Review
Caution
https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/tuning
cvModel.write.overwrite.save("model")
280
Example — Linear Regression
Example — Linear Regression
The DataFrame used for Linear Regression has to have features column of
org.apache.spark.mllib.linalg.VectorUDT type.
Note You can change the name of the column using featuresCol parameter.
scala> println(lr.explainParams)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the
penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label)
maxIter: maximum number of iterations (>= 0) (default: 100)
predictionCol: prediction column name (default: prediction)
regParam: regularization parameter (>= 0) (default: 0.0)
solver: the solver algorithm for optimization. If this is not set or empty, default va
lue is 'auto' (default: auto)
standardization: whether to standardize the training features before fitting the model
(default: true)
tol: the convergence tolerance for iterative algorithms (default: 1.0E-6)
weightCol: weight column name. If this is not set or empty, we treat all instance weig
hts as 1.0 (default: )
281
Example — Linear Regression
import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline("my_pipeline")
import org.apache.spark.ml.regression._
val lr = new LinearRegression
282
Logistic Regression
Logistic Regression
In statistics, logistic regression, or logit regression, or logit model is a regression
model where the dependent variable (DV) is categorical.
283
LogisticRegression
LogisticRegression
LogisticRegression is…FIXME
284
Latent Dirichlet Allocation (LDA)
Topic modeling is a type of model that can be very useful in identifying hidden thematic
structure in documents. Broadly speaking, it aims to find structure within an unstructured
collection of documents. Once the structure is "discovered", you may answer questions like:
Spark MLlib offers out-of-the-box support for Latent Dirichlet Allocation (LDA) which is the
first MLlib algorithm built upon GraphX.
Example
285
Vector
Vector
Vector sealed trait represents a numeric vector of values (of Double type) and their
Note package.
It is not the Vector type in Scala or Java. Train your eyes to see two types of
the same name. You’ve been warned.
There are exactly two available implementations of Vector sealed trait (that also belong to
org.apache.spark.mllib.linalg package):
DenseVector
SparseVector
286
Vector
import org.apache.spark.mllib.linalg.Vectors
// You can create dense vectors explicitly by giving values per index
val denseVec = Vectors.dense(Array(0.0, 0.4, 0.3, 1.5))
val almostAllZeros = Vectors.dense(Array(0.0, 0.4, 0.3, 1.5, 0.0, 0.0, 0.0, 0.0, 0.0,
0.0))
// You can however create a sparse vector by the size and non-zero elements
val sparse = Vectors.sparse(10, Seq((1, 0.4), (2, 0.3), (3, 1.5)))
import org.apache.spark.mllib.linalg._
scala> sv.size
res0: Int = 5
scala> sv.toArray
res1: Array[Double] = Array(1.0, 1.0, 1.0, 1.0, 1.0)
scala> sv == sv.copy
res2: Boolean = true
scala> sv.toJson
res3: String = {"type":0,"size":5,"indices":[0,1,2,3,4],"values":[1.0,1.0,1.0,1.0,1.0]}
287
LabeledPoint
LabeledPoint
Caution FIXME
LabeledPoint is a convenient class for declaring a schema for DataFrames that are used as
288
Streaming MLlib
Streaming MLlib
The following Machine Learning algorithms have their streaming variants in MLlib:
k-means
Linear Regression
Logistic Regression
Note The streaming algorithms belong to spark.mllib (the older RDD-based API).
Streaming k-means
org.apache.spark.mllib.clustering.StreamingKMeans
Sources
Streaming Machine Learning in Spark- Jeremy Freeman (HHMI Janelia Research
Center)
289
GeneralizedLinearRegression
GeneralizedLinearRegression (GLM)
GeneralizedLinearRegression is a regression algorithm. It supports the following error
distribution families:
1. gaussian
2. binomial
3. poisson
4. gamma
1. identity
2. logit
3. log
4. inverse
5. probit
6. cloglog
7. sqrt
import org.apache.spark.ml.regression._
val glm = new GeneralizedLinearRegression()
import org.apache.spark.ml.linalg._
val features = Vectors.sparse(5, Seq((3,1.0)))
val trainDF = Seq((0, features, 1)).toDF("id", "features", "label")
val glmModel = glm.fit(trainDF)
GeneralizedLinearRegressionModel.
290
GeneralizedLinearRegression
GeneralizedLinearRegressionModel
Regressor
Regressor is a custom Predictor.
291
Alternating Least Squares (ALS) Matrix Factorization
Read the original paper Scalable Collaborative Filtering with Jointly Derived
Tip
Neighborhood Interpolation Weights by Robert M. Bell and Yehuda Koren.
Our method is very fast in practice, generating a prediction in about 0.2 milliseconds.
Importantly, it does not require training many parameters or a lengthy preprocessing,
making it very practical for large scale applications. Finally, we show how to apply these
methods to the perceivably much slower user-oriented approach. To this end, we
suggest a novel scheme for low dimensional embedding of the users. We evaluate
these methods on the Netflix dataset, where they deliver significantly better results than
the commercial Netflix Cinematch recommender system.
ALS Example
import spark.implicits._
import org.apache.spark.ml.recommendation.ALS
val als = new ALS().
setMaxIter(5).
setRegParam(0.01).
setUserCol("userId").
setItemCol("movieId").
setRatingCol("rating")
292
Alternating Least Squares (ALS) Matrix Factorization
import org.apache.spark.ml.recommendation.ALS.Rating
// FIXME Use a much richer dataset, i.e. Spark's data/mllib/als/sample_movielens_ratin
gs.txt
// FIXME Load it using spark.read
val ratings = Seq(
Rating(0, 2, 3),
Rating(0, 3, 1),
Rating(0, 5, 2),
Rating(1, 2, 2)).toDF("userId", "movieId", "rating")
val Array(training, testing) = ratings.randomSplit(Array(0.8, 0.2))
import org.apache.spark.ml.recommendation.ALSModel
val model = als.fit(training)
// drop NaNs
model.setColdStartStrategy("drop")
val predictions = model.transform(testing)
import org.apache.spark.ml.evaluation.RegressionEvaluator
val evaluator = new RegressionEvaluator().
setMetricName("rmse"). // root mean squared error
setLabelCol("rating").
setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
System.exit(0)
293
Alternating Least Squares (ALS) Matrix Factorization
294
ALS — Estimator for ALSModel
Supported values:
coldStartStrategy nan
nan - predicted value for
unknown ids will be NaN
drop - rows in the input
DataFrame containing
unknown ids are dropped
from the output DataFrame
(with predictions).
295
ALS — Estimator for ALSModel
predictionCol prediction
The main purpose of the
estimator
Of type FloatType
Regularization parameter
296
ALS — Estimator for ALSModel
computeFactors[ID](
srcFactorBlocks: RDD[(Int, FactorBlock)],
srcOutBlocks: RDD[(Int, OutBlock)],
dstInBlocks: RDD[(Int, InBlock[ID])],
rank: Int,
regParam: Double,
srcEncoder: LocalIndexEncoder,
implicitPrefs: Boolean = false,
alpha: Double = 1.0,
solver: LeastSquaresNESolver): RDD[(Int, FactorBlock)]
computeFactors …FIXME
Internally, fit validates the schema of the dataset (to make sure that the types of the
columns are correct and the prediction column is not available yet).
fit casts the rating column (as defined using ratingCol parameter) to FloatType .
fit selects user, item and rating columns (from the dataset ) and converts it to RDD of
Rating instances.
fit prints out the training parameters as INFO message to the logs:
297
ALS — Estimator for ALSModel
INFO ...FIXME
fit trains a model, i.e. generates a pair of RDDs of user and item factors.
fit converts the RDDs with user and item factors to corresponding DataFrames with id
partitionRatings[ID](
ratings: RDD[Rating[ID]],
srcPart: Partitioner,
dstPart: Partitioner): RDD[((Int, Int), RatingBlock[ID])]
partitionRatings …FIXME
makeBlocks[ID](
prefix: String,
ratingBlocks: RDD[((Int, Int), RatingBlock[ID])],
srcPart: Partitioner,
dstPart: Partitioner,
storageLevel: StorageLevel)(
implicit srcOrd: Ordering[ID]): (RDD[(Int, InBlock[ID])], RDD[(Int, OutBlock)])
makeBlocks …FIXME
298
ALS — Estimator for ALSModel
train Method
train[ID](
ratings: RDD[Rating[ID]],
rank: Int = 10,
numUserBlocks: Int = 10,
numItemBlocks: Int = 10,
maxIter: Int = 10,
regParam: Double = 0.1,
implicitPrefs: Boolean = false,
alpha: Double = 1.0,
nonnegative: Boolean = false,
intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
checkpointInterval: Int = 10,
seed: Long = 0L)(
implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])])
train partition the ratings RDD (using two HashPartitioners with numUserBlocks and
train creates a pair of user in and out block RDDs for blockRatings .
train creates a pair of user in and out block RDDs for the swappedBlockRatings RDD.
Caution FIXME train gets too "heavy", i.e. advanced. Gave up for now. Sorry.
299
ALS — Estimator for ALSModel
requirement failed: ALS is not designed to run without persisting intermediate RDDs.
validateAndTransformSchema …FIXME
300
ALSModel — Model for Predictions
ALSModel is a MLWritable.
301
ALSModel — Model for Predictions
import org.apache.spark.sql._
class MyALS(spark: SparkSession) {
import spark.implicits._
val userFactors = Seq((0, Seq(0.3, 0.2))).toDF("id", "features")
val itemFactors = Seq((0, Seq(0.3, 0.2))).toDF("id", "features")
import org.apache.spark.ml.recommendation._
val alsModel = new ALSModel(uid = "uid", rank = 10, userFactors, itemFactors)
}
// END :pa -raw
import org.apache.spark.sql.types._
val mySchema = new StructType().
add($"user".float).
add($"item".float)
transform left-joins the dataset with userFactors dataset (using userCol column of
302
ALSModel — Model for Predictions
Left join takes two datasets and gives all the rows from the left side (of the join)
combined with the corresponding row from the right side if available or null .
transform left-joins the dataset with itemFactors dataset (using itemCol column of
transform makes predictions using the features columns of userFactors and itemFactors
transform takes (selects) all the columns from the dataset and predictionCol with
predictions.
Ultimately, transform drops rows containing null or NaN values for predictions if
coldStartStrategy is drop .
The default value of coldStartStrategy is nan that does not drop missing
Note
values from predictions column.
transformSchema Method
303
ALSModel — Model for Predictions
Unique ID
Rank
predict: UserDefinedFunction
predict is a user-defined function (UDF) that takes two collections of float numbers and
copy then copies extra parameters to the new ALSModel and sets the parent.
304
ALSModel — Model for Predictions
305
ALSModelReader
ALSModelReader
ALSModelReader is…FIXME
load Method
load …FIXME
306
Instrumentation
Instrumentation
Instrumentation is…FIXME
logParams …FIXME
create …FIXME
307
MLUtils
MLUtils
MLUtils is…FIXME
kFold Method
kFold …FIXME
308
Spark Shell — spark-shell shell script
Under the covers, Spark shell is a standalone Spark application written in Scala that offers
environment with auto-completion (using TAB key) where you can run ad-hoc queries and
get familiar with the features of Spark (that help you in developing your own standalone
Spark applications). It is a very convenient tool to explore the many things available in Spark
with immediate feedback. It is one of the many reasons why Spark is so helpful for tasks to
process datasets of any size.
There are variants of Spark shell for different languages: spark-shell for Scala, pyspark
for Python and sparkR for R.
Note This document (and the book in general) uses spark-shell for Scala only.
$ ./bin/spark-shell
scala>
scala> :imports
1) import spark.implicits._ (59 terms, 38 are implicit)
2) import spark.sql (1 terms)
309
Spark Shell — spark-shell shell script
When you execute spark-shell you actually execute Spark submit as follows:
org.apache.spark.deploy.SparkSubmit --class
org.apache.spark.repl.Main --name Spark shell spark-
Note shell
$ ./bin/spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newL
evel).
WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://10.47.71.138:4040
Spark context available as 'sc' (master = local[*], app id = local-1477858597347).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
Spark shell creates an instance of SparkSession under the name spark for you (so you
don’t have to know the details how to do it yourself on day 1).
310
Spark Shell — spark-shell shell script
scala> :type sc
org.apache.spark.SparkContext
To close Spark shell, you press Ctrl+D or type in :q (or any subset of :quit ).
scala> :q
Settings
Table 1. Spark Properties
Spark Property Default Value Description
Used in spark-shell to create REPL
ClassLoader to load new classes defined
in the Scala REPL as a user types code.
311
Spark Submit — spark-submit shell script
You can submit your Spark application to a Spark deployment environment for execution, kill
or request status of Spark applications.
You can find spark-submit script in bin directory of the Spark distribution.
$ ./bin/spark-submit
Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]
...
When executed, spark-submit script first checks whether SPARK_HOME environment variable
is set and sets it to the directory that contains bin/spark-submit shell script if not. It then
executes spark-class shell script to run SparkSubmit standalone application.
FIXME Add Cluster Manager and Deploy Mode to the table below (see
Caution
options value)
Table 1. Command-Line Options, Spark Properties and Environment Variables (from SparkSubmitArgum
handle)
Command-
Spark Property Environment Variable
Line Option
action Defaults to
--archives
--conf
--deploy-
mode
spark.submit.deployMode DEPLOY_MODE Deploy mode
--driver-
class-path
spark.driver.extraClassPath The driver’s class path
--driver-
java-options
spark.driver.extraJavaOptions The driver’s JVM option
--driver-
library-path
spark.driver.extraLibraryPath The driver’s native libra
--driver-
memory
spark.driver.memory SPARK_DRIVER_MEMORY The driver’s memory
312
Spark Submit — spark-submit shell script
--driver-
spark.driver.cores
cores
--exclude-
spark.jars.excludes
packages
--executor-
cores
spark.executor.cores SPARK_EXECUTOR_CORES The number of executo
--executor-
memory
spark.executor.memory SPARK_EXECUTOR_MEMORY An executor’s memory
--files spark.files
ivyRepoPath spark.jars.ivy
--jars spark.jars
--keytab spark.yarn.keytab
submissionToKill
--kill
to KILL
--class
SPARK_YARN_APP_NAME
Uses mainClass
--name spark.app.name
(YARN only) off primaryResource
ways set it
--num-
executors spark.executor.instances
--packages spark.jars.packages
--principal spark.yarn.principal
--
properties- spark.yarn.principal
file
--proxy-
user
--py-files
--queue
--
repositories
313
Spark Submit — spark-submit shell script
submissionToRequestSta
--status
action set to
--supervise
--total-
executor- spark.cores.max
cores
--verbose
--version SparkSubmit.printVersi
--help printUsageAndExit(0)
--usage-
printUsageAndExit(1)
error
$ SPARK_PRINT_LAUNCH_COMMAND=1 ./bin/spark-shell
Spark Command: /Library/Ja...
Tip
Avoid using scala.App trait for a Spark application’s main class in Scala as
reported in SPARK-4170 Closure problems when running Scala app that "extends
Tip App".
Refer to Executing Main — runMain internal method in this document.
prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], Map[String, String], String)
sysProps, childMainClass) .
314
Spark Submit — spark-submit shell script
Caution FIXME
Tip See the elements of the return tuple using --verbose command-line option.
--properties-file [FILE]
--properties-file command-line option sets the path to a file FILE from which Spark
--driver-cores NUM
--driver-cores command-line option sets the number of cores to NUM for the driver in the
315
Spark Submit — spark-submit shell script
--jars JARS
--jars is a comma-separated list of local jars to include on the driver’s and executors'
classpaths.
Caution FIXME
--files FILES
Caution FIXME
--archives ARCHIVES
Caution FIXME
--queue QUEUE_NAME
With --queue you can choose the YARN resource queue to submit a Spark application to.
The default queue name is default .
316
Spark Submit — spark-submit shell script
Actions
runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit
runMain is an internal method to build execution environment and invoke the main method
When verbose input flag is enabled (i.e. true ) runMain prints out all the input
parameters, i.e. childMainClass , childArgs , sysProps , and childClasspath (in that
order).
317
Spark Submit — spark-submit shell script
Main class:
[childMainClass]
Arguments:
[childArgs one per line]
System properties:
[sysProps one per line]
Classpath elements:
[childClasspath one per line]
Note Use spark-submit 's --verbose command-line option to enable verbose flag.
spark.driver.userClassPathFirst flag.
It adds the jars specified in childClasspath input parameter to the context classloader (that
is later responsible for loading the childMainClass main class).
It sets all the system properties specified in sysProps input parameter (using Java’s
System.setProperty method).
Note childMainClass is the main class spark-submit has been invoked with.
Avoid using scala.App trait for a Spark application’s main class in Scala as
Tip reported in SPARK-4170 Closure problems when running Scala app that "extends
App".
If you use scala.App for the main class, you should see the following warning message in
the logs:
Warning: Subclasses of scala.App may not work correctly. Use a main() method instead.
Finally, runMain executes the main method of the Spark application passing in the
childArgs arguments.
Any SparkUserAppException exceptions lead to System.exit while the others are simply re-
thrown.
318
Spark Submit — spark-submit shell script
addJarToClasspath is an internal method to add file or local jars (as localJar ) to the
loader classloader.
Internally, addJarToClasspath resolves the URI of localJar . If the URI is file or local
and the file denoted by localJar exists, localJar is added to loader . Otherwise, the
following warning is printed out to the logs:
For all other URIs, the following warning is printed out to the logs:
FIXME What is a URI fragment? How does this change re YARN distributed
Caution
cache? See Utils#resolveURI .
Command-line Options
Execute spark-submit --help to know about the command-line options supported.
Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client")
319
Spark Submit — spark-submit shell script
or
on one of the worker machines inside the cluster ("clust
er")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--name NAME A name of your application.
--jars JARS Comma-separated list of local jars to include on the dri
ver
and executor classpaths.
--packages Comma-separated list of maven coordinates of jars to inc
lude
on the driver and executor classpaths. Will search the l
ocal
maven repo, then maven central and any additional remote
repositories given by --repositories. The format for the
coordinates should be groupId:artifactId:version.
--exclude-packages Comma-separated list of groupId:artifactId, to exclude w
hile
resolving the dependencies provided in --packages to avo
id
dependency conflicts.
--repositories Comma-separated list of additional remote repositories t
o
search for the maven coordinates given with --packages.
--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to plac
e
on the PYTHONPATH for Python apps.
--files FILES Comma-separated list of files to be placed in the workin
g
directory of each executor.
--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
--driver-java-options Extra Java options to pass to the driver.
--driver-library-path Extra library path entries to pass to the driver.
--driver-class-path Extra class path entries to pass to the driver. Note tha
t
jars added with --jars are automatically included in the
classpath.
--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
320
Spark Submit — spark-submit shell script
YARN-only:
--driver-cores NUM Number of cores used by the driver, only in cluster mode
(Default: 1).
--queue QUEUE_NAME The YARN queue to submit to (Default: "default").
--num-executors NUM Number of executors to launch (Default: 2).
--archives ARCHIVES Comma separated list of archives to be extracted into th
e
working directory of each executor.
--principal PRINCIPAL Principal to be used to login to KDC, while running on
secure HDFS.
--keytab KEYTAB The full path to the file that contains the keytab for t
he
principal specified above. This keytab will be copied to
the node running the Application Master via the Secure
Distributed Cache, for renewing the login tickets and th
e
delegation tokens periodically.
--class
--conf or -c
--driver-java-options
--driver-library-path
--driver-memory
--executor-memory
--files
321
Spark Submit — spark-submit shell script
--jars
--master
--name
--packages
--exclude-packages
--proxy-user
--py-files
--repositories
--total-executor-cores
--help or -h
--usage-error
YARN-only options:
--archives
--executor-cores
--keytab
--num-executors
--principal
322
Spark Submit — spark-submit shell script
--driver-class-path command-line option sets the extra class path entries (e.g. jars and
SparkSubmitArguments.handle called).
$ ./bin/spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0-SNAPSHOT
/_/
Branch master
Compiled by user jacek on 2016-09-30T07:08:39Z
Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a
Url https://github.com/apache/spark.git
Type --help for more information.
323
Spark Submit — spark-submit shell script
In verbose mode, the parsed arguments are printed out to the System error output.
FIXME
It also prints out propertiesFile and the properties from the file.
FIXME
Environment Variables
The following is the list of environment variables that are considered when command-line
options are not specified:
SPARK_EXECUTOR_CORES
DEPLOY_MODE
SPARK_YARN_APP_NAME
_SPARK_CMD_USAGE
./bin/spark-submit \
--packages my:awesome:package \
--repositories s3n://$aws_ak:$aws_sak@bucket/path/to/repo
324
Spark Submit — spark-submit shell script
When executed, spark-submit script simply passes the call to spark-class with
org.apache.spark.deploy.SparkSubmit class followed by command-line arguments.
It then relays the execution to action-specific internal methods (with the application
arguments):
The action can only have one of the three available values: SUBMIT , KILL , or
Note
REQUEST_STATUS .
export JAVA_HOME=/your/directory/java
export HADOOP_HOME=/usr/lib/hadoop
export SPARK_WORKER_CORES=2
export SPARK_WORKER_MEMORY=1G
325
Spark Submit — spark-submit shell script
326
SparkSubmitArguments
SparkSubmitArguments — spark-submit’s
Command-Line Argument Parser
SparkSubmitArguments is a custom SparkSubmitArgumentsParser to handle the command-line
arguments of spark-submit script that the actions (i.e. submit, kill and status) use for their
execution (possibly with the explicit env environment).
loadEnvironmentArguments(): Unit
loadEnvironmentArguments calculates the Spark properties for the current execution of spark-
submit.
Spark config properties start with spark. prefix and can be set using --conf
Note
[key=value] command-line option.
handle Method
handle parses the input opt argument and returns true or throws an
handle sets the internal properties in the table Command-Line Options, Spark Properties
mergeDefaultSparkProperties(): Unit
327
SparkSubmitArguments
mergeDefaultSparkProperties merges Spark properties from the default Spark properties file,
328
SparkSubmitOptionParser — spark-submit’s Command-Line Parser
SparkSubmitOptionParser — spark-submit’s
Command-Line Parser
SparkSubmitOptionParser is the parser of spark-submit's command-line options.
--driver-cores
--exclude-packages
--executor-cores
--executor-memory
--files
--jars
--keytab
329
SparkSubmitOptionParser — spark-submit’s Command-Line Parser
--name
--num-executors
--packages
--principal
--proxy-user
--py-files
--queue
--repositories
--supervise
--total-executor-cores
--verbose or -v
SparkSubmitOptionParser Callbacks
SparkSubmitOptionParser is supposed to be overriden for the following capabilities (as
callbacks).
330
SparkSubmitOptionParser — spark-submit’s Command-Line Parser
Table 2. Callbacks
Callback Description
handle Executed when an option with an argument is parsed.
handleExtraArgs
Executed for the command-line arguments that handle and
handleUnknown callbacks have not processed.
org.apache.spark.launcher.SparkSubmitArgumentsParser is a custom
Note
SparkSubmitOptionParser .
parse calls handle callback whenever it finds a known command-line option or a switch (a
331
SparkSubmitCommandBuilder Command Builder
1. pyspark-shell-main
2. sparkr-shell-main
3. run-example
1. handle to handle the known options (see the table below). It sets up master ,
arguments.
For spark-shell it assumes that the application arguments are after spark-
Note
submit 's arguments.
SparkSubmitCommandBuilder.buildCommand /
buildSparkSubmitCommand
332
SparkSubmitCommandBuilder Command Builder
buildSparkSubmitCommand builds the first part of the Java command passing in the extra
variables.
addPermGenSizeOpt case…elaborate
buildSparkSubmitArgs method
List<String> buildSparkSubmitArgs()
arguments that spark-submit recognizes (when it is executed later on and uses the very
same SparkSubmitOptionParser parser to parse command-line arguments).
333
SparkSubmitCommandBuilder Command Builder
verbose VERBOSE
getEffectiveConfig internal method builds effectiveConfig that is conf with the Spark
properties file loaded (using loadPropertiesFile internal method) skipping keys that have
already been loaded (it happened when the command-line options were parsed in handle
method).
334
SparkSubmitCommandBuilder Command Builder
isClientMode checks master first (from the command-line options) and then spark.master
Caution FIXME Review master and deployMode . How are they set?
isClientMode responds positive when no explicit master and client deploy mode set
explicitly.
OptionParser
OptionParser is a custom SparkSubmitOptionParser that SparkSubmitCommandBuilder uses
callbacks).
335
SparkSubmitCommandBuilder Command Builder
--deploy-mode deployMode
--properties-file propertiesFile
--driver-java-options
Sets spark.driver.extraJavaOptions (in
conf )
--driver-library-path
Sets spark.driver.extraLibraryPath (in
conf )
--driver-class-path
Sets spark.driver.extraClassPath (in
conf )
--version
Disables isAppResourceReq and adds
itself to sparkArgs .
336
SparkSubmitCommandBuilder Command Builder
Otherwise, handleUnknown sets appResource and stops further parsing of the argument list.
337
spark-class shell script
Note Ultimately, any shell script in Spark, e.g. spark-submit, calls spark-class script.
You can find spark-class script in bin directory of the Spark distribution.
Depending on the Spark distribution (or rather lack thereof), i.e. whether RELEASE file exists
or not, it sets SPARK_JARS_DIR environment variable to [SPARK_HOME]/jars or
[SPARK_HOME]/assembly/target/scala-[SPARK_SCALA_VERSION]/jars , respectively (with the latter
If SPARK_JARS_DIR does not exist, spark-class prints the following error message and exits
with the code 1 .
spark-class sets LAUNCH_CLASSPATH environment variable to include all the jars under
SPARK_JARS_DIR .
the Spark command to launch. The Main class programmatically computes the command
that spark-class executes afterwards.
338
spark-class shell script
Main expects that the first parameter is the class name that is the "operation mode":
$ ./bin/spark-class org.apache.spark.launcher.Main
Exception in thread "main" java.lang.IllegalArgumentException: Not enough arguments: m
issing class name.
at org.apache.spark.launcher.CommandBuilderUtils.checkArgument(CommandBuilderU
tils.java:241)
at org.apache.spark.launcher.Main.main(Main.java:51)
the command.
339
AbstractCommandBuilder
AbstractCommandBuilder
AbstractCommandBuilder is the base command builder for SparkSubmitCommandBuilder
buildJavaCommand
getConfDir
buildJavaCommand builds the Java command for a Spark application (which is a collection of
elements with the path to java executable, JVM options from java-opts file, and a class
path).
buildJavaCommand loads extra Java options from the java-opts file in configuration
directory if the file exists and adds them to the result Java command.
Eventually, buildJavaCommand builds the class path (with the extra class path if non-empty)
and adds it as -cp to the result Java command.
buildClassPath method
340
AbstractCommandBuilder
Directories always end up with the OS-specific file separator at the end of their
Note
paths.
Properties loadPropertiesFile()
from a properties file (when specified on the command line) or spark-defaults.conf in the
configuration directory.
It loads the settings from the following files starting from the first and checking every location
until the first properties file is found:
341
AbstractCommandBuilder
AbstractCommandBuilder.setPropertiesFile ).
2. [SPARK_CONF_DIR]/spark-defaults.conf
3. [SPARK_HOME]/conf/spark-defaults.conf
a Spark application.
application.
Spark home not found; set it explicitly or use the SPARK_HOME environment variable.
342
SparkLauncher — Launching Spark Applications Programmatically
code (not spark-submit directly). It uses a builder pattern to configure a Spark application
and launch it as a child process using spark-submit.
build module.
application to launch.
addAppArgs(String… args)
Adds command line arguments for a
Spark application.
addFile(String file)
Adds a file to be submitted with a Spark
application.
addJar(String jar)
Adds a jar file to be submitted with the
application.
addPyFile(String file)
Adds a python file / zip / egg to be
submitted with a Spark application.
addSparkArg(String arg)
Adds a no-value argument to the Spark
invocation.
directory(File dir)
Sets the working directory of spark-
submit.
redirectError(File errFile)
Redirects error output to the specified
errFile file.
343
SparkLauncher — Launching Spark Applications Programmatically
redirectOutput(File outFile)
Redirects output to the specified outFile
file.
setVerbose(boolean verbose)
Enables verbose reporting for
SparkSubmit.
After the invocation of a Spark application is set up, use launch() method to launch a sub-
process that will start the configured Spark application. It is however recommended to use
startApplication method instead.
344
SparkLauncher — Launching Spark Applications Programmatically
import org.apache.spark.launcher.SparkLauncher
345
Spark Architecture
Spark Architecture
Spark uses a master/worker architecture. There is a driver that talks to a single
coordinator called master that manages workers in which executors run.
346
Spark Architecture
347
Driver
Driver
A Spark driver (aka an application’s driver process) is a JVM process that hosts
SparkContext for a Spark application. It is the master node in a Spark application.
It is the cockpit of jobs and tasks execution (using DAGScheduler and Task Scheduler). It
hosts Web UI for the environment.
A driver is where the task scheduler lives and spawns tasks across workers.
Spark shell is a Spark application and the driver. It creates a SparkContext that
Note
is available as sc .
348
Driver
Driver requires the additional services (beside the common ones like ShuffleManager,
MemoryManager, BlockTransferService, BroadcastManager, CacheManager):
Listener Bus
RPC Environment
HttpFileServer
Launches tasks
Driver’s Memory
It can be set first using spark-submit’s --driver-memory command-line option or
spark.driver.memory and falls back to SPARK_DRIVER_MEMORY if not set earlier.
Note It is printed out to the standard error output in spark-submit’s verbose mode.
Driver’s Cores
It can be set first using spark-submit’s --driver-cores command-line option for cluster
deploy mode.
In client deploy mode the driver’s memory corresponds to the memory of the
Note
JVM process the Spark application runs on.
Note It is printed out to the standard error output in spark-submit’s verbose mode.
349
Driver
Settings
Table 1. Spark Properties
Spark Property Default Value Description
Port to use for the
BlockManager on the driver.
More precisely,
spark.driver.blockManager.port spark.blockManager.port spark.driver.blockManager.port
is used when
NettyBlockTransferService
created (while SparkEnv
created for the driver).
spark.driver.extraLibraryPath
350
Driver
spark.driver.appUIAddress
spark.driver.appUIAddress is
used exclusively in Spark on
YARN. It is set when spark.driver.libraryPath
YarnClientSchedulerBackend
starts to run ExecutorLauncher
(and register ApplicationMaster
for the Spark application).
spark.driver.extraClassPath
spark.driver.extraClassPath system property sets the additional classpath entries (e.g. jars
and directories) that should be added to the driver’s classpath in cluster deploy mode.
For client deploy mode you can use a properties file or command line to set
spark.driver.extraClassPath .
Do not use SparkConf since it is too late for client deploy mode given the
Note JVM has already been set up to start a Spark application.
Refer to buildSparkSubmitCommand Internal Method for the very low-level details
of how it is handled internally.
351
Executor
Executor
Executor is a distributed agent that is responsible for executing tasks.
Executor typically runs for the entire lifetime of a Spark application which is called static
allocation of executors (but you could also opt in for dynamic allocation).
Executors reports heartbeat and partial metrics for active tasks to HeartbeatReceiver RPC
Endpoint on the driver.
When an executor starts it first registers with the driver and communicates directly to
execute tasks.
352
Executor
Executors can run multiple tasks over its lifetime, both in parallel and sequentially. They
track running tasks (by their task ids in runningTasks internal registry). Consult Launching
Tasks section.
Executors use a Executor task launch worker thread pool for launching tasks.
Executors send metrics (and heartbeats) using the internal heartbeater - Heartbeat Sender
Thread.
It is recommended to have as many executors as data nodes and as many cores as you can
get from the cluster.
Executors are described by their id, hostname, environment (as SparkEnv ), and
classpath (and, less importantly, and more for internal optimization, whether they run in
local or cluster mode).
353
Executor
maxDirectResultSize
maxResultSize
Refer to Logging.
updateDependencies …FIXME
createClassLoader Method
Caution FIXME
addReplClassLoaderIfNeeded Method
354
Executor
Caution FIXME
Executor ID
SparkEnv
Collection of user-defined JARs (to add to tasks' class path). Empty by default
Flag that says whether the executor runs in local or cluster mode (default: false , i.e.
cluster mode is preferred)
Note isLocal is enabled exclusively for LocalEndpoint (for Spark in local mode).
When created, you should see the following INFO messages in the logs:
(only for non-local modes) Executor requests the BlockManager to initialize (with the Spark
application id of the SparkConf).
(only for non-local modes) Executor requests the MetricsSystem to register the
ExecutorSource and shuffleMetricsSource of the BlockManager.
Executor creates a task class loader (optionally with REPL support) that the current
355
Executor
Executor initializes the internal registries and counters in the meantime (not necessarily at
launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit
356
Executor
Executors track the TaskRunner that run tasks. A task might not be assigned to
Note
a TaskRunner yet when the executor sends a heartbeat.
A blocking Heartbeat message that holds the executor id, all accumulator updates (per task
id), and BlockManagerId is sent to HeartbeatReceiver RPC endpoint (with
spark.executor.heartbeatInterval timeout).
If the response requests to reregister BlockManager, you should see the following INFO
message in the logs:
357
Executor
If there are any issues with communicating with the driver, you should see the following
WARN message in the logs:
The internal heartbeatFailures is incremented and checked to be less than the acceptable
number of failures (i.e. spark.executor.heartbeat.maxFailures Spark property). If the number
is greater, the following ERROR is printed out to the logs:
ERROR Executor: Exit as unable to send heartbeats to driver more than [HEARTBEAT_MAX_F
AILURES] times
reportHeartBeat(): Unit
reportHeartBeat collects TaskRunners for currently running tasks (aka active tasks) with
their tasks deserialized (i.e. either ready for execution or already started).
reportHeartBeat then records the latest values of internal and external accumulators for
every task.
358
Executor
the driver).
In case of a non-fatal exception, you should see the following WARN message in the logs
(followed by the stack trace).
359
Executor
Coarse-Grained Executors
Coarse-grained executors are executors that use CoarseGrainedExecutorBackend for task
scheduling.
Resource Offers
Read resourceOffers in TaskSchedulerImpl and resourceOffer in TaskSetManager.
launch worker-[ID] (with ID being the task id) for launching tasks.
threadPool is created when Executor is created and shut down when it stops.
You can change the assigned memory per executor per node in standalone cluster using
SPARK_EXECUTOR_MEMORY environment variable.
You can find the value displayed as Memory per Node in web UI for standalone Master (as
depicted in the figure below).
360
Executor
Metrics
Every executor registers its own ExecutorSource to report metrics.
stop(): Unit
361
Executor
Settings
Table 3. Spark Properties
Default
Spark Property Description
Value
spark.executor.cores
Number of cores for an
executor.
362
Executor
spark.executor.id
spark.executor.logs.rolling.maxSize
spark.executor.logs.rolling.maxRetainedFiles
spark.executor.logs.rolling.strategy
spark.executor.logs.rolling.time.interval
Equivalent to
SPARK_EXECUTOR_MEMORY
spark.executor.memory 1g environment variable.
spark.executor.port
spark.executor.port
spark.executor.uri Equivalent to
SPARK_EXECUTOR_URI
spark.task.maxDirectResultSize 1048576B
363
TaskRunner
TaskRunner
TaskRunner is a thread of execution of a single task.
364
TaskRunner
FIXME
taskId
Used when…FIXME
FIXME
threadName
Used when…FIXME
FIXME
taskName
Used when…FIXME
FIXME
finished
Used when…FIXME
FIXME
killed
Used when…FIXME
FIXME
threadId
Used when…FIXME
FIXME
startGCTime
Used when…FIXME
FIXME
task
Used when…FIXME
FIXME
replClassLoader
Used when…FIXME
Refer to Logging.
365
TaskRunner
ExecutorBackend
TaskDescription
computeTotalGcTime Method
Caution FIXME
updateDependencies Method
Caution FIXME
setTaskFinishedAndClearInterruptStatus Method
Caution FIXME
Lifecycle
It is created with an ExecutorBackend (to send the task’s status updates to), task and
attempt ids, task name, and serialized version of the task (as ByteBuffer ).
run(): Unit
When executed, run initializes threadId as the current thread identifier (using Java’s
Thread)
run then sets the name of the current thread as threadName (using Java’s Thread).
366
TaskRunner
Note run uses ExecutorBackend that was specified when TaskRunner was created.
run deserializes the task (using the context class loader) and sets its localProperties and
TaskMemoryManager . run sets the task internal reference to hold the deserialized task.
run records the current time as the task’s start time (as taskStart ).
run runs the task (with taskAttemptId as taskId, attemptNumber from TaskDescription ,
367
TaskRunner
The task runs inside a "monitored" block (i.e. try-finally block) to detect any
Note memory and lock leaks after the task’s run finishes regardless of the final
outcome - the computed value or an exception thrown.
After the task’s run has finished (inside the "finally" block of the "monitored" block), run
requests BlockManager to release all locks of the task (for the task’s taskId). The locks are
later used for lock leak detection.
run then requests TaskMemoryManager to clean up allocated memory (that helps finding
memory leaks).
If run detects memory leak of the managed memory (i.e. the memory freed is greater than
0 ) and spark.unsafe.exceptionOnMemoryLeak Spark property is enabled (it is not by
default) and no exception was reported while the task ran, run reports a SparkException :
ERROR Executor: Managed memory leak detected; size = [freedMemory] bytes, TID = [taskI
d]
If run detects lock leaking (i.e. the number of locks released) and
spark.storage.exceptionOnPinLeak Spark property is enabled (it is not by default) and no
exception was reported while the task ran, run reports a SparkException :
INFO Executor: [releasedLocks] block locks were not released by TID = [taskId]:
[releasedLocks separated by comma]
368
TaskRunner
Rigth after the "monitored" block, run records the current time as the task’s finish time (as
taskFinish ).
If the task was killed (while it was running), run reports a TaskKilledException (and the
TaskRunner exits).
run creates a Serializer and serializes the task’s result. run measures the time to
executorDeserializeTime
executorDeserializeCpuTime
executorRunTime
executorCpuTime
jvmGCTime
resultSerializationTime
run collects the latest values of internal and external accumulators used in the task.
run creates a DirectTaskResult (with the serialized result and the latest values of
accumulators).
run serializes the DirectTaskResult and gets the byte buffer’s limit.
run selects the proper serialized version of the result before sending it to ExecutorBackend .
run branches off based on the serialized DirectTaskResult byte buffer’s limit.
When maxResultSize is greater than 0 and the serialized DirectTaskResult buffer limit
exceeds it, the following WARN message is displayed in the logs:
369
TaskRunner
WARN Executor: Finished [taskName] (TID [taskId]). Result is larger than maxResultSize
([resultSize] > [maxResultSize]), dropping it.
$ ./bin/spark-shell -c spark.driver.maxResultSize=1m
scala> sc.version
res0: String = 2.0.0-SNAPSHOT
scala> sc.getConf.get("spark.driver.maxResultSize")
res1: String = 1m
In this case, run creates a IndirectTaskResult (with a TaskResultBlockId for the task’s
taskId and resultSize ) and serializes it.
INFO Executor: Finished [taskName] (TID [taskId]). [resultSize] bytes result sent via
BlockManager)
In this case, run creates a IndirectTaskResult (with a TaskResultBlockId for the task’s
taskId and resultSize ) and serializes it.
The difference between the two above cases is that the result is dropped or
Note
stored in BlockManager with MEMORY_AND_DISK_SER storage level.
370
TaskRunner
When the two cases above do not hold, you should see the following INFO message in the
logs:
INFO Executor: Finished [taskName] (TID [taskId]). [resultSize] bytes result sent to d
river
run uses the serialized DirectTaskResult byte buffer as the final serializedResult .
run notifies ExecutorBackend that taskId is in TaskState.FINISHED state with the serialized
result and removes taskId from the owning executor’s runningTasks registry.
When run catches a exception while executing the task, run acts according to its type (as
presented in the following "run’s Exception Cases" table and the following sections linked
from the table).
FetchFailedException
When FetchFailedException is reported while running a task, run
setTaskFinishedAndClearInterruptStatus.
ExecutorBackend that the task has failed (with taskId, TaskState.FAILED , and a serialized
reason).
371
TaskRunner
run uses a closure Serializer to serialize the failure reason. The Serializer
Note
was created before run ran the task.
TaskKilledException
When TaskKilledException is reported while running a task, you should see the following
INFO message in the logs:
task has been killed (with taskId, TaskState.KILLED , and a serialized TaskKilled object).
task has been killed (with taskId, TaskState.KILLED , and a serialized TaskKilled object).
CommitDeniedException
When CommitDeniedException is reported while running a task, run
setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend that the task has
failed (with taskId, TaskState.FAILED , and a serialized TaskKilled object).
Throwable
When run catches a Throwable , you should see the following ERROR message in the
logs (followed by the exception).
372
TaskRunner
run then records the following task metrics (only when Task is available):
executorRunTime
jvmGCTime
run then collects the latest values of internal and external accumulators (with taskFailed
error.
The difference between this most Throwable case and other FAILED cases
Note (i.e. FetchFailedException and CommitDeniedException) is just the serialized
ExceptionFailure vs a reason being sent to ExecutorBackend , respectively.
kill marks the TaskRunner as killed and kills the task (if available and not finished
already).
Note kill passes the input interruptThread on to the task itself while killing it.
When executed, you should see the following INFO message in the logs:
373
TaskRunner
killed flag is checked periodically in run to stop executing the task. Once killed,
Note
the task will eventually stop.
Settings
Table 3. Spark Properties
Spark Property Default Value Description
spark.unsafe.exceptionOnMemoryLeak false FIXME
374
ExecutorSource
ExecutorSource
ExecutorSource is a metrics source of an Executor. It uses an executor’s threadPool for
Every executor has its own separate ExecutorSource that is registered when
Note
CoarseGrainedExecutorBackend receives a RegisteredExecutor .
375
ExecutorSource
376
ExecutorSource
377
Master
Master
A master is a running Spark instance that connects to a cluster manager for resources.
378
Workers
Workers
Workers (aka slaves) are running Spark instances where executors live to execute tasks.
They are the compute nodes in Spark.
It hosts a local Block Manager that serves blocks to other workers in a Spark cluster.
Workers communicate among themselves using their Block Manager instances.
Explain task execution in Spark and understand Spark’s underlying execution model.
When you create SparkContext, each worker starts an executor. This is a separate process
(JVM), and it loads your jar, too. The executors connect back to your driver program. Now
the driver can send them commands, like flatMap , map and reduceByKey . When the
driver quits, the executors shut down.
A new process is not started for each step. A new process is started on each worker when
the SparkContext is constructed.
The executor deserializes the command (this is possible because it has loaded your jar),
and executes it on a partition.
1. Create RDD graph, i.e. DAG (directed acyclic graph) of RDDs to represent entire
computation.
2. Create stage graph, i.e. a DAG of stages that is a logical execution plan based on the
RDD graph. Stages are created by breaking the RDD graph at shuffle boundaries.
379
Workers
Based on this graph, two stages are created. The stage creation rule is based on the idea of
pipelining as many narrow transformations as possible. RDD operations with "narrow"
dependencies, like map() and filter() , are pipelined together into one set of tasks in
each stage.
In the end, every stage will only have shuffle dependencies on other stages, and may
compute multiple operations inside it.
In the WordCount example, the narrow transformation finishes at per-word count. Therefore,
you get two stages:
Once stages are defined, Spark will generate tasks from stages. The first stage will create
ShuffleMapTasks with the last stage creating ResultTasks because in the last stage, one
action operation is included to produce results.
The number of tasks to be generated depends on how your files are distributed. Suppose
that you have 3 three different files in three different nodes, the first stage will generate 3
tasks: one task per partition.
Therefore, you should not map your steps to tasks directly. A task belongs to a stage, and is
related to a partition.
The number of tasks being generated in each stage will be equal to the number of partitions.
Cleanup
Caution FIXME
Settings
spark.worker.cleanup.enabled (default: false ) Cleanup enabled.
380
Anatomy of Spark Application
A Spark application is uniquely identified by a pair of the application and application attempt
ids.
For it to work, you have to create a Spark configuration using SparkConf or use a custom
SparkContext constructor.
package pl.japila.spark
object SparkMeApp {
def main(args: Array[String]) {
381
Anatomy of Spark Application
Tip Spark shell creates a Spark context and SQL context for you at startup.
You can then create RDDs, transform them to other RDDs and ultimately execute actions.
You can also cache interim RDDs to speed up data processing.
After all the data processing is completed, the Spark application finishes by stopping the
Spark context.
382
SparkConf — Programmable Configuration for Spark Applications
TODO
Describe SparkConf object for the application configuration.
Caution
the default configs
system properties
…
setIfMissing Method
Caution FIXME
isExecutorStartupConf Method
Caution FIXME
set Method
Caution FIXME
Spark Properties
Every user program starts with creating an instance of SparkConf that holds the master
URL to connect to ( spark.master ), the name for your Spark application (that is later
displayed in web UI and becomes spark.app.name ) and other Spark properties required for
383
SparkConf — Programmable Configuration for Spark Applications
Start Spark shell with --conf spark.logConf=true to log the effective Spark
configuration as INFO when SparkContext is started.
You can query for the values of Spark properties in Spark shell as follows:
scala> sc.getConf.getOption("spark.local.dir")
res0: Option[String] = None
scala> sc.getConf.getOption("spark.app.name")
res1: Option[String] = Some(Spark shell)
scala> sc.getConf.get("spark.master")
res2: String = local[*]
Read spark-defaults.conf.
--conf or -c - the command-line option used by spark-submit (and other shell scripts
SparkConf
Default Configuration
The default Spark configuration is created when you execute the following code:
384
SparkConf — Programmable Configuration for Spark Applications
import org.apache.spark.SparkConf
val conf = new SparkConf
You can use conf.toDebugString or conf.getAll to have the spark.* system properties
loaded printed out.
scala> conf.getAll
res0: Array[(String, String)] = Array((spark.app.name,Spark shell), (spark.jars,""), (
spark.master,local[*]), (spark.submit.deployMode,client))
scala> conf.toDebugString
res1: String =
spark.app.name=Spark shell
spark.jars=
spark.master=local[*]
spark.submit.deployMode=client
scala> println(conf.toDebugString)
spark.app.name=Spark shell
spark.jars=
spark.master=local[*]
spark.submit.deployMode=client
getAppId: String
Settings
385
SparkConf — Programmable Configuration for Spark Applications
386
Spark Properties and spark-defaults.conf Properties File
387
Spark Properties and spark-defaults.conf Properties File
388
Deploy Mode
Deploy Mode
Deploy mode specifies the location of where driver executes in the deployment
environment.
client (default) - the driver runs on the machine that the Spark application was
launched.
Note cluster deploy mode is only available for non-local cluster deployments.
You can control the deploy mode of a Spark application using spark-submit’s --deploy-mode
command-line option or spark.submit.deployMode Spark property.
Caution FIXME
spark.submit.deployMode
spark.submit.deployMode (default: client ) can be client or cluster .
389
SparkContext
Note You could also assume that a SparkContext instance is a Spark application.
Spark context sets up internal services and establishes a connection to a Spark execution
environment.
Once a SparkContext is created you can use it to create RDDs, accumulators and
broadcast variables, access Spark services and run jobs (until SparkContext is stopped).
A Spark context is essentially a client of Spark’s execution environment and acts as the
master of your Spark application (don’t get confused with the other meaning of Master in
Spark, though).
SparkEnv
SparkConf
390
SparkContext
application name
deploy mode
default level of parallelism that specifies the number of partitions in RDDs when
they are created without specifying the number explicitly by a user.
Spark user
URL of web UI
Spark version
Storage status
Setting Configuration
master URL
RDDs
Accumulators
Broadcast variables
Cancelling a job
Cancelling a stage
Closure cleaning
391
SparkContext
Registering SparkListener
persistRDD
persistentRdds
getRDDStorageInfo
getPersistentRDDs
unpersistRDD
Refer to Logging.
addFile Method
392
SparkContext
unpersistRDD requests BlockManagerMaster to remove the blocks for the RDD (given
rddId ).
Caution FIXME
Caution FIXME
postApplicationEnd Method
Caution FIXME
393
SparkContext
clearActiveContext Method
Caution FIXME
cancelJob(jobId: Int)
reason ).
executors:
requestExecutors
killExecutors
requestTotalExecutors
394
SparkContext
(private!) getExecutorIds
CoarseGrainedSchedulerBackend.
Caution FIXME
requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]): Boolean
When called for other scheduler backends you should see the following WARN message in
the logs:
contract. It simply passes the call on to the current coarse-grained scheduler backend, i.e.
calls getExecutorIds .
395
SparkContext
When called for other scheduler backends you should see the following WARN message in
the logs:
You may want to read Inside Creating SparkContext to learn what happens
Note
behind the scenes when SparkContext is created.
getOrCreate(): SparkContext
getOrCreate(conf: SparkConf): SparkContext
getOrCreate methods allow you to get the existing SparkContext or create a new one.
import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate()
The no-param getOrCreate method requires that the two mandatory Spark settings - master
and application name - are specified using spark-submit.
Constructors
396
SparkContext
SparkContext()
SparkContext(conf: SparkConf)
SparkContext(master: String, appName: String, conf: SparkConf)
SparkContext(
master: String,
appName: String,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map())
import org.apache.spark.SparkConf
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("SparkMe App")
import org.apache.spark.SparkContext
val sc = new SparkContext(conf)
When a Spark context starts up you should see the following INFO in the logs (amongst the
other messages that come from the Spark services):
Only one SparkContext may be running in a single JVM (check out SPARK-
2243 Support multiple SparkContexts in the same JVM). Sharing access to a
Note
SparkContext in the JVM is the solution to share data within Spark (without
relying on other means of data sharing using external data stores).
Caution FIXME
getConf: SparkConf
Changing the SparkConf object does not change the current configuration (as
Note
the method returns a copy).
397
SparkContext
master: String
master method returns the current value of spark.master which is the deployment
environment in use.
appName: String
applicationAttemptId: Option[String]
application.
getExecutorStorageStatus: Array[StorageStatus]
BlockManagers).
398
SparkContext
deployMode: String
set.
getSchedulingMode: SchedulingMode.SchedulingMode
Note getPoolForName is part of the Developer’s API and may change in the future.
Internally, it requests the TaskScheduler for the root pool and looks up the Schedulable by
the pool name.
getAllPools: Seq[Schedulable]
399
SparkContext
getAllPools is used to calculate pool names for Stages tab in web UI with
Note
FAIR scheduling mode used.
defaultParallelism: Int
taskScheduler: TaskScheduler
taskScheduler_=(ts: TaskScheduler): Unit
version: String
makeRDD Method
Caution FIXME
400
SparkContext
submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit,
resultFunc: => R): SimpleFutureAction[R]
It is used in:
AsyncRDDActions methods
Spark Configuration
Caution FIXME
When an RDD is created, it belongs to and is completely owned by the Spark context it
originated from. RDDs can’t by design be shared between SparkContexts.
401
SparkContext
Caution FIXME
unpersist removes an RDD from the master’s Block Manager (calls removeRdd(rddId: Int,
402
SparkContext
setCheckpointDir(directory: String)
Caution FIXME
register registers the acc accumulator. You can optionally give an accumulator a name .
You can create built-in accumulators for longs, doubles, and collection types
Tip
using specialized methods.
longAccumulator: LongAccumulator
longAccumulator(name: String): LongAccumulator
doubleAccumulator: DoubleAccumulator
doubleAccumulator(name: String): DoubleAccumulator
collectionAccumulator[T]: CollectionAccumulator[T]
collectionAccumulator[T](name: String): CollectionAccumulator[T]
java.util.List[T] .
403
SparkContext
scala> counter.value
res0: Long = 0
scala> counter.value
res3: Long = 45
The name input parameter allows you to give a name to an accumulator and have it
displayed in Spark UI (under Stages tab for a given stage).
broadcast method creates a broadcast variable. It is a shared memory with value (as
404
SparkContext
Spark transfers the value to Spark executors once, and tasks can share it without incurring
repetitive network transmissions when the broadcast variable is used multiple times.
405
SparkContext
Once created, the broadcast variable (and other blocks) are displayed per executor and the
driver in web UI (under Executors tab).
scala> sc.addJar("build.sbt")
15/11/11 21:54:54 INFO SparkContext: Added JAR build.sbt at http://192.168.1.4:49427/j
ars/build.sbt with timestamp 1447275294457
406
SparkContext
shuffle ids using nextShuffleId internal counter for registering shuffle dependencies to
Shuffle Service.
runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit
runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U]
runJob[T, U](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U]
runJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U]
runJob[T, U](rdd: RDD[T], func: Iterator[T] => U): Array[U]
runJob[T, U](
rdd: RDD[T],
processPartition: (TaskContext, Iterator[T]) => U,
resultHandler: (Int, U) => Unit)
runJob[T, U: ClassTag](
rdd: RDD[T],
processPartition: Iterator[T] => U,
resultHandler: (Int, U) => Unit)
runJob executes a function on one or many partitions of a RDD (in a SparkContext space)
Internally, runJob first makes sure that the SparkContext is not stopped. If it is, you should
see the following IllegalStateException exception in the logs:
runJob then calculates the call site and cleans a func closure.
407
SparkContext
With spark.logLineage enabled (which is not by default), you should see the following INFO
message with toDebugString (executed on rdd ):
Tip runJob just prepares input parameters for DAGScheduler to run a job.
After DAGScheduler is done and the job has finished, runJob stops ConsoleProgressBar
and performs RDD checkpointing of rdd .
For some actions, e.g. first() and lookup() , there is no need to compute all
Tip
the partitions of the RDD in a job. And Spark knows it.
import org.apache.spark.TaskContext
scala> sc.runJob(lines, (t: TaskContext, i: Iterator[String]) => 1) (1)
res0: Array[Int] = Array(1, 1) (2)
1. Run a job using runJob on lines RDD with a function that returns 1 for every partition
(of lines RDD).
2. What can you say about the number of partitions of the lines RDD? Is your result
res0 different than mine? Why?
partition).
408
SparkContext
stop(): Unit
Internally, stop enables stopped internal flag. If already stopped, you should see the
following INFO message in the logs:
3. Stops web UI
5. Stops ContextCleaner
409
SparkContext
Ultimately, you should see the following INFO message in the logs:
Note You can also register custom listeners using spark.extraListeners setting.
Events
410
SparkContext
setLogLevel(logLevel: String)
setLogLevel allows you to set the root logging level in a Spark application, e.g. Spark shell.
Every time an action is called, Spark cleans up the closure, i.e. the body of the action, before
it is serialized and sent over the wire to executors.
Not only does ClosureCleaner.clean method clean the closure, but also does it transitively,
i.e. referenced closures are cleaned transitively.
411
SparkContext
Refer to Logging.
With DEBUG logging level you should see the following messages in the logs:
Serialization is verified using a new instance of Serializer (as closure Serializer). Refer to
Serialization.
Hadoop Configuration
While a SparkContext is being created, so is a Hadoop configuration (as an instance of
org.apache.hadoop.conf.Configuration that is available as _hadoopConfiguration ).
of AWS_ACCESS_KEY_ID
Every spark.hadoop. setting becomes a setting of the configuration with the prefix
spark.hadoop. removed for the key.
412
SparkContext
startTime: Long
scala> sc.startTime
res0: Long = 1464425605653
sparkUser: String
submitMapStage[K, V, C](
dependency: ShuffleDependency[K, V, C]): SimpleFutureAction[MapOutputStatistics]
returns a SimpleFutureAction .
413
SparkContext
Internally, submitMapStage calculates the call site first and submits it with localProperties .
Caution FIXME
cancelJobGroup(groupId: String)
Caution FIXME
setJobGroup(
groupId: String,
description: String,
interruptOnCancel: Boolean = false): Unit
spark.jobGroup.id as groupId
spark.job.description as description
414
SparkContext
spark.job.interruptOnCancel as interruptOnCancel
cleaner Method
cleaner: Option[ContextCleaner]
getPreferredLocs simply requests DAGScheduler for the preferred locations for partition .
415
SparkContext
getRDDStorageInfo takes all the RDDs (from persistentRdds registry) that match filter
getRDDStorageInfo then updates the RDDInfos with the current status of all BlockManagers
In the end, getRDDStorageInfo gives only the RDD that are cached (i.e. the sum of memory
and disk sizes as well as the number of partitions cached are greater than 0 ).
Note getRDDStorageInfo is used when RDD is requested for RDD lineage graph.
Settings
spark.driver.allowMultipleContexts
Quoting the scaladoc of org.apache.spark.SparkContext:
Only one SparkContext may be active per JVM. You must stop() the active
SparkContext before creating a new one.
If enabled (i.e. true ), Spark prints the following WARN message to the logs:
Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this erro
r, set spark.driver.allowMultipleContexts = true. The currently running SparkContext w
as created at:
[ctx.creationSite.longForm]
416
SparkContext
When creating an instance of SparkContext , Spark marks the current thread as having it
being created (very early in the instantiation process).
It’s not guaranteed that Spark will work properly with two or more
Caution
SparkContexts. Consider the feature a work in progress.
statusStore: AppStatusStore
uiWebUrl: Option[String]
Environment Variables
Table 3. Environment Variables
Environment Variable Default Value Description
417
HeartbeatReceiver RPC Endpoint
HeartbeatReceiver.
HeartbeatReceiver receives Heartbeat messages from executors that Spark uses as the
mechanism to receive accumulator updates (with task metrics and a Spark application’s
accumulators) and pass them along to TaskScheduler .
418
HeartbeatReceiver RPC Endpoint
ExpireDeadHosts FIXME
executorLastSeen
Executor ids and the timestamps of when the last
heartbeat was received.
scheduler TaskScheduler
Refer to Logging.
SparkContext
Clock
419
HeartbeatReceiver RPC Endpoint
ExecutorRegistered
ExecutorRegistered(executorId: String)
When received, HeartbeatReceiver registers the executorId executor and the current time
(in executorLastSeen internal registry).
Note HeartbeatReceiver uses the internal Clock to know the current time.
ExecutorRemoved
ExecutorRemoved(executorId: String)
ExpireDeadHosts
ExpireDeadHosts
When ExpireDeadHosts arrives the following TRACE is printed out to the logs:
Each executor (in executorLastSeen registry) is checked whether the time it was last seen is
not longer than spark.network.timeout.
For any such executor, the following WARN message is printed out to the logs:
420
HeartbeatReceiver RPC Endpoint
killExecutorThread).
Heartbeat
Heartbeat(executorId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId)
When the executor is found, HeartbeatReceiver updates the time the heartbeat was
received (in executorLastSeen).
Note HeartbeatReceiver uses the internal Clock to know the current time.
heartbeat was received from the executor (using TaskScheduler internal reference).
HeartbeatReceiver posts a HeartbeatResponse back to the executor (with the response from
TaskScheduler whether the executor has been registered already or not so it may eventually
need to re-register).
If however the executor was not found (in executorLastSeen registry), i.e. the executor was
not registered before, you should see the following DEBUG message in the logs and the
response is to notify the executor to re-register.
In a very rare case, when TaskScheduler is not yet assigned to HeartbeatReceiver , you
should see the following WARN message in the logs and the response is to notify the
executor to re-register.
421
HeartbeatReceiver RPC Endpoint
TaskSchedulerIsSet
TaskSchedulerIsSet
onExecutorAdded Method
registers an executor).
onExecutorRemoved Method
422
HeartbeatReceiver RPC Endpoint
executor).
When called, HeartbeatReceiver cancels the checking task (that sends a blocking
ExpireDeadHosts every spark.network.timeoutInterval on eventLoopThread - Heartbeat
Receiver Event Loop Thread - see Starting (onStart method)) and shuts down
eventLoopThread and killExecutorThread executors.
423
HeartbeatReceiver RPC Endpoint
expireDeadHosts(): Unit
Caution FIXME
Settings
Table 3. Spark Properties
Spark Property Default Value
spark.storage.blockManagerTimeoutIntervalMs 60s
spark.storage.blockManagerSlaveTimeoutMs 120s
spark.network.timeout spark.storage.blockManagerSlaveTimeoutMs
spark.network.timeoutInterval spark.storage.blockManagerTimeoutIntervalMs
424
Inside Creating SparkContext
The example uses Spark in local mode, but the initialization with the other
Note
cluster modes would follow similar steps.
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
Note
// the SparkContext code goes here
SparkContext.setActiveContext(this, allowMultipleContexts)
The very first information printed out is the version of Spark as an INFO message:
You can use version method to learn about the current Spark version or
Tip
org.apache.spark.SPARK_VERSION value.
425
Inside Creating SparkContext
Detected yarn cluster mode, but isn't running on a cluster. Deployment to YARN is not
supported directly by SparkContext. Please use spark-submit.
Caution FIXME How to "trigger" the exception? What are the steps?
The driver’s host and port are set if missing. spark.driver.host becomes the value of
Utils.localHostName (or an exception is thrown) while spark.driver.port is set to 0 .
426
Inside Creating SparkContext
It sets the jars and files based on spark.jars and spark.files , respectively. These are
files that are required for proper task execution on executors.
If event logging is enabled, i.e. spark.eventLog.enabled flag is true , the internal field
_eventLogDir is set to the value of spark.eventLog.dir setting or the default value
/tmp/spark-events .
Also, if spark.eventLog.compress is enabled (it is not by default), the short name of the
CompressionCodec is assigned to _eventLogCodec . The config key is
spark.io.compression.codec (default: lz4 ).
Creating LiveListenerBus
SparkContext creates a LiveListenerBus.
live Spark application) and requests LiveListenerBus to add the AppStatusListener to the
status queue.
Creating SparkEnv
SparkContext creates a SparkEnv and requests SparkEnv to use the instance as the
default SparkEnv.
MetadataCleaner is created.
Creating SparkStatusTracker
SparkContext creates a SparkStatusTracker (with itself and the AppStatusStore).
Creating ConsoleProgressBar
427
Inside Creating SparkContext
Creating SparkUI
SparkContext creates a SparkUI when spark.ui.enabled configuration property is enabled
AppStatusStore
Name of the Spark application that is exactly the value of spark.app.name configuration
property
If there are jars given through the SparkContext constructor, they are added using addJar .
At this point in time, the amount of memory to allocate to each executor (as
_executorMemory ) is calculated. It is the value of spark.executor.memory setting, or
CoarseMesosSchedulerBackend.
FIXME
What’s _executorMemory ?
What’s the unit of the value of _executorMemory exactly?
Caution
What are "SPARK_TESTING", "spark.testing"? How do they contribute
to executorEnvs ?
What’s executorEnvs ?
428
Inside Creating SparkContext
Starting TaskScheduler
SparkContext starts TaskScheduler .
Initializing BlockManager
429
Inside Creating SparkContext
Starting MetricsSystem
SparkContext requests the MetricsSystem to start.
FIXME It’d be quite useful to have all the properties with their default values
Caution in sc.getConf.toDebugString , so when a configuration is not included but
does change Spark runtime configuration, it should be added to _conf .
LiveListenerBus with information about Task Scheduler’s scheduling mode, added jar and
file paths, and other environmental details. They are displayed in web UI’s Environment tab.
430
Inside Creating SparkContext
1. DAGScheduler
2. BlockManager
431
Inside Creating SparkContext
createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler)
Caution FIXME
432
Inside Creating SparkContext
If there are two or more external cluster managers that could handle url , a
SparkException is thrown:
setupAndStartListenerBus
setupAndStartListenerBus(): Unit
It expects that the class name represents a SparkListenerInterface listener with one of the
following constructors (in this order):
a zero-argument constructor
When no single- SparkConf or zero-argument constructor could be found for a class name in
spark.extraListeners setting, a SparkException is thrown with the message:
433
Inside Creating SparkContext
createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv
createSparkEnv simply delegates the call to SparkEnv to create a SparkEnv for the driver.
It calculates the number of cores to 1 for local master URL, the number of processors
available for JVM for * or the exact number in the master URL, or 0 for the cluster
master URLs.
Utils.getCurrentUserName Method
getCurrentUserName(): String
getCurrentUserName computes the user name who has started the SparkContext instance.
Internally, it reads SPARK_USER environment variable and, if not set, reverts to Hadoop
Security API’s UserGroupInformation.getCurrentUser().getShortUserName() .
Note It is another place where Spark relies on Hadoop API for its operation.
Utils.localHostName Method
localHostName computes the local host name.
434
Inside Creating SparkContext
stopped Flag
435
ConsoleProgressBar
ConsoleProgressBar
ConsoleProgressBar shows the progress of active stages to standard error, i.e. stderr . It
uses SparkStatusTracker to poll the status of stages periodically and print out active stages
with more than one task. It keeps overwriting itself to hold in one line for at most 3 first
concurrent stages at a time.
The progress includes the stage id, the number of completed, active, and total tasks.
ConsoleProgressBar may be useful when you ssh to workers and want to see
Tip
the progress of active stages.
import org.apache.log4j._
Logger.getLogger("org.apache.spark.SparkContext").setLevel(Level.WARN)
The progress bar prints out the status after a stage has ran at least 500 milliseconds every
spark.ui.consoleProgress.update.interval milliseconds.
436
ConsoleProgressBar
import org.apache.log4j._
scala> Logger.getLogger("org.apache.spark.SparkContext").setLevel(Level.WARN) (3)
4. Run a job with 4 tasks with 500ms initial sleep and 200ms sleep chunks to see the
progress bar.
You may want to use the following example to see the progress bar in full glory - all 3
concurrent stages in console (borrowed from a comment to [SPARK-4017] show progress
bar in console #3029):
> ./bin/spark-shell
scala> val a = sc.makeRDD(1 to 1000, 10000).map(x => (x, x)).reduceByKey(_ + _)
scala> val b = sc.makeRDD(1 to 1000, 10000).map(x => (x, x)).reduceByKey(_ + _)
scala> a.union(b).count()
ConsoleProgressBar starts the internal timer refresh progress that does refresh and shows
progress.
437
ConsoleProgressBar
finishAll Method
Caution FIXME
stop Method
stop(): Unit
refresh(): Unit
refresh …FIXME
438
SparkStatusTracker
SparkStatusTracker
SparkStatusTracker is…FIXME
SparkContext
AppStatusStore
439
Local Properties — Creating Logical Job Groups
You can set a local property that will affect Spark jobs submitted from a thread, such as the
Spark fair scheduler pool. You can use your own custom properties. The properties are
propagated through to worker tasks and can be accessed there via
TaskContext.getLocalProperty.
Local properties is used to group jobs into pools in FAIR job scheduler by
Note spark.scheduler.pool per-thread property and in
SQLExecution.withNewExecutionId Helper Methods
A common use case for the local property concept is to set a local property in a thread, say
spark.scheduler.pool, after which all jobs submitted within the thread will be grouped, say
into a pool by FAIR job scheduler.
sc.setLocalProperty("spark.scheduler.pool", "myPool")
// these two jobs (one per action) will run in the myPool pool
rdd.count
rdd.collect
sc.setLocalProperty("spark.scheduler.pool", null)
localProperties: InheritableThreadLocal[Properties]
440
Local Properties — Creating Logical Job Groups
Tip When value is null the key property is removed from localProperties.
getLocalProperty gets a local property by key in this thread. It returns null if key is
missing.
getLocalProperties: Properties
setLocalProperties Method
441
RDD — Resilient Distributed Dataset
A RDD is a resilient and distributed collection of records spread over one or many partitions.
Using RDD Spark hides data partitioning and so distribution that in turn allowed them to
design parallel computational framework with a higher-level programming interface (API) for
four mainstream programming languages.
Resilient, i.e. fault-tolerant with the help of RDD lineage graph and so able to
recompute missing or damaged partitions due to node failures.
Dataset is a collection of partitioned data with primitive values or values of values, e.g.
tuples or other objects (that represent records of the data you work with).
Figure 1. RDDs
442
RDD — Resilient Distributed Dataset
From the original paper about RDD - Resilient Distributed Datasets: A Fault-Tolerant
Abstraction for In-Memory Cluster Computing:
Resilient Distributed Datasets (RDDs) are a distributed memory abstraction that lets
programmers perform in-memory computations on large clusters in a fault-tolerant
manner.
Beside the above traits (that are directly embedded in the name of the data abstraction -
RDD) it has the following additional traits:
In-Memory, i.e. data inside RDD is stored in memory as much (size) and long (time) as
possible.
Immutable or Read-Only, i.e. it does not change once created and can only be
transformed using transformations to new RDDs.
Lazy evaluated, i.e. the data inside RDD is not available or transformed until an action
is executed that triggers the execution.
Cacheable, i.e. you can hold all the data in a persistent "storage" like memory (default
and the most preferred) or disk (the least preferred due to access speed).
Partitioned — records are partitioned (split into logical partitions) and distributed across
nodes in a cluster.
Computing partitions in a RDD is a distributed process by design and to achieve even data
distribution as well as leverage data locality (in distributed systems like HDFS or
Cassandra in which data is partitioned by default), they are partitioned to a fixed number of
443
RDD — Resilient Distributed Dataset
partitions - logical chunks (parts) of data. The logical division is for processing only and
internally it is not divided whatsoever. Each partition comprises of records.
Figure 2. RDDs
Partitions are the units of parallelism. You can control the number of partitions of a RDD
using repartition or coalesce transformations. Spark tries to be as close to data as possible
without wasting time to send data across network by means of RDD shuffling, and creates
as many partitions as required to follow the storage layout and thus optimize data access. It
leads to a one-to-one mapping between (physical) data in distributed data storage, e.g.
HDFS or Cassandra, and partitions.
The motivation to create RDD were (after the authors) two types of applications that current
computing frameworks handle inefficiently:
Technically, RDDs follow the contract defined by the five main intrinsic properties:
444
RDD — Resilient Distributed Dataset
An optional Partitioner that defines how keys are hashed, and the pairs partitioned (for
key-value RDDs)
Optional preferred locations (aka locality info), i.e. hosts for a partition where the
records live or are the closest to read from.
This RDD abstraction supports an expressive set of operations without having to modify
scheduler for each one.
An RDD is a named (by name ) and uniquely identified (by id ) entity in a SparkContext
(available as context property).
RDDs live in one and only one SparkContext that creates a logical boundary.
An RDD can optionally have a friendly name accessible using name that can be changed
using = :
scala> ns.id
res0: Int = 2
scala> ns.name
res1: String = null
scala> ns.name
res2: String = Friendly name
scala> ns.toDebugString
res3: String = (8) Friendly name ParallelCollectionRDD[2] at parallelize at <console>:
24 []
RDDs are a container of instructions on how to materialize big (arrays of) distributed data,
and how to split it into partitions so Spark (using executors) can hold some of them.
In general data distribution can help executing processing in parallel so a task processes a
chunk of data that it could eventually keep in memory.
Spark does jobs in parallel, and RDDs are split into partitions to be processed and written in
parallel. Inside a partition, data is processed sequentially.
445
RDD — Resilient Distributed Dataset
Saving partitions results in part-files instead of one single file (unless there is a single
partition).
Caution FIXME
isCheckpointedAndMaterialized Method
Caution FIXME
getNarrowAncestors Method
Caution FIXME
toLocalIterator Method
Caution FIXME
cache Method
Caution FIXME
persist Methods
persist(): this.type
persist(newLevel: StorageLevel): this.type
Caution FIXME
persist is used when RDD is requested to persist itself and marks itself for
Note
local checkpointing.
446
RDD — Resilient Distributed Dataset
unpersist Method
Caution FIXME
localCheckpoint Method
localCheckpoint(): this.type
RDD Contract
getPartitions
Used exclusively when RDD is requested for its partitions
(called only once as the value is cached).
getDependencies
Used when RDD is requested for its dependencies
(called only once as the value is cached).
Types of RDDs
447
RDD — Resilient Distributed Dataset
ParallelCollectionRDD
CoGroupedRDD
HadoopRDD is an RDD that provides core functionality for reading data stored in HDFS
using the older MapReduce API. The most notable use case is the return RDD of
SparkContext.textFile .