Implementation of HedgedRequests#19291
Conversation
src/Server/TCPHandler.cpp
Outdated
|
|
||
| void TCPHandler::sendData(const Block & block) | ||
| { | ||
| /// For testing hedged requests |
There was a problem hiding this comment.
Probably we can just run a query like
SELECT sleep(x) and have different x stored in table on different nodes of distributed table? We even can disable replication and insert different rows in replicated table
| while (true) | ||
| { | ||
| { | ||
| AsyncCallbackSetter async_setter(receiver.connection, ReadCallback{receiver, sink}); |
There was a problem hiding this comment.
AsyncCallbackSetter is a lightweight structure, cheap to set per packet.
But do we need to reset it every time? Why not to store it as Routine field...
There was a problem hiding this comment.
I just thought that it's not good to remain this ReadCallback in connection after received packet, because (theoretically) we can use this connection for something else, but maybe there was bad logic. What do you think? Should I move it to Routine field?
There was a problem hiding this comment.
Hm, maybe it is true. In case if we store it in Routine, this Routine can be destroyed later then connection is reused by something else.
There was a problem hiding this comment.
Let's keep it in callback then
|
Everything else LGTM in general. (I did not check tests properly so far). |
| M(Seconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "", 0) \ | ||
| M(Seconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "", 0) \ | ||
| M(Seconds, tcp_keep_alive_timeout, 0, "The time in seconds the connection needs to remain idle before TCP starts sending keepalive probes", 0) \ | ||
| M(Milliseconds, hedged_connection_timeout, DBMS_DEFAULT_HEDGED_CONNECTION_TIMEOUT_MS, "Connection timeout for establishing connection with replica for Hedged requests", 0) \ |
There was a problem hiding this comment.
Should it have _ms suffix like other settings?
| M(Int64, sleep_in_send_tables_status, 0, "Time to sleep in sending tables status response in TCPHandler", 0) \ | ||
| M(Int64, sleep_in_send_data, 0, "Time to sleep in sending data in TCPHandler", 0) \ |
There was a problem hiding this comment.
seconds granularity is too high, how about converting to ms?
|
|
||
| for name in NODES: | ||
| if name != 'node': | ||
| NODES[name] = cluster.add_instance(name, with_zookeeper=True, user_configs=['configs/users1.xml']) |
There was a problem hiding this comment.
Looks like the test does not really requires zookeeper and ReplicatedMergeTree since it uses remote_servers, right?
There was a problem hiding this comment.
Yes, you are right, it would be good to remove zookeeper and ReplicatedMergeTree from this test, I will do it, thanks.
|
@azat sleep_in_send_data/sleep_in_send_tables_status settings are needed only for testing and I had no reason to change them to ms. But if you really need this settings to be in ms for some testing reasons, I will change it. |
@Avogar Indeed, I was going to use them for stress testing distributed queries - #21944, it will be great to if you will convert them to ms |
|
@azat Ok, will convert it. |
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Hedged Requests for remote queries. When setting
use_hedged_requestsenabled (by default), allow to establish many connections with different replicas for query. New connection is enabled in case existent connection(s) with replica(s) were not established withinhedged_connection_timeoutor no data was received withinreceive_data_timeout. Query uses the first connection which send non empty progress packet (or data packet, ifallow_changing_replica_until_first_data_packet); other connections are cancelled. Queries withmax_parallel_replicas > 1are supported.Detailed description / Documentation draft:
Using Hedged Requests allows to reduce tail latency on distributed queries. It presents new timeouts:
hedged_connection_timeout(Milliseconds) - if we can't establish connection with replica after this timeout, we start working with the next replica without cancelling connection to the previous.receive_data_timeout(Seconds) - this timeout is set when the query is sent to the replica, if we don't receive first packet of data and we don't make any progress in query execution after this timeout, we start working with the next replica, without cancelling connection to the previous.Working with multiple replicas is performed by using epoll. Hedged Requests also support parallel distributed query execution (see setting
max_parallel_replicas).Also there is a special setting
allow_changing_replica_until_first_data_packet(0 as default). It it's enabled we can start new connection until receiving first data packet even if we have already made some progress (but progress haven't updated forreceive_data_timeouttimeout), otherwise we disable changing replica after the first time we made progress.This behaviour is worked under special setting
use_hedged_requests(1 as default).