[DF] Split HeadNode class in different head nodes according to data source#8485
Conversation
|
Starting build on |
HeadNode class in different head nodes according to data source
|
This pull request fixes 1 alert when merging d309b4e into 645d3d0 - view on LGTM.com fixed alerts:
|
eguiraud
left a comment
There was a problem hiding this comment.
Thank you for the rework, this looks much nicer! Some last comments below.
| # Restrict 'npartitions' if it's greater than 'nentries' | ||
| msg = ("Number of partitions {0} is greater than number of entries {1} " | ||
| "in the dataframe. Using {1} partition(s)".format(self.npartitions, self.nentries)) | ||
| warnings.warn(msg, UserWarning, stacklevel=2) |
There was a problem hiding this comment.
here, because of the self.npartitions = 2 above, we warn in case of nentries == 1 but I think that is a valid (toy/test) usecase
There was a problem hiding this comment.
(you mentioned here that distRDF is also not ok with nentries == 0: that case should probably also just work (just run locally and produce a bunch of empty histograms, maybe with a warning) but i'm also ok with leaving that behavior as it is. on the other hand, warning on 1 entry but being ok with 3 is weird)
There was a problem hiding this comment.
This depends on whether we can always guarantee that backends will be able to process the case of npartitions == 1. I think it can be done in general, not sure that is for this PR
|
Starting build on |
|
This pull request fixes 1 alert when merging 4d212ec into e1bdaf6 - view on LGTM.com fixed alerts:
|
|
Starting build on |
|
Build failed on ROOT-performance-centos8-multicore/default. Failing tests:
|
|
This pull request fixes 1 alert when merging 6eab070 into d026b49 - view on LGTM.com fixed alerts:
|
|
Starting build on |
1 similar comment
|
Starting build on |
|
This pull request fixes 1 alert when merging d84218f into d026b49 - view on LGTM.com fixed alerts:
|
|
Build failed on ROOT-performance-centos8-multicore/default. Failing tests:
|
| # 2. An educated guess according to the backend, using the backend's | ||
| # `optimize_npartitions` function | ||
| # 3. Set `npartitions` to 2 | ||
| npartitions = kwargs.pop("npartitions", self.optimize_npartitions(Base.BaseBackend.MIN_NPARTITIONS)) |
There was a problem hiding this comment.
no need to pass MIN_NPARTITIONS as an argument, it's a data member
There was a problem hiding this comment.
Right, initially it was meant to be a generic function "optimize the partitions if possible, else give me back the argument I pass you". It's probably changed logic now
There was a problem hiding this comment.
So what should I do here? Change the signature of optimize_npartitions to (self) rather than (self, npartitions) ? Or maybe leave the input parameter but make it optional as (self, npartitions=None)?
There was a problem hiding this comment.
(this function might still change / be removed in the future whenever we decide on some coherent structure to "optimize" the number of partitions of the distributed dataframe)
There was a problem hiding this comment.
i don't see why this shouldn't be def optimize_npartitions(self) (might even change it to a property called npartitions). passing a data member as the only argument of a method looks "wrong".
|
Starting build on |
|
Build failed on ROOT-performance-centos8-multicore/default. Failing tests: |
|
This pull request introduces 1 alert and fixes 1 when merging ad75fbf into b7d0cb1 - view on LGTM.com new alerts:
fixed alerts:
|
|
LGTM gracefully telling me I shouldn't override methods with different signatures, will fix it right away 😄 |
|
Starting build on |
|
Build failed on ROOT-performance-centos8-multicore/default. Failing tests: |
|
This pull request fixes 1 alert when merging de65bd0 into b3b9aa2 - view on LGTM.com fixed alerts:
|
In distributed RDataFrame, the head node of the RDF computation graph also stores information about the user provided arguments to the RDF constructor, i.e. information about the source of data. The current implementation has only one HeadNode class responsible for distinguishing between the two supported data sources, i.e. empty RDF with sequential entries and TTree/TChain based RDF. This commit introduces instead multiple head node types, one per data source. In particular, there is now a `EmptySourceHeadNode` and a `TreeHeadNode`. To decide which is the correct type of head node for a particular distributed dataframe, this commit introduces a factory function `get_headnode` that parses the user provided arguments to the RDataFrame constructor and returns the correct head node instance. It would follow that each data source might have different information to send to the distributed resources. This is the case, and it will be addressed in the future with the introduction of a different Range object per each head node type These change brought a few more changes along: * In the `get_headnode` factory function there is an early check for the passed arguments to the RDF constructor. Improve the check by catching the `TypeError` that comes from the C++ overload failing to instantiate an RDF with wrong constructor arguments, by raising a new `TypeError` with a more user friendly message. * The logic of the distributed execution present in the Base backend and in the Spark backend need to be changed in order to accomodate the different head node types. * The presence of different head node types collides with the current implementation of the `RangesBuilder` class that expected a single type of headnode to retrieve attributes from. Workaround this by using `try except` clauses in the properties of `RangesBuilder`. It is an ugly workaround that will be superseded very soon when also the creation of the ranges will be separated according to the data source.
The number of partitions in which the distributed RDataFrame should be split is needed in the logic for creating the ranges from the head node, and might also be modified after some checks. Since it should stay a data member of the head node classes, it should also be defined at __init__ time. This now happens in the call of `make_dataframe` of any distributed backend, that will also pass the `npartitions` argument to the `get_headnode` factory function. The `npartitions` parameter is now a required, positional argument in all the signatures that expect it.
This method does not really need an extra positional parameter, just to return it if it could not "optimize" it. Switch to returning the default `MIN_NPARTITIONS` data member if nothing better could be done.
de65bd0 to
6110909
Compare
|
Starting build on |
|
This pull request fixes 1 alert when merging 6110909 into b3b9aa2 - view on LGTM.com fixed alerts:
|
|
Build failed on ROOT-performance-centos8-multicore/default. Failing tests: |
This PR addresses step 1 of #8391
It also addresses the comments from the original PR regarding the files touched in this PR, except for the following:
TreeHeadNodeand doing the parsing of constructor arguments only once in__init__.will be addressed both in another PR. Probably the properties can be removed or changed in favor of having the attributes already set during__init__thanks to the mentioned single-pass parsing.glob.globbehaves similarly to TChain's globbing is left for another time