Skip to content

Commit bdf67e8

Browse files
authored
Remove level and errors arguments for Pandas 2 compatibility. (#28375)
1 parent 3d6a490 commit bdf67e8

2 files changed

Lines changed: 34 additions & 27 deletions

File tree

sdks/python/apache_beam/dataframe/frames.py

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -914,8 +914,10 @@ def sort_index(self, axis, **kwargs):
914914
))
915915

916916
@frame_base.with_docs_from(pd.DataFrame)
917-
@frame_base.args_to_kwargs(pd.DataFrame)
918-
@frame_base.populate_defaults(pd.DataFrame)
917+
@frame_base.args_to_kwargs(
918+
pd.DataFrame, removed_args=["errors"] if PD_VERSION >= (2, 0) else None)
919+
@frame_base.populate_defaults(
920+
pd.DataFrame, removed_args=["errors"] if PD_VERSION >= (2, 0) else None)
919921
@frame_base.maybe_inplace
920922
def where(self, cond, other, errors, **kwargs):
921923
"""where is not parallelizable when ``errors="ignore"`` is specified."""
@@ -937,16 +939,19 @@ def where(self, cond, other, errors, **kwargs):
937939
else:
938940
actual_args['other'] = other
939941

940-
if errors == "ignore":
941-
# We need all data in order to ignore errors and propagate the original
942-
# data.
943-
requires = partitionings.Singleton(
944-
reason=(
945-
f"where(errors={errors!r}) is currently not parallelizable, "
946-
"because all data must be collected on one node to determine if "
947-
"the original data should be propagated instead."))
942+
# For Pandas 2.0, errors was removed as an argument.
943+
if PD_VERSION < (2, 0):
944+
if "errors" in kwargs and kwargs['errors'] == "ignore":
945+
# We need all data in order to ignore errors and propagate the original
946+
# data.
947+
requires = partitionings.Singleton(
948+
reason=(
949+
f"where(errors={kwargs['errors']!r}) is currently not "
950+
"parallelizable, because all data must be collected on one "
951+
"node to determine if the original data should be propagated "
952+
"instead."))
948953

949-
actual_args['errors'] = errors
954+
actual_args['errors'] = kwargs['errors'] if 'errors' in kwargs else None
950955

951956
def where_execution(df, *args):
952957
runtime_values = {
@@ -1605,14 +1610,11 @@ def mean(self, skipna, **kwargs):
16051610
return self.sum(skipna=skipna, **kwargs) / size
16061611

16071612
@frame_base.with_docs_from(pd.Series)
1608-
@frame_base.args_to_kwargs(pd.Series)
1609-
@frame_base.populate_defaults(pd.Series)
1613+
@frame_base.args_to_kwargs(
1614+
pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None)
1615+
@frame_base.populate_defaults(
1616+
pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None)
16101617
def var(self, axis, skipna, level, ddof, **kwargs):
1611-
"""Per-level aggregation is not yet supported
1612-
(https://github.com/apache/beam/issues/21829). Only the default,
1613-
``level=None``, is allowed."""
1614-
if level is not None:
1615-
raise NotImplementedError("per-level aggregation")
16161618
if skipna is None or skipna:
16171619
self = self.dropna() # pylint: disable=self-cls-assignment
16181620

@@ -1680,11 +1682,11 @@ def corr(self, other, method, min_periods):
16801682
requires_partition_by=partitionings.Singleton(reason=reason)))
16811683

16821684
@frame_base.with_docs_from(pd.Series)
1683-
@frame_base.args_to_kwargs(pd.Series)
1684-
@frame_base.populate_defaults(pd.Series)
1685+
@frame_base.args_to_kwargs(
1686+
pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None)
1687+
@frame_base.populate_defaults(
1688+
pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None)
16851689
def skew(self, axis, skipna, level, numeric_only, **kwargs):
1686-
if level is not None:
1687-
raise NotImplementedError("per-level aggregation")
16881690
if skipna is None or skipna:
16891691
self = self.dropna() # pylint: disable=self-cls-assignment
16901692
# See the online, numerically stable formulae at
@@ -1744,11 +1746,11 @@ def combine_moments(data):
17441746
requires_partition_by=partitionings.Singleton()))
17451747

17461748
@frame_base.with_docs_from(pd.Series)
1747-
@frame_base.args_to_kwargs(pd.Series)
1748-
@frame_base.populate_defaults(pd.Series)
1749+
@frame_base.args_to_kwargs(
1750+
pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None)
1751+
@frame_base.populate_defaults(
1752+
pd.Series, removed_args=["level"] if PD_VERSION >= (2, 0) else None)
17491753
def kurtosis(self, axis, skipna, level, numeric_only, **kwargs):
1750-
if level is not None:
1751-
raise NotImplementedError("per-level aggregation")
17521754
if skipna is None or skipna:
17531755
self = self.dropna() # pylint: disable=self-cls-assignment
17541756

@@ -2578,7 +2580,8 @@ def align(self, other, join, axis, copy, level, method, **kwargs):
25782580
if kwargs:
25792581
raise NotImplementedError('align(%s)' % ', '.join(kwargs.keys()))
25802582

2581-
if level is not None:
2583+
# In Pandas 2.0, all aggregations lost the level keyword.
2584+
if PD_VERSION < (2, 0) and level is not None:
25822585
# Could probably get by partitioning on the used levels.
25832586
requires_partition_by = partitionings.Singleton(reason=(
25842587
f"align(level={level}) is not currently parallelizable. Only "
@@ -5393,6 +5396,7 @@ def func(df, *args, **kwargs):
53935396
name,
53945397
frame_base._elementwise_method(name, restrictions={'level': None},
53955398
base=pd.Series))
5399+
53965400
if hasattr(pd.DataFrame, name):
53975401
setattr(
53985402
DeferredDataFrame,

sdks/python/apache_beam/dataframe/frames_test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2044,6 +2044,7 @@ def test_dataframe_agg_modes(self):
20442044
self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df)
20452045
self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df)
20462046

2047+
@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
20472048
def test_series_agg_level(self):
20482049
self._run_test(
20492050
lambda df: df.set_index(['group', 'foo']).bar.count(level=0),
@@ -2067,6 +2068,7 @@ def test_series_agg_level(self):
20672068
lambda df: df.set_index(['group', 'foo']).bar.median(level=1),
20682069
GROUPBY_DF)
20692070

2071+
@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
20702072
def test_dataframe_agg_level(self):
20712073
self._run_test(
20722074
lambda df: df.set_index(['group', 'foo']).count(level=0), GROUPBY_DF)
@@ -2234,6 +2236,7 @@ def test_df_agg_method_invalid_kwarg_raises(self):
22342236
self._run_error_test(
22352237
lambda df: df.median(min_count=3, numeric_only=True), GROUPBY_DF)
22362238

2239+
@unittest.skipIf(PD_VERSION >= (2, 0), "level argument removed in Pandas 2")
22372240
def test_agg_min_count(self):
22382241
df = pd.DataFrame({
22392242
'good': [1, 2, 3, np.nan],

0 commit comments

Comments
 (0)