Skip to content

Commit 02bbe73

Browse files
zero323rxin
authored andcommitted
[SPARK-20584][PYSPARK][SQL] Python generic hint support
## What changes were proposed in this pull request? Adds `hint` method to PySpark `DataFrame`. ## How was this patch tested? Unit tests, doctests. Author: zero323 <[email protected]> Closes #17850 from zero323/SPARK-20584.
1 parent 13eb37c commit 02bbe73

File tree

2 files changed

+45
-0
lines changed

2 files changed

+45
-0
lines changed

python/pyspark/sql/dataframe.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,35 @@ def withWatermark(self, eventTime, delayThreshold):
380380
jdf = self._jdf.withWatermark(eventTime, delayThreshold)
381381
return DataFrame(jdf, self.sql_ctx)
382382

383+
@since(2.2)
384+
def hint(self, name, *parameters):
385+
"""Specifies some hint on the current DataFrame.
386+
387+
:param name: A name of the hint.
388+
:param parameters: Optional parameters.
389+
:return: :class:`DataFrame`
390+
391+
>>> df.join(df2.hint("broadcast"), "name").show()
392+
+----+---+------+
393+
|name|age|height|
394+
+----+---+------+
395+
| Bob| 5| 85|
396+
+----+---+------+
397+
"""
398+
if len(parameters) == 1 and isinstance(parameters[0], list):
399+
parameters = parameters[0]
400+
401+
if not isinstance(name, str):
402+
raise TypeError("name should be provided as str, got {0}".format(type(name)))
403+
404+
for p in parameters:
405+
if not isinstance(p, str):
406+
raise TypeError(
407+
"all parameters should be str, got {0} of type {1}".format(p, type(p)))
408+
409+
jdf = self._jdf.hint(name, self._jseq(parameters))
410+
return DataFrame(jdf, self.sql_ctx)
411+
383412
@since(1.3)
384413
def count(self):
385414
"""Returns the number of rows in this :class:`DataFrame`.

python/pyspark/sql/tests.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1906,6 +1906,22 @@ def test_functions_broadcast(self):
19061906
# planner should not crash without a join
19071907
broadcast(df1)._jdf.queryExecution().executedPlan()
19081908

1909+
def test_generic_hints(self):
1910+
from pyspark.sql import DataFrame
1911+
1912+
df1 = self.spark.range(10e10).toDF("id")
1913+
df2 = self.spark.range(10e10).toDF("id")
1914+
1915+
self.assertIsInstance(df1.hint("broadcast"), DataFrame)
1916+
self.assertIsInstance(df1.hint("broadcast", []), DataFrame)
1917+
1918+
# Dummy rules
1919+
self.assertIsInstance(df1.hint("broadcast", "foo", "bar"), DataFrame)
1920+
self.assertIsInstance(df1.hint("broadcast", ["foo", "bar"]), DataFrame)
1921+
1922+
plan = df1.join(df2.hint("broadcast"), "id")._jdf.queryExecution().executedPlan()
1923+
self.assertEqual(1, plan.toString().count("BroadcastHashJoin"))
1924+
19091925
def test_toDF_with_schema_string(self):
19101926
data = [Row(key=i, value=str(i)) for i in range(100)]
19111927
rdd = self.sc.parallelize(data, 5)

0 commit comments

Comments
 (0)