@@ -168,6 +168,7 @@ namespace ErrorCodes
168168 extern const int INCORRECT_QUERY;
169169 extern const int CANNOT_RESTORE_TABLE;
170170 extern const int ZERO_COPY_REPLICATION_ERROR;
171+ extern const int SERIALIZATION_ERROR;
171172}
172173
173174
@@ -1693,15 +1694,6 @@ scope_guard MergeTreeData::getTemporaryPartDirectoryHolder(const String & part_d
16931694 return [this , part_dir_name]() { temporary_parts.remove (part_dir_name); };
16941695}
16951696
1696- std::optional<scope_guard> MergeTreeData::tryGetTemporaryPartDirectoryHolder (const String & part_dir_name)
1697- {
1698- bool inserted = temporary_parts.add (part_dir_name);
1699- if (!inserted)
1700- return {};
1701-
1702- return [this , part_dir_name]() { temporary_parts.remove (part_dir_name); };
1703- }
1704-
17051697MergeTreeData::MutableDataPartPtr MergeTreeData::preparePartForRemoval (const DataPartPtr & part)
17061698{
17071699 auto state = part->getState ();
@@ -1767,6 +1759,8 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts(bool force)
17671759 const DataPartPtr & part = *it;
17681760
17691761 // / Do not remove outdated part if it may be visible for some transaction
1762+ LOG_TRACE (log, " grabOldParts: check {}." , part->name );
1763+
17701764 if (!part->version .canBeRemoved ())
17711765 {
17721766 skipped_parts.push_back (part->info );
@@ -2140,7 +2134,7 @@ size_t MergeTreeData::clearEmptyParts()
21402134 if (part->getState () != DataPartState::Active)
21412135 continue ;
21422136
2143- DataPartsVector covered_parts = getCoveredOutdatedParts (part-> info , lock);
2137+ DataPartsVector covered_parts = getCoveredOutdatedParts (part, lock);
21442138 if (!covered_parts.empty ())
21452139 continue ;
21462140 }
@@ -2924,14 +2918,13 @@ MergeTreeData::PartsTemporaryRename::~PartsTemporaryRename()
29242918 }
29252919}
29262920
2927- void MergeTreeData::getPartHierarchy (
2921+ MergeTreeData::PartHierarchy MergeTreeData::getPartHierarchy (
29282922 const MergeTreePartInfo & part_info,
2929- const String & part_name,
29302923 DataPartState state,
2931- DataPartPtr & out_covering_part,
2932- DataPartsVector & covered_part,
29332924 DataPartsLock & /* data_parts_lock */ ) const
29342925{
2926+ PartHierarchy result;
2927+
29352928 // / Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself.
29362929 auto it_middle = data_parts_by_state_and_info.lower_bound (DataPartStateAndInfo{state, part_info});
29372930 auto committed_parts_range = getDataPartsStateRange (state);
@@ -2946,13 +2939,15 @@ void MergeTreeData::getPartHierarchy(
29462939 {
29472940 if ((*prev)->info .contains (part_info))
29482941 {
2949- out_covering_part = *prev;
2950- return ;
2942+ result. covering_part = *prev;
2943+ return result ;
29512944 }
29522945
29532946 if (!part_info.isDisjoint ((*prev)->info ))
2954- throw Exception (ErrorCodes::LOGICAL_ERROR, " Part {} intersects previous part {}. It is a bug." ,
2955- part_name, (*prev)->getNameWithState ());
2947+ {
2948+ result.intersected_part = *prev;
2949+ return result;
2950+ }
29562951
29572952 break ;
29582953 }
@@ -2965,37 +2960,42 @@ void MergeTreeData::getPartHierarchy(
29652960 while (end != committed_parts_range.end ())
29662961 {
29672962 if ((*end)->info == part_info)
2968- throw Exception (ErrorCodes::LOGICAL_ERROR, " Unexpected duplicate part {}. It is a bug." , (*end)->getNameWithState ());
2963+ {
2964+ result.duplicate_part = *end;
2965+ return result;
2966+ }
29692967
29702968 if (!part_info.contains ((*end)->info ))
29712969 {
29722970 if ((*end)->info .contains (part_info))
29732971 {
2974- out_covering_part = *end;
2975- return ;
2972+ result. covering_part = *end;
2973+ return result ;
29762974 }
29772975
29782976 if (!part_info.isDisjoint ((*end)->info ))
2979- throw Exception (ErrorCodes::LOGICAL_ERROR, " Part {} intersects next part {}. It is a bug." ,
2980- part_name, (*end)->getNameWithState ());
2977+ {
2978+ result.intersected_part = *end;
2979+ return result;
2980+ }
29812981
29822982 break ;
29832983 }
29842984
29852985 ++end;
29862986 }
29872987
2988- covered_part = DataPartsVector{begin, end};
2988+ result.covered_parts = DataPartsVector{begin, end};
2989+ return result;
29892990}
29902991
29912992MergeTreeData::DataPartsVector MergeTreeData::getCoveredOutdatedParts (
2992- const MergeTreePartInfo & part_info ,
2993+ const DataPartPtr & part ,
29932994 DataPartsLock & data_parts_lock) const
29942995{
2995- DataPartPtr covering_part;
2996- DataPartsVector covered_parts;
2997- getPartHierarchy (part_info, part_info.getPartName (), DataPartState::Outdated, covering_part, covered_parts, data_parts_lock);
2998- return covered_parts;
2996+ assert (part->getState () == DataPartState::Active);
2997+ PartHierarchy hierarchy = getPartHierarchy (part->info , DataPartState::Outdated, data_parts_lock);
2998+ return hierarchy.covered_parts ;
29992999}
30003000
30013001MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace (
@@ -3004,32 +3004,22 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
30043004 DataPartPtr & out_covering_part,
30053005 DataPartsLock & data_parts_lock) const
30063006{
3007- DataPartsVector covered_parts;
3008- getPartHierarchy (new_part_info, new_part_name, DataPartState::Active, out_covering_part, covered_parts, data_parts_lock);
3009- return covered_parts;
3010- }
3007+ PartHierarchy hierarchy = getPartHierarchy (new_part_info, DataPartState::Active, data_parts_lock);
30113008
3012- bool MergeTreeData::renameTempPartAndAdd (
3013- MutableDataPartPtr & part,
3014- Transaction & out_transaction,
3015- DataPartsLock & lock)
3016- {
3017- DataPartsVector covered_parts;
3009+ if (hierarchy.intersected_part )
3010+ throw Exception (ErrorCodes::LOGICAL_ERROR, " Part {} intersects next part {}. It is a bug." ,
3011+ new_part_name, hierarchy.intersected_part ->getNameWithState ());
30183012
3019- if (! renameTempPartAndReplaceImpl (part, out_transaction, lock, &covered_parts) )
3020- return false ;
3013+ if (hierarchy. duplicate_part )
3014+ throw Exception (ErrorCodes::LOGICAL_ERROR, " Unexpected duplicate part {}. It is a bug. " , hierarchy. duplicate_part -> getNameWithState ()) ;
30213015
3022- if (!covered_parts.empty ())
3023- throw Exception (" Added part " + part->name + " covers " + toString (covered_parts.size ())
3024- + " existing part(s) (including " + covered_parts[0 ]->name + " )" , ErrorCodes::LOGICAL_ERROR);
3016+ out_covering_part = std::move (hierarchy.covering_part );
30253017
3026- return true ;
3018+ return std::move (hierarchy. covered_parts ) ;
30273019}
30283020
3029- void MergeTreeData::checkPartCanBeAddedToTable (MutableDataPartPtr & part, DataPartsLock & lock) const
3021+ void MergeTreeData::checkPartPartition (MutableDataPartPtr & part, DataPartsLock & lock) const
30303022{
3031- part->assertState ({DataPartState::Temporary});
3032-
30333023 if (DataPartPtr existing_part_in_partition = getAnyPartInPartition (part->info .partition_id , lock))
30343024 {
30353025 if (part->partition .value != existing_part_in_partition->partition .value )
@@ -3038,15 +3028,23 @@ void MergeTreeData::checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPa
30383028 + existing_part_in_partition->name + " , newly added part: " + part->name ,
30393029 ErrorCodes::CORRUPTED_DATA);
30403030 }
3031+ }
30413032
3042- if (auto it_duplicate = data_parts_by_info.find (part->info ); it_duplicate != data_parts_by_info.end ())
3033+ void MergeTreeData::checkPartDuplicate (MutableDataPartPtr & part, Transaction & transaction, DataPartsLock & /* lock*/ ) const
3034+ {
3035+ auto it_duplicate = data_parts_by_info.find (part->info );
3036+
3037+ if (it_duplicate != data_parts_by_info.end ())
30433038 {
30443039 String message = " Part " + (*it_duplicate)->getNameWithState () + " already exists" ;
30453040
30463041 if ((*it_duplicate)->checkState ({DataPartState::Outdated, DataPartState::Deleting}))
30473042 throw Exception (message + " , but it will be deleted soon" , ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
30483043
3049- throw Exception (message, ErrorCodes::DUPLICATE_DATA_PART);
3044+ if (transaction.txn )
3045+ throw Exception (message, ErrorCodes::SERIALIZATION_ERROR);
3046+ else
3047+ throw Exception (message, ErrorCodes::DUPLICATE_DATA_PART);
30503048 }
30513049}
30523050
@@ -3074,36 +3072,45 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
30743072 DataPartsLock & lock,
30753073 DataPartsVector * out_covered_parts)
30763074{
3077- LOG_TRACE (log, " Renaming temporary part {} to {}." , part->getDataPartStorage ().getPartDirectory (), part->name );
3075+ LOG_TRACE (log, " Renaming temporary part {} to {} with tid {} ." , part->getDataPartStorage ().getPartDirectory (), part->name , out_transaction. getTID () );
30783076
30793077 if (&out_transaction.data != this )
30803078 throw Exception (" MergeTreeData::Transaction for one table cannot be used with another. It is a bug." ,
30813079 ErrorCodes::LOGICAL_ERROR);
30823080
3083- if (part->hasLightweightDelete ())
3084- has_lightweight_delete_parts.store (true );
3081+ part->assertState ({DataPartState::Temporary});
3082+ checkPartPartition (part, lock);
3083+ checkPartDuplicate (part, out_transaction, lock);
3084+
3085+ PartHierarchy hierarchy = getPartHierarchy (part->info , DataPartState::Active, lock);
3086+
3087+ if (hierarchy.intersected_part )
3088+ {
3089+ String message = fmt::format (" Part {} intersects next part {}" , part->name , hierarchy.intersected_part ->getNameWithState ());
30853090
3086- checkPartCanBeAddedToTable (part, lock);
3091+ if (part->isEmpty () || hierarchy.intersected_part ->isEmpty ())
3092+ throw Exception (message, ErrorCodes::SERIALIZATION_ERROR);
30873093
3088- DataPartPtr covering_part ;
3089- DataPartsVector covered_parts = getActivePartsToReplace (part-> info , part-> name , covering_part, lock);
3094+ throw Exception (ErrorCodes::LOGICAL_ERROR, message + " It is a bug. " ) ;
3095+ }
30903096
3091- if (covering_part)
3097+ if (hierarchy. covering_part )
30923098 {
3093- LOG_WARNING (log, " Tried to add obsolete part {} covered by {}" , part->name , covering_part->getNameWithState ());
3099+ LOG_WARNING (log, " Tried to add obsolete part {} covered by {}" , part->name , hierarchy. covering_part ->getNameWithState ());
30943100 return false ;
30953101 }
30963102
3103+ if (part->hasLightweightDelete ())
3104+ has_lightweight_delete_parts.store (true );
3105+
30973106 // / All checks are passed. Now we can rename the part on disk.
30983107 // / So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
30993108 preparePartForCommit (part, out_transaction);
31003109
31013110 if (out_covered_parts)
31023111 {
3103- out_covered_parts->reserve (out_covered_parts->size () + covered_parts.size ());
3104-
3105- for (DataPartPtr & covered_part : covered_parts)
3106- out_covered_parts->emplace_back (std::move (covered_part));
3112+ out_covered_parts->reserve (out_covered_parts->size () + hierarchy.covered_parts .size ());
3113+ std::move (hierarchy.covered_parts .begin (), hierarchy.covered_parts .end (), std::back_inserter (*out_covered_parts));
31073114 }
31083115
31093116 return true ;
@@ -3128,6 +3135,23 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
31283135 return covered_parts;
31293136}
31303137
3138+ bool MergeTreeData::renameTempPartAndAdd (
3139+ MutableDataPartPtr & part,
3140+ Transaction & out_transaction,
3141+ DataPartsLock & lock)
3142+ {
3143+ DataPartsVector covered_parts;
3144+
3145+ if (!renameTempPartAndReplaceImpl (part, out_transaction, lock, &covered_parts))
3146+ return false ;
3147+
3148+ if (!covered_parts.empty ())
3149+ throw Exception (" Added part " + part->name + " covers " + toString (covered_parts.size ())
3150+ + " existing part(s) (including " + covered_parts[0 ]->name + " )" , ErrorCodes::LOGICAL_ERROR);
3151+
3152+ return true ;
3153+ }
3154+
31313155void MergeTreeData::removePartsFromWorkingSet (MergeTreeTransaction * txn, const MergeTreeData::DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock)
31323156{
31333157 if (txn)
@@ -5213,6 +5237,13 @@ void MergeTreeData::Transaction::rollbackPartsToTemporaryState()
52135237 clear ();
52145238}
52155239
5240+ TransactionID MergeTreeData::Transaction::getTID () const
5241+ {
5242+ if (txn)
5243+ return txn->tid ;
5244+ return Tx::PrehistoricTID;
5245+ }
5246+
52165247void MergeTreeData::Transaction::addPart (MutableDataPartPtr & part)
52175248{
52185249 precommitted_parts.insert (part);
@@ -5223,11 +5254,14 @@ void MergeTreeData::Transaction::rollback()
52235254 if (!isEmpty ())
52245255 {
52255256 WriteBufferFromOwnString buf;
5226- buf << " Removing parts:" ;
5257+ buf << " Removing parts:" ;
52275258 for (const auto & part : precommitted_parts)
52285259 buf << " " << part->getDataPartStorage ().getPartDirectory ();
52295260 buf << " ." ;
5230- LOG_DEBUG (data.log , " Undoing transaction.{}" , buf.str ());
5261+ LOG_DEBUG (data.log , " Undoing transaction {}. {}" , getTID (), buf.str ());
5262+
5263+ for (const auto & part : precommitted_parts)
5264+ part->version .creation_csn .store (Tx::RolledBackCSN);
52315265
52325266 auto lock = data.lockParts ();
52335267
0 commit comments