Skip to content

Commit 6110909

Browse files
committed
[DF] Change signature of optimize_npartitions in backends
This method does not really need an extra positional parameter, just to return it if it could not "optimize" it. Switch to returning the default `MIN_NPARTITIONS` data member if nothing better could be done.
1 parent a9f8bac commit 6110909

File tree

3 files changed

+8
-8
lines changed

3 files changed

+8
-8
lines changed

bindings/experimental/distrdf/python/DistRDF/Backends/Base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,12 +305,12 @@ def distribute_unique_paths(self, paths):
305305
"""
306306
pass
307307

308-
def optimize_npartitions(self, npartitions):
308+
def optimize_npartitions(self):
309309
"""
310310
Distributed backends may optimize the number of partitions of the
311311
current dataset or leave it as it is.
312312
"""
313-
return npartitions
313+
return self.MIN_NPARTITIONS
314314

315315
def distribute_files(self, files_paths):
316316
"""

bindings/experimental/distrdf/python/DistRDF/Backends/Spark/Backend.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,16 @@ def __init__(self, sparkcontext=None):
6464
else:
6565
self.sc = pyspark.SparkContext.getOrCreate()
6666

67-
def optimize_npartitions(self, npartitions):
67+
def optimize_npartitions(self):
6868
numex = self.sc.getConf().get("spark.executor.instances")
6969
numcoresperex = self.sc.getConf().get("spark.executor.cores")
7070

71-
if numex:
72-
if numcoresperex:
71+
if numex is not None:
72+
if numcoresperex is not None:
7373
return int(numex) * int(numcoresperex)
7474
return int(numex)
7575
else:
76-
return npartitions
76+
return self.MIN_NPARTITIONS
7777

7878
def ProcessAndMerge(self, ranges, mapper, reducer):
7979
"""
@@ -158,6 +158,6 @@ def make_dataframe(self, *args, **kwargs):
158158
# 2. An educated guess according to the backend, using the backend's
159159
# `optimize_npartitions` function
160160
# 3. Set `npartitions` to 2
161-
npartitions = kwargs.pop("npartitions", self.optimize_npartitions(Base.BaseBackend.MIN_NPARTITIONS))
161+
npartitions = kwargs.pop("npartitions", self.optimize_npartitions())
162162
headnode = HeadNode.get_headnode(npartitions, *args)
163163
return DataFrame.RDataFrame(headnode, self)

bindings/experimental/distrdf/test/backend/test_spark.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def test_optimize_npartitions_with_num_executors(self):
7070
sc = pyspark.SparkContext(conf=sconf)
7171
backend = Backend.SparkBackend(sparkcontext=sc)
7272

73-
self.assertEqual(backend.optimize_npartitions(1), 10)
73+
self.assertEqual(backend.optimize_npartitions(), 10)
7474

7575

7676
class OperationSupportTest(unittest.TestCase):

0 commit comments

Comments
 (0)