Skip to content
Merged
16 changes: 13 additions & 3 deletions src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ std::string DataPartStorageOnDiskBase::getRelativePath() const
return fs::path(root_path) / part_dir / "";
}

std::string DataPartStorageOnDiskBase::getParentDirectory() const
{
/// Cut last "/" if it exists (it shouldn't). Otherwise fs::path behave differently.
fs::path part_dir_without_slash = part_dir.ends_with("/") ? part_dir.substr(0, part_dir.size() - 1) : part_dir;

if (part_dir_without_slash.has_parent_path())
return part_dir_without_slash.parent_path();
return "";
}

std::optional<String> DataPartStorageOnDiskBase::getRelativePathForPrefix(LoggerPtr log, const String & prefix, bool detached, bool broken) const
{
assert(!broken || detached);
Expand Down Expand Up @@ -674,17 +684,17 @@ void DataPartStorageOnDiskBase::remove(

if (!has_delete_prefix)
{
if (part_dir_without_slash.has_parent_path())
auto parent_path = getParentDirectory();
if (!parent_path.empty())
{
auto parent_path = part_dir_without_slash.parent_path();
if (parent_path == MergeTreeData::DETACHED_DIR_NAME)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Trying to remove detached part {} with path {} in remove function. It shouldn't happen",
part_dir,
root_path);

part_dir_without_slash = parent_path / ("delete_tmp_" + std::string{part_dir_without_slash.filename()});
part_dir_without_slash = fs::path(parent_path) / ("delete_tmp_" + std::string{part_dir_without_slash.filename()});
}
else
{
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/DataPartStorageOnDiskBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class DataPartStorageOnDiskBase : public IDataPartStorage
std::string getRelativePath() const override;
std::string getPartDirectory() const override;
std::string getFullRootPath() const override;
std::string getParentDirectory() const override;

Poco::Timestamp getLastModified() const override;
UInt64 calculateTotalSizeOnDisk() const override;
Expand Down
11 changes: 6 additions & 5 deletions src/Storages/MergeTree/IDataPartStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@ class IDataPartStorage : public boost::noncopyable
virtual MergeTreeDataPartStorageType getType() const = 0;

/// Methods to get path components of a data part.
virtual std::string getFullPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving/all_1_5_1'
virtual std::string getRelativePath() const = 0; /// 'database/table/moving/all_1_5_1'
virtual std::string getPartDirectory() const = 0; /// 'all_1_5_1'
virtual std::string getFullRootPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving'
/// Can add it if needed /// 'database/table/moving'
virtual std::string getFullPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving/all_1_5_1'
virtual std::string getRelativePath() const = 0; /// 'database/table/moving/all_1_5_1'
virtual std::string getPartDirectory() const = 0; /// 'all_1_5_1'
virtual std::string getFullRootPath() const = 0; /// '/var/lib/clickhouse/data/database/table/moving'
virtual std::string getParentDirectory() const = 0; /// '' (or 'detached' for 'detached/all_1_5_1')
/// Can add it if needed /// 'database/table/moving'
/// virtual std::string getRelativeRootPath() const = 0;

/// Get a storage for projection.
Expand Down
6 changes: 5 additions & 1 deletion src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,11 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
{
/// Don't scare people with broken part error
if (!isRetryableException(std::current_exception()))
LOG_ERROR(storage.log, "Part {} is broken and need manual correction", getDataPartStorage().getFullPath());
{
auto message = getCurrentExceptionMessage(true);
LOG_ERROR(storage.log, "Part {} is broken and need manual correction. Reason: {}",
getDataPartStorage().getFullPath(), message);
}

// There could be conditions that data part to be loaded is broken, but some of meta infos are already written
// into meta data before exception, need to clean them all.
Expand Down
96 changes: 75 additions & 21 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3894,7 +3894,7 @@ void MergeTreeData::checkPartDynamicColumns(MutableDataPartPtr & part, DataParts
}
}

void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename)
void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename, bool rename_in_transaction)
{
part->is_temp = false;
part->setState(DataPartState::PreActive);
Expand All @@ -3906,12 +3906,15 @@ void MergeTreeData::preparePartForCommit(MutableDataPartPtr & part, Transaction
return !may_be_cleaned_up || temporary_parts.contains(dir_name);
}());

if (need_rename)
if (need_rename && !rename_in_transaction)
part->renameTo(part->name, true);

LOG_TEST(log, "preparePartForCommit: inserting {} into data_parts_indexes", part->getNameWithState());
data_parts_indexes.insert(part);
out_transaction.addPart(part);
if (rename_in_transaction)
out_transaction.addPart(part, need_rename);
else
out_transaction.addPart(part, /* need_rename= */ false);
}

bool MergeTreeData::addTempPart(
Expand Down Expand Up @@ -3960,7 +3963,8 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts)
DataPartsVector * out_covered_parts,
bool rename_in_transaction)
{
LOG_TRACE(log, "Renaming temporary part {} to {} with tid {}.", part->getDataPartStorage().getPartDirectory(), part->name, out_transaction.getTID());

Expand Down Expand Up @@ -3999,7 +4003,7 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(

/// All checks are passed. Now we can rename the part on disk.
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
preparePartForCommit(part, out_transaction, /* need_rename */ true);
preparePartForCommit(part, out_transaction, /* need_rename= */ true, rename_in_transaction);

if (out_covered_parts)
{
Expand All @@ -4014,29 +4018,31 @@ bool MergeTreeData::renameTempPartAndReplaceUnlocked(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts)
bool rename_in_transaction)
{
return renameTempPartAndReplaceImpl(part, out_transaction, lock, out_covered_parts);
return renameTempPartAndReplaceImpl(part, out_transaction, lock, /*out_covered_parts=*/ nullptr, rename_in_transaction);
}

MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part,
Transaction & out_transaction)
Transaction & out_transaction,
bool rename_in_transaction)
{
auto part_lock = lockParts();
DataPartsVector covered_parts;
renameTempPartAndReplaceImpl(part, out_transaction, part_lock, &covered_parts);
renameTempPartAndReplaceImpl(part, out_transaction, part_lock, &covered_parts, rename_in_transaction);
return covered_parts;
}

bool MergeTreeData::renameTempPartAndAdd(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock)
DataPartsLock & lock,
bool rename_in_transaction)
{
DataPartsVector covered_parts;

if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts))
if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts, rename_in_transaction))
return false;

if (!covered_parts.empty())
Expand Down Expand Up @@ -4077,9 +4083,9 @@ void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const
resetObjectColumnsFromActiveParts(acquired_lock);
}

void MergeTreeData::removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove)
void MergeTreeData::removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove, DataPartsLock * acquired_lock)
{
auto lock = lockParts();
auto lock = (acquired_lock) ? DataPartsLock() : lockParts();

for (const auto & part : remove)
{
Expand Down Expand Up @@ -4245,8 +4251,9 @@ MergeTreeData::PartsToRemoveFromZooKeeper MergeTreeData::removePartsInRangeFromW
auto [new_data_part, tmp_dir_holder] = createEmptyPart(empty_info, partition, empty_part_name, NO_TRANSACTION_PTR);

MergeTreeData::Transaction transaction(*this, NO_TRANSACTION_RAW);
renameTempPartAndAdd(new_data_part, transaction, lock); /// All covered parts must be already removed
renameTempPartAndAdd(new_data_part, transaction, lock, /*rename_in_transaction=*/ true); /// All covered parts must be already removed

transaction.renameParts();
/// It will add the empty part to the set of Outdated parts without making it Active (exactly what we need)
transaction.rollback(&lock);
new_data_part->remove_time.store(0, std::memory_order_relaxed);
Expand Down Expand Up @@ -6617,25 +6624,54 @@ TransactionID MergeTreeData::Transaction::getTID() const
return Tx::PrehistoricTID;
}

void MergeTreeData::Transaction::addPart(MutableDataPartPtr & part)
void MergeTreeData::Transaction::addPart(MutableDataPartPtr & part, bool need_rename)
{
precommitted_parts.insert(part);
if (need_rename)
precommitted_parts_need_rename.insert(part);
}

void MergeTreeData::Transaction::rollback(DataPartsLock * lock)
{
if (!isEmpty())
{
for (const auto & part : precommitted_parts)
part->version.creation_csn.store(Tx::RolledBackCSN);

auto non_detached_precommitted_parts = precommitted_parts;

/// Remove detached parts from working set.
///
/// It is possible to have detached parts here, only when rename (in
/// commit()) of detached parts had been broken (i.e. during ATTACH),
/// i.e. the part itself is broken.
DataPartsVector detached_precommitted_parts;
for (auto it = non_detached_precommitted_parts.begin(); it != non_detached_precommitted_parts.end();)
{
const auto & part = *it;
if (part->getDataPartStorage().getParentDirectory() == DETACHED_DIR_NAME)
{
detached_precommitted_parts.push_back(part);
it = non_detached_precommitted_parts.erase(it);
}
else
++it;
}

WriteBufferFromOwnString buf;
buf << "Removing parts:";
for (const auto & part : precommitted_parts)
for (const auto & part : non_detached_precommitted_parts)
buf << " " << part->getDataPartStorage().getPartDirectory();
buf << ".";
if (!detached_precommitted_parts.empty())
{
buf << " Rollbacking parts state to temporary and removing from working set:";
for (const auto & part : detached_precommitted_parts)
buf << " " << part->getDataPartStorage().getPartDirectory();
buf << ".";
}
LOG_DEBUG(data.log, "Undoing transaction {}. {}", getTID(), buf.str());

for (const auto & part : precommitted_parts)
part->version.creation_csn.store(Tx::RolledBackCSN);

/// It would be much better with TSA...
auto our_lock = (lock) ? DataPartsLock() : data.lockParts();

Expand All @@ -6645,7 +6681,7 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock)
if (!data.all_data_dropped)
{
Strings part_names;
for (const auto & part : precommitted_parts)
for (const auto & part : non_detached_precommitted_parts)
part_names.emplace_back(part->name);
throw Exception(ErrorCodes::LOGICAL_ERROR, "There are some PreActive parts ({}) to rollback, "
"but data parts set is empty and table {} was not dropped. It's a bug",
Expand All @@ -6654,8 +6690,12 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock)
}
else
{
data.removePartsFromWorkingSetImmediatelyAndSetTemporaryState(
detached_precommitted_parts,
&our_lock);

data.removePartsFromWorkingSet(txn,
DataPartsVector(precommitted_parts.begin(), precommitted_parts.end()),
DataPartsVector(non_detached_precommitted_parts.begin(), non_detached_precommitted_parts.end()),
/* clear_without_timeout = */ true, &our_lock);
}
}
Expand All @@ -6665,7 +6705,16 @@ void MergeTreeData::Transaction::rollback(DataPartsLock * lock)

void MergeTreeData::Transaction::clear()
{
chassert(precommitted_parts.size() >= precommitted_parts_need_rename.size());
precommitted_parts.clear();
precommitted_parts_need_rename.clear();
}

void MergeTreeData::Transaction::renameParts()
{
for (const auto & part_need_rename : precommitted_parts_need_rename)
part_need_rename->renameTo(part_need_rename->name, true);
precommitted_parts_need_rename.clear();
}

MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock * acquired_parts_lock)
Expand All @@ -6674,6 +6723,9 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock

if (!isEmpty())
{
if (!precommitted_parts_need_rename.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Parts not renamed");

auto settings = data.getSettings();
auto parts_lock = acquired_parts_lock ? DataPartsLock() : data.lockParts();
auto * owing_parts_lock = acquired_parts_lock ? acquired_parts_lock : &parts_lock;
Expand All @@ -6682,6 +6734,8 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(DataPartsLock
if (part->getDataPartStorage().hasActiveTransaction())
part->getDataPartStorage().commitTransaction();

renameParts();

if (txn)
{
for (const auto & part : precommitted_parts)
Expand Down
26 changes: 17 additions & 9 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ class MergeTreeData : public IStorage, public WithMutableContext

DataPartsVector commit(DataPartsLock * acquired_parts_lock = nullptr);

void addPart(MutableDataPartPtr & part);
void renameParts();

void addPart(MutableDataPartPtr & part, bool need_rename);

void rollback(DataPartsLock * lock = nullptr);

Expand Down Expand Up @@ -286,9 +288,9 @@ class MergeTreeData : public IStorage, public WithMutableContext

MergeTreeData & data;
MergeTreeTransaction * txn;
MutableDataParts precommitted_parts;
MutableDataParts locked_parts;

MutableDataParts precommitted_parts;
MutableDataParts precommitted_parts_need_rename;
};

using TransactionUniquePtr = std::unique_ptr<Transaction>;
Expand Down Expand Up @@ -588,25 +590,27 @@ class MergeTreeData : public IStorage, public WithMutableContext
bool renameTempPartAndAdd(
MutableDataPartPtr & part,
Transaction & transaction,
DataPartsLock & lock);
DataPartsLock & lock,
bool rename_in_transaction);

/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
/// Returns all parts covered by the added part (in ascending order).
DataPartsVector renameTempPartAndReplace(
MutableDataPartPtr & part,
Transaction & out_transaction);
Transaction & out_transaction,
bool rename_in_transaction);

/// Unlocked version of previous one. Useful when added multiple parts with a single lock.
bool renameTempPartAndReplaceUnlocked(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr);
bool rename_in_transaction);

/// Remove parts from working set immediately (without wait for background
/// process). Transfer part state to temporary. Have very limited usage only
/// for new parts which aren't already present in table.
void removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove);
void removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove, DataPartsLock * acquired_lock = nullptr);

/// Removes parts from the working set parts.
/// Parts in add must already be in data_parts with PreActive, Active, or Outdated states.
Expand Down Expand Up @@ -1602,15 +1606,19 @@ class MergeTreeData : public IStorage, public WithMutableContext

/// Preparing itself to be committed in memory: fill some fields inside part, add it to data_parts_indexes
/// in precommitted state and to transaction
void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename);
///
/// @param need_rename - rename the part
/// @param rename_in_transaction - if set, the rename will be done as part of transaction (without holding DataPartsLock), otherwise inplace (when it does not make sense).
void preparePartForCommit(MutableDataPartPtr & part, Transaction & out_transaction, bool need_rename, bool rename_in_transaction = false);

/// Low-level method for preparing parts for commit (in-memory).
/// FIXME Merge MergeTreeTransaction and Transaction
bool renameTempPartAndReplaceImpl(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts);
DataPartsVector * out_covered_parts,
bool rename_in_transaction);

/// RAII Wrapper for atomic work with currently moving parts
/// Acquire them in constructor and remove them in destructor
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,10 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
"but transactions were enabled for this table");

/// Rename new part, add to the set and remove original parts.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, out_transaction);
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, out_transaction, /*rename_in_transaction=*/ true);

/// Explicitly rename part while still holding the lock for tmp folder to avoid cleanup
out_transaction.renameParts();

/// Let's check that all original parts have been deleted and only them.
if (replaced_parts.size() != parts.size())
Expand Down
Loading