Skip to content

Commit c976b28

Browse files
committed
less pedantic checks in operations, rely on existed tx functionality
1 parent a4d1adf commit c976b28

11 files changed

+351
-225
lines changed

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 98 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ namespace ErrorCodes
168168
extern const int INCORRECT_QUERY;
169169
extern const int CANNOT_RESTORE_TABLE;
170170
extern const int ZERO_COPY_REPLICATION_ERROR;
171+
extern const int SERIALIZATION_ERROR;
171172
}
172173

173174

@@ -1693,15 +1694,6 @@ scope_guard MergeTreeData::getTemporaryPartDirectoryHolder(const String & part_d
16931694
return [this, part_dir_name]() { temporary_parts.remove(part_dir_name); };
16941695
}
16951696

1696-
std::optional<scope_guard> MergeTreeData::tryGetTemporaryPartDirectoryHolder(const String & part_dir_name)
1697-
{
1698-
bool inserted = temporary_parts.add(part_dir_name);
1699-
if (!inserted)
1700-
return {};
1701-
1702-
return [this, part_dir_name]() { temporary_parts.remove(part_dir_name); };
1703-
}
1704-
17051697
MergeTreeData::MutableDataPartPtr MergeTreeData::preparePartForRemoval(const DataPartPtr & part)
17061698
{
17071699
auto state = part->getState();
@@ -1767,6 +1759,8 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts(bool force)
17671759
const DataPartPtr & part = *it;
17681760

17691761
/// Do not remove outdated part if it may be visible for some transaction
1762+
LOG_TRACE(log, "grabOldParts: check {}.", part->name);
1763+
17701764
if (!part->version.canBeRemoved())
17711765
{
17721766
skipped_parts.push_back(part->info);
@@ -2140,7 +2134,7 @@ size_t MergeTreeData::clearEmptyParts()
21402134
if (part->getState() != DataPartState::Active)
21412135
continue;
21422136

2143-
DataPartsVector covered_parts = getCoveredOutdatedParts(part->info, lock);
2137+
DataPartsVector covered_parts = getCoveredOutdatedParts(part, lock);
21442138
if (!covered_parts.empty())
21452139
continue;
21462140
}
@@ -2924,14 +2918,13 @@ MergeTreeData::PartsTemporaryRename::~PartsTemporaryRename()
29242918
}
29252919
}
29262920

2927-
void MergeTreeData::getPartHierarchy(
2921+
MergeTreeData::PartHierarchy MergeTreeData::getPartHierarchy(
29282922
const MergeTreePartInfo & part_info,
2929-
const String & part_name,
29302923
DataPartState state,
2931-
DataPartPtr & out_covering_part,
2932-
DataPartsVector & covered_part,
29332924
DataPartsLock & /* data_parts_lock */) const
29342925
{
2926+
PartHierarchy result;
2927+
29352928
/// Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself.
29362929
auto it_middle = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo{state, part_info});
29372930
auto committed_parts_range = getDataPartsStateRange(state);
@@ -2946,13 +2939,15 @@ void MergeTreeData::getPartHierarchy(
29462939
{
29472940
if ((*prev)->info.contains(part_info))
29482941
{
2949-
out_covering_part = *prev;
2950-
return;
2942+
result.covering_part = *prev;
2943+
return result;
29512944
}
29522945

29532946
if (!part_info.isDisjoint((*prev)->info))
2954-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug.",
2955-
part_name, (*prev)->getNameWithState());
2947+
{
2948+
result.intersected_part = *prev;
2949+
return result;
2950+
}
29562951

29572952
break;
29582953
}
@@ -2965,37 +2960,42 @@ void MergeTreeData::getPartHierarchy(
29652960
while (end != committed_parts_range.end())
29662961
{
29672962
if ((*end)->info == part_info)
2968-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected duplicate part {}. It is a bug.", (*end)->getNameWithState());
2963+
{
2964+
result.duplicate_part = *end;
2965+
return result;
2966+
}
29692967

29702968
if (!part_info.contains((*end)->info))
29712969
{
29722970
if ((*end)->info.contains(part_info))
29732971
{
2974-
out_covering_part = *end;
2975-
return;
2972+
result.covering_part = *end;
2973+
return result;
29762974
}
29772975

29782976
if (!part_info.isDisjoint((*end)->info))
2979-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug.",
2980-
part_name, (*end)->getNameWithState());
2977+
{
2978+
result.intersected_part = *end;
2979+
return result;
2980+
}
29812981

29822982
break;
29832983
}
29842984

29852985
++end;
29862986
}
29872987

2988-
covered_part = DataPartsVector{begin, end};
2988+
result.covered_parts = DataPartsVector{begin, end};
2989+
return result;
29892990
}
29902991

29912992
MergeTreeData::DataPartsVector MergeTreeData::getCoveredOutdatedParts(
2992-
const MergeTreePartInfo & part_info,
2993+
const DataPartPtr & part,
29932994
DataPartsLock & data_parts_lock) const
29942995
{
2995-
DataPartPtr covering_part;
2996-
DataPartsVector covered_parts;
2997-
getPartHierarchy(part_info, part_info.getPartName(), DataPartState::Outdated, covering_part, covered_parts, data_parts_lock);
2998-
return covered_parts;
2996+
assert(part->getState() == DataPartState::Active);
2997+
PartHierarchy hierarchy = getPartHierarchy(part->info, DataPartState::Outdated, data_parts_lock);
2998+
return hierarchy.covered_parts;
29992999
}
30003000

30013001
MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
@@ -3004,32 +3004,22 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
30043004
DataPartPtr & out_covering_part,
30053005
DataPartsLock & data_parts_lock) const
30063006
{
3007-
DataPartsVector covered_parts;
3008-
getPartHierarchy(new_part_info, new_part_name, DataPartState::Active, out_covering_part, covered_parts, data_parts_lock);
3009-
return covered_parts;
3010-
}
3007+
PartHierarchy hierarchy = getPartHierarchy(new_part_info, DataPartState::Active, data_parts_lock);
30113008

3012-
bool MergeTreeData::renameTempPartAndAdd(
3013-
MutableDataPartPtr & part,
3014-
Transaction & out_transaction,
3015-
DataPartsLock & lock)
3016-
{
3017-
DataPartsVector covered_parts;
3009+
if (hierarchy.intersected_part)
3010+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug.",
3011+
new_part_name, hierarchy.intersected_part->getNameWithState());
30183012

3019-
if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts))
3020-
return false;
3013+
if (hierarchy.duplicate_part)
3014+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected duplicate part {}. It is a bug.", hierarchy.duplicate_part->getNameWithState());
30213015

3022-
if (!covered_parts.empty())
3023-
throw Exception("Added part " + part->name + " covers " + toString(covered_parts.size())
3024-
+ " existing part(s) (including " + covered_parts[0]->name + ")", ErrorCodes::LOGICAL_ERROR);
3016+
out_covering_part = std::move(hierarchy.covering_part);
30253017

3026-
return true;
3018+
return std::move(hierarchy.covered_parts);
30273019
}
30283020

3029-
void MergeTreeData::checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPartsLock & lock) const
3021+
void MergeTreeData::checkPartPartition(MutableDataPartPtr & part, DataPartsLock & lock) const
30303022
{
3031-
part->assertState({DataPartState::Temporary});
3032-
30333023
if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
30343024
{
30353025
if (part->partition.value != existing_part_in_partition->partition.value)
@@ -3038,15 +3028,23 @@ void MergeTreeData::checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPa
30383028
+ existing_part_in_partition->name + ", newly added part: " + part->name,
30393029
ErrorCodes::CORRUPTED_DATA);
30403030
}
3031+
}
30413032

3042-
if (auto it_duplicate = data_parts_by_info.find(part->info); it_duplicate != data_parts_by_info.end())
3033+
void MergeTreeData::checkPartDuplicate(MutableDataPartPtr & part, Transaction & transaction, DataPartsLock & /*lock*/) const
3034+
{
3035+
auto it_duplicate = data_parts_by_info.find(part->info);
3036+
3037+
if (it_duplicate != data_parts_by_info.end())
30433038
{
30443039
String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists";
30453040

30463041
if ((*it_duplicate)->checkState({DataPartState::Outdated, DataPartState::Deleting}))
30473042
throw Exception(message + ", but it will be deleted soon", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
30483043

3049-
throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART);
3044+
if (transaction.txn)
3045+
throw Exception(message, ErrorCodes::SERIALIZATION_ERROR);
3046+
else
3047+
throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART);
30503048
}
30513049
}
30523050

@@ -3074,36 +3072,45 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
30743072
DataPartsLock & lock,
30753073
DataPartsVector * out_covered_parts)
30763074
{
3077-
LOG_TRACE(log, "Renaming temporary part {} to {}.", part->getDataPartStorage().getPartDirectory(), part->name);
3075+
LOG_TRACE(log, "Renaming temporary part {} to {} with tid {}.", part->getDataPartStorage().getPartDirectory(), part->name, out_transaction.getTID());
30783076

30793077
if (&out_transaction.data != this)
30803078
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
30813079
ErrorCodes::LOGICAL_ERROR);
30823080

3083-
if (part->hasLightweightDelete())
3084-
has_lightweight_delete_parts.store(true);
3081+
part->assertState({DataPartState::Temporary});
3082+
checkPartPartition(part, lock);
3083+
checkPartDuplicate(part, out_transaction, lock);
3084+
3085+
PartHierarchy hierarchy = getPartHierarchy(part->info, DataPartState::Active, lock);
3086+
3087+
if (hierarchy.intersected_part)
3088+
{
3089+
String message = fmt::format("Part {} intersects next part {}", part->name, hierarchy.intersected_part->getNameWithState());
30853090

3086-
checkPartCanBeAddedToTable(part, lock);
3091+
if (part->isEmpty() || hierarchy.intersected_part->isEmpty())
3092+
throw Exception(message, ErrorCodes::SERIALIZATION_ERROR);
30873093

3088-
DataPartPtr covering_part;
3089-
DataPartsVector covered_parts = getActivePartsToReplace(part->info, part->name, covering_part, lock);
3094+
throw Exception(ErrorCodes::LOGICAL_ERROR, message + " It is a bug.");
3095+
}
30903096

3091-
if (covering_part)
3097+
if (hierarchy.covering_part)
30923098
{
3093-
LOG_WARNING(log, "Tried to add obsolete part {} covered by {}", part->name, covering_part->getNameWithState());
3099+
LOG_WARNING(log, "Tried to add obsolete part {} covered by {}", part->name, hierarchy.covering_part->getNameWithState());
30943100
return false;
30953101
}
30963102

3103+
if (part->hasLightweightDelete())
3104+
has_lightweight_delete_parts.store(true);
3105+
30973106
/// All checks are passed. Now we can rename the part on disk.
30983107
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
30993108
preparePartForCommit(part, out_transaction);
31003109

31013110
if (out_covered_parts)
31023111
{
3103-
out_covered_parts->reserve(out_covered_parts->size() + covered_parts.size());
3104-
3105-
for (DataPartPtr & covered_part : covered_parts)
3106-
out_covered_parts->emplace_back(std::move(covered_part));
3112+
out_covered_parts->reserve(out_covered_parts->size() + hierarchy.covered_parts.size());
3113+
std::move(hierarchy.covered_parts.begin(), hierarchy.covered_parts.end(), std::back_inserter(*out_covered_parts));
31073114
}
31083115

31093116
return true;
@@ -3128,6 +3135,23 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
31283135
return covered_parts;
31293136
}
31303137

3138+
bool MergeTreeData::renameTempPartAndAdd(
3139+
MutableDataPartPtr & part,
3140+
Transaction & out_transaction,
3141+
DataPartsLock & lock)
3142+
{
3143+
DataPartsVector covered_parts;
3144+
3145+
if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts))
3146+
return false;
3147+
3148+
if (!covered_parts.empty())
3149+
throw Exception("Added part " + part->name + " covers " + toString(covered_parts.size())
3150+
+ " existing part(s) (including " + covered_parts[0]->name + ")", ErrorCodes::LOGICAL_ERROR);
3151+
3152+
return true;
3153+
}
3154+
31313155
void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const MergeTreeData::DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock)
31323156
{
31333157
if (txn)
@@ -5213,6 +5237,13 @@ void MergeTreeData::Transaction::rollbackPartsToTemporaryState()
52135237
clear();
52145238
}
52155239

5240+
TransactionID MergeTreeData::Transaction::getTID() const
5241+
{
5242+
if (txn)
5243+
return txn->tid;
5244+
return Tx::PrehistoricTID;
5245+
}
5246+
52165247
void MergeTreeData::Transaction::addPart(MutableDataPartPtr & part)
52175248
{
52185249
precommitted_parts.insert(part);
@@ -5223,11 +5254,14 @@ void MergeTreeData::Transaction::rollback()
52235254
if (!isEmpty())
52245255
{
52255256
WriteBufferFromOwnString buf;
5226-
buf << " Removing parts:";
5257+
buf << "Removing parts:";
52275258
for (const auto & part : precommitted_parts)
52285259
buf << " " << part->getDataPartStorage().getPartDirectory();
52295260
buf << ".";
5230-
LOG_DEBUG(data.log, "Undoing transaction.{}", buf.str());
5261+
LOG_DEBUG(data.log, "Undoing transaction {}. {}", getTID(), buf.str());
5262+
5263+
for (const auto & part : precommitted_parts)
5264+
part->version.creation_csn.store(Tx::RolledBackCSN);
52315265

52325266
auto lock = data.lockParts();
52335267

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,9 @@ class MergeTreeData : public IStorage, public WithMutableContext
220220
using DataPartsLock = std::unique_lock<std::mutex>;
221221
DataPartsLock lockParts() const { return DataPartsLock(data_parts_mutex); }
222222

223+
using OperationDataPartsLock = std::unique_lock<std::mutex>;
224+
OperationDataPartsLock lockOperationsWithParts() const { return OperationDataPartsLock(operation_with_data_parts_mutex); }
225+
223226
MergeTreeDataPartType choosePartType(size_t bytes_uncompressed, size_t rows_count) const;
224227
MergeTreeDataPartType choosePartTypeOnDisk(size_t bytes_uncompressed, size_t rows_count) const;
225228

@@ -271,6 +274,8 @@ class MergeTreeData : public IStorage, public WithMutableContext
271274
}
272275
}
273276

277+
TransactionID getTID() const;
278+
274279
private:
275280
friend class MergeTreeData;
276281

@@ -1029,7 +1034,6 @@ class MergeTreeData : public IStorage, public WithMutableContext
10291034

10301035
/// Returns an object that protects temporary directory from cleanup
10311036
scope_guard getTemporaryPartDirectoryHolder(const String & part_dir_name);
1032-
std::optional<scope_guard> tryGetTemporaryPartDirectoryHolder(const String & part_dir_name);
10331037

10341038
protected:
10351039
friend class IMergeTreeDataPart;
@@ -1112,6 +1116,10 @@ class MergeTreeData : public IStorage, public WithMutableContext
11121116
DataPartsIndexes::index<TagByInfo>::type & data_parts_by_info;
11131117
DataPartsIndexes::index<TagByStateAndInfo>::type & data_parts_by_state_and_info;
11141118

1119+
/// Mutex for critical sections which alter set of parts
1120+
/// It is like truncate, drop/detach partition
1121+
mutable std::mutex operation_with_data_parts_mutex;
1122+
11151123
/// Current description of columns of data type Object.
11161124
/// It changes only when set of parts is changed and is
11171125
/// protected by @data_parts_mutex.
@@ -1222,15 +1230,20 @@ class MergeTreeData : public IStorage, public WithMutableContext
12221230
DataPartsLock & data_parts_lock) const;
12231231

12241232
DataPartsVector getCoveredOutdatedParts(
1225-
const MergeTreePartInfo & part_info,
1233+
const DataPartPtr & part,
12261234
DataPartsLock & data_parts_lock) const;
12271235

1228-
void getPartHierarchy(
1236+
struct PartHierarchy
1237+
{
1238+
DataPartPtr duplicate_part;
1239+
DataPartPtr covering_part;
1240+
DataPartsVector covered_parts;
1241+
DataPartPtr intersected_part;
1242+
};
1243+
1244+
PartHierarchy getPartHierarchy(
12291245
const MergeTreePartInfo & part_info,
1230-
const String & part_name,
12311246
DataPartState state,
1232-
DataPartPtr & out_covering_part,
1233-
DataPartsVector & covered_part,
12341247
DataPartsLock & /* data_parts_lock */) const;
12351248

12361249
/// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument.
@@ -1302,8 +1315,9 @@ class MergeTreeData : public IStorage, public WithMutableContext
13021315
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);
13031316

13041317
private:
1305-
/// Checking that candidate part doesn't break invariants: correct partition and doesn't exist already
1306-
void checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPartsLock & lock) const;
1318+
/// Checking that candidate part doesn't break invariants: correct partition
1319+
void checkPartPartition(MutableDataPartPtr & part, DataPartsLock & lock) const;
1320+
void checkPartDuplicate(MutableDataPartPtr & part, Transaction & transaction, DataPartsLock & lock) const;
13071321

13081322
/// Preparing itself to be committed in memory: fill some fields inside part, add it to data_parts_indexes
13091323
/// in precommitted state and to transaction

src/Storages/MergeTree/MergeTreePartInfo.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ struct MergeTreePartInfo
6060
/// True if contains rhs (this part is obtained by merging rhs with some other parts or mutating rhs)
6161
bool contains(const MergeTreePartInfo & rhs) const
6262
{
63-
/// Containing part may have equal level iff block numbers are equal (unless level is MAX_LEVEL)
63+
/// Containing part may have equal level if block numbers are equal (unless level is MAX_LEVEL)
6464
/// (e.g. all_0_5_2 does not contain all_0_4_2, but all_0_5_3 or all_0_4_2_9 do)
6565
bool strictly_contains_block_range = (min_block == rhs.min_block && max_block == rhs.max_block) || level > rhs.level
6666
|| level == MAX_LEVEL || level == LEGACY_MAX_LEVEL;

0 commit comments

Comments
 (0)