Skip to content

Commit 74691d1

Browse files
author
David Gingrich
committed
Improve combineByKey docs
* Add note on memory allocation * Change example code to use different mergeValue and mergeCombiners
1 parent a2d8d76 commit 74691d1

File tree

1 file changed

+19
-5
lines changed

1 file changed

+19
-5
lines changed

python/pyspark/rdd.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,17 +1804,31 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
18041804
a one-element list)
18051805
- C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
18061806
a list)
1807-
- C{mergeCombiners}, to combine two C's into a single one.
1807+
- C{mergeCombiners}, to combine two C's into a single one (e.g., merges
1808+
the lists)
1809+
1810+
To avoid memory allocation, both mergeValue and mergeCombiners are allowed to
1811+
modify and return their first argument instead of creating a new C.
18081812
18091813
In addition, users can control the partitioning of the output RDD.
18101814
18111815
.. note:: V and C can be different -- for example, one might group an RDD of type
18121816
(Int, Int) into an RDD of type (Int, List[Int]).
18131817
1814-
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1815-
>>> def add(a, b): return a + str(b)
1816-
>>> sorted(x.combineByKey(str, add, add).collect())
1817-
[('a', '11'), ('b', '1')]
1818+
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
1819+
>>> def to_list(a):
1820+
... return [a]
1821+
...
1822+
>>> def append(a, b):
1823+
... a.append(b)
1824+
... return a
1825+
...
1826+
>>> def extend(a, b):
1827+
... a.extend(b)
1828+
... return a
1829+
...
1830+
>>> sorted(x.combineByKey(to_list, append, extend).collect())
1831+
[('a', [1, 2]), ('b', [1])]
18181832
"""
18191833
if numPartitions is None:
18201834
numPartitions = self._defaultReducePartitions()

0 commit comments

Comments
 (0)