Skip to content

Commit 2bc9a6d

Browse files
Backport #66898 to 24.3: [CI Fest] Better processing of broken parts and their projections (fixes rare cases of lost forever)
1 parent f4fbb01 commit 2bc9a6d

File tree

3 files changed

+31
-22
lines changed

3 files changed

+31
-22
lines changed

src/Storages/MergeTree/DataPartsExchange.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
#include <Storages/MergeTree/ReplicatedFetchList.h>
1515
#include <Storages/StorageReplicatedMergeTree.h>
1616
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
17+
#include <Storages/MergeTree/MergeTreeData.h>
18+
#include <Storages/MergeTree/MergeTreeSettings.h>
19+
#include <Storages/MergeTree/checkDataPart.h>
1720
#include <Common/CurrentMetrics.h>
1821
#include <Common/NetException.h>
1922
#include <Disks/IO/createReadBufferFromFileBase.h>
@@ -210,14 +213,18 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
210213
}
211214
catch (const Exception & e)
212215
{
213-
if (e.code() != ErrorCodes::ABORTED && e.code() != ErrorCodes::CANNOT_WRITE_TO_OSTREAM)
216+
if (e.code() != ErrorCodes::CANNOT_WRITE_TO_OSTREAM
217+
&& !isRetryableException(std::current_exception()))
218+
{
214219
report_broken_part();
220+
}
215221

216222
throw;
217223
}
218224
catch (...)
219225
{
220-
report_broken_part();
226+
if (!isRetryableException(std::current_exception()))
227+
report_broken_part();
221228
throw;
222229
}
223230
}

src/Storages/MergeTree/MergeTreeSequentialSource.cpp

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,11 @@
1414
#include <Processors/QueryPlan/FilterStep.h>
1515
#include <Common/logger_useful.h>
1616
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
17+
#include <Storages/MergeTree/checkDataPart.h>
1718

1819
namespace DB
1920
{
2021

21-
namespace ErrorCodes
22-
{
23-
extern const int MEMORY_LIMIT_EXCEEDED;
24-
}
25-
26-
2722
/// Lightweight (in terms of logic) stream for reading single part from
2823
/// MergeTree, used for merges and mutations.
2924
///
@@ -275,7 +270,7 @@ try
275270
catch (...)
276271
{
277272
/// Suspicion of the broken part. A part is added to the queue for verification.
278-
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
273+
if (!isRetryableException(std::current_exception()))
279274
storage.reportBrokenPart(data_part);
280275
throw;
281276
}

src/Storages/MergeTree/checkDataPart.cpp

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,13 @@ namespace ErrorCodes
3838
extern const int CANNOT_ALLOCATE_MEMORY;
3939
extern const int CANNOT_MUNMAP;
4040
extern const int CANNOT_MREMAP;
41+
extern const int CANNOT_SCHEDULE_TASK;
4142
extern const int UNEXPECTED_FILE_IN_DATA_PART;
4243
extern const int NO_FILE_IN_DATA_PART;
4344
extern const int NETWORK_ERROR;
4445
extern const int SOCKET_TIMEOUT;
4546
extern const int BROKEN_PROJECTION;
47+
extern const int ABORTED;
4648
}
4749

4850

@@ -88,11 +90,11 @@ bool isRetryableException(std::exception_ptr exception_ptr)
8890
}
8991
catch (const Exception & e)
9092
{
91-
if (isNotEnoughMemoryErrorCode(e.code()))
92-
return true;
93-
94-
if (e.code() == ErrorCodes::NETWORK_ERROR || e.code() == ErrorCodes::SOCKET_TIMEOUT)
95-
return true;
93+
return isNotEnoughMemoryErrorCode(e.code())
94+
|| e.code() == ErrorCodes::NETWORK_ERROR
95+
|| e.code() == ErrorCodes::SOCKET_TIMEOUT
96+
|| e.code() == ErrorCodes::CANNOT_SCHEDULE_TASK
97+
|| e.code() == ErrorCodes::ABORTED;
9698
}
9799
catch (const Poco::Net::NetException &)
98100
{
@@ -335,16 +337,21 @@ static IMergeTreeDataPart::Checksums checkDataPart(
335337
projections_on_disk.erase(projection_file);
336338
}
337339

338-
if (throw_on_broken_projection && !broken_projections_message.empty())
340+
if (throw_on_broken_projection)
339341
{
340-
throw Exception(ErrorCodes::BROKEN_PROJECTION, "{}", broken_projections_message);
341-
}
342+
if (!broken_projections_message.empty())
343+
{
344+
throw Exception(ErrorCodes::BROKEN_PROJECTION, "{}", broken_projections_message);
345+
}
342346

343-
if (require_checksums && !projections_on_disk.empty())
344-
{
345-
throw Exception(ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART,
346-
"Found unexpected projection directories: {}",
347-
fmt::join(projections_on_disk, ","));
347+
/// This one is actually not broken, just redundant files on disk which
348+
/// MergeTree will never use.
349+
if (require_checksums && !projections_on_disk.empty())
350+
{
351+
throw Exception(ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART,
352+
"Found unexpected projection directories: {}",
353+
fmt::join(projections_on_disk, ","));
354+
}
348355
}
349356

350357
if (is_cancelled())

0 commit comments

Comments
 (0)