Skip to content

Commit 369e96c

Browse files
committed
Add wheel package support for PySpark
1 parent d138aa8 commit 369e96c

File tree

11 files changed

+147
-38
lines changed

11 files changed

+147
-38
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
447447
| coordinates should be groupId:artifactId:version.
448448
| --repositories Comma-separated list of additional remote repositories to
449449
| search for the maven coordinates given with --packages.
450-
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
451-
| on the PYTHONPATH for Python apps.
450+
| --py-files PY_FILES Comma-separated list of .whl, .egg, .zip or .py files to
451+
| place on the PYTHONPATH for Python apps.
452452
| --files FILES Comma-separated list of files to be placed in the working
453453
| directory of each executor.
454454
|

docs/programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes,
204204

205205
In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the
206206
variable called `sc`. Making your own SparkContext will not work. You can set which master the
207-
context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files
207+
context connects to using the `--master` argument, and you can add Python .whl, .egg, .zip or .py files
208208
to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies
209209
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
210210
to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType)

docs/submitting-applications.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ as `provided` dependencies; these need not be bundled since they are provided by
1818
the cluster manager at runtime. Once you have an assembled jar you can call the `bin/spark-submit`
1919
script as shown here while passing your jar.
2020

21-
For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`, `.zip` or `.egg`
22-
files to be distributed with your application. If you depend on multiple Python files we recommend
23-
packaging them into a `.zip` or `.egg`.
21+
For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`, `.whl`, `.egg`
22+
or `.zip` files to be distributed with your application. If you depend on multiple Python files we
23+
recommend packaging them into a `.whl`, `.egg` or `.zip`.
2424

2525
# Launching Applications with spark-submit
2626

@@ -62,7 +62,7 @@ the drivers and the executors. Note that `cluster` mode is currently not support
6262
Mesos clusters or Python applications.
6363

6464
For Python applications, simply pass a `.py` file in the place of `<application-jar>` instead of a JAR,
65-
and add Python `.zip`, `.egg` or `.py` files to the search path with `--py-files`.
65+
and add Python `.whl`, `.egg`, `.zip` or `.py` files to the search path with `--py-files`.
6666

6767
There are a few options available that are specific to the
6868
[cluster manager](#cluster-overview.html#cluster-manager-types) that is being used.
@@ -179,8 +179,8 @@ with `--packages`. All transitive dependencies will be handled when using this c
179179
repositories (or resolvers in SBT) can be added in a comma-delimited fashion with the flag `--repositories`.
180180
These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to include Spark Packages.
181181

182-
For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries
183-
to executors.
182+
For Python, the equivalent `--py-files` option can be used to distribute `.whl`, `.egg`, `.zip`
183+
and `.py` libraries to executors.
184184

185185
# More Information
186186

launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public SparkLauncher addFile(String file) {
225225
}
226226

227227
/**
228-
* Adds a python file / zip / egg to be submitted with the application.
228+
* Adds a python file / zip / whl / egg to be submitted with the application.
229229
*
230230
* @param file Path to the file.
231231
* @return This launcher.

python/pyspark/context.py

Lines changed: 80 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
from threading import Lock
2222
from tempfile import NamedTemporaryFile
2323

24+
from pip.commands.install import InstallCommand as pip_InstallCommand
25+
2426
from py4j.java_collections import ListConverter
2527

2628
from pyspark import accumulators
@@ -62,9 +64,9 @@ class SparkContext(object):
6264
_next_accum_id = 0
6365
_active_spark_context = None
6466
_lock = Lock()
65-
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
67+
_python_includes = None # whl, egg, zip and jar files that need to be added to PYTHONPATH
6668

67-
PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
69+
PACKAGE_EXTENSIONS = ('.whl', '.egg', '.zip', '.jar')
6870

6971
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
7072
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
@@ -77,9 +79,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
7779
(e.g. mesos://host:port, spark://host:port, local[4]).
7880
:param appName: A name for your job, to display on the cluster web UI.
7981
:param sparkHome: Location where Spark is installed on cluster nodes.
80-
:param pyFiles: Collection of .zip or .py files to send to the cluster
81-
and add to PYTHONPATH. These can be paths on the local file
82-
system or HDFS, HTTP, HTTPS, or FTP URLs.
82+
:param pyFiles: Collection of .py, .whl, .egg or .zip files to send
83+
to the cluster and add to PYTHONPATH. These can be paths on
84+
the local file system or HDFS, HTTP, HTTPS, or FTP URLs.
8385
:param environment: A dictionary of environment variables to set on
8486
worker nodes.
8587
:param batchSize: The number of Python objects represented as a single
@@ -178,18 +180,24 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
178180
sys.path.insert(1, root_dir)
179181

180182
# Deploy any code dependencies specified in the constructor
183+
# Wheel files will be installed by pip later.
181184
self._python_includes = list()
182-
for path in (pyFiles or []):
183-
self.addPyFile(path)
185+
if pyFiles:
186+
for path in pyFiles:
187+
self.addFile(path)
188+
self._include_python_packages(paths=pyFiles)
189+
else:
190+
pyFiles = []
184191

185192
# Deploy code dependencies set by spark-submit; these will already have been added
186-
# with SparkContext.addFile, so we just need to add them to the PYTHONPATH
187-
for path in self._conf.get("spark.submit.pyFiles", "").split(","):
188-
if path != "":
189-
(dirname, filename) = os.path.split(path)
190-
if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
191-
self._python_includes.append(filename)
192-
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
193+
# with SparkContext.addFile, so we just need to include them.
194+
# Wheel files will be installed by pip later.
195+
spark_submit_pyfiles = self._conf.get("spark.submit.pyFiles", "").split(",")
196+
if spark_submit_pyfiles:
197+
self._include_python_packages(paths=spark_submit_pyfiles)
198+
199+
# Install all wheel files at once.
200+
self._install_wheel_files(paths=pyFiles + spark_submit_pyfiles)
193201

194202
# Create a temporary directory inside spark.local.dir:
195203
local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
@@ -693,23 +701,71 @@ def clearFiles(self):
693701
Clear the job's list of files added by L{addFile} or L{addPyFile} so
694702
that they do not get downloaded to any new nodes.
695703
"""
696-
# TODO: remove added .py or .zip files from the PYTHONPATH?
704+
# TODO: remove added .py, .whl, .egg or .zip files from the PYTHONPATH?
697705
self._jsc.sc().clearFiles()
698706

699707
def addPyFile(self, path):
700708
"""
701-
Add a .py or .zip dependency for all tasks to be executed on this
702-
SparkContext in the future. The C{path} passed can be either a local
703-
file, a file in HDFS (or other Hadoop-supported filesystems), or an
704-
HTTP, HTTPS or FTP URI.
709+
Add a .py, .whl, .egg or .zip dependency for all tasks to be
710+
executed on this SparkContext in the future. The C{path} passed can
711+
be either a local file, a file in HDFS (or other Hadoop-supported
712+
filesystems), or an HTTP, HTTPS or FTP URI.
705713
"""
706714
self.addFile(path)
707-
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
715+
self._include_python_packages(paths=(path,))
716+
self._install_wheel_files(paths=(path,))
708717

709-
if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
710-
self._python_includes.append(filename)
711-
# for tests in local mode
712-
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
718+
def _include_python_packages(self, paths):
719+
"""
720+
Add Python package dependencies. Python packages (except for .whl) are
721+
added to PYTHONPATH.
722+
"""
723+
root_dir = SparkFiles.getRootDirectory()
724+
for path in paths:
725+
basename = os.path.basename(path)
726+
extname = os.path.splitext(basename)[1].lower()
727+
if extname in self.PACKAGE_EXTENSIONS \
728+
and basename not in self._python_includes:
729+
self._python_includes.append(basename)
730+
if extname != '.whl':
731+
# Prepend the python package (except for *.whl) to sys.path
732+
sys.path.insert(1, os.path.join(root_dir, basename))
733+
734+
def _install_wheel_files(
735+
self,
736+
paths,
737+
quiet=True,
738+
upgrade=True,
739+
no_deps=True,
740+
no_index=True,
741+
):
742+
"""
743+
Install .whl files at once by pip install.
744+
"""
745+
root_dir = SparkFiles.getRootDirectory()
746+
paths = {
747+
os.path.join(root_dir, os.path.basename(path))
748+
for path in paths
749+
if os.path.splitext(path)[1].lower() == '.whl'
750+
}
751+
if not paths:
752+
return
753+
754+
pip_args = [
755+
'--find-links', root_dir,
756+
'--target', os.path.join(root_dir, 'site-packages'),
757+
]
758+
if quiet:
759+
pip_args.append('--quiet')
760+
if upgrade:
761+
pip_args.append('--upgrade')
762+
if no_deps:
763+
pip_args.append('--no-deps')
764+
if no_index:
765+
pip_args.append('--no-index')
766+
pip_args.extend(paths)
767+
768+
pip_InstallCommand().main(args=pip_args)
713769

714770
def setCheckpointDir(self, dirName):
715771
"""

python/pyspark/tests.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,22 @@ def func():
367367
from userlib import UserClass
368368
self.assertEqual("Hello World from inside a package!", UserClass().hello())
369369

370+
def test_add_whl_file_locally(self):
371+
# To ensure that we're actually testing addPyFile's effects, check that
372+
# this fails due to `testpackage1` or `testpackage2` not being on the
373+
# Python path:
374+
def func():
375+
from testpackage2 import TestPackage1Class
376+
self.assertRaises(ImportError, func)
377+
paths = [
378+
os.path.join(SPARK_HOME, "python/test_support/testpackage1-0.0.1-py2.py3-none-any.whl"),
379+
os.path.join(SPARK_HOME, "python/test_support/testpackage2-0.0.1-py2.py3-none-any.whl"),
380+
]
381+
for path in paths:
382+
self.sc.addPyFile(path)
383+
from testpackage2 import TestPackage1Class
384+
self.assertEqual("Hello World from inside a package!", TestPackage1Class().hello())
385+
370386
def test_overwrite_system_module(self):
371387
self.sc.addPyFile(os.path.join(SPARK_HOME, "python/test_support/SimpleHTTPServer.py"))
372388

python/pyspark/worker.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
"""
1919
Worker that receives input from Piped RDD.
2020
"""
21+
import fcntl
2122
import os
2223
import sys
2324
import time
2425
import socket
2526
import traceback
2627

28+
from pip.commands.install import InstallCommand as pip_InstallCommand
29+
2730
from pyspark.accumulators import _accumulatorRegistry
2831
from pyspark.broadcast import Broadcast, _broadcastRegistry
2932
from pyspark.files import SparkFiles
@@ -66,12 +69,46 @@ def main(infile, outfile):
6669
SparkFiles._root_directory = spark_files_dir
6770
SparkFiles._is_running_on_worker = True
6871

69-
# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
7072
add_path(spark_files_dir) # *.py files that were added will be copied here
73+
74+
# fetch names of includes and construct PYTHONPATH if the file extension
75+
# is not '.whl'
7176
num_python_includes = read_int(infile)
77+
wheel_files = set()
7278
for _ in range(num_python_includes):
7379
filename = utf8_deserializer.loads(infile)
74-
add_path(os.path.join(spark_files_dir, filename))
80+
path = os.path.join(spark_files_dir, filename)
81+
if os.path.splitext(filename)[1].lower() == '.whl':
82+
wheel_files.add(path)
83+
else:
84+
add_path(path)
85+
86+
if wheel_files:
87+
# Install wheel files
88+
89+
local_site_packages_dir = os.path.join(
90+
spark_files_dir,
91+
'site-packages',
92+
)
93+
with open(os.path.join(
94+
spark_files_dir,
95+
'.pyspark_pip_install.lock'
96+
), 'w') as f:
97+
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
98+
try:
99+
if os.path.exists(local_site_packages_dir) is False:
100+
# '--no-deps' is not set.
101+
# All dependencies must be there.
102+
pip_InstallCommand().main(args=[
103+
'--quiet',
104+
'--upgrade',
105+
'--no-index',
106+
'--target', local_site_packages_dir,
107+
'--find-links', spark_files_dir,
108+
] + list(wheel_files))
109+
finally:
110+
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
111+
add_path(local_site_packages_dir)
75112

76113
# fetch names and values of broadcast variables
77114
num_broadcast_variables = read_int(infile)
2.32 KB
Binary file not shown.
2.09 KB
Binary file not shown.

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
9292
| --jar JAR_PATH Path to your application's JAR file
9393
| --class CLASS_NAME Name of your application's main class
9494
| --primary-py-file A main Python file
95-
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
95+
| --py-files PY_FILES Comma-separated list of .whl, .egg, .zip or .py files to
9696
| place on the PYTHONPATH for Python apps.
9797
| --args ARGS Arguments to be passed to your application's main class.
9898
| Multiple invocations are possible, each will be passed in order.

0 commit comments

Comments
 (0)