@@ -202,7 +202,9 @@ ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr ObjectStorageQueueOrdered
202202
203203 const auto processor_info = getProcessorInfo (processor);
204204
205- while (true )
205+ const size_t max_num_tries = 1000 ;
206+ Coordination::Error code;
207+ for (size_t i = 0 ; i < max_num_tries; ++i)
206208 {
207209 Coordination::Requests requests;
208210
@@ -225,7 +227,7 @@ ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr ObjectStorageQueueOrdered
225227 requests.push_back (zkutil::makeSetRequest (bucket_lock_id_path, processor_info, -1 ));
226228
227229 Coordination::Responses responses;
228- const auto code = zk_client->tryMulti (requests, responses);
230+ code = zk_client->tryMulti (requests, responses);
229231 if (code == Coordination::Error::ZOK)
230232 {
231233 const auto & set_response = dynamic_cast <const Coordination::SetResponse &>(*responses.back ());
@@ -259,6 +261,11 @@ ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr ObjectStorageQueueOrdered
259261 LOG_INFO (log_, " Bucket lock id path was probably created or removed "
260262 " while acquiring the bucket (error code: {}), will retry" , code);
261263 }
264+
265+ throw Exception (
266+ ErrorCodes::LOGICAL_ERROR,
267+ " Failed to set file processing within {} retries, last error: {}" ,
268+ max_num_tries, code);
262269}
263270
264271std::pair<bool , ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorageQueueOrderedFileMetadata::setProcessingImpl ()
@@ -267,7 +274,9 @@ std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorag
267274 processing_id = node_metadata.processing_id = getRandomASCIIString (10 );
268275 auto processor_info = getProcessorInfo (processing_id.value ());
269276
270- while (true )
277+ const size_t max_num_tries = 1000 ;
278+ Coordination::Error code;
279+ for (size_t i = 0 ; i < max_num_tries; ++i)
271280 {
272281 std::optional<NodeMetadata> processed_node;
273282 Coordination::Stat processed_node_stat;
@@ -277,10 +286,10 @@ std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorag
277286 std::vector<std::string> paths{processed_node_path, failed_node_path};
278287 auto responses = zk_client->tryGet (paths);
279288
280- auto check_code = [this ](auto code )
289+ auto check_code = [this ](auto code_ )
281290 {
282- if (!(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE))
283- throw zkutil::KeeperException::fromPath (code , path);
291+ if (!(code_ == Coordination::Error::ZOK || code_ == Coordination::Error::ZNONODE))
292+ throw zkutil::KeeperException::fromPath (code_ , path);
284293 };
285294 check_code (responses[0 ].error );
286295 check_code (responses[1 ].error );
@@ -367,7 +376,7 @@ std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorag
367376 zkutil::addCheckNotExistsRequest (requests, *zk_client, processed_node_path);
368377
369378 Coordination::Responses responses;
370- const auto code = zk_client->tryMulti (requests, responses);
379+ code = zk_client->tryMulti (requests, responses);
371380 auto has_request_failed = [&](size_t request_index) { return responses[request_index]->error != Coordination::Error::ZOK; };
372381
373382 if (code == Coordination::Error::ZOK)
@@ -401,6 +410,11 @@ std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorag
401410 // / most likely the processing node id path node was removed or created so let's try again
402411 LOG_TRACE (log, " Retrying setProcessing because processing node id path is unexpectedly missing or was created (error code: {})" , code);
403412 }
413+
414+ throw Exception (
415+ ErrorCodes::LOGICAL_ERROR,
416+ " Failed to set file processing within {} retries, last error: {}" ,
417+ max_num_tries, code);
404418}
405419
406420void ObjectStorageQueueOrderedFileMetadata::prepareProcessedAtStartRequests (
0 commit comments