Skip to content

Commit 5ab1587

Browse files
Backport #84463 to 25.5: S3Queue ordered mode fix: quit earlier if shutdown
1 parent c4ba82b commit 5ab1587

File tree

3 files changed

+32
-10
lines changed

3 files changed

+32
-10
lines changed

src/Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.cpp

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,9 @@ ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr ObjectStorageQueueOrdered
202202

203203
const auto processor_info = getProcessorInfo(processor);
204204

205-
while (true)
205+
const size_t max_num_tries = 1000;
206+
Coordination::Error code;
207+
for (size_t i = 0; i < max_num_tries; ++i)
206208
{
207209
Coordination::Requests requests;
208210

@@ -225,7 +227,7 @@ ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr ObjectStorageQueueOrdered
225227
requests.push_back(zkutil::makeSetRequest(bucket_lock_id_path, processor_info, -1));
226228

227229
Coordination::Responses responses;
228-
const auto code = zk_client->tryMulti(requests, responses);
230+
code = zk_client->tryMulti(requests, responses);
229231
if (code == Coordination::Error::ZOK)
230232
{
231233
const auto & set_response = dynamic_cast<const Coordination::SetResponse &>(*responses.back());
@@ -259,6 +261,11 @@ ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr ObjectStorageQueueOrdered
259261
LOG_INFO(log_, "Bucket lock id path was probably created or removed "
260262
"while acquiring the bucket (error code: {}), will retry", code);
261263
}
264+
265+
throw Exception(
266+
ErrorCodes::LOGICAL_ERROR,
267+
"Failed to set file processing within {} retries, last error: {}",
268+
max_num_tries, code);
262269
}
263270

264271
std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorageQueueOrderedFileMetadata::setProcessingImpl()
@@ -267,7 +274,9 @@ std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorag
267274
processing_id = node_metadata.processing_id = getRandomASCIIString(10);
268275
auto processor_info = getProcessorInfo(processing_id.value());
269276

270-
while (true)
277+
const size_t max_num_tries = 1000;
278+
Coordination::Error code;
279+
for (size_t i = 0; i < max_num_tries; ++i)
271280
{
272281
std::optional<NodeMetadata> processed_node;
273282
Coordination::Stat processed_node_stat;
@@ -277,10 +286,10 @@ std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorag
277286
std::vector<std::string> paths{processed_node_path, failed_node_path};
278287
auto responses = zk_client->tryGet(paths);
279288

280-
auto check_code = [this](auto code)
289+
auto check_code = [this](auto code_)
281290
{
282-
if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
283-
throw zkutil::KeeperException::fromPath(code, path);
291+
if (!(code_ == Coordination::Error::ZOK || code_ == Coordination::Error::ZNONODE))
292+
throw zkutil::KeeperException::fromPath(code_, path);
284293
};
285294
check_code(responses[0].error);
286295
check_code(responses[1].error);
@@ -367,7 +376,7 @@ std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorag
367376
zkutil::addCheckNotExistsRequest(requests, *zk_client, processed_node_path);
368377

369378
Coordination::Responses responses;
370-
const auto code = zk_client->tryMulti(requests, responses);
379+
code = zk_client->tryMulti(requests, responses);
371380
auto has_request_failed = [&](size_t request_index) { return responses[request_index]->error != Coordination::Error::ZOK; };
372381

373382
if (code == Coordination::Error::ZOK)
@@ -401,6 +410,11 @@ std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorag
401410
/// most likely the processing node id path node was removed or created so let's try again
402411
LOG_TRACE(log, "Retrying setProcessing because processing node id path is unexpectedly missing or was created (error code: {})", code);
403412
}
413+
414+
throw Exception(
415+
ErrorCodes::LOGICAL_ERROR,
416+
"Failed to set file processing within {} retries, last error: {}",
417+
max_num_tries, code);
404418
}
405419

406420
void ObjectStorageQueueOrderedFileMetadata::prepareProcessedAtStartRequests(

src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ ObjectStorageQueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t proc
512512
log, "Current processor: {}, acquired bucket: {}",
513513
processor, current_bucket_holder ? toString(current_bucket_holder->getBucket()) : "None");
514514

515-
while (true)
515+
while (!shutdown_called)
516516
{
517517
/// Each processing thread gets next path
518518
/// and checks if corresponding bucket is already acquired by someone.
@@ -710,6 +710,7 @@ ObjectStorageQueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t proc
710710
if (listed_keys_cache.empty())
711711
return {};
712712
}
713+
return {};
713714
}
714715

715716
ObjectStorageQueueSource::ObjectStorageQueueSource(

src/Storages/ObjectStorageQueue/ObjectStorageQueueUnorderedFileMetadata.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,15 @@ ObjectStorageQueueUnorderedFileMetadata::prepareProcessingRequestsImpl(Coordinat
8383
std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorageQueueUnorderedFileMetadata::setProcessingImpl()
8484
{
8585
const auto zk_client = getZooKeeper();
86-
while (true)
86+
const size_t max_num_tries = 1000;
87+
Coordination::Error code;
88+
for (size_t i = 0; i < max_num_tries; ++i)
8789
{
8890
Coordination::Requests requests;
8991
auto result_indexes = prepareProcessingRequestsImpl(requests);
9092

9193
Coordination::Responses responses;
92-
const auto code = zk_client->tryMulti(requests, responses);
94+
code = zk_client->tryMulti(requests, responses);
9395
auto has_request_failed = [&](size_t request_index) { return responses[request_index]->error != Coordination::Error::ZOK; };
9496

9597
if (code == Coordination::Error::ZOK)
@@ -118,6 +120,11 @@ std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorag
118120
log, "Retrying setProcessing because processing node id path "
119121
"is unexpectedly missing or was created (error code: {})", code);
120122
}
123+
124+
throw Exception(
125+
ErrorCodes::LOGICAL_ERROR,
126+
"Failed to set file processing within {} retries, last error: {}",
127+
max_num_tries, code);
121128
}
122129

123130
void ObjectStorageQueueUnorderedFileMetadata::prepareProcessedAtStartRequests(

0 commit comments

Comments
 (0)