Skip to content

Commit 75771d4

Browse files
committed
Merge branch 'master' into merging_53307
2 parents 4b5874b + 428a05a commit 75771d4

26 files changed

+336
-231
lines changed

docker/test/install/deb/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ ENV \
1212
# install systemd packages
1313
RUN apt-get update && \
1414
apt-get install -y --no-install-recommends \
15+
sudo \
1516
systemd \
1617
&& \
1718
apt-get clean && \

src/Functions/transform.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,8 +776,12 @@ namespace
776776
UInt64 key = 0;
777777
auto * dst = reinterpret_cast<char *>(&key);
778778
const auto ref = cache.from_column->getDataAt(i);
779+
780+
#pragma clang diagnostic push
781+
#pragma clang diagnostic ignored "-Wunreachable-code"
779782
if constexpr (std::endian::native == std::endian::big)
780783
dst += sizeof(key) - ref.size;
784+
#pragma clang diagnostic pop
781785

782786
memcpy(dst, ref.data, ref.size);
783787
table[key] = i;

src/Storages/MergeTree/IMergeTreeDataPart.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1780,7 +1780,8 @@ void IMergeTreeDataPart::renameToDetached(const String & prefix)
17801780
part_is_probably_removed_from_disk = true;
17811781
}
17821782

1783-
DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/) const
1783+
DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/,
1784+
const DiskTransactionPtr & disk_transaction) const
17841785
{
17851786
/// Avoid unneeded duplicates of broken parts if we try to detach the same broken part multiple times.
17861787
/// Otherwise it may pollute detached/ with dirs with _tryN suffix and we will fail to remove broken part after 10 attempts.
@@ -1795,7 +1796,8 @@ DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix
17951796
IDataPartStorage::ClonePartParams params
17961797
{
17971798
.copy_instead_of_hardlink = isStoredOnRemoteDiskWithZeroCopySupport() && storage.supportsReplication() && storage_settings->allow_remote_fs_zero_copy_replication,
1798-
.make_source_readonly = true
1799+
.make_source_readonly = true,
1800+
.external_transaction = disk_transaction
17991801
};
18001802
return getDataPartStorage().freeze(
18011803
storage.relative_data_path,

src/Storages/MergeTree/IMergeTreeDataPart.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,8 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
371371
virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists);
372372

373373
/// Makes clone of a part in detached/ directory via hard links
374-
virtual DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const;
374+
virtual DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot,
375+
const DiskTransactionPtr & disk_transaction) const;
375376

376377
/// Makes full clone of part in specified subdirectory (relative to storage data directory, e.g. "detached") on another disk
377378
MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const;

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 72 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2619,8 +2619,50 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
26192619
if (detached_parts.empty())
26202620
return 0;
26212621

2622-
PartsTemporaryRename renamed_parts(*this, "detached/");
2622+
auto get_last_touched_time = [&](const DetachedPartInfo & part_info) -> time_t
2623+
{
2624+
auto path = fs::path(relative_data_path) / "detached" / part_info.dir_name;
2625+
time_t last_change_time = part_info.disk->getLastChanged(path);
2626+
time_t last_modification_time = part_info.disk->getLastModified(path).epochTime();
2627+
return std::max(last_change_time, last_modification_time);
2628+
};
26232629

2630+
time_t ttl_seconds = getSettings()->merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds;
2631+
2632+
size_t unfinished_deleting_parts = 0;
2633+
time_t current_time = time(nullptr);
2634+
for (const auto & part_info : detached_parts)
2635+
{
2636+
if (!part_info.dir_name.starts_with("deleting_"))
2637+
continue;
2638+
2639+
time_t startup_time = current_time - static_cast<time_t>(Context::getGlobalContextInstance()->getUptimeSeconds());
2640+
time_t last_touch_time = get_last_touched_time(part_info);
2641+
2642+
/// Maybe it's being deleted right now (for example, in ALTER DROP DETACHED)
2643+
bool had_restart = last_touch_time < startup_time;
2644+
bool ttl_expired = last_touch_time + ttl_seconds <= current_time;
2645+
if (!had_restart && !ttl_expired)
2646+
continue;
2647+
2648+
/// We were trying to delete this detached part but did not finish deleting, probably because the server crashed
2649+
LOG_INFO(log, "Removing detached part {} that we failed to remove previously", part_info.dir_name);
2650+
try
2651+
{
2652+
removeDetachedPart(part_info.disk, fs::path(relative_data_path) / "detached" / part_info.dir_name / "", part_info.dir_name);
2653+
++unfinished_deleting_parts;
2654+
}
2655+
catch (...)
2656+
{
2657+
tryLogCurrentException(log);
2658+
}
2659+
}
2660+
2661+
if (!getSettings()->merge_tree_enable_clear_old_broken_detached)
2662+
return unfinished_deleting_parts;
2663+
2664+
const auto full_path = fs::path(relative_data_path) / "detached";
2665+
size_t removed_count = 0;
26242666
for (const auto & part_info : detached_parts)
26252667
{
26262668
if (!part_info.valid_name || part_info.prefix.empty())
@@ -2635,31 +2677,24 @@ size_t MergeTreeData::clearOldBrokenPartsFromDetachedDirectory()
26352677
if (!can_be_removed_by_timeout)
26362678
continue;
26372679

2638-
time_t current_time = time(nullptr);
2639-
ssize_t threshold = current_time - getSettings()->merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds;
2640-
auto path = fs::path(relative_data_path) / "detached" / part_info.dir_name;
2641-
time_t last_change_time = part_info.disk->getLastChanged(path);
2642-
time_t last_modification_time = part_info.disk->getLastModified(path).epochTime();
2643-
time_t last_touch_time = std::max(last_change_time, last_modification_time);
2680+
ssize_t threshold = current_time - ttl_seconds;
2681+
time_t last_touch_time = get_last_touched_time(part_info);
26442682

26452683
if (last_touch_time == 0 || last_touch_time >= threshold)
26462684
continue;
26472685

2648-
renamed_parts.addPart(part_info.dir_name, "deleting_" + part_info.dir_name, part_info.disk);
2649-
}
2650-
2651-
LOG_INFO(log, "Will clean up {} detached parts", renamed_parts.old_and_new_names.size());
2686+
const String & old_name = part_info.dir_name;
2687+
String new_name = "deleting_" + part_info.dir_name;
2688+
part_info.disk->moveFile(fs::path(full_path) / old_name, fs::path(full_path) / new_name);
26522689

2653-
renamed_parts.tryRenameAll();
2654-
2655-
for (auto & [old_name, new_name, disk] : renamed_parts.old_and_new_names)
2656-
{
2657-
removeDetachedPart(disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
2690+
removeDetachedPart(part_info.disk, fs::path(relative_data_path) / "detached" / new_name / "", old_name);
26582691
LOG_WARNING(log, "Removed broken detached part {} due to a timeout for broken detached parts", old_name);
2659-
old_name.clear();
2692+
++removed_count;
26602693
}
26612694

2662-
return renamed_parts.old_and_new_names.size();
2695+
LOG_INFO(log, "Cleaned up {} detached parts", removed_count);
2696+
2697+
return removed_count + unfinished_deleting_parts;
26632698
}
26642699

26652700
size_t MergeTreeData::clearOldWriteAheadLogs()
@@ -4035,7 +4070,7 @@ void MergeTreeData::restoreAndActivatePart(const DataPartPtr & part, DataPartsLo
40354070
void MergeTreeData::outdateUnexpectedPartAndCloneToDetached(const DataPartPtr & part_to_detach)
40364071
{
40374072
LOG_INFO(log, "Cloning part {} to unexpected_{} and making it obsolete.", part_to_detach->getDataPartStorage().getPartDirectory(), part_to_detach->name);
4038-
part_to_detach->makeCloneInDetached("unexpected", getInMemoryMetadataPtr());
4073+
part_to_detach->makeCloneInDetached("unexpected", getInMemoryMetadataPtr(), /*disk_transaction*/ {});
40394074

40404075
DataPartsLock lock = lockParts();
40414076
part_to_detach->is_unexpected_local_part = true;
@@ -5797,18 +5832,21 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
57975832
{
57985833
const String source_dir = "detached/";
57995834

5800-
std::map<String, DiskPtr> name_to_disk;
5801-
58025835
/// Let's compose a list of parts that should be added.
58035836
if (attach_part)
58045837
{
58055838
const String part_id = partition->as<ASTLiteral &>().value.safeGet<String>();
58065839
validateDetachedPartName(part_id);
5807-
auto disk = getDiskForDetachedPart(part_id);
5808-
renamed_parts.addPart(part_id, "attaching_" + part_id, disk);
5809-
5810-
if (MergeTreePartInfo::tryParsePartName(part_id, format_version))
5811-
name_to_disk[part_id] = getDiskForDetachedPart(part_id);
5840+
if (temporary_parts.contains(String(DETACHED_DIR_NAME) + "/" + part_id))
5841+
{
5842+
LOG_WARNING(log, "Will not try to attach part {} because its directory is temporary, "
5843+
"probably it's being detached right now", part_id);
5844+
}
5845+
else
5846+
{
5847+
auto disk = getDiskForDetachedPart(part_id);
5848+
renamed_parts.addPart(part_id, "attaching_" + part_id, disk);
5849+
}
58125850
}
58135851
else
58145852
{
@@ -5825,6 +5863,12 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
58255863

58265864
for (const auto & part_info : detached_parts)
58275865
{
5866+
if (temporary_parts.contains(String(DETACHED_DIR_NAME) + "/" + part_info.dir_name))
5867+
{
5868+
LOG_WARNING(log, "Will not try to attach part {} because its directory is temporary, "
5869+
"probably it's being detached right now", part_info.dir_name);
5870+
continue;
5871+
}
58285872
LOG_DEBUG(log, "Found part {}", part_info.dir_name);
58295873
active_parts.add(part_info.dir_name);
58305874
}
@@ -5835,6 +5879,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
58355879
for (const auto & part_info : detached_parts)
58365880
{
58375881
const String containing_part = active_parts.getContainingPart(part_info.dir_name);
5882+
if (containing_part.empty())
5883+
continue;
58385884

58395885
LOG_DEBUG(log, "Found containing part {} for part {}", containing_part, part_info.dir_name);
58405886

src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ namespace DB
1717
namespace ErrorCodes
1818
{
1919
extern const int DIRECTORY_ALREADY_EXISTS;
20+
extern const int NOT_IMPLEMENTED;
2021
}
2122

2223
MergeTreeDataPartInMemory::MergeTreeDataPartInMemory(
@@ -138,8 +139,12 @@ MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String &
138139
return new_data_part_storage;
139140
}
140141

141-
DataPartStoragePtr MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const
142+
DataPartStoragePtr MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix,
143+
const StorageMetadataPtr & metadata_snapshot,
144+
const DiskTransactionPtr & disk_transaction) const
142145
{
146+
if (disk_transaction)
147+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "InMemory parts are not compatible with disk transactions");
143148
String detached_path = *getRelativePathForDetachedPart(prefix, /* broken */ false);
144149
return flushToDisk(detached_path, metadata_snapshot);
145150
}

src/Storages/MergeTree/MergeTreeDataPartInMemory.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ class MergeTreeDataPartInMemory : public IMergeTreeDataPart
4242
bool hasColumnFiles(const NameAndTypePair & column) const override { return !!getColumnPosition(column.getNameInStorage()); }
4343
String getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; }
4444
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) override;
45-
DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const override;
45+
DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot,
46+
const DiskTransactionPtr & disk_transaction) const override;
4647
std::optional<time_t> getColumnModificationTime(const String & /* column_name */) const override { return {}; }
4748

4849
MutableDataPartStoragePtr flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const;

src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,7 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate()
149149
/// do it under share lock
150150
cleaned_other += storage.clearOldWriteAheadLogs();
151151
cleaned_part_like += storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
152-
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached)
153-
cleaned_part_like += storage.clearOldBrokenPartsFromDetachedDirectory();
152+
cleaned_part_like += storage.clearOldBrokenPartsFromDetachedDirectory();
154153
}
155154

156155
/// This is loose condition: no problem if we actually had lost leadership at this moment

src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -633,8 +633,8 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
633633
delayed_chunk.reset();
634634
}
635635

636-
template<bool async_insert>
637-
void ReplicatedMergeTreeSinkImpl<async_insert>::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
636+
template<>
637+
bool ReplicatedMergeTreeSinkImpl<false>::writeExistingPart(MergeTreeData::MutableDataPartPtr & part)
638638
{
639639
/// NOTE: No delay in this case. That's Ok.
640640
auto origin_zookeeper = storage.getZooKeeper();
@@ -649,8 +649,13 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::writeExistingPart(MergeTreeData:
649649
try
650650
{
651651
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
652-
commitPart(zookeeper, part, BlockIDsType(), replicas_num, true);
653-
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, watch.elapsed(), profile_events_scope.getSnapshot()));
652+
String block_id = deduplicate ? fmt::format("{}_{}", part->info.partition_id, part->checksums.getTotalChecksumHex()) : "";
653+
bool deduplicated = commitPart(zookeeper, part, block_id, replicas_num, /* writing_existing_part */ true).second;
654+
655+
/// Set a special error code if the block is duplicate
656+
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
657+
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, watch.elapsed(), profile_events_scope.getSnapshot()), ExecutionStatus(error));
658+
return deduplicated;
654659
}
655660
catch (...)
656661
{

src/Storages/MergeTree/ReplicatedMergeTreeSink.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class ReplicatedMergeTreeSinkImpl : public SinkToStorage
5656
String getName() const override { return "ReplicatedMergeTreeSink"; }
5757

5858
/// For ATTACHing existing data on filesystem.
59-
void writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
59+
bool writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
6060

6161
/// For proper deduplication in MaterializedViews
6262
bool lastBlockIsDuplicate() const override

0 commit comments

Comments
 (0)