Skip to content

Improve async reading from socket#47229

Merged
Avogar merged 31 commits intoClickHouse:masterfrom
Avogar:non-blocking-connect
Apr 21, 2023
Merged

Improve async reading from socket#47229
Avogar merged 31 commits intoClickHouse:masterfrom
Avogar:non-blocking-connect

Conversation

@Avogar
Copy link
Copy Markdown
Member

@Avogar Avogar commented Mar 3, 2023

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Add async connection to socket and async writing to socket. Make creating connections and sending query/external tables async across shards. Refactor code with fibers. Closes #46931. We will be able to increase connect_timeout_with_failover_ms by default after this PR (#5188)

CC: @KochetovNicolai, @azat

@Avogar Avogar force-pushed the non-blocking-connect branch from 76b795d to 3fae85f Compare March 3, 2023 19:58
@robot-clickhouse-ci-1 robot-clickhouse-ci-1 added the pr-improvement Pull request with some product improvements label Mar 3, 2023
@nickitat nickitat self-assigned this Mar 6, 2023
@Avogar Avogar marked this pull request as ready for review March 9, 2023 12:04
@Avogar Avogar force-pushed the non-blocking-connect branch from f30202c to 9596c21 Compare March 15, 2023 13:01
@Avogar Avogar marked this pull request as draft March 16, 2023 15:38
@Avogar Avogar marked this pull request as ready for review March 27, 2023 11:50

receive_timeout_usec = timeout.totalMicroseconds();
connection_fd_description = fd_description;
epoll.add(connection_fd, events);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is some intersection with the code in ConnectionEstablisher. maybe it could be moved into the base class

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are some intersection, but also lots of differences. I tried to come up with base class, but TBH it looked not better than now and I decided not to do it.

Copy link
Copy Markdown
Member

@azat azat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, plus now the code has less atomics, and some abstractions!

Also maybe it worth to write some test? Introduce another setting like sleep_before_receiving_query_ms (now there is only sleep_after_receiving_query_ms), write huge enough query that is larger the socket buffer and see how long it will take without this patches it should take sleep_before_receiving_query_ms*shards while with only sleep_before_receiving_query_ms

{
checkTimeout(/* blocking= */ true);
to_destroy = std::move(to_destroy).resume();
resumeUnlocked();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this code works with existing fiber, while previously it had been detached first, though it should not be an issue I guess?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok. We call cancelBefore under fiber_lock mutex in AsyncTaskExecutor.

@Avogar
Copy link
Copy Markdown
Member Author

Avogar commented Apr 19, 2023

Also maybe it worth to write some test?

Sure, I will try to do it.

@Avogar
Copy link
Copy Markdown
Member Author

Avogar commented Apr 19, 2023

@nickitat sorry for long response to comments, I was distracted by other tasks. Can you continue reviewing please?

@Avogar
Copy link
Copy Markdown
Member Author

Avogar commented Apr 20, 2023

Some small testing:
First, test that we send query async across shards. Here we can use the fact that we send temporary tables during query sending, so we can create big temporary table and check if we send it async across shards:
Temporary table:

avogar-dev :) create temporary table test (number UInt64, s String)

CREATE TEMPORARY TABLE test
(
    `number` UInt64,
    `s` String
)

Query id: 8edf267a-7897-4a60-89b3-16d47a607dcd

Ok.

0 rows in set. Elapsed: 0.002 sec.

avogar-dev :) insert into test select number, randomString(number % 1000) from numbers(1000000)

INSERT INTO test SELECT
    number,
    randomString(number % 1000)
FROM numbers(1000000)

Query id: 420a0b9e-9c67-45e8-a41d-2dee0f7c1748

Ok.

0 rows in set. Elapsed: 0.550 sec. Processed 1.05 million rows, 8.37 MB (1.90 million rows/s., 15.22 MB/s.)

First, with sleep_after_receiving_query_ms=0:

avogar-dev :) select * from remote('127.0.0.{1,2,3,4,5,6}', 'system.numbers') limit 10 format Null settings prefer_localhost_replica=0, async_query_sending_for_remote=0, max_threads=1

SELECT *
FROM remote('127.0.0.{1,2,3,4,5,6}', 'system.numbers')
LIMIT 10
FORMAT `Null`
SETTINGS prefer_localhost_replica = 0, async_query_sending_for_remote = 0, max_threads = 1

Query id: 021f8fed-14e7-4488-932a-a55aa6db33f0

Ok.

0 rows in set. Elapsed: 7.572 sec.

avogar-dev :) select * from remote('127.0.0.{1,2,3,4,5,6}', 'system.numbers') limit 10 format Null settings prefer_localhost_replica=0, async_query_sending_for_remote=1, max_threads=1

SELECT *
FROM remote('127.0.0.{1,2,3,4,5,6}', 'system.numbers')
LIMIT 10
FORMAT `Null`
SETTINGS prefer_localhost_replica = 0, async_query_sending_for_remote = 1, max_threads = 1

Query id: c1554b49-ef27-452c-baac-ffd212c391e9

Ok.

0 rows in set. Elapsed: 7.193 sec.

Already can see some improvement. Now with sleep_after_receiving_query_ms=5000 (so we will sleep before reading temporary table for 5 sec on each shard):

avogar-dev :) select * from remote('127.0.0.{1,2,3,4,5,6}', 'system.numbers') limit 10 format Null settings prefer_localhost_replica=0, async_query_sending_for_remote=0, max_threads=1

SELECT *
FROM remote('127.0.0.{1,2,3,4,5,6}', 'system.numbers')
LIMIT 10
FORMAT `Null`
SETTINGS prefer_localhost_replica = 0, async_query_sending_for_remote = 0, max_threads = 1

Query id: bd0046ec-35a8-47a2-b9fc-b080f07ddf0e

Ok.

0 rows in set. Elapsed: 37.512 sec.

avogar-dev :) select * from remote('127.0.0.{1,2,3,4,5,6}', 'system.numbers') limit 10 format Null settings prefer_localhost_replica=0, async_query_sending_for_remote=1, max_threads=1

SELECT *
FROM remote('127.0.0.{1,2,3,4,5,6}', 'system.numbers')
LIMIT 10
FORMAT `Null`
SETTINGS prefer_localhost_replica = 0, async_query_sending_for_remote = 1, max_threads = 1

Query id: cc27cab4-cc8c-4745-b125-a5bd00ffbe4c

Ok.

0 rows in set. Elapsed: 11.984 sec.

As we can see, now we defenitey send query/tmp tables async across shards :)

Now, test that we connect to replicas async across replicas and shards:
Before:

avogar-dev :) select * from remote('{129.0.0.1|127.0.0.1,129.0.0.2|127.0.0.2}', 'system.numbers') limit 10 format Null settings prefer_localhost_replica=0,  max_threads=1, hedged_connection_timeout_ms=1000, connect_timeout_with_failover_ms=5000

SELECT *
FROM remote('{129.0.0.1|127.0.0.1,129.0.0.2|127.0.0.2}', 'system.numbers')
LIMIT 10
FORMAT `Null`
SETTINGS prefer_localhost_replica = 0, max_threads = 1, hedged_connection_timeout_ms = 1000, connect_timeout_with_failover_ms = 5000

Query id: 474fd53e-e064-4ef8-80c1-77c8fffb5dd2

Ok.

0 rows in set. Elapsed: 10.380 sec.

10 seconds. We spend 5 seconds trying to connect to replica on each shard and don't change replica during it.

Now:

:) select * from remote('{129.0.0.1|127.0.0.1,129.0.0.2|127.0.0.2}', 'system.numbers') limit 10 format Null settings prefer_localhost_replica=0, async_query_sending_for_remote=0, max_threads=1, hedged_connection_timeout_ms=1000, connect_timeout_with_failover_ms=5000

SELECT *
FROM remote('{129.0.0.1|127.0.0.1,129.0.0.2|127.0.0.2}', 'system.numbers')
LIMIT 10
FORMAT `Null`
SETTINGS prefer_localhost_replica = 0, async_query_sending_for_remote = 0, max_threads = 1, hedged_connection_timeout_ms = 1000, connect_timeout_with_failover_ms = 5000

Query id: de3ab98c-ee08-4288-9b86-cfce613fbcac

Ok.

0 rows in set. Elapsed: 2.035 sec.

avogar-dev :) select * from remote('{129.0.0.1|127.0.0.1,129.0.0.2|127.0.0.2}', 'system.numbers') limit 10 format Null settings prefer_localhost_replica=0, async_query_sending_for_remote=1, max_threads=1, hedged_connection_timeout_ms=1000, connect_timeout_with_failover_ms=5000

SELECT *
FROM remote('{129.0.0.1|127.0.0.1,129.0.0.2|127.0.0.2}', 'system.numbers')
LIMIT 10
FORMAT `Null`
SETTINGS prefer_localhost_replica = 0, async_query_sending_for_remote = 1, max_threads = 1, hedged_connection_timeout_ms = 1000, connect_timeout_with_failover_ms = 5000

Query id: 9c6c99ae-310b-459c-afff-3c359227331b

Ok.

0 rows in set. Elapsed: 1.145 sec.

With async_query_sending_for_remote=0 we have 2 seconds. We don't do it async across shards, but we do it async across replicas (according to hedged_connection_timeout_ms = 1000). With async_query_sending_for_remote=1 we have 1 sec, we connect async across shards and replicas.

I will add some integration tests, but they won't rely on query execution time (it can lead to flakiness), I will use some profile events instead

Copy link
Copy Markdown
Member

@nickitat nickitat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-improvement Pull request with some product improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Hedged requests still don't allow to quickly select a good replica in presence of timeouts on others.

4 participants