-
Notifications
You must be signed in to change notification settings - Fork 8.3k
Slow down all backup threads after S3 retryable errors, including ‘SlowDown’ #84854
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
46a23f5
dd6c73d
964a297
31926d6
ba3f969
35940b0
9d7373f
355fe74
20c4191
7d419a0
096a3b8
55348fd
45275b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -110,12 +110,8 @@ bool Client::RetryStrategy::useGCSRewrite(const Aws::Client::AWSError<Aws::Clien | |
| /// NOLINTNEXTLINE(google-runtime-int) | ||
| long Client::RetryStrategy::CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>&, long attemptedRetries) const | ||
| { | ||
| if (attemptedRetries == 0) | ||
| { | ||
| return 0; | ||
| } | ||
|
|
||
| uint64_t backoffLimitedPow = 1ul << std::min(attemptedRetries, 31l); | ||
| chassert(attemptedRetries >= 0); | ||
| uint64_t backoffLimitedPow = 1ul << std::clamp(attemptedRetries, 0l, 31l); | ||
| return std::min<uint64_t>(scaleFactor * backoffLimitedPow, maxDelayMs); | ||
| } | ||
|
|
||
|
|
@@ -668,13 +664,13 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request | |
| auto with_retries = [this, request_fn_ = std::move(request_fn)] (const RequestType & request_) | ||
| { | ||
| chassert(client_configuration.retryStrategy); | ||
| const Int64 max_attempts = client_configuration.retryStrategy->GetMaxAttempts(); | ||
| const Int64 max_attempts = client_configuration.s3_retry_attempts + 1; | ||
| chassert(max_attempts > 0); | ||
| std::exception_ptr last_exception = nullptr; | ||
| for (Int64 attempt_no = 0; attempt_no < max_attempts; ++attempt_no) | ||
| { | ||
| /// Sometimes we need to slow down because other requests failed with network errors to free the S3 server a bit. | ||
| slowDownAfterNetworkError(); | ||
| /// Slowing down due to a previously encountered retryable error, possibly from another thread. | ||
| slowDownAfterRetryableError(); | ||
|
|
||
| try | ||
| { | ||
|
|
@@ -688,7 +684,19 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request | |
| /// Not all requests can be retried in that way. | ||
| /// Requests that read out response body to build the result are possible to retry. | ||
| /// Requests that expose the response stream as an answer are not retried with that code. E.g. GetObject. | ||
| return request_fn_(request_); | ||
| auto outcome = request_fn_(request_); | ||
|
|
||
| if (!outcome.IsSuccess() | ||
| /// AWS SDK's built-in per-thread retry logic is disabled. | ||
| && client_configuration.s3_slow_all_threads_after_retryable_error | ||
| && attempt_no + 1 < max_attempts | ||
| /// Retry attempts are managed by the outer loop, so the attemptedRetries argument can be ignored. | ||
| && client_configuration.retryStrategy->ShouldRetry(outcome.GetError(), /*attemptedRetries*/ -1)) | ||
| { | ||
| updateNextTimeToRetryAfterRetryableError(outcome.GetError(), attempt_no); | ||
| continue; | ||
| } | ||
| return outcome; | ||
|
Comment on lines
+687
to
+699
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the essential part. |
||
| } | ||
| catch (Poco::Net::NetException &) | ||
| { | ||
|
|
@@ -714,11 +722,12 @@ Client::doRequestWithRetryNetworkErrors(RequestType & request, RequestFn request | |
|
|
||
| auto error = Aws::Client::AWSError<Aws::Client::CoreErrors>(Aws::Client::CoreErrors::NETWORK_CONNECTION, /*retry*/ true); | ||
|
|
||
| /// Check if query is canceled | ||
| if (!client_configuration.retryStrategy->ShouldRetry(error, attempt_no)) | ||
| /// Check if query is canceled. | ||
| /// Retry attempts are managed by the outer loop, so the attemptedRetries argument can be ignored. | ||
| if (!client_configuration.retryStrategy->ShouldRetry(error, /*attemptedRetries*/ -1)) | ||
| break; | ||
|
|
||
| sleepAfterNetworkError(error, attempt_no); | ||
| updateNextTimeToRetryAfterRetryableError(error, attempt_no); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -749,36 +758,33 @@ RequestResult Client::processRequestResult(RequestResult && outcome) const | |
| return RequestResult(error); | ||
| } | ||
|
|
||
| void Client::sleepAfterNetworkError(Aws::Client::AWSError<Aws::Client::CoreErrors> error, Int64 attempt_no) const | ||
| void Client::updateNextTimeToRetryAfterRetryableError(Aws::Client::AWSError<Aws::Client::CoreErrors> error, Int64 attempt_no) const | ||
| { | ||
| auto sleep_ms = client_configuration.retryStrategy->CalculateDelayBeforeNextRetry(error, attempt_no); | ||
| if (!client_configuration.s3_slow_all_threads_after_network_error) | ||
| { | ||
| LOG_WARNING(log, "Request failed, now waiting {} ms before attempting again", sleep_ms); | ||
| sleepForMilliseconds(sleep_ms); | ||
|
Comment on lines
-757
to
-758
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is incorrect because it changes the behavior of the old versions with |
||
| if (!client_configuration.s3_slow_all_threads_after_network_error || !client_configuration.s3_slow_all_threads_after_retryable_error) | ||
| return; | ||
| } | ||
|
|
||
| /// Set the time other s3 requests must wait until. | ||
| auto sleep_ms = client_configuration.retryStrategy->CalculateDelayBeforeNextRetry(error, attempt_no); | ||
| /// Other S3 requests must wait until this time. | ||
| UInt64 current_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); | ||
| UInt64 next_time_ms = current_time_ms + sleep_ms; | ||
| /// next_time_to_retry_after_network_error = std::max(next_time_to_retry_after_network_error, next_time_ms) | ||
| for (UInt64 stored_next_time = next_time_to_retry_after_network_error; | ||
| (stored_next_time < next_time_ms) && !next_time_to_retry_after_network_error.compare_exchange_weak(stored_next_time, next_time_ms);) | ||
| UInt64 stored_next_time = next_time_to_retry_after_retryable_error; | ||
| while (stored_next_time < next_time_ms | ||
| && !next_time_to_retry_after_retryable_error.compare_exchange_weak(stored_next_time, next_time_ms)) | ||
| { | ||
| /// Atomically update to a later retry time, but only if it's further in the future. | ||
| } | ||
| } | ||
|
|
||
| void Client::slowDownAfterNetworkError() const | ||
| void Client::slowDownAfterRetryableError() const | ||
| { | ||
| if (!client_configuration.s3_slow_all_threads_after_network_error) | ||
| if (!client_configuration.s3_slow_all_threads_after_network_error && !client_configuration.s3_slow_all_threads_after_retryable_error) | ||
| return; | ||
|
|
||
| /// Wait until `next_time_to_retry_after_network_error`. | ||
| /// Wait until `next_time_to_retry_after_retryable_error`. | ||
| for (;;) | ||
| { | ||
| UInt64 current_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); | ||
| UInt64 next_time_ms = next_time_to_retry_after_network_error.load(); | ||
| UInt64 next_time_ms = next_time_to_retry_after_retryable_error.load(); | ||
| if (current_time_ms >= next_time_ms) | ||
| break; | ||
| UInt64 sleep_ms = next_time_ms - current_time_ms; | ||
|
|
@@ -1057,7 +1063,9 @@ std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT | |
| credentials_configuration.role_session_name, credentials_configuration.expiration_window_seconds, | ||
| std::move(credentials_provider), client_configuration, credentials_configuration.sts_endpoint_override); | ||
|
|
||
| client_configuration.retryStrategy = std::make_shared<Client::RetryStrategy>(client_configuration.s3_retry_attempts); | ||
| /// Disable per-thread retry loops if global retry coordination is in use. | ||
| uint32_t retry_attempts = client_configuration.s3_slow_all_threads_after_retryable_error ? 0 : client_configuration.s3_retry_attempts; | ||
| client_configuration.retryStrategy = std::make_shared<Client::RetryStrategy>(retry_attempts); | ||
|
|
||
| /// Use virtual addressing if endpoint is not specified. | ||
| if (client_configuration.endpointOverride.empty()) | ||
|
|
@@ -1079,6 +1087,7 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT | |
| unsigned int s3_max_redirects, | ||
| unsigned int s3_retry_attempts, | ||
| bool s3_slow_all_threads_after_network_error, | ||
| bool s3_slow_all_threads_after_retryable_error, | ||
| bool enable_s3_requests_logging, | ||
| bool for_disk_s3, | ||
| const ThrottlerPtr & get_request_throttler, | ||
|
|
@@ -1099,6 +1108,7 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT | |
| s3_max_redirects, | ||
| s3_retry_attempts, | ||
| s3_slow_all_threads_after_network_error, | ||
| s3_slow_all_threads_after_retryable_error, | ||
| enable_s3_requests_logging, | ||
| for_disk_s3, | ||
| context->getGlobalContext()->getSettingsRef()[Setting::s3_use_adaptive_timeouts], | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need two separate settings? Couldn't you just modify the logic of
s3_slow_all_threads_after_network_errorinstead of adding a new setting?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I enabled the new setting only for backups. In other places, wherever the S3 client is configured, the value of this setting is
false, unlikes3_slow_all_threads_after_network_error, which is true.For example:
src/Coordination/KeeperSnapshotManagerS3.cpp
The reason for this is that I prefer to avoid any unexpected delays outside of the backup ecosystem, and the setting
s3_slow_all_threads_after_network_erroris too broad for the use case.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The setting
s3_slow_all_threads_after_network_errorwas added only to resolve the backup issue, despite the fact it doesn't have thebackupword in its name. Since you've found a better solution let's remove the previous setting or at least make it obsolete and make these settings work in exactly the same way. Just to avoid creating unnecessary complexity.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It applies not solely to backups:
This is why I introduced a new backup-specific setting.
I am fine with deprecating
s3_slow_all_threads_after_network_errorin favor of the new setting, but IMO its value for non-backup cases should be set tofalse.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but I didn't have any intention to do anything specially for
KeeperSnapshotManagerS3orS3 object storage. My idea in #80035 was to provide a general fix for S3 client. Since you've modified that fix so it works only for backups now, then it seems better not to apply the previous insufficient fix #80035 toKeeperSnapshotManagerS3orS3 object storage.