What’s Next for the
Upcoming Apache
Spark 4.0 Release?
Wenchen Fan cloud-fan
Xiao Li gatorsmile
Data + AI Summit 2024
©2024 Databricks Inc. — All rights reserved 1
Apache Spark 4.0
GA Major Features
Arrow optimized pandas 2 Structured [WIP] Variant Python Data SQL UDF/UDTF
Spark ANSI Mode
Python UDF API parity Logging Data Types Source APIs [WIP]
Connect
Python SQL
UDF-level
applyInArrow Polymorphic PySpark UDF [WIP] Collation [WIP] Stored Execute View
DF.toArrow Dependency
Python UDTF Unified Profiling Support Procedures Immediate Evolution
Control [WIP]
Streaming More
Arbitrary
State Data Streaming Python New Streaming Error Class XML Spark K8S
Stateful Java 21
Source Reader Data Sources Doc Enhancements Connectors operator
Processing V2
©2024 Databricks Inc. — All rights reserved 2
New Functionalities
Spark Connect, ANSI Mode, Arbitrary Stateful Processing V2,
Collation Support, Variant Data Types, pandas 2.x Support
Extensions
Python Data Source APIs, XML/Databricks Connectors
and DSV2 Extension, Delta 4.0
Agenda Custom Functions and Procedures
SQL UDFs, SQL Scripting, Python UDTF, Arrow optimized
Python UDF, PySpark UDF Unified Profiler
Usability
Structured Logging Framework, Error Class Framework,
Behavior Change Process
Spark Connect
©2024 Databricks Inc. — All rights reserved
How to embed Spark in applications?
Up until Spark 3.4: Hard to support today’s developer
experience requirements
Applications
Spark’s Monolith Driver
SQL only
Modern data application
Application Logic
Analyzer
IDEs / Notebooks
Close to REPL Optimizer
Scheduler
Programming Languages / SDKs Distributed Execution Engine
No JVM InterOp
©2024 Databricks Inc. — All rights reserved
Connect to Spark from Any App
Thin client, with full power of Apache Spark
Applications
Spark’s Driver
Spark Connect Client API
Modern data application
Application Gateway
Analyzer
IDEs / Notebooks
Optimizer
Scheduler
Programming Languages / SDKs Distributed Execution Engine
©2024 Databricks Inc. — All rights reserved
Spark Connect GA in Apache Spark 4.0
Interactively develop & New Connectors and Build interactive Data
debug from your IDE SDKs in any language! Applications
Scala3
Databricks
Connect
pip install pyspark>=3.4.0 Check out Databricks Connect, Get started with our github
use & contribute the Go client example!
in your favorite IDE!
©2024 Databricks Inc. — All rights reserved
7
The lightweight Spark Connect Package
• pip install pyspark-connect
• Pure Python library, no JVM.
• Pure Spark Connect client, not entire PySpark
• Only 1.5MB (PySpark 355 MB)
• Preferred if your application has fully migrated to the Spark Connect API.
©2024 Databricks Inc. — All rights reserved
ANSI MODE
ON by default in 4.0
©2024 Databricks Inc. — All rights reserved
Migration to
ANSI ON
Action: Turn On ANSI
Mode to fix your data
corruptions!
©2024 Databricks Inc. — All rights reserved
Without ANSI mode
Data Corruption!
©2024 Databricks Inc. — All rights reserved
With ANSI mode (3.5)
Error callsite is captured
©2024 Databricks Inc. — All rights reserved
With ANSI mode (4.0)
Error callsite is highlighted
©2024 Databricks Inc. — All rights reserved
With ANSI mode (4.0)
DataFrame queries with error callsite
Culprit operation Line number
©2024 Databricks Inc. — All rights reserved
DataFrame queries with error callsite
• PySpark support is on the way.
• Spark Connect support is on the way.
• Native notebook integration is on the way (so that you can see
highlight).
©2024 Databricks Inc. — All rights reserved
Variant Data Type for Semi-
Structured Data
©2024 Databricks Inc. — All rights reserved
Motivation
Data Engineer’s Dilemma: Only pick 2 out of 3…
What if you could ingest Flexible
JSON, maintain flexibility,
boost performance, and
use an open standard? Fast Open
©2024 Databricks Inc. — All rights reserved
Variant is flexible
INSERT INTO variant_tbl (event_data) SQL
VALUES
(
PARSE_JSON(
'{"level": "warning",
"message": "invalid request",
"user_agent": "Mozilla/5.0 ..."}'
)
);
SELECT
*
FROM
variant_tbl
WHERE
event_data:user_agent ilike '%Mozilla%';
©2024 Databricks Inc. — All rights reserved
Performance
©2024 Databricks Inc. — All rights reserved
String Collation Support
©2024 Databricks Inc. — All rights reserved
ANSI SQL COLLATE
Sorting and comparing strings according to locale
● Associate columns, fields, array elements with a collation of
choice
○ Case insensitive
○ Accent insensitive
○ Locale aware
● Supported by many string functions such as
○ lower()/upper()
○ substr()
○ locate()
○ like
● GROUP BY, ORDER BY, comparisons, …
● Supported by Delta and Photon
©2024 Databricks Inc. — All rights reserved
A look at the default collation
A<Z<a<z<Ā
> SELECT name FROM names ORDER BY name;
name
Anthony
Bertha
anthony
bertha
Ānthōnī SQL
Is this really what we want here?
©2024 Databricks Inc. — All rights reserved
COLLATE UNICODE
One size, fits most
> SELECT name
FROM names
ORDER BY name COLLATE unicode;
name
Ānthōnī
anthony
Anthony
bertha
Bertha SQL
Root collation with decent sort order for most locales
©2024 Databricks Inc. — All rights reserved
COLLATE UNICODE_CI
Case insensitive comparisons have entered the chat
> SELECT name
FROM names
WHERE startswith(name COLLATE unicode_ci, 'a')
ORDER BY name COLLATE unicode_ci;
name
anthony
Anthony
SQL
Case insensitive is not accent insensitive: We lost Ānthōnī
©2024 Databricks Inc. — All rights reserved
COLLATE UNICODE_CI_AI
Equality from a to Ź
> SELECT name
FROM names
WHERE startswith(name COLLATE unicode_ci_ai, 'a')
ORDER BY name COLLATE unicode_ci_ai;
name
Ānthōnī
anthony
Anthony
SQL
100s of supported predefined collations across many locales
©2024 Databricks Inc. — All rights reserved
Streaming State Data Source
©2024 Databricks Inc. — All rights reserved
Stateful Stream Processing
©2024 Databricks Inc. — All rights reserved
Streaming State data source
• Allows you to inspect the internal states of streaming applications,
for debugging, profiling, testing, troubleshooting, etc.
• Allows you to manipulate the internal states for quick workaround to
recover urgent issues.
• All with your familiar data source APIs.
©2024 Databricks Inc. — All rights reserved
State Reader API: state-metadata
High-level API
©2024 Databricks Inc. — All rights reserved
State Reader API: statestore
Granular API
©2024 Databricks Inc. — All rights reserved
Arbitrary Stateful Processing V2
©2024 Databricks Inc. — All rights reserved
(flat)MapGroupsWithState: current V1 version
val ds = spark.readStream.json(path)
• Supports a single user .as[CreditCardTransaction]
defined state object per ds.groupByKey(_.cardId)
grouping key .flatMapGroupsWithState[
CreditCardTransactionState,
CreditCardTransaction
](
• State object can be OutputMode.Append(),
updated while evaluating GroupStateTimeout.NoTimeout())
(
the current group, and (_, txns, groupState) => {
updated value will be // read state, compute new average, and save to state
...
available in next trigger. }
Scala
)
©2024 Databricks Inc. — All rights reserved
Limitations of (flat)MapGroupsWithState
Existing API
Lack of Data Modelling Flexibility Lack of Composite Types
Prevents users from splitting state (for Values stored in GroupState are single
a grouping key) into multiple logical types and cannot support data structures
instances, which can be read/updated like List, Map etc efficiently. Current
independently. approach requires users to read/update
the entire data structure.
Lack of State Eviction Support Lack of State Schema Evolution
No support for eventual state cleanup Does not support changes to state schema
using TTL. once the streaming query has started.
©2024 Databricks Inc. — All rights reserved
transformWithState: the V2 version
Layered, Flexible,
Extensible State API
©2024 Databricks Inc. — All rights reserved
pandas 2 support/API parity
©2024 Databricks Inc. — All rights reserved
Pandas API on Spark (Koalas)
pandas Pandas API on Spark
import pandas as pd import pyspark.pandas as ps
df = pd.read_csv("my_data.csv") df = ps.read_csv("my_data.csv")
df.columns = ['x', 'y', 'z1'] df.columns = ['x', 'y', 'z1']
df['x2'] = df.x * df.x df['x2'] = df.x * df.x
Python Python
©2024 Databricks Inc. — All rights reserved
Pandas 2.x Support
API change parity with Pandas 2.2.2
Backwards incompatible
API changes
• Pandas 2.0.0
• Pandas 2.1.0
SPARK-44101
Spark migration guide
©2024 Databricks Inc. — All rights reserved ©2024 Databricks Inc. — All rights reserved 37
New Functionalities
Spark Connect, ANSI Mode, Arbitrary Stateful Processing V2,
Collation Support, Variant Data Types, pandas 2.x Support
Extensions
Python Data Source APIs, XML/Databricks Connectors
and DSV2 Extension, Delta 4.0
Agenda Custom Functions and Procedures
SQL UDFs, SQL Scripting, Python UDTF, Arrow optimized
Python UDF, PySpark UDF Unified Profiler
Usability
Structured Logging Framework, Error Class Framework,
Behavior Change Process
Streaming and Batching
Python Data Sources
©2024 Databricks Inc. — All rights reserved
Why Python Data Source?
• People like writing Python!
• pip install is so convenient.
• Simplified API without complicated performance features in Data
Source V2.
Spark Python
©2024 Databricks Inc. — All rights reserved
Python Data Source APIs
• SPIP: Python Data Source API (SPARK-44076)
• Available in Spark 4.0 preview version and Databricks Runtime 15.2
• Support both batch and streaming, read and write
©2024 Databricks Inc. — All rights reserved
Python Data Source Overview
Easy Three Steps to create and use your custom data sources
Step 1: Create a Data Source Step 2: Register the Data Step 3: Read from or write to
Source the data source
class MySource(DataSource): Register the data source in the spark.read
… current Spark session using the .format("my-source")
Python data source class: .load(...)
spark
.dataSource df.write
.register(MySource) .format("my-source")
.mode("append")
.save(...)
©2024 Databricks Inc. — All rights reserved ©2024 Databricks Inc. — All rights reserved
DataFrame.toArrow
GroupedData.applyInArrow
©2024 Databricks Inc. — All rights reserved
DataFrame.toArrow
• An simply API to convert PySpark DataFrame to PyArrow Table.
• Make it easier to integrate with Arrow ecosystem.
• Note, all the data is loaded into the driver’s memory. It may cause out-
of-memory errors for large data
©2024 Databricks Inc. — All rights reserved
GroupedData.applyInArrow
from pyspark.sql.functions import ceil
>>> import pyarrow
• Utilizes Apache Arrow to map >>> import pyarrow.compute as pc
>>> df = spark.createDataFrame(
functions over DataFrame groups ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
• Returns the result as a DataFrame >>> def normalize(table):
... v = table.column("v")
... norm = pc.divide(pc.subtract(v, pc.mean(v)),
• Supports functions taking pc.stddev(v, ddof=1))
... return table.set_column(1, "v", norm)
pyarrow.Table or tuple of grouping >>> df.groupby("id").applyInArrow(
... normalize, schema="id long, v double").show()
keys and pyarrow.Table +---+-------------------+
| id| v|
• Note: Function requires a full shuffle +---+-------------------+
| 1|-0.7071067811865475|
and may cause out-of-memory errors | 1| 0.7071067811865475|
| 2|-0.8320502943378437|
for large data groups | 2|-0.2773500981126146|
| 2| 1.1094003924504583|
+---+-------------------+
©2024 Databricks Inc. — All rights reserved
XML Connectors
©2024 Databricks Inc. — All rights reserved
Reading XML files out of the box
spark.read.xml("/path/to/my/file.xml").show()
+-------+-----+
| name | age |
+-------+-----+
| Alice | 23 |
| Bob | 32 | Python
©2024 Databricks Inc. — All rights reserved
More Than
Just a Simple
Port
©2024 Databricks Inc. — All rights reserved
Databricks JDBC Dialect
©2024 Databricks Inc. — All rights reserved
Reading Databricks SQL out of the box
spark.read.jdbc(
"jdbc:databricks://…",
"my_table",
properties
).show()
+-------+-----+
| name | age |
+-------+-----+
| Alice | 23 |
| Bob | 32 | Python
©2024 Databricks Inc. — All rights reserved
Delta Lake 4.0
©2024 Databricks Inc. — All rights reserved
The biggest Delta release yet
Delta 3.x Delta 4.0
Deletion Liquid Optimized
Delta Kernel
Writes Multi-cluster
Vectors clustering VARIANT UniForm GA Liquid GA writes
Lightning fast Easy migration
semi-structured Write once, read as Cross cloud, cross
from partitioned
Incremental Log data all formats engines
Table features Row IDs tables
checkpoints compactions
MERGE Auto-
Table cloning CDF
improvements compaction
Delta Identity Type
Connect Collations columns widening
Spark Connect Flexible sort and Pain-free primary Data types expand
support comparison and foreign keys with your data
©2024 Databricks Inc. — All rights reserved
UniForm GA Delta Lake
With UniForm
Towards full
lakehouse format Metadata Metadata
Interoperability
Data Parquet
Liquid clustering Usage Walkthrough
Create a new Delta table with liquid clustering
CREATE [EXTERNAL] TABLE tbl (id INT, name STRING) CLUSTER BY(id)
Change Liquid Clustering keys on existing clustered table:
ALTER TABLE tbl CLUSTER BY (name);
Clustering data in a Delta table with liquid clustering:
OPTIMIZE tbl;
What you don’t need to worry about:
● Optimal file sizes
● Whether a column can be used as a clustering key
● Order of clustering keys
Public doc: https://docs.databricks.com/delta/clustering.html
©2024 Databricks Inc. — All rights reserved
Liquid Clustering GA
Easy to use
Up to 7x faster writes
Up to 12x faster reads
Highly flexible
©2024 Databricks Inc. — All rights reserved
New Functionalities
Spark Connect, ANSI Mode, Arbitrary Stateful Processing V2,
Collation Support, Variant Data Types, pandas 2.x Support
Extensions
Python Data Source APIs, XML/Databricks Connectors
and DSV2 Extension, Delta 4.0
Agenda Custom Functions and Procedures
SQL UDFs, SQL Scripting, Python UDTF, Arrow optimized
Python UDF, PySpark UDF Unified Profiler
Usability
Structured Logging Framework, Error Class Framework,
Behavior Change Process
Python UDTF
©2024 Databricks Inc. — All rights reserved
Apache Spark 3.5
Python User Defined Table Functions
This is a new kind of function that returns
an entire table as output instead of a
single scalar result value
○ Once registered, they can appear in
the FROM clause of a SQL query
○ Or use the DataFrame API to call them
from pyspark.sql.functions import udtf
@udtf(returnType="num: int, squared: int")
class SquareNumbers:
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num) Python
©2024 Databricks Inc. — All rights reserved
Apache Spark 3.5
Python User Defined Table Functions
SELECT * FROM SquareNumbers(
SquareNumbers(lit(1), lit(3)).show()
num => 1, squared => 3);
+-----+--------+
+-----+--------+
| num | squared|
| num | squared|
+-----+--------+
+-----+--------+
| 1 | 1 |
| 1 | 1 |
| 2 | 4 |
| 2 | 4 |
| 3 | 9 |
| 3 | 9 |
+-----+--------+
+-----+--------+ SQL Python
SQL Lang PySpark DataFrame APIs
©2024 Databricks Inc. — All rights reserved
Apache Spark 4.0
Python UDTF Input Table Partitioning
Split input rows among class instances: eval
Polymorphic Analysis runs once each row, then terminate runs last
Compute the output schema for each call class CountAndMax:
def __init__(self):
depending on arguments, using analyze self._count = 0
self._max = 0
class ReadFromConfigFile: def eval(self, row: Row):
@staticmethod self._count += 1
def analyze(filename: AnalyzeArgument): self._max = max(self._max, row[0])
with open(os.path.join( def terminate(self):
SparkFiles.getRootDirectory(), yield self._count, self._max
filename.value), "r") as f:
# Compute the UDTF output schema
WITH t AS (SELECT id FROM RANGE(0, 100))
# based on the contents of the file.
SELECT * FROM CountAndMax(
return AnalyzeResult(
from_file(f.read())) TABLE(t) PARTITION BY id / 10 ORDER BY id);
...
+-------+-----+
ReadFromConfigFile(lit("config.txt")).show() | count | max |
+-------+-----+
+------------+-------------+ | 10 | 0 |
| start_date | other_field | | 10 | 1 |
+------------+-------------+ ...
| 2024-04-02 | 1 | Python
+------------+-------------+ Python
©2024 Databricks Inc. — All rights reserved
Apache Spark 4.0
Python UDTF Custom Initialization
Create a subclass of AnalyzeResult and
Variable Keyword Arguments consume it in each subsequent __init__
The analyze and eval methods may class SplitWords:
@dataclass
accept *args or **kwargs class MyAnalyzeResult(AnalyzeResult):
numWords: int
class VarArgs: numArticles: int
@staticmethod
def analyze(**kwargs: AnalyzeArgument): @staticmethod
return AnalyzeResult(StructType( def analyze(text: str):
[StructField(key, arg.dataType) words = text.split(" ")
for key, arg in sorted( return MyAnalyzeResult(
kwargs.items())])) schema=StructType()
def eval(self, **kwargs): .add("word", StringType())
yield tuple(value for _, value .add("total", IntegerType()),
in sorted(kwargs.items())) withSinglePartition=true,
numWords=len(words)
SELECT * FROM VarArgs(a => 10, b => 'x'); numArticles=len((
word for word in words
+----+-----+ if word in ("a", "an", "the")))
| a | b |
+----+-----+ def __init__(self, r: MyAnalyzeResult):
| 10 | "x" | ... Python
+----+-----+
©2024 Databricks Inc. — All rights reserved
Python
Arrow Optimized Python UDF
©2024 Databricks Inc. — All rights reserved
Enhancing Python UDFs with Apache Arrow
● Introduction to Arrow and Its Role in UDF Optimization:
○ Utilizes Apache Arrow
○ Supported since Spark 3.5 and ON by default since Spark 4.0
● Key Benefits
○ Enhances data serialization and deserialization speed
○ Provides standardized type coercion
©2024 Databricks Inc. — All rights reserved
Enabling Arrow Optimization
Global Activation in a UDF
Local Activation in a UDF
• Activates Arrow optimization for all
• Activates Arrow optimization for Python UDFs in the Spark session
a specific UDF, improving
performance
spark.conf.set("spark.sql.execution.pythonUDF.arro
w.enabled", True)
# An Arrow Python UDF # An Arrow Python UDF
@udf(returnType='int', useArrow=True) @udf(returnType='int')
def arrow_slen(s): def arrow_slen(s):
return len(s) return len(s)
Python Python
©2024 Databricks Inc. — All rights reserved
@udf(returnType='int', useArrow=True)
def arrow_slen(s):
return len(s)
Python
Performance
©2024 Databricks Inc. — All rights reserved
sdf.select(
udf(lambda v: v + 1, DoubleType(), useArrow=True)("v"),
udf(lambda v: v - 1, DoubleType(), useArrow=True)("v"),
udf(lambda v: v * v, DoubleType(), useArrow=True)("v")
)
Python
Performance
©2024 Databricks Inc. — All rights reserved
Pickled Python UDF
>>> df.select(udf(lambda x: x, 'string')('value').alias('date_in_string')).show()
+-----------------------------------------------------------------------+
| date_in_string |
+-----------------------------------------------------------------------+
|java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet..|
|java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet..|
+-----------------------------------------------------------------------+ Python
Arrow-optimized Python UDF
>>> df.select(udf(lambda x: x, 'string')('value').alias('date_in_string')).show()
+--------------+
|date_in_string|
+--------------+
| 1970-01-01|
Comparing Pickled and Arrow-optimized
| 1970-01-02| Python UDFs on type coercion [Link]
+--------------+ Python
©2024 Databricks Inc. — All rights reserved
SQL UDF / UDTF
©2024 Databricks Inc. — All rights reserved
Easily extend SQL function library
● SQL User Defined Scalar Functions
○ Persisted SQL Expressions
● SQL User Defined Table Functions
○ Persisted Parameterized Views
● Support named parameter invocation and defaulting
● Table functions with lateral correlation
©2024 Databricks Inc. — All rights reserved
SQL User Defined Scalar Functions
● Encapsulate (complex) expressions, including subqueries
● May contain subqueries
● Return a scalar value
● Can be used in most places where builtin functions go
©2024 Databricks Inc. — All rights reserved
SQL User Defined Scalar Functions
Persists complex expression patterns
> CREATE FUNCTION roll_dice(
num_dice INT DEFAULT 1 COMMENT 'number of dice to roll (Default: 1)’,
num_sides INT DEFAULT 6 COMMENT 'number of sides per die (Default: 6)'
) COMMENT 'Roll a number of n-sided dice’
RETURN aggregate(
sequence(1, roll_dice.num_dice, 1),
0,
(acc, x) -> (rand() * roll_dice.num_sides) :: INT,
acc -> acc + roll_dice.num_dice
);
> SELECT roll_dice();
3
-- Roll 3 6-sided dice
> SELECT roll_dice(3);
15
-- Roll 3 10-sided dice
> SELECT roll_dice(3, 10) SQL
21
©2024 Databricks Inc. — All rights reserved
SQL User Defined Table Functions
● Encapsulate (complex) correlated subqueries aka a parameterized view
● Can be used in the FROM clause
©2024 Databricks Inc. — All rights reserved
SQL User Defined Table Functions
Persist complex parameterized queries
CREATE FUNCTION weekdays(start DATE,end DATE)
RETURNS TABLE(day_of_week STRING, day DATE)
RETURN SELECT
to_char(day, 'E’),
day
FROM
(
SELECT sequence(weekdays.start, weekdays.end)
) AS t(days),
LATERAL(explode(days)) AS dates(day)
WHERE
extract(DAYOFWEEK_ISO FROM day) BETWEEN 1 AND 5;
SQL
©2024 Databricks Inc. — All rights reserved
SQL User Defined Table Functions
Persist complex parameterized queries
> SELECT > -- Return weekdays for date ranges originating from a
day_of_week, LATERAL correlation
day > SELECT
FROM weekdays.*
weekdays(DATE '2024-01-01’, FROM
DATE '2024-01-14’); VALUES
(DATE '2020-01-01’),
Mon 2022-01-01 (DATE '2021-01-01') AS starts(start),
… LATERAL weekdays(start, start + INTERVAL '7' DAYS);
Fri 2022-01-05
Mon 2022-01-08 Wed 2020-01-01
SQL Thu 2020-01-02
Fri 2020-01-03 SQL
…
©2024 Databricks Inc. — All rights reserved
Named parameter invocation
Self documenting and safer SQL UDF invocation
> DESCRIBE FUNCTION roll_dice;
Function: default.roll_dice
Type: SCALAR
Input: num_dice INT
num_sides INT
Returns: INT
> -- Roll 1 10-sided dice - skip dice count
> SELECT roll_dice(num_sides => 10)
7
> -- Roll 3 10-sided dice - reversed order
> SELECT roll_dice(num_sides => 10, num_dice => 3)
21
SQL
©2024 Databricks Inc. — All rights reserved
Stored Procedure
©2024 Databricks Inc. — All rights reserved
External Stored Procedures
©2024 Databricks Inc. — All rights reserved
External Iceberg Stored Procedure
©2024 Databricks Inc. — All rights reserved
SQL Scripting
It’s SQL, but with control flow!
• Support for control flow, iterators & error handling
Natively in SQL
• Control flow → IF/ELSE, CASE
• Looping → WHILE, REPEAT, ITERATE
• Resultset iterator → FOR
• Exception handling → CONTINUE/EXIT
• Parameterized queries → EXECUTE IMMEDIATE
• Following the SQL/PSM standard
©2024 Databricks Inc. — All rights reserved
SQL Scripting
BEGIN
DECLARE c INT = 10;
WHILE c > 0 DO
INSERT INTO t VALUES (c);
SET VAR c = c - 1;
END WHILE;
END
SQL
©2024 Databricks Inc. — All rights reserved
SQL Scripting
WHILE i < array_size(tableArray) DO
SET
-- parameters tableType = (
DECLARE oldColName = 'ColoUr'; SELECT
DECLARE newColName = 'color’; table_type
FROM
BEGIN INFORMATION_SCHEMA.tables
DECLARE tableArray Array < STRING >; WHERE
DECLARE tableType STRING; table_name = tableArray [i]
DECLARE i INT = 0; );
DECLARE alterQuery STRING; IF tableType != 'VIEW' COLLATE UNICODE_CI THEN
SET SET
tableArray = ( alterQuery = 'ALTER TABLE ' || tableArray [i] ||
SELECT ' RENAME COLUMN ' || oldColName || ' TO ' ||
array_agg(table_name) newColName;
FROM EXECUTE IMMEDIATE alterQuery;
INFORMATION_SCHEMA.columns END IF;
WHERE SET i = i + 1;
column_name END WHILE;
COLLATE UNICODE_CI = oldColName END;
SQL
);
©2024 Databricks Inc. — All rights reserved
PySpark UDF Unified Profiling
©2024 Databricks Inc. — All rights reserved
Overview of Unified Profiling
• Key Components: Performance and memory profiling
• Benefits: Tracks function calls, execution time, memory usage
• Replacement for Legacy Profiling
• Drawbacks of Legacy Profiling
• Advantages of New Unified Profiling
• Session-based, works with Spark Connect, runtime toggling
©2024 Databricks Inc. — All rights reserved
Overview of Unified Profiling
• How to Enable:
• Performance Profiler: spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")
• Memory Profiler: spark.conf.set("spark.sql.pyspark.udf.profiler", "memory")
• API Features: "show", "dump", and "clear" commands
• Show results:
• Performance: spark.profile.show(type="perf")
• Performance: spark.profile.show(type="memory")
• Dump results: spark.profile.dump("/your_path/...")
• Clear results: spark.profile.clear()
©2024 Databricks Inc. — All rights reserved
PySpark
Performance
Profiler
from pyspark.sql.functions import pandas_udf
df = spark.range(10)
@pandas_udf("long")
def add1(x):
return x + 1
added = df.select(add1("id"))
spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")
added.show() Python
©2024 Databricks Inc. — All rights reserved
PySpark
Performance
Profiler
©2024 Databricks Inc. — All rights reserved
PySpark
Memory
Profiler
from pyspark.sql.functions import pandas_udf
df = spark.range(10)
@pandas_udf("long")
def add1(x):
return x + 1
added = df.select(add1("id"))
spark.conf.set("spark.sql.pyspark.udf.profiler", "memory")
added.show() Python
©2024 Databricks Inc. — All rights reserved
PySpark
Memory
Profiler
©2024 Databricks Inc. — All rights reserved
New Functionalities
Spark Connect, ANSI Mode, Arbitrary Stateful Processing V2,
Collation Support, Variant Data Types, pandas 2.x Support
Extensions
Python Data Source APIs, XML/Databricks Connectors
and DSV2 Extension, Delta 4.0
Agenda Custom Functions and Procedures
SQL UDFs, SQL Scripting, Python UDTF, Arrow optimized
Python UDF, PySpark UDF Unified Profiler
Usability
Structured Logging Framework, Error Class Framework,
Behavior Change Process
Structured Logging Framework
©2024 Databricks Inc. — All rights reserved
What are we going to build to improve this?
• Transition to Structured Logging in Apache Spark
• Introducing Spark System Log Directories
©2024 Databricks Inc. — All rights reserved 91
Analyzing Spark logs is challenging due to their
unstructured nature
©2024 Databricks Inc. — All rights reserved
Structured Spark Logging
Starting from Spark 4.0, the default log format is JSON lines, making it
easier to parse and analyze.
{
"ts": "2023-03-12T12:02:46.661-0700",
"level": "ERROR",
"msg": "Fail to know the executor 289 is alive or not",
"context": {
"executor_id": "289"
},
"exception": {
"class": "org.apache.spark.SparkException",
"msg": "Exception thrown in awaitResult",
"stackTrace": "..."
},
"source": "BlockManagerMasterEndpoint"
} Json
©2024 Databricks Inc. — All rights reserved
Use Spark to Analyze Spark Logs
©2024 Databricks Inc. — All rights reserved
System Log Directories
logs = spark.read.json("/var/spark/logs.json")
# To get all the errors on host 100.116.29.4
errors_host_logs = logs.filter(
(col("context.host") == "100.116.29.4") & (col("level") == "ERROR"))
# To get all the exceptions from Spark
spark_exceptions_logs = logs.filter(
col("exception.class").startswith("org.apache.spark")) Python
©2024 Databricks Inc. — All rights reserved
System Log Directories
logs = spark.read.json("/var/spark/logs.json")
# To get all the executor loss logs
executor_lost_logs = logs.filter(
col("msg").contains("Lost executor"))
# To get all the distributed logs about executor 289
executor_289_logs = logs.filter(
col("context.executor_id") == 289) Python
©2024 Databricks Inc. — All rights reserved
Error Conditions and Messages
©2024 Databricks Inc. — All rights reserved
Error Conditions
800+ top-frequency error conditions from the server.
©2024 Databricks Inc. — All rights reserved
Error Conditions in PySpark
All 200+ error conditions issued by PySpark client
©2024 Databricks Inc. — All rights reserved
Quality PySpark Errors
• Clear and specific error
classes
• Improved documentation
• Consistency and
standardization
• Enhanced debugging and
maintenance
©2024 Databricks Inc. — All rights reserved
Spark 3.5
Spark 4.0
©2024 Databricks Inc. — All rights reserved
Behavior Changes
©2024 Databricks Inc. — All rights reserved
Overview of Apache Spark Versioning Policy
• Semantic Versioning Structure: [MAJOR].[FEATURE].[MAINTENANCE]
• MAJOR: Long-term API stability
• FEATURE: New features and improvements
• MAINTENANCE: Frequent, urgent patches
• API Compatibility Commitments: Maintain compatibility across
feature releases, reducing the need for users to refactor code.
©2024 Databricks Inc. — All rights reserved
Categories of Impactful Behavior Changes
• Query Results Impact: Changes affecting the accuracy and outcome of
data queries.
• Schema and Configuration Changes: Adjustments to the database or
application schema and Spark configuration settings.
• API Modifications: Alterations to the public and developer APIs across
multiple programming languages.
• Error Handling Adjustments: Modifications in how errors are classified
and handled within the system.
• Deployment and Management Revisions: Changes in the methods and
tools used for deploying and managing Spark environments.
©2024 Databricks Inc. — All rights reserved
Best Practices for API Changes
• General Approach:
• Avoid API changes whenever possible.
• Prefer deprecating features over direct modifications to ensure smoother
transitions.
• Implement legacy flags to allow users to opt into previous behaviors
temporarily, easing the transition to new versions.
• Communicate changes clearly through deprecation warnings and
documentation updates.
©2024 Databricks Inc. — All rights reserved
Best Practices for API Changes
• User-facing documentation
• Migration Guide Updates: Regularly update the migration guide with detailed
information on changes affecting user operations.
• Legacy Configs: Include information on legacy configurations that might help users
transition smoothly between versions.
• Error Messages:
• Clarity and Actionability: Ensure that all error messages are clear and direct, informing
the user precisely what went wrong.
• Workarounds: Wherever possible, provide actionable advice within the error message,
including configuration changes that can revert to previous behaviors or other
immediate solutions.
©2024 Databricks Inc. — All rights reserved
Best Practices for API Changes
• PR Descriptions
• Detail: Provide comprehensive explanations of the changes, highlighting the
modifications and their implications.
• Transparency: Explain clearly how the new behavior differs from the old, and the
reasons for these changes.
©2024 Databricks Inc. — All rights reserved
Documentation
©2024 Databricks Inc. — All rights reserved
PySpark Doc – Dark Mode
©2024 Databricks Inc. — All rights reserved
PySpark Doc
• SPARK-44728
• More examples
• Environment Setup
• Quickstart
• Type System
©2024 Databricks Inc. — All rights reserved
Versionless Spark Programming Guide
©2024 Databricks Inc. — All rights reserved
[Proposal] Versionless Spark Programming
Guide
• Motivation: Allow for real-time
updates and rapid content iteration
without release schedule
constraints.
• Small changes pose less SEO
deranking risk than major updates.
• Transition 9 existing programming
guides to Spark website repo.
©2024 Databricks Inc. — All rights reserved
Python: The Number One Choice
©2024 Databricks Inc. — All rights reserved
©2024 Databricks Inc. — All rights reserved
Birth of
PySpark
2013
©2024 Databricks Inc. — All rights reserved
>330
millions
Number of PyPI
Downloads per Year
©2024 Databricks Inc. — All rights reserved
Countries
and Regions
PyPI downloads of
PySpark
in the last 12 months
PySpark’s View of the World
DataFrame
SQL pandas APIs
APIs
PySpark
Catalyst Optimizer
Adaptive Execution
Spark Ecosystem (connectors, 3rd party libraries)
©2024 Databricks Inc. — All rights reserved
Key Focus of PySpark
Functionality Parity Ease of Use
• Complete feature • No JVM Knowledge
availability Required
• Python native APIs • Pythonic APIs
Performance Parity Ecosystem Integration
• Matched Performance with Scala • Integrated with Python
• Optimized query compiler and ecosystem
engine • Spark ecosystem growth
©2024 Databricks Inc. — All rights reserved
Key Enhancements in PySpark 3.5 and 4.0
Functionality Parity Ease of Use
• 180 new built-in functions in Spark 3.5+ • Spark Connect
• Python native data source APIs • Error Framework
• Stateful streaming processing V2 • Enriched Documentation
• Python UDTFs • Unified UDF profiling
Performance Ecosystem Integration
• Spark Connect • Compatible with pandas 2 (pandas API)
• Arrow-optimized Python UDF • Delta Lake 4.0
• Variant type for semi-structured processing • [ML] Distributed training with TorchDistributor
• Advanced optimizer and adaptive execution • Arrow integration: toArrow API
©2024 Databricks Inc. — All rights reserved
Python UDF Pandas 2 pandas API on NumPy UDF-level applyInArrow
Memory/CPU Compatibility Spark Coverage Inputs Dependency Control
Profiler [WIP]
180+ Spark Connect mapInArrow Deepspeed
New Python Distributor
PySpark
Functions
Visualization/ Richer Pythonic [WIP] DF / UDF
Plotting Connectors Error Handling Debuggability
Arrow-optimized DF.toArrow() Python Arbitrary Type Python UDTF PySpark Testing
Python UDFs Stateful Annotations API
Processing
©2024 Databricks Inc. — All rights reserved
Thank you for your
contributions!
©2024 Databricks Inc. — All rights reserved