Skip to content

Add listener task to cancel streaming response in case of an early disconnect#839

Merged
lovelydinosaur merged 3 commits intoKludex:masterfrom
Kamforka:fix-streaming-response-keeps-streaming-after-client-disconnected
Feb 26, 2020
Merged

Add listener task to cancel streaming response in case of an early disconnect#839
lovelydinosaur merged 3 commits intoKludex:masterfrom
Kamforka:fix-streaming-response-keeps-streaming-after-client-disconnected

Conversation

@Kamforka
Copy link
Copy Markdown
Contributor

Related to #20

This one is to fix an ongoing stream to an already disconnected client.

All the existing tests are passed, however I'd like to add one to cover a scenario when a "http.disconnect" message is sent via the receive channel, @tomchristie maybe you could suggest some sort of setup for such a test?

@lovelydinosaur
Copy link
Copy Markdown

Okay, this is looking decent yup!

I happened to inroduce a run_until_first_complete since I was looking at a similar pattern for working with websockets.

I reckon we ought to use that here, and have .listen_for_disconnect and .stream_response_body methods. That'd cover us in any wierd broken cases where disconnect is never sent for some reason.

@Kamforka
Copy link
Copy Markdown
Contributor Author

@tomchristie Sure, I'll adjust the code this evening.

Just to be clear, you meant something like this:

await run_until_first_complete(self.stream_response(send), self.listen_for_disconnect(receive))

Right?

@lovelydinosaur
Copy link
Copy Markdown

@Kamforka Yup, I'll be along these lines...

await run_until_first_complete(
    (self.stream_response, {"send": send}),
    (self.listen_for_disconnect, {"receive": receive})
)

@Kamforka
Copy link
Copy Markdown
Contributor Author

Kamforka commented Feb 24, 2020

@tomchristie added the changes.

However I have one observation. Is there any particular reason for the signature of run_until_first_complete?

async def run_until_first_complete(*args: typing.Tuple[typing.Callable, dict]) -> None

I think the following signature would be more intuitive:

async def run_until_first_complete(tasks: typing.Sequence[typing.Awaitable]) -> None:

Note: typing.Awaitable might not be the best type here but you get my point.

So in our particular case one could call it like:

await run_until_first_complete(
    [self.stream_response(send), self.listen_for_disconnect(receive)]
)

As far as I can tell the functionality is the same, however we get less boilerplate with better typing and auto-completion experience. What do you think?

Comment thread starlette/responses.py Outdated
@@ -1,3 +1,4 @@
import asyncio
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we can drop this now, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah

@lovelydinosaur
Copy link
Copy Markdown

Good suggestion yup, although I’m also keeping half an eye here on trio compatibility, which it wouldn’t play so well with.

@Kamforka
Copy link
Copy Markdown
Contributor Author

What do you mean by trio compatibility?

@lovelydinosaur
Copy link
Copy Markdown

What do you mean by trio compatibility?

We might want a future version of Starlette to support being run on Trio https://trio.readthedocs.io/en/stable/ as an alternative to Asyncio. The API that Trio uses for starting tasks doesn't allow passing instantiated coroutines (for some good reasons) so keeping this constraint in our own codebase would make it easier for us support Trio at some point.

@lovelydinosaur
Copy link
Copy Markdown

And, great stuff, thank you!

@Kamforka
Copy link
Copy Markdown
Contributor Author

What do you mean by trio compatibility?

We might want a future version of Starlette to support being run on Trio https://trio.readthedocs.io/en/stable/ as an alternative to Asyncio. The API that Trio uses for starting tasks doesn't allow passing instantiated coroutines (for some good reasons) so keeping this constraint in our own codebase would make it easier for us support Trio at some point.

Got it, thanks for the clarification!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants