Skip to content

Commit f49b9a2

Browse files
committed
[SPARK-16931][PYTHON] PySpark APIS for bucketBy and sortBy
1 parent 929cb8b commit f49b9a2

File tree

1 file changed

+40
-0
lines changed

1 file changed

+40
-0
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,46 @@ def partitionBy(self, *cols):
501501
self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
502502
return self
503503

504+
@since(2.1)
505+
def bucketBy(self, numBuckets, *cols):
506+
"""Buckets the output by the given columns on the file system.
507+
508+
:param numBuckets: the number of buckets to save
509+
:param cols: name of columns
510+
511+
>>> (df.write.format('parquet')
512+
... .bucketBy(100, 'year', 'month')
513+
... .saveAsTable(os.path.join(tempfile.mkdtemp(), 'bucketed_table')))
514+
"""
515+
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
516+
cols = cols[0]
517+
518+
col = cols[0]
519+
cols = cols[1:]
520+
521+
self._jwrite = self._jwrite.bucketBy(numBuckets, col, _to_seq(self._spark._sc, cols))
522+
return self
523+
524+
@since(2.1)
525+
def sortBy(self, *cols):
526+
"""Sorts the output in each bucket by the given columns on the file system.
527+
528+
:param cols: name of columns
529+
530+
>>> (df.write.format('parquet')
531+
... .bucketBy(100, 'year', 'month')
532+
... .sortBy('day')
533+
... .saveAsTable(os.path.join(tempfile.mkdtemp(), 'sorted_bucketed_table')))
534+
"""
535+
if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
536+
cols = cols[0]
537+
538+
col = cols[0]
539+
cols = cols[1:]
540+
541+
self._jwrite = self._jwrite.sortBy(col, _to_seq(self._spark._sc, cols))
542+
return self
543+
504544
@since(1.4)
505545
def save(self, path=None, format=None, mode=None, partitionBy=None, **options):
506546
"""Saves the contents of the :class:`DataFrame` to a data source.

0 commit comments

Comments
 (0)