Skip to content

Commit 985048c

Browse files
author
Alexey Katsman
authored
Merge pull request #63206 from alexkats/drop-connections
Reject queries when the server is overloaded
2 parents 5a030f8 + 476c374 commit 985048c

File tree

21 files changed

+194
-77
lines changed

21 files changed

+194
-77
lines changed

src/Common/AsynchronousMetrics.cpp

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <Common/formatReadable.h>
1212
#include <Common/logger_useful.h>
1313
#include <Common/setThreadName.h>
14+
#include <Core/ServerSettings.h>
1415

1516
#include <boost/locale/date_time_facet.hpp>
1617

@@ -23,14 +24,13 @@
2324
#endif
2425

2526

26-
namespace ProfileEvents
27+
namespace DB
2728
{
28-
extern const Event OSCPUWaitMicroseconds;
29-
extern const Event OSCPUVirtualTimeMicroseconds;
30-
}
3129

32-
namespace DB
30+
namespace ServerSetting
3331
{
32+
extern const ServerSettingsUInt64 os_cpu_busy_time_threshold;
33+
}
3434

3535
namespace ErrorCodes
3636
{
@@ -1826,7 +1826,7 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
18261826
}
18271827
}
18281828

1829-
new_values["OSCPUOverload"] = { getCPUOverloadMetric(), "Relative CPU deficit, calculated as: how many threads are waiting for CPU relative to the number of threads, using CPU. If it is greater than zero, the server would benefit from more CPU. If it is significantly greater than zero, the server could become unresponsive. The metric is accumulated between the updates of asynchronous metrics." };
1829+
new_values["OSCPUOverload"] = { ProfileEvents::global_counters.getCPUOverload(context->getServerSettings()[ServerSetting::os_cpu_busy_time_threshold], /*reset*/ true), "Relative CPU deficit, calculated as: how many threads are waiting for CPU relative to the number of threads, using CPU. If it is greater than zero, the server would benefit from more CPU. If it is significantly greater than zero, the server could become unresponsive. The metric is accumulated between the updates of asynchronous metrics." };
18301830

18311831
/// Add more metrics as you wish.
18321832

@@ -1849,22 +1849,4 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
18491849
}
18501850
}
18511851

1852-
double AsynchronousMetrics::getCPUOverloadMetric()
1853-
{
1854-
Int64 curr_cpu_wait_microseconds = ProfileEvents::global_counters[ProfileEvents::OSCPUWaitMicroseconds];
1855-
Int64 curr_cpu_virtual_time_microseconds = ProfileEvents::global_counters[ProfileEvents::OSCPUVirtualTimeMicroseconds];
1856-
1857-
Int64 os_cpu_wait_microseconds = curr_cpu_wait_microseconds - prev_cpu_wait_microseconds;
1858-
Int64 os_cpu_virtual_time_microseconds = curr_cpu_virtual_time_microseconds - prev_cpu_virtual_time_microseconds;
1859-
1860-
prev_cpu_wait_microseconds = curr_cpu_wait_microseconds;
1861-
prev_cpu_virtual_time_microseconds = curr_cpu_virtual_time_microseconds;
1862-
1863-
/// If we used less than one CPU core, we cannot detect overload.
1864-
if (os_cpu_virtual_time_microseconds < 1'000'000 || os_cpu_wait_microseconds <= 0)
1865-
return 0;
1866-
1867-
return static_cast<double>(os_cpu_wait_microseconds) / os_cpu_virtual_time_microseconds;
1868-
}
1869-
18701852
}

src/Common/AsynchronousMetrics.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,6 @@ class AsynchronousMetrics
124124
[[maybe_unused]] const bool update_rss;
125125
ContextPtr context;
126126

127-
128-
Int64 prev_cpu_wait_microseconds = 0;
129-
Int64 prev_cpu_virtual_time_microseconds = 0;
130-
131-
double getCPUOverloadMetric();
132-
133127
#if defined(OS_LINUX)
134128
std::optional<ReadBufferFromFilePRead> meminfo TSA_GUARDED_BY(data_mutex);
135129
std::optional<ReadBufferFromFilePRead> loadavg TSA_GUARDED_BY(data_mutex);

src/Common/ErrorCodes.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,7 @@
624624
M(742, DELTA_KERNEL_ERROR) \
625625
M(743, ICEBERG_SPECIFICATION_VIOLATION) \
626626
M(744, SESSION_ID_EMPTY) \
627+
M(745, SERVER_OVERLOADED) \
627628
\
628629
M(900, DISTRIBUTED_CACHE_ERROR) \
629630
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

src/Common/ProfileEvents.cpp

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <Common/ProfileEvents.h>
22
#include <Common/CurrentThread.h>
33
#include <Common/TraceSender.h>
4+
#include <Interpreters/Context.h>
45

56

67
// clang-format off
@@ -11,7 +12,7 @@
1112
M(Query, "Number of queries to be interpreted and potentially executed. Does not include queries that failed to parse or were rejected due to AST size limits, quota limits or limits on the number of simultaneously running queries. May include internal queries initiated by ClickHouse itself. Does not count subqueries.", ValueType::Number) \
1213
M(SelectQuery, "Same as Query, but only for SELECT queries.", ValueType::Number) \
1314
M(InsertQuery, "Same as Query, but only for INSERT queries.", ValueType::Number) \
14-
M(InitialQuery, "Same as Query, but only counts initial queries (see is_initial_query).", ValueType::Number)\
15+
M(InitialQuery, "Same as Query, but only counts initial queries (see is_initial_query).", ValueType::Number) \
1516
M(QueriesWithSubqueries, "Count queries with all subqueries", ValueType::Number) \
1617
M(SelectQueriesWithSubqueries, "Count SELECT queries with all subqueries", ValueType::Number) \
1718
M(InsertQueriesWithSubqueries, "Count INSERT queries with all subqueries", ValueType::Number) \
@@ -712,7 +713,7 @@ The server successfully detected this situation and will download merged part fr
712713
M(KeeperLatency, "Keeper latency", ValueType::Milliseconds) \
713714
M(KeeperTotalElapsedMicroseconds, "Keeper total latency for a single request", ValueType::Microseconds) \
714715
M(KeeperProcessElapsedMicroseconds, "Keeper commit latency for a single request", ValueType::Microseconds) \
715-
M(KeeperPreprocessElapsedMicroseconds, "Keeper preprocessing latency for a single reuquest", ValueType::Microseconds)\
716+
M(KeeperPreprocessElapsedMicroseconds, "Keeper preprocessing latency for a single reuquest", ValueType::Microseconds) \
716717
M(KeeperStorageLockWaitMicroseconds, "Time spent waiting for acquiring Keeper storage lock", ValueType::Microseconds) \
717718
M(KeeperCommitWaitElapsedMicroseconds, "Time spent waiting for certain log to be committed", ValueType::Microseconds) \
718719
M(KeeperBatchMaxCount, "Number of times the size of batch was limited by the amount", ValueType::Number) \
@@ -743,29 +744,29 @@ The server successfully detected this situation and will download merged part fr
743744
M(S3QueueSetFileProcessingMicroseconds, "Time spent to set file as processing", ValueType::Microseconds) \
744745
M(S3QueueSetFileProcessedMicroseconds, "Time spent to set file as processed", ValueType::Microseconds) \
745746
M(S3QueueSetFileFailedMicroseconds, "Time spent to set file as failed", ValueType::Microseconds) \
746-
M(ObjectStorageQueueFailedFiles, "Number of files which failed to be processed", ValueType::Number)\
747-
M(ObjectStorageQueueProcessedFiles, "Number of files which were processed", ValueType::Number)\
747+
M(ObjectStorageQueueFailedFiles, "Number of files which failed to be processed", ValueType::Number) \
748+
M(ObjectStorageQueueProcessedFiles, "Number of files which were processed", ValueType::Number) \
748749
M(ObjectStorageQueueCleanupMaxSetSizeOrTTLMicroseconds, "Time spent to set file as failed", ValueType::Microseconds) \
749750
M(ObjectStorageQueuePullMicroseconds, "Time spent to read file data", ValueType::Microseconds) \
750751
M(ObjectStorageQueueLockLocalFileStatusesMicroseconds, "Time spent to lock local file statuses", ValueType::Microseconds) \
751-
M(ObjectStorageQueueFailedToBatchSetProcessing, "Number of times batched set processing request failed", ValueType::Number)\
752-
M(ObjectStorageQueueTrySetProcessingRequests, "The number of times we tried to make set processing request", ValueType::Number)\
753-
M(ObjectStorageQueueTrySetProcessingSucceeded, "The number of times we successfully set file as processing", ValueType::Number)\
754-
M(ObjectStorageQueueTrySetProcessingFailed, "The number of times we unsuccessfully set file as processing", ValueType::Number)\
755-
M(ObjectStorageQueueListedFiles, "Number of listed files in StorageS3(Azure)Queue", ValueType::Number)\
756-
M(ObjectStorageQueueFilteredFiles, "Number of filtered files in StorageS3(Azure)Queue", ValueType::Number)\
757-
M(ObjectStorageQueueReadFiles, "Number of read files (not equal to the number of actually inserted files)", ValueType::Number)\
758-
M(ObjectStorageQueueReadRows, "Number of read rows (not equal to the number of actually inserted rows)", ValueType::Number)\
759-
M(ObjectStorageQueueReadBytes, "Number of read bytes (not equal to the number of actually inserted bytes)", ValueType::Number)\
760-
M(ObjectStorageQueueExceptionsDuringRead, "Number of exceptions during read in S3(Azure)Queue", ValueType::Number)\
761-
M(ObjectStorageQueueExceptionsDuringInsert, "Number of exceptions during insert in S3(Azure)Queue", ValueType::Number)\
762-
M(ObjectStorageQueueRemovedObjects, "Number of objects removed as part of after_processing = delete", ValueType::Number)\
763-
M(ObjectStorageQueueInsertIterations, "Number of insert iterations", ValueType::Number)\
764-
M(ObjectStorageQueueCommitRequests, "Number of keeper requests to commit files as either failed or processed", ValueType::Number)\
765-
M(ObjectStorageQueueSuccessfulCommits, "Number of successful keeper commits", ValueType::Number)\
766-
M(ObjectStorageQueueUnsuccessfulCommits, "Number of unsuccessful keeper commits", ValueType::Number)\
767-
M(ObjectStorageQueueCancelledFiles, "Number cancelled files in StorageS3(Azure)Queue", ValueType::Number)\
768-
M(ObjectStorageQueueProcessedRows, "Number of processed rows in StorageS3(Azure)Queue", ValueType::Number)\
752+
M(ObjectStorageQueueFailedToBatchSetProcessing, "Number of times batched set processing request failed", ValueType::Number) \
753+
M(ObjectStorageQueueTrySetProcessingRequests, "The number of times we tried to make set processing request", ValueType::Number) \
754+
M(ObjectStorageQueueTrySetProcessingSucceeded, "The number of times we successfully set file as processing", ValueType::Number) \
755+
M(ObjectStorageQueueTrySetProcessingFailed, "The number of times we unsuccessfully set file as processing", ValueType::Number) \
756+
M(ObjectStorageQueueListedFiles, "Number of listed files in StorageS3(Azure)Queue", ValueType::Number) \
757+
M(ObjectStorageQueueFilteredFiles, "Number of filtered files in StorageS3(Azure)Queue", ValueType::Number) \
758+
M(ObjectStorageQueueReadFiles, "Number of read files (not equal to the number of actually inserted files)", ValueType::Number) \
759+
M(ObjectStorageQueueReadRows, "Number of read rows (not equal to the number of actually inserted rows)", ValueType::Number) \
760+
M(ObjectStorageQueueReadBytes, "Number of read bytes (not equal to the number of actually inserted bytes)", ValueType::Number) \
761+
M(ObjectStorageQueueExceptionsDuringRead, "Number of exceptions during read in S3(Azure)Queue", ValueType::Number) \
762+
M(ObjectStorageQueueExceptionsDuringInsert, "Number of exceptions during insert in S3(Azure)Queue", ValueType::Number) \
763+
M(ObjectStorageQueueRemovedObjects, "Number of objects removed as part of after_processing = delete", ValueType::Number) \
764+
M(ObjectStorageQueueInsertIterations, "Number of insert iterations", ValueType::Number) \
765+
M(ObjectStorageQueueCommitRequests, "Number of keeper requests to commit files as either failed or processed", ValueType::Number) \
766+
M(ObjectStorageQueueSuccessfulCommits, "Number of successful keeper commits", ValueType::Number) \
767+
M(ObjectStorageQueueUnsuccessfulCommits, "Number of unsuccessful keeper commits", ValueType::Number) \
768+
M(ObjectStorageQueueCancelledFiles, "Number cancelled files in StorageS3(Azure)Queue", ValueType::Number) \
769+
M(ObjectStorageQueueProcessedRows, "Number of processed rows in StorageS3(Azure)Queue", ValueType::Number) \
769770
\
770771
M(ServerStartupMilliseconds, "Time elapsed from starting server to listening to sockets in milliseconds", ValueType::Milliseconds) \
771772
M(IOUringSQEsSubmitted, "Total number of io_uring SQEs submitted", ValueType::Number) \
@@ -938,7 +939,7 @@ The server successfully detected this situation and will download merged part fr
938939
M(StorageConnectionsPreserved, "Number of preserved connections for storages", ValueType::Number) \
939940
M(StorageConnectionsExpired, "Number of expired connections for storages", ValueType::Number) \
940941
M(StorageConnectionsErrors, "Number of cases when creation of a connection for storage is failed", ValueType::Number) \
941-
M(StorageConnectionsElapsedMicroseconds, "Total time spend on creating connections for storages", ValueType::Microseconds) \
942+
M(StorageConnectionsElapsedMicroseconds, "Total time spend on creating connections for storages", ValueType::Microseconds) \
942943
\
943944
M(DiskConnectionsCreated, "Number of created connections for disk", ValueType::Number) \
944945
M(DiskConnectionsReused, "Number of reused connections for disk", ValueType::Number) \
@@ -1122,6 +1123,30 @@ void incrementNoTrace(Event event, Count amount)
11221123
DB::CurrentThread::getProfileEvents().incrementNoTrace(event, amount);
11231124
}
11241125

1126+
double Counters::getCPUOverload(Int64 os_cpu_busy_time_threshold, bool reset)
1127+
{
1128+
/// It's possible that we'll have slightly inconsistent values between wait time and busy time. But since we take the value of CPU wait time first,
1129+
/// it should not affect the situation a lot. In the worst case scenario we will have a slightly lower CPU overload value than it should be, but it's fine.
1130+
Int64 curr_cpu_wait_microseconds = counters[OSCPUWaitMicroseconds];
1131+
Int64 curr_cpu_virtual_time_microseconds = counters[OSCPUVirtualTimeMicroseconds];
1132+
1133+
Int64 os_cpu_wait_microseconds = curr_cpu_wait_microseconds - prev_cpu_wait_microseconds.load(std::memory_order_acquire);
1134+
Int64 os_cpu_virtual_time_microseconds = curr_cpu_virtual_time_microseconds - prev_cpu_virtual_time_microseconds.load(std::memory_order_acquire);
1135+
1136+
if (reset)
1137+
{
1138+
/// It's important to update wait time first, since the atomicity is not guaranteed for both counters at the same time.
1139+
/// So in the worst case scenario, we'll update prev wait time first, which will result in an underestimated wait time and lower CPU overload value.
1140+
prev_cpu_wait_microseconds.store(curr_cpu_wait_microseconds, std::memory_order_release);
1141+
prev_cpu_virtual_time_microseconds.store(curr_cpu_virtual_time_microseconds, std::memory_order_release);
1142+
}
1143+
1144+
if (os_cpu_virtual_time_microseconds <= os_cpu_busy_time_threshold || os_cpu_wait_microseconds <= 0)
1145+
return 0;
1146+
1147+
return static_cast<double>(os_cpu_wait_microseconds) / os_cpu_virtual_time_microseconds;
1148+
}
1149+
11251150
void Counters::increment(Event event, Count amount)
11261151
{
11271152
Counters * current = this;

src/Common/ProfileEvents.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <Common/VariableContext.h>
44
#include <Common/Stopwatch.h>
5+
#include <Interpreters/Context_fwd.h>
56
#include <base/types.h>
67
#include <base/strong_typedef.h>
78
#include <Poco/Message.h>
@@ -62,6 +63,8 @@ namespace ProfileEvents
6263
/// Used to propagate increments
6364
std::atomic<Counters *> parent = {};
6465
bool trace_profile_events = false;
66+
Counter prev_cpu_wait_microseconds = 0;
67+
Counter prev_cpu_virtual_time_microseconds = 0;
6568

6669
public:
6770

@@ -86,6 +89,8 @@ namespace ProfileEvents
8689
return counters[event];
8790
}
8891

92+
double getCPUOverload(Int64 os_cpu_busy_time_threshold, bool reset = false);
93+
8994
void increment(Event event, Count amount = 1);
9095
void incrementNoTrace(Event event, Count amount = 1);
9196

src/Core/ServerSettings.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1035,7 +1035,8 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
10351035
<wait_dictionaries_load_at_startup>true</wait_dictionaries_load_at_startup>
10361036
```
10371037
)", 0) \
1038-
DECLARE(Bool, storage_shared_set_join_use_inner_uuid, true, "If enabled, an inner UUID is generated during the creation of SharedSet and SharedJoin. ClickHouse Cloud only", 0)
1038+
DECLARE(Bool, storage_shared_set_join_use_inner_uuid, true, "If enabled, an inner UUID is generated during the creation of SharedSet and SharedJoin. ClickHouse Cloud only", 0) \
1039+
DECLARE(UInt64, os_cpu_busy_time_threshold, 1'000'000, "Threshold of OS CPU busy time in microseconds (OSCPUVirtualTimeMicroseconds metric) to consider CPU doing some useful work, no CPU overload would be considered if busy time was below this value.", 0) \
10391040

10401041

10411042
// clang-format on

src/Core/Settings.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6420,6 +6420,8 @@ Note that initially (24.12) there was a server setting (`send_settings_to_client
64206420
DECLARE(Milliseconds, low_priority_query_wait_time_ms, 1000, R"(
64216421
Wait time in milliseconds when lower priority query meets higher priority query.
64226422
)", BETA) \
6423+
DECLARE(Float, min_os_cpu_wait_time_ratio_to_throw, 2.0, "Min ratio between OS CPU wait (OSCPUWaitMicroseconds metric) and busy (OSCPUVirtualTimeMicroseconds metric) times to consider rejecting queries. Linear interpolation between min and max ratio is used to calculate the probability, the probability is 0 at this point.", 0) \
6424+
DECLARE(Float, max_os_cpu_wait_time_ratio_to_throw, 6.0, "Max ratio between OS CPU wait (OSCPUWaitMicroseconds metric) and busy (OSCPUVirtualTimeMicroseconds metric) times to consider rejecting queries. Linear interpolation between min and max ratio is used to calculate the probability, the probability is 1 at this point.", 0) \
64236425
\
64246426
/* ####################################################### */ \
64256427
/* ########### START OF EXPERIMENTAL FEATURES ############ */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7676
{"low_priority_query_wait_time_ms", 1000, 1000, "New setting."},
7777
{"allow_experimental_shared_set_join", 0, 1, "A setting for ClickHouse Cloud to enable SharedSet and SharedJoin"},
7878
{"distributed_cache_read_request_max_tries", 20, 20, "New setting"},
79+
{"min_os_cpu_wait_time_ratio_to_throw", 0, 2, "New setting"},
80+
{"max_os_cpu_wait_time_ratio_to_throw", 0, 6, "New setting"},
7981
});
8082
addSettingsChanges(settings_changes_history, "25.3",
8183
{

0 commit comments

Comments
 (0)