Skip to content

Conversation

@bubbleboy14
Copy link
Collaborator

@bubbleboy14 bubbleboy14 commented Jul 11, 2022

run_forever() has a new kwarg, "reconnect".

If "reconnect" is True (which is the default) and the connection ends (or can't be established), it waits for 5 seconds and tries again.

This has been tested with run_forever() and run_forever(dispatcher=rel), and seems to work pretty great.

LMK what you think!

NB: I think I merged in upstream weirdly (maybe forgot to pull first), so there are lots of commits listed going back 7 months. However, the only actual changes (with one small exception - see diff) are in run_forever().

bubbleboy14 and others added 15 commits December 7, 2021 15:44
…d except TimeoutError - raise WebSocketTimeoutException
…ent compatibility): WrappedDispatcher (for use with generic event dispatchers such as pyevent and rel); create_dispatcher() accepts dispatcher kwarg (default None), and if it is specified, returns a WrappedDispatcher; use create_dispatcher() (passing specified dispatcher if any) every time (regardless of dispatcher specification)
@maksimu
Copy link

maksimu commented Aug 12, 2022

Is there an ETA on when this feature will be release?

@engn33r
Copy link
Collaborator

engn33r commented Aug 13, 2022

@maksimu I'm away from my computer right now, but later this month I plan to catch up on everything

@engn33r
Copy link
Collaborator

engn33r commented Aug 24, 2022

Great upgrade, thanks @bubbleboy14! My comments:

  • Nice idea to use inheritance for the Dispatcher classes to reduce redundancy
  • I like how the logging message mentions how many frames are on the stack, which gives an idea of how long the disconnection has lasted
  • Good idea replacing the teardown try/except with functions to improve function call control

I found no issues when testing these changes

  1. on an encrypted wss endpoint on the web where I stopped/restarted my network connection several times
  2. on a local unencrypted ws server where I stopped/restarted the server process several times

To err on the cautious side, I will make a new 1.4.0 release for this in case the upgrades have any unexpected side effects that I didn't catch.

For future reference, the CI linting error exists in the master branch and was not introduced by this PR.

@engn33r engn33r merged commit 43eb344 into websocket-client:master Aug 24, 2022
@NadavK
Copy link

NadavK commented Aug 25, 2022

Firstly, thanks for this important upgrade!
Please note that for me, when the server-side raises an exception, the client terminates.

For instance, changing the echo-server.py app to raise an exception, e.g. await websocket.send(XXX) expectedly fails with name 'XXX' is not defined
But interestingly, also the client fails:

Running forever...
--- request header ---
GET / HTTP/1.1
Upgrade: websocket
Host: localhost:8765
Origin: http://localhost:8765
Sec-WebSocket-Key: 7auS5TiZnKAdDwJTNP883Q==
Sec-WebSocket-Version: 13
Connection: Upgrade


-----------------------
--- response header ---
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: CxFIBMn0pIottRDSWg+4kkuBk1g=
Date: Thu, 25 Aug 2022 14:27:55 GMT
Server: Python/3.10 websockets/10.3
-----------------------
websocket connected
++Sent raw: b'\x81\x87ft\xcbI\x0b\x11\xb8:\x07\x13\xae'
++Sent decoded: fin=1 opcode=1 data=b'message'
++Rcv raw: b'\x88\x02\x03\xf3'
++Rcv decoded: fin=1 opcode=8 data=b'\x03\xf3'
++Sent raw: b'\x88\x82 \xcbP;##'
++Sent decoded: fin=1 opcode=8 data=b'\x03\xe8'
### closed ###: 1011
Thread terminating...

here is my client code:

def on_message(self, message):
    print(message)

def on_error(self, error):
    print("### error ###:", error)

def on_close(self, close_status_code, close_msg):
    print("### closed ###:", close_status_code, close_msg)

if __name__ == '__main__':
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(
        'ws://localhost:8765/',
        on_message=on_message,
        on_error=on_error,
        on_close=on_close)

    scheduler = BackgroundScheduler(daemon=False)
    scheduler.add_job(ws.send, 'interval', seconds=2, args=['message'])
    scheduler.start()

    print("Running forever...")
    ws.run_forever()
    print("Thread terminating...")

@bubbleboy14
Copy link
Collaborator Author

Hello @NadavK. Hm, you're talking about this one, right:

https://github.com/websocket-client/websocket-client/blob/master/websocket/tests/echo-server.py

On your client, are you using the BackgroundScheduler in PyBackground (https://pypi.org/project/PyBackground/) or apscheduler (https://apscheduler.readthedocs.io/en/stable/userguide.html) or some other project? Could you please show us your full client code, or at least the import lines?

Thanks!

@engn33r
Copy link
Collaborator

engn33r commented Aug 26, 2022

I can recreate what @NadavK shared. My opinion is that this is expected behavior and I updated the README to clarify this detail. I will share my explanation and then the code I used to recreate.

The "client fails" action happens when the on_close callback in the client code is entered. This means the connection has been closed by the server and the client received close code 1011, which is for STATUS_UNEXPECTED_CONDITION. This happened because the server outputs the error NameError: name 'XXX' is not defined, and because the server is in an unexpected condition, it closes the connection. So if the server closes the connection, the client cannot automatically reconnect because the connection is closed, so should the client create a new connection to the server? If the server closes the connection, run_forever does not automatically attempt to create a new connection to the server by default, but adding the line ws.run_forever(dispatcher=rel) into the on_close callback will make the server reconnect when the connection is closed. In this specific example, it will cause an infinite loop, which is not ideal. I like the current way websocket-client handles this because a server that closes the connection and sends a close status code normally has a reason for doing so, meaning custom logic should be added in on_close depending on how the server normally responds and what the client wants to do. If the client or server drops offline without closing the connection, run_forever should attempt to reconnect because the connection is still open because no close status code was sent or received.

Server code (lightly modified from websocket/tests/echo-server.py in this repo)

#!/usr/bin/env python

# From https://github.com/aaugustin/websockets/blob/main/example/echo.py

import asyncio
import websockets
import os

LOCAL_WS_SERVER_PORT = os.environ.get('LOCAL_WS_SERVER_PORT', '8765')


async def echo(websocket, path):
    async for message in websocket:
        await websocket.send(XXX)


async def main():
    async with websockets.serve(echo, "localhost", LOCAL_WS_SERVER_PORT):
        await asyncio.Future()  # run forever

asyncio.run(main())

Client code (lightly modified from Long-Lived Connection code)

import websocket
import _thread
import time
import rel

def on_message(ws, message):
    print(message)

def on_error(ws, error):
    print(error)

def on_close(ws, close_status_code, close_msg):
    print("### closed ###")
    print("close_status_code: " + str(close_status_code))
    print("close_msg: " + close_msg)

def on_open(ws):
    print("Opened connection")
    ws.send("hello")

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("ws://127.0.0.1:8765",
                              on_open=on_open,
                              on_message=on_message,
                              on_error=on_error,
                              on_close=on_close)

    ws.run_forever(dispatcher=rel)  # Set dispatcher to automatic reconnection
    rel.signal(2, rel.abort)  # Keyboard Interrupt
    rel.dispatch()

@mkapuza
Copy link

mkapuza commented Aug 26, 2022

@engn33r I agree with you logic here and believe this is the right decision.

Do you have a suggestion for an exponential reconnection that doesn't result in a potential stack overflow like calling ws.run_forever(dispatcher=rel) in the on_close method?

@brunchboy
Copy link

brunchboy commented Aug 26, 2022

In my case, I control the server and the client, and my absolute requirement is that no matter what, the client should retry its connection if it goes away for any reason. Because of that, I decided to put that logic outside of websocket-client, in the launchd configuration which runs it, to restart it whenever the process exits. The discussion above reinforces my plan to stick with the older version of websocket-client, and leave the reconnection outside of it.

Actually, I can probably continue to update versions of websocket-client, and simply continue to avoid trying to use a dispatcher. The project I’m using it with is a nice controller for my automated blinds, https://github.com/brunchboy/shade

@bubbleboy14
Copy link
Collaborator Author

bubbleboy14 commented Aug 26, 2022

Hello all! @engn33r , @brunchboy , @mkapuza , check out this pull request:

#854

I adjusted the synchronous reconnect logic to avoid growing the stack. Thoughts?

BTW here's the super basic client I was using to test this:


import rel, websocket

websocket._logging.enableTrace(True, level="INFO")
ws = websocket.WebSocketApp("ws://localhost:8888")
if input("use rel? [N/y]: ") == "y":
    ws.run_forever(dispatcher=rel)
    rel.dispatch()
else:
    ws.run_forever()

@aantn
Copy link

aantn commented Sep 2, 2022

Hi, we're seeing strange behaviour since updating clients to 1.4.0.

On the websocket server (which uses a different package) the number of open websocket clients seems to infinitely increase. That could be due to a server bug, but I'm wondering if the change in the client behaviour could be playing a role in triggering it?

@bubbleboy14
Copy link
Collaborator Author

Hey @aantn, #854 should fix this. Could you please test and let us know?

@engn33r, maybe we should change the "reconnect" kwarg default to 0 to preserve the previous behavior for existing integrations - what do you think?

@milosivanovic
Copy link

There seems to be an issue with rel - it's extremely inefficient. Instead of blocking and waiting for data, it's polling every 0.3ms:

[root@machine ~]# strace -p 12345
strace: Process 12345 attached
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)
poll([{fd=22, events=POLLIN}], 1, 0)    = 0 (Timeout)
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=300000}, NULL) = 0 (Timeout)

You can check this by running strace -p <pid_of_process> against the PID of the Python process where the websocket client is implemented with the rel dispatcher.

Is there an alternative to rel? I don't think we should be suggesting this solution based on the inefficient busy-waiting it's doing.

@bubbleboy14
Copy link
Collaborator Author

@milosivanovic the main alternative is to run without an async dispatcher, which is the default. If you want an alternative async dispatcher, pyevent has the same API, but I'm not sure if it runs on Python 3.

Bear in mind that the sole purpose of an asynchronous dispatcher is to run multiple WebSocketApps in the same process. If you don't have this requirement, the default (synchronous) dispatcher is all you'll ever need.

@milosivanovic
Copy link

@bubbleboy14 I see. In the README for this project, it suggests that if we want automatic reconnection, we should use the rel dispatcher. Did I misunderstand this? Are you suggesting that the client will still try to automatically reconnect without any loss in functionality if we omit rel? If so, would it be helpful if I opened an issue to have that text clarified?

@bubbleboy14
Copy link
Collaborator Author

bubbleboy14 commented Jan 6, 2023

@milosivanovic for that, use the "reconnect" kwarg, eg:

ws.run_forever(reconnect=5)

In the above example, 5 is the reconnect interval, so if the connection drops, it will be re-established 5 seconds later.

And yes, that's a good idea.

@pouya817
Copy link

I can recreate what @NadavK shared. My opinion is that this is expected behavior and I updated the README to clarify this detail. I will share my explanation and then the code I used to recreate.

The "client fails" action happens when the on_close callback in the client code is entered. This means the connection has been closed by the server and the client received close code 1011, which is for STATUS_UNEXPECTED_CONDITION. This happened because the server outputs the error NameError: name 'XXX' is not defined, and because the server is in an unexpected condition, it closes the connection. So if the server closes the connection, the client cannot automatically reconnect because the connection is closed, so should the client create a new connection to the server? If the server closes the connection, run_forever does not automatically attempt to create a new connection to the server by default, but adding the line ws.run_forever(dispatcher=rel) into the on_close callback will make the server reconnect when the connection is closed. In this specific example, it will cause an infinite loop, which is not ideal. I like the current way websocket-client handles this because a server that closes the connection and sends a close status code normally has a reason for doing so, meaning custom logic should be added in on_close depending on how the server normally responds and what the client wants to do. If the client or server drops offline without closing the connection, run_forever should attempt to reconnect because the connection is still open because no close status code was sent or received.

Server code (lightly modified from websocket/tests/echo-server.py in this repo)

#!/usr/bin/env python

# From https://github.com/aaugustin/websockets/blob/main/example/echo.py

import asyncio
import websockets
import os

LOCAL_WS_SERVER_PORT = os.environ.get('LOCAL_WS_SERVER_PORT', '8765')


async def echo(websocket, path):
    async for message in websocket:
        await websocket.send(XXX)


async def main():
    async with websockets.serve(echo, "localhost", LOCAL_WS_SERVER_PORT):
        await asyncio.Future()  # run forever

asyncio.run(main())

Client code (lightly modified from Long-Lived Connection code)

import websocket
import _thread
import time
import rel

def on_message(ws, message):
    print(message)

def on_error(ws, error):
    print(error)

def on_close(ws, close_status_code, close_msg):
    print("### closed ###")
    print("close_status_code: " + str(close_status_code))
    print("close_msg: " + close_msg)

def on_open(ws):
    print("Opened connection")
    ws.send("hello")

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("ws://127.0.0.1:8765",
                              on_open=on_open,
                              on_message=on_message,
                              on_error=on_error,
                              on_close=on_close)

    ws.run_forever(dispatcher=rel)  # Set dispatcher to automatic reconnection
    rel.signal(2, rel.abort)  # Keyboard Interrupt
    rel.dispatch()

@pouya817
Copy link

Hello dear,
now I have an issue with this part ['rel'].
my problem is my code connects to a socket successfully but my problem showed the during connection.
error code 1001 CloudFlare WebSocket proxy restarting,
I should use a thread in my code and the 'rel' does not support multithread activity and raises an error.
what can I do to solve this problem ( I mean disconnected from server unwanted )

` def start(self):
self.ws = websocket.WebSocketApp(self.subscribe_url,
on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
self.ws.on_open = self.on_open
self.websocket_thread = threading.Thread(target=self.ws.run_forever, kwargs={'reconnect': True})
self.websocket_thread.start()

def stop(self):
    if self.ws:
        self.ws.close()
    if self.websocket_thread:
        self.websocket_thread.join()
    logger.info({'msg': f' count of datahub thread: {threading.active_count()}'})

def on_close(self, ws, close_status_code, close_msg):
    sub_str = json.dumps(self.channel)
    logger.info({'msg': f'Unsubscribed {sub_str}',
                 'close_status_code': close_status_code, 'close_msg': close_msg})

def on_message(self, ws, message):
    compressed_data = gzip.GzipFile(fileobj=io.BytesIO(message), mode='rb')
    decompressed_data = compressed_data.read()
    utf8_data = decompressed_data.decode('utf-8')
    if utf8_data == 'Ping':
        ws.send('Pong')
    elif json.loads(utf8_data).get('code') is not None:
        print(json.loads(utf8_data))
        self.callback(msg=json.loads(utf8_data))
def on_open(self, ws):
    sub_str = json.dumps(self.channel)
    ws.send(sub_str)
    msg = f'Start subscribing to {sub_str}'
    logger.info({'msg': msg})

def on_error(self, ws, error):
    sub_str = json.dumps(self.channel)
    msg = f'Websocket error: {error}, {sub_str}'
    logger.error({'msg': msg})`

@bubbleboy14
Copy link
Collaborator Author

Regarding the threading/rel conflict, @pouya817, what's the error?

The reconnect issue is with your run_forever() kwargs. In particular, you currently have True for reconnect, but that should be an integer - try 1.

If you still have problems, try out this branch I've been working on:

https://github.com/websocket-client/websocket-client/tree/asyncpong

It should work with rel 0.4.9.11, but feel free to try the current working branch:

https://github.com/bubbleboy14/registeredeventlistener/tree/bwtweax

Hope it helps, LMK how it goes!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants