Skip to content

Worker not receiving an acknowledgement from client via redis although it's received by backend and published to redis #1533

@antoinert

Description

@antoinert

Hello!

In my setup I have

  • flask backend
  • flask worker via Redis MQ with flask-RQ2
  • javascript client

I'm attempting to halt my worker until it receives an acknowledgement from the client. However, for some reason the acknowledgement does not get back to the worker. From logs I can see that client receives the event and sends an ack back and then backend publishes it to redis but worker does not notice it. Eventually the call() in worker times out.

Each job emits events to a client via a room where the client joins.

Any tips? Below you can find sample code and logs.

Backend code

socketio = SocketIO(
    app,
    cors_allowed_origins=app.config['CORS_ORIGINS'],
    message_queue='redis://redis',
    logger=app.logger,
    engineio_logger=app.logger,
)

@socketio.on('connect')
def on_connect():
    job_id = request.args.get('job')
    join_room(job_id)
    socketio.emit('backend', {'data': f'Connected to room {job_id}'})

Worker code

socketio = SocketIO(
    message_queue='redis://redis',
    logger=logger,
    engineio_logger=logger
)

@rq.job
def job(...):
    # do stuff

    # problem is that this call times out even though client acks it
    socketio.server.call('hello', timeout=5, to=job_id)

    # do stuff...

Client

client.on('hello', (callback: (data: string) => void) => {
    callback('ACK');
});

Logs

backend_1  | INFO:app:P021wWvox66xcrjYAABq: Sending packet OPEN data {'sid': 'P021wWvox66xcrjYAABq', 'upgrades': [], 'pingTimeout': 20000, 'pingInterval': 25000}
backend_1  | INFO:app:P021wWvox66xcrjYAABq: Received packet MESSAGE data 0
backend_1  | INFO:app:YIcViNvSyeiKZP5wAABr is entering room 119eb333-ecc0-4323-8d4b-2e4ac3e9bb31 [/]
backend_1  | INFO:app:emitting event "backend" to all [/]
backend_1  | INFO:app:pubsub message: emit
backend_1  | INFO:app:P021wWvox66xcrjYAABq: Sending packet MESSAGE data 2["backend",{"data":"Connected to room 119eb333-ecc0-4323-8d4b-2e4ac3e9bb31"}]
backend_1  | INFO:app:P021wWvox66xcrjYAABq: Sending packet MESSAGE data 0{"sid":"YIcViNvSyeiKZP5wAABr"}
worker_1  | 17:50:29 Server initialized for eventlet.
worker_1  | 17:50:30 emitting event "hello" to 119eb333-ecc0-4323-8d4b-2e4ac3e9bb31 [/]
backend_1  | INFO:app:pubsub message: emit
backend_1  | INFO:app:P021wWvox66xcrjYAABq: Sending packet MESSAGE data 21["hello"]
backend_1  | INFO:app:P021wWvox66xcrjYAABq: Received packet MESSAGE data 31["ACK"]
backend_1  | INFO:app:received ack from YIcViNvSyeiKZP5wAABr [/]
backend_1  | INFO:app:pubsub message: callback
worker_1  | 17:50:35 Traceback (most recent call last):
worker_1  |   File "/usr/local/lib/python3.9/site-packages/rq/worker.py", line 1008, in perform_job
worker_1  |     rv = job.perform()
worker_1  |   File "/usr/local/lib/python3.9/site-packages/rq/job.py", line 706, in perform
worker_1  |     self._result = self._execute()
worker_1  |   File "/usr/local/lib/python3.9/site-packages/rq/job.py", line 729, in _execute
worker_1  |     result = self.func(*self.args, **self.kwargs)
worker_1  |   File "./worker.py", line 68, in run_card
worker_1  |     socketio.server.call('hello', timeout=5, to=job_id)
worker_1  |   File "/usr/local/lib/python3.9/site-packages/socketio/server.py", line 384, in call
worker_1  |     raise exceptions.TimeoutError()
worker_1  | socketio.exceptions.TimeoutError
worker_1  | Traceback (most recent call last):
worker_1  |   File "/usr/local/lib/python3.9/site-packages/rq/worker.py", line 1008, in perform_job
worker_1  |     rv = job.perform()
worker_1  |   File "/usr/local/lib/python3.9/site-packages/rq/job.py", line 706, in perform
worker_1  |     self._result = self._execute()
worker_1  |   File "/usr/local/lib/python3.9/site-packages/rq/job.py", line 729, in _execute
worker_1  |     result = self.func(*self.args, **self.kwargs)
worker_1  |   File "./worker.py", line 68, in run_card
worker_1  |     socketio.server.call('hello', timeout=5, to=job_id)
worker_1  |   File "/usr/local/lib/python3.9/site-packages/socketio/server.py", line 384, in call
worker_1  |     raise exceptions.TimeoutError()
worker_1  | socketio.exceptions.TimeoutError

Metadata

Metadata

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions