-
-
Notifications
You must be signed in to change notification settings - Fork 899
Closed
Labels
Description
Hello!
In my setup I have
- flask backend
- flask worker via Redis MQ with flask-RQ2
- javascript client
I'm attempting to halt my worker until it receives an acknowledgement from the client. However, for some reason the acknowledgement does not get back to the worker. From logs I can see that client receives the event and sends an ack back and then backend publishes it to redis but worker does not notice it. Eventually the call() in worker times out.
Each job emits events to a client via a room where the client joins.
Any tips? Below you can find sample code and logs.
Backend code
socketio = SocketIO(
app,
cors_allowed_origins=app.config['CORS_ORIGINS'],
message_queue='redis://redis',
logger=app.logger,
engineio_logger=app.logger,
)
@socketio.on('connect')
def on_connect():
job_id = request.args.get('job')
join_room(job_id)
socketio.emit('backend', {'data': f'Connected to room {job_id}'})Worker code
socketio = SocketIO(
message_queue='redis://redis',
logger=logger,
engineio_logger=logger
)
@rq.job
def job(...):
# do stuff
# problem is that this call times out even though client acks it
socketio.server.call('hello', timeout=5, to=job_id)
# do stuff...Client
client.on('hello', (callback: (data: string) => void) => {
callback('ACK');
});Logs
backend_1 | INFO:app:P021wWvox66xcrjYAABq: Sending packet OPEN data {'sid': 'P021wWvox66xcrjYAABq', 'upgrades': [], 'pingTimeout': 20000, 'pingInterval': 25000}
backend_1 | INFO:app:P021wWvox66xcrjYAABq: Received packet MESSAGE data 0
backend_1 | INFO:app:YIcViNvSyeiKZP5wAABr is entering room 119eb333-ecc0-4323-8d4b-2e4ac3e9bb31 [/]
backend_1 | INFO:app:emitting event "backend" to all [/]
backend_1 | INFO:app:pubsub message: emit
backend_1 | INFO:app:P021wWvox66xcrjYAABq: Sending packet MESSAGE data 2["backend",{"data":"Connected to room 119eb333-ecc0-4323-8d4b-2e4ac3e9bb31"}]
backend_1 | INFO:app:P021wWvox66xcrjYAABq: Sending packet MESSAGE data 0{"sid":"YIcViNvSyeiKZP5wAABr"}
worker_1 | 17:50:29 Server initialized for eventlet.
worker_1 | 17:50:30 emitting event "hello" to 119eb333-ecc0-4323-8d4b-2e4ac3e9bb31 [/]
backend_1 | INFO:app:pubsub message: emit
backend_1 | INFO:app:P021wWvox66xcrjYAABq: Sending packet MESSAGE data 21["hello"]
backend_1 | INFO:app:P021wWvox66xcrjYAABq: Received packet MESSAGE data 31["ACK"]
backend_1 | INFO:app:received ack from YIcViNvSyeiKZP5wAABr [/]
backend_1 | INFO:app:pubsub message: callback
worker_1 | 17:50:35 Traceback (most recent call last):
worker_1 | File "/usr/local/lib/python3.9/site-packages/rq/worker.py", line 1008, in perform_job
worker_1 | rv = job.perform()
worker_1 | File "/usr/local/lib/python3.9/site-packages/rq/job.py", line 706, in perform
worker_1 | self._result = self._execute()
worker_1 | File "/usr/local/lib/python3.9/site-packages/rq/job.py", line 729, in _execute
worker_1 | result = self.func(*self.args, **self.kwargs)
worker_1 | File "./worker.py", line 68, in run_card
worker_1 | socketio.server.call('hello', timeout=5, to=job_id)
worker_1 | File "/usr/local/lib/python3.9/site-packages/socketio/server.py", line 384, in call
worker_1 | raise exceptions.TimeoutError()
worker_1 | socketio.exceptions.TimeoutError
worker_1 | Traceback (most recent call last):
worker_1 | File "/usr/local/lib/python3.9/site-packages/rq/worker.py", line 1008, in perform_job
worker_1 | rv = job.perform()
worker_1 | File "/usr/local/lib/python3.9/site-packages/rq/job.py", line 706, in perform
worker_1 | self._result = self._execute()
worker_1 | File "/usr/local/lib/python3.9/site-packages/rq/job.py", line 729, in _execute
worker_1 | result = self.func(*self.args, **self.kwargs)
worker_1 | File "./worker.py", line 68, in run_card
worker_1 | socketio.server.call('hello', timeout=5, to=job_id)
worker_1 | File "/usr/local/lib/python3.9/site-packages/socketio/server.py", line 384, in call
worker_1 | raise exceptions.TimeoutError()
worker_1 | socketio.exceptions.TimeoutError