fix sink block due to response queue is full#1109
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 67640cd084
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| context); | ||
|
|
||
| producer->start(/*need_poll=*/context->isQueryFromMaterializedView()); | ||
| producer->start(/*need_poll=*/true); |
There was a problem hiding this comment.
Keep polling conditional until sink teardown is safe
Starting producer polling unconditionally here enables asynchronous delivery callbacks for non-MV writes as well, but KafkaSink still passes this as message opaque and can exit onFinish() with undelivered messages when rd_kafka_flush(..., 15000) times out (src/Storages/ExternalStream/Kafka/KafkaSink.cpp:444-452). In that failure path, callbacks may still arrive while the sink is being torn down, and because member destruction happens before producer is released (src/Storages/ExternalStream/Kafka/KafkaSink.h:78-117), onMessageDelivery can run on partially destroyed state, leading to crash/UAF under broker outage or slow-ack conditions.
Useful? React with 👍 / 👎.
Please write user-readable short description of the changes:
Kafka inserting message may block if response queue is full and will not be consumed.
Always start background thread to poll for Producer no matter query is from mv or not.