Skip to content

Conversation

@fjetter
Copy link
Member

@fjetter fjetter commented Feb 16, 2022

Most of our handlers are specifying comm as its first argument because the handle_comm interface currently requires so. IMO this does not only create very poor function signatures but also is potentially confusing to new developers. Dropping this requirement is pretty easy by simply checking for the argument names.

Of course, this check for the name of the argument so there is no implicit comm passing possible anymore but I'd be fine with that.

I simplified a few signatures already but would leave the bulk work to follow up PRs

@fjetter fjetter requested a review from crusaderky February 16, 2022 14:00
task_stream = self.get_task_stream(start=start)
total_tasks = len(task_stream)
timespent = defaultdict(int)
timespent: dict[int, float] = defaultdict(int)
Copy link
Collaborator

@crusaderky crusaderky Feb 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
timespent: dict[int, float] = defaultdict(int)
timespent: "defaultdict[str, float]" = defaultdict(int)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we're abusing defaultdict while we should really use a Counter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I think my annotation is just wrong and it shouldn't be initialized to int but rather float, if we're 100% accurate

timespent: defaultdict[str, float] = defaultdict(float)

Does mypy recognize defaultdict?

Copy link
Collaborator

@crusaderky crusaderky Feb 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mypy recognizes everything in collections and collections.abc.
Note that you must have either from __future__ import annotations in your module or wrap it in quotes. In scheduler.py you don't have the former option due to Cython.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still, I think you should use a Counter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still, I think you should use a Counter?

Am I missing something obvious? How is this a counter? start, stop are floats / time.time(), we're not counting
timespent[x["action"]] += x["stop"] - x["start"]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counter and defaultdict(int) are pretty much the same thing, they only differ in how you initialise them. Nevermind, it's really a philosophical difference.

@github-actions
Copy link
Contributor

github-actions bot commented Feb 17, 2022

Unit Test Results

       12 files  ±0         12 suites  ±0   7h 13m 19s ⏱️ + 1m 48s
  2 608 tests +1    2 528 ✔️ +  2       79 💤 ±  0  1 ±0 
15 572 runs  +6  14 503 ✔️ +38  1 068 💤  - 30  1  - 1 

For more details on these failures, see this check.

Results for commit e3b1f0c. ± Comparison against base commit 43dfb61.

♻️ This comment has been updated with latest results.

@fjetter fjetter force-pushed the remove_comm_requ_from_handlers branch from f3d555b to 69cb2e1 Compare February 18, 2022 15:01
@fjetter fjetter requested a review from crusaderky February 18, 2022 15:02
@fjetter fjetter force-pushed the remove_comm_requ_from_handlers branch from 69cb2e1 to 44077d3 Compare February 18, 2022 15:02

def _expects_comm(func):
sig = inspect.signature(func)
return "comm" in sig.parameters or "stream" in sig.parameters
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is stream used? Is it for potential third party scheduler extensions? If so, I think there should be a deprecation cycle.

I could not find any internal use of stream, besides this one which you forgot to remove:
https://github.com/fjetter/distributed/blob/a16c7e713a787b52573c31664db98d2fb275dab7/distributed/publish.py#L50-L52

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we raise a deprecation warning here if we hit stream? I noticed there is another place and didn't clean it up since I wasn't sure about the actual usage.

Note: The change I am proposing here is technically already a breaking change to user plugins. If they are not using any of the two names, their plugin might break.

I'm wondering if this warrants a deprecation cycle as well.

Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the deprecation system you put in place is already plenty

LOG_PDB = dask.config.get("distributed.admin.pdb-on-err")


def _expects_comm(func):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _expects_comm(func):
def _expects_comm(func: Callable) -> bool:

Comment on lines 103 to 118
sig = inspect.signature(func)
if "comm" in sig.parameters or "stream" in sig.parameters:
params = list(sig.parameters)
if len(params) > 1 and params[1] in ["comm", "stream"]:
if "stream" in sig.parameters:
try:
fname = func.__name__
except AttributeError:
# e.g. partials don't have __name__ We don't want to crash
# just because we're trying to raise a warning
fname = "unknown"
warnings.warn(
f"Calling the first arugment of a RPC handler `stream` is deprecated. Defining this argument is optional. Either remove the arugment or rename it to `comm`. Instead got {fname}{sig}",
FutureWarning,
)
return True
Copy link
Collaborator

@crusaderky crusaderky Feb 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sig = inspect.signature(func)
if "comm" in sig.parameters or "stream" in sig.parameters:
params = list(sig.parameters)
if len(params) > 1 and params[1] in ["comm", "stream"]:
if "stream" in sig.parameters:
try:
fname = func.__name__
except AttributeError:
# e.g. partials don't have __name__ We don't want to crash
# just because we're trying to raise a warning
fname = "unknown"
warnings.warn(
f"Calling the first arugment of a RPC handler `stream` is deprecated. Defining this argument is optional. Either remove the arugment or rename it to `comm`. Instead got {fname}{sig}",
FutureWarning,
)
return True
params = list(inspect.signature(func).parameters)
if params[:1] == ["comm"]:
return True
if params[:1] == ["stream"]:
warnings.warn(
f"Calling the first arugment of a RPC handler `stream` is deprecated. Defining this argument is optional. Either remove the argument or rename it to `comm` in {func}",
FutureWarning,
)
return True

Comment on lines 1030 to 1041
def comm_kwarg_only(self, *, comm, other):
...

def comm_not_leading_position(self, *, other, comm):
...

def stream_not_leading_position(self, *, other, stream):
...

stream_not_leading_partialmethod = partialmethod(
stream_not_leading_position, other="foo"
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def comm_kwarg_only(self, *, comm, other):
...
def comm_not_leading_position(self, *, other, comm):
...
def stream_not_leading_position(self, *, other, stream):
...
stream_not_leading_partialmethod = partialmethod(
stream_not_leading_position, other="foo"
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think the "not leading position" thing is important since I don't want to block any names from signatures. If a user wants to define a stream kwarg or a comm kwarg without using our comm mechanism that's fine imo.

I generally dislike this API quite a lot. I don't think we should silently expect the first argument to be positional-only comm.

@fjetter fjetter force-pushed the remove_comm_requ_from_handlers branch 2 times, most recently from e49775e to 537750c Compare February 21, 2022 11:58
@fjetter fjetter force-pushed the remove_comm_requ_from_handlers branch from 537750c to e3b1f0c Compare February 21, 2022 11:58
@fjetter
Copy link
Member Author

fjetter commented Feb 21, 2022

I'm quite open to moving on to a more stable interface overall. I figured this may be a nice transition to something that is cleaner.
For instance, we might want to choose to pass comm as a keyword argument instead of a positional. I believe this would already provide a clearer API. I'm sure there are even better options.

@crusaderky
Copy link
Collaborator

I'm happy to support comm as a keyword argument; handle_comm should be changed accordingly (with added unit tests)

@fjetter
Copy link
Member Author

fjetter commented Feb 22, 2022

I'm happy to support comm as a keyword argument; handle_comm should be changed accordingly (with added unit tests)

We'd still need to go through a deprecation cycle for the stream argument. I think we should do this in another release

@crusaderky crusaderky merged commit 9a266a0 into dask:main Feb 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants