-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Cleanup dd.groupby metadata handling #1017
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dask/dataframe/core.py
Outdated
| def head(self, n=5, compute=True): | ||
| """ First n items of the Index. | ||
| Caveat, the only checks the first partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo. the -> this.
|
This looks like a positive improvement to me. @sinhrks are you still working on this? Should we try to merge it soon or wait? |
|
Still working, hopefully finish in this weekend. |
afb8d62 to
6983f20
Compare
dask/dataframe/tests/test_groupby.py
Outdated
|
|
||
| assert sorted(d.groupby('a').apply(func).dask) == \ | ||
| sorted(d.groupby('a').apply(func).dask) | ||
| # assert sorted(d.groupby('a').apply(func).dask) == \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, mostly done except for one point. I've changed groupby.apply to actually use shuffle to get correct result. However, the change makes the above test fail.
It is because shuffle (set_partition) has a barrier logic which always use new token. I don't know the background, but can it be the same key if input is the same?
dask/dask/dataframe/shuffle.py
Line 116 in d9b8a14
barrier_token = 'barrier-' + always_new_token
Previous logic
Previous logic doesn't actually use shuffled data, so the test has been passed (but lead to incorrect results).
df = shuffle(self.df, self.index, **self.kwargs)
return map_partitions(_groupby_apply,
columns or self.df.columns,
self.df, self.index, func)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that that makes sense. The order to a shuffle is non-deterministic. It is probably correct that this test fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Removed the test.
|
@sinhrks, what's the status of this? Is it ready to merge? |
|
@jcrist Yes, it is ready for review. Modified the title. |
dask/dataframe/groupby.py
Outdated
| Return an empty pd.DataFrameGroupBy / pd.SeriesGroupBy to emulate | ||
| calculation result. | ||
| """ | ||
| return self._pd_cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use the _pd attribute directly in all cases, and ditch the _pd_cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping @sinhrks. If there's a good reason for this design, then this looks good to me. I'd like to get this in soon before the next release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, _pd is enough. Originally needed for the logic which is not chosen.
Let me fix it today (7th) including DataFrame.
|
@jcrist updated, could you check? |
|
Looks good to me. Thanks, merging. |
* Make column projections stricter (dask#881) * Simplify again after lowering (dask#884) * Visual EXPLAIN (dask#885) * Fix merge predicate pushdowns with weird predicates (dask#888) * Handle futures that are put into map_partitions (dask#892) * Remove eager divisions from indexing (dask#891) * Add shuffle if objects are not aligned and partitions are unknown in assign (dask#887) Co-authored-by: Hendrik Makait <[email protected]> * Add support for dd.Aggregation (dask#893) * Fix random_split for series (dask#894) * Update dask version * Use Aggregation from dask/dask (dask#895) * Fix meta calculation error in groupby (dask#897) * Revert "Use Aggregation from dask/dask" (dask#898) * Parquet reader using Pyarrow FileSystem (dask#882) Co-authored-by: Patrick Hoefler <[email protected]> * Fix assign for empty indexer (dask#901) * Add dask.dataframe import at start (dask#903) * Add indicator support to merge (dask#902) * Fix detection of parquet filter pushdown (dask#910) * Speedup init of `ReadParquetPyarrowFS` (dask#909) * Don't rely on sets in are_co_aligned (dask#908) * Implement more efficient GroupBy.mean (dask#906) * Refactor GroupByReduction (dask#920) * Implement array inference in new_collection (dask#922) * Add support for convert string option (dask#912) * P2P shuffle drops partitioning column early (dask#899) * Avoid culling for SetIndexBlockwise with divisions (dask#925) * Re-run versioneer install to fix version number (tag_prefix) (dask#926) * Sort if split_out=1 in value_counts (dask#924) * Wrap fragments (dask#911) * Ensure that columns are copied in projection (dask#927) * Raise in map if pandas < 2.1 (dask#929) * Add _repr_html_ and updated __repr__ for FrameBase (dask#930) * Support token for map_partitions (dask#931) * Fix Copy-on-Write related bug in groupby.transform (dask#933) * Fix to_dask_dataframe test after switching to dask-expr by default (dask#935) * Use multi-column assign in groupby apply (dask#934) * Enable copy on write by default (dask#932) Co-authored-by: Patrick Hoefler <[email protected]> * Avoid fusing from_pandas ops to avoid duplicating data (dask#938) * Adjust automatic split_out parameter (dask#940) * Revert "Add _repr_html_ and updated __repr__ for FrameBase (dask#930)" (dask#941) * Remove repartition from P2P shuffle (dask#942) * [Parquet] Calculate divisions from statistics (dask#917) * Accept user arguments for arrow_to_pandas (dask#936) * Add _repr_html_ and prettier __repr__ w/o graph materialization (dask#943) * Add dask tokenize for fragment wrapper (dask#948) * Warn if annotations are ignored (dask#947) * Require `pyarrow>=7` (dask#949) * Implement string conversion for from_array (dask#950) * Add dtype and columns type check for shuffle (dask#951) * Concat arrow tables before converting to pandas (dask#928) * MINOR: Avoid confusion around shuffle method (dask#956) Co-authored-by: Patrick Hoefler <[email protected]> * Set pa cpu count (dask#954) Co-authored-by: Patrick Hoefler <[email protected]> * Update for pandas nighlies (dask#953) * Fix bug with split_out in groupby aggregate (dask#957) * Fix default observed value (dask#960) * Ensure that we respect shuffle in context manager (dask#958) Co-authored-by: Hendrik Makait <[email protected]> * Fix 'Empty' prefix to non-empty Series repr (dask#963) * Update README.md (dask#964) * Adjust split_out values to be consistent with other methods (dask#961) * bump version to 1.0 * Raise an error if the optimizer cannot terminate (dask#966) * Fix non-converging optimizer (dask#967) * Fixup filter pushdown through merges with ands and column reuse (dask#969) * Fix unique with shuffle and strings (dask#971) * Implement custom reductions (dask#970) * Fixup set_index with one partition but more divisions by user (dask#972) * Fixup predicate pushdown for query 19 (dask#973) Co-authored-by: Miles <[email protected]> * Revert enabling pandas cow (dask#974) * Update changelog for 1.0.2 * Fix set-index preserving divisions for presorted (dask#977) * Fixup reduction with split_every=False (dask#978) * Release for dask 2024.3.1 * Raise better error for repartition on divisions with unknown divisions (dask#980) * Fix concat of series objects with column projection (dask#981) * Fix some reset_index optimization issues (dask#982) * Remove keys() (dask#983) * Ensure wrapping an array when comparing to Series works if columns are empty (dask#984) * Version v1.0.4 * Visual ANALYZE (dask#889) Co-authored-by: fjetter <[email protected]> * Support ``prefix`` argument in ``from_delayed`` (dask#991) * Ensure drop matches column names exactly (dask#992) * Fix SettingWithCopyWarning in _merge.py (dask#990) * Update pyproject.toml (dask#994) * Allow passing of boolean index for column index in loc (dask#995) * Ensure that repr doesn't raise if an operand is a pandas object (dask#996) * Version v1.0.5 * Reduce coverage target a little bit (dask#999) * Nicer read_parquet prefix (dask#998) Co-authored-by: Patrick Hoefler <[email protected]> * Set divisions with divisions already known (dask#997) * Start building and publishing conda nightlies (dask#986) * Fix zero division error when reading index from parquet (dask#1000) * Rename overloaded `to/from_dask_dataframe` API (dask#987) * Register json and orc APIs for "pandas" dispatch (dask#1004) * Fix pyarrow fs reads for list of directories (dask#1006) * Release for dask 2024.4.0 * Fix meta caclulation in drop_duplicates (dask#1007) * Release 1.0.7 * Support named aggregations in groupby.aggregate (dask#1009) * Make release 1.0.9 * Adjust version number in changes * Make setattr work (dask#1011) * Release for dask 2024.4.1 * Fix head for npartitions=-1 and optimizer step (dask#1014) * Deprecate ``to/from_dask_dataframe`` API (dask#1001) * Fix projection for rename if projection isn't renamed (dask#1016) * Fix unique with numeric columns (dask#1017) * Add changes for new version * Fix column projections in merge when suffixes are relevant (dask#1019) * Simplify dtype casting logic for shuffle (dask#1012) * Use implicit knowledge about divisions for efficient grouping (dask#946) Co-authored-by: Patrick Hoefler <[email protected]> Co-authored-by: Hendrik Makait <[email protected]> * Fix assign after set index incorrect projections (dask#1020) * Fix read_parquet if directory is empty (dask#1023) * Rename uniuqe_partition_mapping property and add docs (dask#1022) * Add docs for usefule optimizer methods (dask#1025) * Fix doc build error (dask#1026) * Fix error in analyze for scalar (dask#1027) * Add nr of columns to explain output for projection (dask#1030) Co-authored-by: Hendrik Makait <[email protected]> * Fuse more aggressively if parquet files are tiny (dask#1029) * Move IO docstrings over (dask#1033) * Release for dask 2024.4.2 * Add cudf support to ``to_datetime`` and ``_maybe_from_pandas`` (dask#1035) * Fix backend dispatching for `read_csv` (dask#1028) * Fix loc accessing index for element wise op (dask#1037) * Fix loc slicing with Datetime Index (dask#1039) * Fix shuffle after set_index from 1 partition df (dask#1040) * Bugfix release * Fix bug in ``Series`` reductions (dask#1041) * Fix shape returning integer (dask#1043) * Fix xarray integration with scalar columns (dask#1046) * Fix None min/max statistics and missing statistics generally (dask#1045) * Fix drop with set (dask#1047) * Fix delayed in fusing with multipled dependencies (dask#1038) * Add bugfix release * Optimize when from-delayed is called (dask#1048) * Fix default name conversion in `ToFrame` (dask#1044) Co-authored-by: Patrick Hoefler <[email protected]> * Add support for ``DataFrame.melt`` (dask#1049) * Fixup failing test (dask#1052) * Generalize ``get_dummies`` (dask#1053) * reduce pickle size of parquet fragments (dask#1050) * Add a bunch of docs (dask#1051) Co-authored-by: Hendrik Makait <[email protected]> * Release for dask 2024.5.0 * Fix to_parquet in append mode (dask#1057) * Fix sort_values for unordered categories (dask#1058) * Fix dropna before merge (dask#1062) * Fix non-integer divisions in FusedIO (dask#1063) * Add cache argument to ``lower_once`` (dask#1059) * Use ensure_deterministic kwarg instead of config (dask#1064) * Fix isin with strings (dask#1067) * Fix isin for head computation (dask#1068) * Fix read_csv with positional usecols (dask#1069) * Release for dask 2024.5.1 * Use `is_categorical_dtype` dispatch for `sort_values` (dask#1070) * Fix meta for string accessors (dask#1071) * Fix projection to empty from_pandas (dask#1072) * Release for dask 2024.5.2 * Fix categorize if columns are dropped (dask#1074) * Fix resample divisions propagation (dask#1075) * Release for dask 2024.6.0 * Fix get_group for multiple keys (dask#1080) * Skip distributed tests (dask#1081) * Fix cumulative aggregations for empty partitions (dask#1082) * Move another test to distributed folder (dask#1085) * Release 1.1.4 * Release for dask 2024.6.2 * Add minimal subset of interchange protocol (dask#1087) * Add from_map docstring (dask#1088) * Ensure 1 task group per from_delayed (dask#1084) * Advise against using from_delayed (dask#1089) * Refactor shuffle method to handle invalid columns (dask#1091) * Fix freq behavior on ci (dask#1092) * Add first array draft (dask#1090) * Fix array import stuff (dask#1094) * Add asarray (dask#1095) * Implement arange (dask#1097) * Implement linspace (dask#1098) * Implement zeros and ones (dask#1099) * Remvoe pandas 2 checks (dask#1100) * Add unify-chunks draft to arrays (dask#1101) Co-authored-by: Patrick Hoefler <[email protected]> * Release for dask 2024.7.0 * Skip test if optional xarray cannot be imported (dask#1104) * Fix deepcopying FromPandas class (dask#1105) * Fix from_pandas with chunksize and empty df (dask#1106) * Link fix in readme (dask#1107) * Fix shuffle blowing up the task graph (dask#1108) Co-authored-by: Hendrik Makait <[email protected]> * Release for dask 2024.7.1 * Fix some things for pandas 3 (dask#1110) * Fixup remaining upstream failures (dask#1111) * Release for dask 2024.8.0 * Drop support for Python 3.9 (dask#1109) Co-authored-by: James Bourbeau <[email protected]> * Fix tuples as on argument in merge (dask#1117) * Fix merging when index name in meta missmatches actual name (dask#1119) Co-authored-by: Hendrik Makait <[email protected]> * Register `read_parquet` and `read_csv` as "dispatchable" (dask#1114) * Fix projection for Index class in read_parquet (dask#1120) * Fix result index of merge (dask#1121) * Introduce `ToBackend` expression (dask#1115) * Avoid calling ``array`` attribute on ``cudf.Series`` (dask#1122) * Make split_out for categorical default smarter (dask#1124) * Release for dask 2024.8.1 * Fix scalar detection of columns coming from sql (dask#1125) * Bump `pyarrow>=14.0.1` minimum versions (dask#1127) Co-authored-by: Patrick Hoefler <[email protected]> * Fix concat axis 1 bug in divisions (dask#1128) * Release for dask 2024.8.2 * Use task-based rechunking as default (dask#1131) * Improve performance of `DelayedsExpr` through caching (dask#1132) * Import from tokenize (dask#1133) * Release for dask 2024.9.0 * Add concatenate flag to .compute() (dask#1138) * Release for dask 2024.9.1 * Fix displaying timestamp scalar (dask#1141) * Fix alignment issue with groupby index accessors (dask#1142) * Improve handling of optional dependencies in `analyze` and `explain` (dask#1146) * Switch from mambaforge to miniforge in CI (dask#1147) * Fix merge_asof for single partition (dask#1145) * Raise exception when calculating divisons (dask#1149) * Fix binary operations with scalar on the left (dask#1150) * Explicitly list setuptools as a build dependency in conda recipe (dask#1151) * Version v1.1.16 * Fix ``Merge`` divisions after filtering partitions (dask#1152) * Fix meta calculation for to_datetime (dask#1153) * Internal cleanup of P2P code (dask#1154) * Migrate P2P shuffle and merge to TaskSpec (dask#1155) * Improve Aggregation docstring explicitly mentionning SeriesGroupBy (dask#1156) * Migrate shuffle and merge to `P2PBarrierTask` (dask#1157) * Migrate Blockwise to use taskspec (dask#1159) * Add support for Python 3.13 (dask#1160) * Release for dask 2024.11.0 * Fix fusion calling things multiple times (dask#1161) * Version 1.1.18 * Version 1.1.19 * Fix orphaned dependencies in Fused expression (dask#1163) * Use Taskspec fuse implementation (dask#1162) Co-authored-by: Patrick Hoefler <[email protected]> * Introduce more caching when walking the expression (dask#1165) * Avoid exponentially growing graph for Assign-Projection combinations (dask#1164) * Remove ``from_dask_dataframe`` (dask#1167) * Deprecated and remove from_legacy_dataframe usage (dask#1168) Co-authored-by: James Bourbeau <[email protected]> * Remove recursion in task spec (dask#1158) * Fix value_counts with split_out != 1 (dask#1170) * Release 2024.12.0 * Use new blockwise unpack collection in array (dask#1173) * Propagate group_keys in DataFrameGroupBy (dask#1174) * Fix assign optimization when overwriting columns (dask#1176) * Remove custom read-csv stuff (dask#1178) * Fixup install paths (dask#1179) * Version 1.1.21 * Remove legacy conversion functions (dask#1177) * Remove duplicated files * Move repository * Clean up docs and imports * Clean up docs and imports --------- Co-authored-by: Hendrik Makait <[email protected]> Co-authored-by: Florian Jetter <[email protected]> Co-authored-by: Miles <[email protected]> Co-authored-by: Joris Van den Bossche <[email protected]> Co-authored-by: Richard (Rick) Zamora <[email protected]> Co-authored-by: Charles Blackmon-Luca <[email protected]> Co-authored-by: James Bourbeau <[email protected]> Co-authored-by: alex-rakowski <[email protected]> Co-authored-by: Matthew Rocklin <[email protected]> Co-authored-by: Sandro <[email protected]> Co-authored-by: Ben <[email protected]> Co-authored-by: James Bourbeau <[email protected]> Co-authored-by: Guillaume Eynard-Bontemps <[email protected]> Co-authored-by: Tom Augspurger <[email protected]>
Follow-up for #961. I think
groupbylogics can be simplified further.