-
-
Notifications
You must be signed in to change notification settings - Fork 750
Remove the requirements to add comm to every handler #5820
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
distributed/scheduler.py
Outdated
| task_stream = self.get_task_stream(start=start) | ||
| total_tasks = len(task_stream) | ||
| timespent = defaultdict(int) | ||
| timespent: dict[int, float] = defaultdict(int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| timespent: dict[int, float] = defaultdict(int) | |
| timespent: "defaultdict[str, float]" = defaultdict(int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like we're abusing defaultdict while we should really use a Counter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I think my annotation is just wrong and it shouldn't be initialized to int but rather float, if we're 100% accurate
timespent: defaultdict[str, float] = defaultdict(float)
Does mypy recognize defaultdict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mypy recognizes everything in collections and collections.abc.
Note that you must have either from __future__ import annotations in your module or wrap it in quotes. In scheduler.py you don't have the former option due to Cython.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still, I think you should use a Counter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still, I think you should use a Counter?
Am I missing something obvious? How is this a counter? start, stop are floats / time.time(), we're not counting
timespent[x["action"]] += x["stop"] - x["start"]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Counter and defaultdict(int) are pretty much the same thing, they only differ in how you initialise them. Nevermind, it's really a philosophical difference.
Unit Test Results 12 files ±0 12 suites ±0 7h 13m 19s ⏱️ + 1m 48s For more details on these failures, see this check. Results for commit e3b1f0c. ± Comparison against base commit 43dfb61. ♻️ This comment has been updated with latest results. |
f3d555b to
69cb2e1
Compare
69cb2e1 to
44077d3
Compare
distributed/core.py
Outdated
|
|
||
| def _expects_comm(func): | ||
| sig = inspect.signature(func) | ||
| return "comm" in sig.parameters or "stream" in sig.parameters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is stream used? Is it for potential third party scheduler extensions? If so, I think there should be a deprecation cycle.
I could not find any internal use of stream, besides this one which you forgot to remove:
https://github.com/fjetter/distributed/blob/a16c7e713a787b52573c31664db98d2fb275dab7/distributed/publish.py#L50-L52
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we raise a deprecation warning here if we hit stream? I noticed there is another place and didn't clean it up since I wasn't sure about the actual usage.
Note: The change I am proposing here is technically already a breaking change to user plugins. If they are not using any of the two names, their plugin might break.
I'm wondering if this warrants a deprecation cycle as well.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the deprecation system you put in place is already plenty
distributed/core.py
Outdated
| LOG_PDB = dask.config.get("distributed.admin.pdb-on-err") | ||
|
|
||
|
|
||
| def _expects_comm(func): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def _expects_comm(func): | |
| def _expects_comm(func: Callable) -> bool: |
distributed/core.py
Outdated
| sig = inspect.signature(func) | ||
| if "comm" in sig.parameters or "stream" in sig.parameters: | ||
| params = list(sig.parameters) | ||
| if len(params) > 1 and params[1] in ["comm", "stream"]: | ||
| if "stream" in sig.parameters: | ||
| try: | ||
| fname = func.__name__ | ||
| except AttributeError: | ||
| # e.g. partials don't have __name__ We don't want to crash | ||
| # just because we're trying to raise a warning | ||
| fname = "unknown" | ||
| warnings.warn( | ||
| f"Calling the first arugment of a RPC handler `stream` is deprecated. Defining this argument is optional. Either remove the arugment or rename it to `comm`. Instead got {fname}{sig}", | ||
| FutureWarning, | ||
| ) | ||
| return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| sig = inspect.signature(func) | |
| if "comm" in sig.parameters or "stream" in sig.parameters: | |
| params = list(sig.parameters) | |
| if len(params) > 1 and params[1] in ["comm", "stream"]: | |
| if "stream" in sig.parameters: | |
| try: | |
| fname = func.__name__ | |
| except AttributeError: | |
| # e.g. partials don't have __name__ We don't want to crash | |
| # just because we're trying to raise a warning | |
| fname = "unknown" | |
| warnings.warn( | |
| f"Calling the first arugment of a RPC handler `stream` is deprecated. Defining this argument is optional. Either remove the arugment or rename it to `comm`. Instead got {fname}{sig}", | |
| FutureWarning, | |
| ) | |
| return True | |
| params = list(inspect.signature(func).parameters) | |
| if params[:1] == ["comm"]: | |
| return True | |
| if params[:1] == ["stream"]: | |
| warnings.warn( | |
| f"Calling the first arugment of a RPC handler `stream` is deprecated. Defining this argument is optional. Either remove the argument or rename it to `comm` in {func}", | |
| FutureWarning, | |
| ) | |
| return True |
distributed/tests/test_core.py
Outdated
| def comm_kwarg_only(self, *, comm, other): | ||
| ... | ||
|
|
||
| def comm_not_leading_position(self, *, other, comm): | ||
| ... | ||
|
|
||
| def stream_not_leading_position(self, *, other, stream): | ||
| ... | ||
|
|
||
| stream_not_leading_partialmethod = partialmethod( | ||
| stream_not_leading_position, other="foo" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def comm_kwarg_only(self, *, comm, other): | |
| ... | |
| def comm_not_leading_position(self, *, other, comm): | |
| ... | |
| def stream_not_leading_position(self, *, other, stream): | |
| ... | |
| stream_not_leading_partialmethod = partialmethod( | |
| stream_not_leading_position, other="foo" | |
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think the "not leading position" thing is important since I don't want to block any names from signatures. If a user wants to define a stream kwarg or a comm kwarg without using our comm mechanism that's fine imo.
I generally dislike this API quite a lot. I don't think we should silently expect the first argument to be positional-only comm.
e49775e to
537750c
Compare
537750c to
e3b1f0c
Compare
|
I'm quite open to moving on to a more stable interface overall. I figured this may be a nice transition to something that is cleaner. |
|
I'm happy to support comm as a keyword argument; |
We'd still need to go through a deprecation cycle for the stream argument. I think we should do this in another release |
Most of our handlers are specifying
commas its first argument because thehandle_comminterface currently requires so. IMO this does not only create very poor function signatures but also is potentially confusing to new developers. Dropping this requirement is pretty easy by simply checking for the argument names.Of course, this check for the name of the argument so there is no implicit comm passing possible anymore but I'd be fine with that.
I simplified a few signatures already but would leave the bulk work to follow up PRs