@@ -914,8 +914,10 @@ def sort_index(self, axis, **kwargs):
914914 ))
915915
916916 @frame_base .with_docs_from (pd .DataFrame )
917- @frame_base .args_to_kwargs (pd .DataFrame )
918- @frame_base .populate_defaults (pd .DataFrame )
917+ @frame_base .args_to_kwargs (
918+ pd .DataFrame , removed_args = ["errors" ] if PD_VERSION >= (2 , 0 ) else None )
919+ @frame_base .populate_defaults (
920+ pd .DataFrame , removed_args = ["errors" ] if PD_VERSION >= (2 , 0 ) else None )
919921 @frame_base .maybe_inplace
920922 def where (self , cond , other , errors , ** kwargs ):
921923 """where is not parallelizable when ``errors="ignore"`` is specified."""
@@ -937,16 +939,19 @@ def where(self, cond, other, errors, **kwargs):
937939 else :
938940 actual_args ['other' ] = other
939941
940- if errors == "ignore" :
941- # We need all data in order to ignore errors and propagate the original
942- # data.
943- requires = partitionings .Singleton (
944- reason = (
945- f"where(errors={ errors !r} ) is currently not parallelizable, "
946- "because all data must be collected on one node to determine if "
947- "the original data should be propagated instead." ))
942+ # For Pandas 2.0, errors was removed as an argument.
943+ if PD_VERSION < (2 , 0 ):
944+ if "errors" in kwargs and kwargs ['errors' ] == "ignore" :
945+ # We need all data in order to ignore errors and propagate the original
946+ # data.
947+ requires = partitionings .Singleton (
948+ reason = (
949+ f"where(errors={ kwargs ['errors' ]!r} ) is currently not "
950+ "parallelizable, because all data must be collected on one "
951+ "node to determine if the original data should be propagated "
952+ "instead." ))
948953
949- actual_args ['errors' ] = errors
954+ actual_args ['errors' ] = kwargs [ ' errors' ] if 'errors' in kwargs else None
950955
951956 def where_execution (df , * args ):
952957 runtime_values = {
@@ -1605,14 +1610,11 @@ def mean(self, skipna, **kwargs):
16051610 return self .sum (skipna = skipna , ** kwargs ) / size
16061611
16071612 @frame_base .with_docs_from (pd .Series )
1608- @frame_base .args_to_kwargs (pd .Series )
1609- @frame_base .populate_defaults (pd .Series )
1613+ @frame_base .args_to_kwargs (
1614+ pd .Series , removed_args = ["level" ] if PD_VERSION >= (2 , 0 ) else None )
1615+ @frame_base .populate_defaults (
1616+ pd .Series , removed_args = ["level" ] if PD_VERSION >= (2 , 0 ) else None )
16101617 def var (self , axis , skipna , level , ddof , ** kwargs ):
1611- """Per-level aggregation is not yet supported
1612- (https://github.com/apache/beam/issues/21829). Only the default,
1613- ``level=None``, is allowed."""
1614- if level is not None :
1615- raise NotImplementedError ("per-level aggregation" )
16161618 if skipna is None or skipna :
16171619 self = self .dropna () # pylint: disable=self-cls-assignment
16181620
@@ -1680,11 +1682,11 @@ def corr(self, other, method, min_periods):
16801682 requires_partition_by = partitionings .Singleton (reason = reason )))
16811683
16821684 @frame_base .with_docs_from (pd .Series )
1683- @frame_base .args_to_kwargs (pd .Series )
1684- @frame_base .populate_defaults (pd .Series )
1685+ @frame_base .args_to_kwargs (
1686+ pd .Series , removed_args = ["level" ] if PD_VERSION >= (2 , 0 ) else None )
1687+ @frame_base .populate_defaults (
1688+ pd .Series , removed_args = ["level" ] if PD_VERSION >= (2 , 0 ) else None )
16851689 def skew (self , axis , skipna , level , numeric_only , ** kwargs ):
1686- if level is not None :
1687- raise NotImplementedError ("per-level aggregation" )
16881690 if skipna is None or skipna :
16891691 self = self .dropna () # pylint: disable=self-cls-assignment
16901692 # See the online, numerically stable formulae at
@@ -1744,11 +1746,11 @@ def combine_moments(data):
17441746 requires_partition_by = partitionings .Singleton ()))
17451747
17461748 @frame_base .with_docs_from (pd .Series )
1747- @frame_base .args_to_kwargs (pd .Series )
1748- @frame_base .populate_defaults (pd .Series )
1749+ @frame_base .args_to_kwargs (
1750+ pd .Series , removed_args = ["level" ] if PD_VERSION >= (2 , 0 ) else None )
1751+ @frame_base .populate_defaults (
1752+ pd .Series , removed_args = ["level" ] if PD_VERSION >= (2 , 0 ) else None )
17491753 def kurtosis (self , axis , skipna , level , numeric_only , ** kwargs ):
1750- if level is not None :
1751- raise NotImplementedError ("per-level aggregation" )
17521754 if skipna is None or skipna :
17531755 self = self .dropna () # pylint: disable=self-cls-assignment
17541756
@@ -2578,7 +2580,8 @@ def align(self, other, join, axis, copy, level, method, **kwargs):
25782580 if kwargs :
25792581 raise NotImplementedError ('align(%s)' % ', ' .join (kwargs .keys ()))
25802582
2581- if level is not None :
2583+ # In Pandas 2.0, all aggregations lost the level keyword.
2584+ if PD_VERSION < (2 , 0 ) and level is not None :
25822585 # Could probably get by partitioning on the used levels.
25832586 requires_partition_by = partitionings .Singleton (reason = (
25842587 f"align(level={ level } ) is not currently parallelizable. Only "
@@ -5393,6 +5396,7 @@ def func(df, *args, **kwargs):
53935396 name ,
53945397 frame_base ._elementwise_method (name , restrictions = {'level' : None },
53955398 base = pd .Series ))
5399+
53965400 if hasattr (pd .DataFrame , name ):
53975401 setattr (
53985402 DeferredDataFrame ,
0 commit comments