Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,8 @@
\
M(InsertedWideParts, "Number of parts inserted in Wide format.") \
M(InsertedCompactParts, "Number of parts inserted in Compact format.") \
M(InsertedInMemoryParts, "Number of parts inserted in InMemory format.") \
M(MergedIntoWideParts, "Number of parts merged into Wide format.") \
M(MergedIntoCompactParts, "Number of parts merged into Compact format.") \
M(MergedIntoInMemoryParts, "Number of parts in merged into InMemory format.") \
\
M(MergeTreeDataProjectionWriterRows, "Number of rows INSERTed to MergeTree tables projection.") \
M(MergeTreeDataProjectionWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables projection.") \
Expand Down
56 changes: 4 additions & 52 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,8 @@ namespace ProfileEvents
extern const Event DelayedInsertsMilliseconds;
extern const Event InsertedWideParts;
extern const Event InsertedCompactParts;
extern const Event InsertedInMemoryParts;
extern const Event MergedIntoWideParts;
extern const Event MergedIntoCompactParts;
extern const Event MergedIntoInMemoryParts;
extern const Event RejectedMutations;
extern const Event DelayedMutations;
extern const Event DelayedMutationsMilliseconds;
Expand Down Expand Up @@ -385,8 +383,7 @@ MergeTreeData::MergeTreeData(

String reason;
if (!canUsePolymorphicParts(*settings, &reason) && !reason.empty())
LOG_WARNING(log, "{} Settings 'min_rows_for_wide_part', 'min_bytes_for_wide_part', "
"'min_rows_for_compact_part' and 'min_bytes_for_compact_part' will be ignored.", reason);
LOG_WARNING(log, "{} Settings 'min_rows_for_wide_part'and 'min_bytes_for_wide_part' will be ignored.", reason);

#if !USE_ROCKSDB
if (use_metadata_cache)
Expand Down Expand Up @@ -2354,22 +2351,6 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
}
}

void MergeTreeData::flushAllInMemoryPartsIfNeeded()
{
if (getSettings()->in_memory_parts_enable_wal)
return;

auto metadata_snapshot = getInMemoryMetadataPtr();
DataPartsVector parts = getDataPartsVectorForInternalUsage();
for (const auto & part : parts)
{
if (auto part_in_memory = asInMemoryPart(part))
{
part_in_memory->flushToDisk(part_in_memory->getDataPartStorage().getPartDirectory(), metadata_snapshot);
}
}
}

size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
{
DataPartsVector parts_to_remove = grabOldParts(force);
Expand Down Expand Up @@ -3342,7 +3323,7 @@ void MergeTreeData::checkMutationIsPossible(const MutationCommands & /*commands*
/// Some validation will be added
}

MergeTreeDataPartFormat MergeTreeData::choosePartFormat(size_t bytes_uncompressed, size_t rows_count, bool only_on_disk) const
MergeTreeDataPartFormat MergeTreeData::choosePartFormat(size_t bytes_uncompressed, size_t rows_count) const
{
using PartType = MergeTreeDataPartType;
using PartStorageType = MergeTreeDataPartStorageType;
Expand All @@ -3356,9 +3337,6 @@ MergeTreeDataPartFormat MergeTreeData::choosePartFormat(size_t bytes_uncompresse
return bytes_uncompressed < min_bytes_for || rows_count < min_rows_for;
};

if (!only_on_disk && satisfies(settings->min_bytes_for_compact_part, settings->min_rows_for_compact_part))
return {PartType::InMemory, PartStorageType::Full};

auto part_type = PartType::Wide;
if (satisfies(settings->min_bytes_for_wide_part, settings->min_rows_for_wide_part))
part_type = PartType::Compact;
Expand All @@ -3368,7 +3346,7 @@ MergeTreeDataPartFormat MergeTreeData::choosePartFormat(size_t bytes_uncompresse

MergeTreeDataPartFormat MergeTreeData::choosePartFormatOnDisk(size_t bytes_uncompressed, size_t rows_count) const
{
return choosePartFormat(bytes_uncompressed, rows_count, true);
return choosePartFormat(bytes_uncompressed, rows_count);
}

MergeTreeDataPartBuilder MergeTreeData::getDataPartBuilder(
Expand Down Expand Up @@ -6146,19 +6124,6 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
}
}

MergeTreeData::WriteAheadLogPtr wal;
auto get_inited_wal = [&] ()
{
if (!wal)
wal = data.getWriteAheadLog();
return wal;
};

if (settings->in_memory_parts_enable_wal)
for (const auto & part : precommitted_parts)
if (auto part_in_memory = asInMemoryPart(part))
get_inited_wal()->addPart(part_in_memory);

NOEXCEPT_SCOPE({
auto current_time = time(nullptr);

Expand Down Expand Up @@ -6202,10 +6167,6 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:

data.modifyPartState(covered_part, DataPartState::Outdated);
data.removePartContributionToColumnAndSecondaryIndexSizes(covered_part);

if (settings->in_memory_parts_enable_wal)
if (isInMemoryPart(covered_part))
get_inited_wal()->dropPart(covered_part->name);
}

reduce_parts += covered_parts.size();
Expand Down Expand Up @@ -7886,11 +7847,8 @@ bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, S
"Table can't create parts with adaptive granularity, but settings"
" min_rows_for_wide_part = {}"
", min_bytes_for_wide_part = {}"
", min_rows_for_compact_part = {}"
", min_bytes_for_compact_part = {}"
". Parts with non-adaptive granularity can be stored only in Wide (default) format.",
settings.min_rows_for_wide_part, settings.min_bytes_for_wide_part,
settings.min_rows_for_compact_part, settings.min_bytes_for_compact_part);
settings.min_rows_for_wide_part, settings.min_bytes_for_wide_part);
}

return false;
Expand Down Expand Up @@ -8270,9 +8228,6 @@ void MergeTreeData::incrementInsertedPartsProfileEvent(MergeTreeDataPartType typ
case MergeTreeDataPartType::Compact:
ProfileEvents::increment(ProfileEvents::InsertedCompactParts);
break;
case MergeTreeDataPartType::InMemory:
ProfileEvents::increment(ProfileEvents::InsertedInMemoryParts);
break;
default:
break;
}
Expand All @@ -8288,9 +8243,6 @@ void MergeTreeData::incrementMergedPartsProfileEvent(MergeTreeDataPartType type)
case MergeTreeDataPartType::Compact:
ProfileEvents::increment(ProfileEvents::MergedIntoCompactParts);
break;
case MergeTreeDataPartType::InMemory:
ProfileEvents::increment(ProfileEvents::MergedIntoInMemoryParts);
break;
default:
break;
}
Expand Down
5 changes: 1 addition & 4 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
using OperationDataPartsLock = std::unique_lock<std::mutex>;
OperationDataPartsLock lockOperationsWithParts() const { return OperationDataPartsLock(operation_with_data_parts_mutex); }

MergeTreeDataPartFormat choosePartFormat(size_t bytes_uncompressed, size_t rows_count, bool only_on_disk = false) const;
MergeTreeDataPartFormat choosePartFormat(size_t bytes_uncompressed, size_t rows_count) const;
MergeTreeDataPartFormat choosePartFormatOnDisk(size_t bytes_uncompressed, size_t rows_count) const;
MergeTreeDataPartBuilder getDataPartBuilder(const String & name, const VolumePtr & volume, const String & part_dir) const;

Expand Down Expand Up @@ -661,9 +661,6 @@ class MergeTreeData : public IStorage, public WithMutableContext
/// Removes parts from data_parts, they should be in Deleting state
void removePartsFinally(const DataPartsVector & parts);

/// When WAL is not enabled, the InMemoryParts need to be persistent.
void flushAllInMemoryPartsIfNeeded();

/// Delete irrelevant parts from memory and disk.
/// If 'force' - don't wait for old_parts_lifetime.
size_t clearOldPartsFromFilesystem(bool force = false);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeDataPartType.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class MergeTreeDataPartType
/// Data of all columns is stored in one file. Marks are also stored in single file.
Compact,

/// Format with buffering data in RAM.
/// Format with buffering data in RAM. Obsolete - new parts cannot be created in this format.
InMemory,

Unknown,
Expand Down
19 changes: 9 additions & 10 deletions src/Storages/MergeTree/MergeTreeSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ struct Settings;
/** Data storing format settings. */ \
M(UInt64, min_bytes_for_wide_part, 10485760, "Minimal uncompressed size in bytes to create part in wide format instead of compact", 0) \
M(UInt64, min_rows_for_wide_part, 0, "Minimal number of rows to create part in wide format instead of compact", 0) \
M(UInt64, min_bytes_for_compact_part, 0, "Experimental. Minimal uncompressed size in bytes to create part in compact format instead of saving it in RAM", 0) \
M(UInt64, min_rows_for_compact_part, 0, "Experimental. Minimal number of rows to create part in compact format instead of saving it in RAM", 0) \
M(Bool, in_memory_parts_enable_wal, true, "Whether to write blocks in Native format to write-ahead-log before creation in-memory part", 0) \
M(UInt64, write_ahead_log_max_bytes, 1024 * 1024 * 1024, "Rotate WAL, if it exceeds that amount of bytes", 0) \
M(Float, ratio_of_defaults_for_sparse_serialization, 1.0, "Minimal ratio of number of default values to number of all values in column to store it in sparse serializations. If >= 1, columns will be always written in full serialization.", 0) \
\
/** Merge settings. */ \
Expand All @@ -59,9 +55,6 @@ struct Settings;
M(UInt64, min_compressed_bytes_to_fsync_after_fetch, 0, "Minimal number of compressed bytes to do fsync for part after fetch (0 - disabled)", 0) \
M(Bool, fsync_after_insert, false, "Do fsync for every inserted part. Significantly decreases performance of inserts, not recommended to use with wide parts.", 0) \
M(Bool, fsync_part_directory, false, "Do fsync for part directory after all part operations (writes, renames, etc.).", 0) \
M(UInt64, write_ahead_log_bytes_to_fsync, 100ULL * 1024 * 1024, "Amount of bytes, accumulated in WAL to do fsync.", 0) \
M(UInt64, write_ahead_log_interval_ms_to_fsync, 100, "Interval in milliseconds after which fsync for WAL is being done.", 0) \
M(Bool, in_memory_parts_insert_sync, false, "If true insert of part with in-memory format will wait for fsync of WAL", 0) \
M(UInt64, non_replicated_deduplication_window, 0, "How many last blocks of hashes should be kept on disk (0 - disabled).", 0) \
M(UInt64, max_parts_to_merge_at_once, 100, "Max amount of parts which can be merged at once (0 - disabled). Doesn't affect OPTIMIZE FINAL query.", 0) \
M(UInt64, merge_selecting_sleep_ms, 5000, "Sleep time for merge selecting when no part selected, a lower setting will trigger selecting tasks in background_schedule_pool frequently which result in large amount of requests to zookeeper in large-scale clusters", 0) \
Expand Down Expand Up @@ -188,7 +181,14 @@ struct Settings;
M(UInt64, replicated_max_parallel_sends_for_table, 0, "Obsolete setting, does nothing.", 0) \
M(UInt64, replicated_max_parallel_fetches, 0, "Obsolete setting, does nothing.", 0) \
M(UInt64, replicated_max_parallel_fetches_for_table, 0, "Obsolete setting, does nothing.", 0) \
M(Bool, write_final_mark, true, "Obsolete setting, does nothing.", 0) \
M(Bool, write_final_mark, true, "Obsolete setting, does nothing.", 0) \
M(UInt64, min_bytes_for_compact_part, 0, "Obsolete setting, does nothing.", 0) \
M(UInt64, min_rows_for_compact_part, 0, "Obsolete setting, does nothing.", 0) \
M(Bool, in_memory_parts_enable_wal, true, "Obsolete setting, does nothing.", 0) \
M(UInt64, write_ahead_log_max_bytes, 1024 * 1024 * 1024, "Obsolete setting, does nothing.", 0) \
M(UInt64, write_ahead_log_bytes_to_fsync, 100ULL * 1024 * 1024, "Obsolete setting, does nothing.", 0) \
M(UInt64, write_ahead_log_interval_ms_to_fsync, 100, "Obsolete setting, does nothing.", 0) \
M(Bool, in_memory_parts_insert_sync, false, "Obsolete setting, does nothing.", 0) \
/// Settings that should not change after the creation of a table.
/// NOLINTNEXTLINE
#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \
Expand Down Expand Up @@ -216,8 +216,7 @@ struct MergeTreeSettings : public BaseSettings<MergeTreeSettingsTraits>

static bool isPartFormatSetting(const String & name)
{
return name == "min_bytes_for_wide_part" || name == "min_rows_for_wide_part"
|| name == "min_bytes_for_compact_part" || name == "min_rows_for_compact_part";
return name == "min_bytes_for_wide_part" || name == "min_rows_for_wide_part";
}

/// Check that the values are sane taking also query-level settings into account.
Expand Down
47 changes: 0 additions & 47 deletions src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,31 +84,6 @@ void MergeTreeWriteAheadLog::init()
bytes_at_last_sync = 0;
}

void MergeTreeWriteAheadLog::addPart(DataPartInMemoryPtr & part)
{
std::unique_lock lock(write_mutex);

auto part_info = MergeTreePartInfo::fromPartName(part->name, storage.format_version);
min_block_number = std::min(min_block_number, part_info.min_block);
max_block_number = std::max(max_block_number, part_info.max_block);

writeIntBinary(WAL_VERSION, *out);

ActionMetadata metadata{};
metadata.part_uuid = part->uuid;
metadata.write(*out);

writeIntBinary(static_cast<UInt8>(ActionType::ADD_PART), *out);
writeStringBinary(part->name, *out);
block_out->write(part->block);
block_out->flush();
sync(lock);

auto max_wal_bytes = storage.getSettings()->write_ahead_log_max_bytes;
if (out->count() > max_wal_bytes)
rotate(lock);
}

void MergeTreeWriteAheadLog::dropPart(const String & part_name)
{
std::unique_lock lock(write_mutex);
Expand All @@ -121,7 +96,6 @@ void MergeTreeWriteAheadLog::dropPart(const String & part_name)
writeIntBinary(static_cast<UInt8>(ActionType::DROP_PART), *out);
writeStringBinary(part_name, *out);
out->next();
sync(lock);
}

void MergeTreeWriteAheadLog::rotate(const std::unique_lock<std::mutex> &)
Expand Down Expand Up @@ -269,27 +243,6 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(
return result;
}

void MergeTreeWriteAheadLog::sync(std::unique_lock<std::mutex> & lock)
{
size_t bytes_to_sync = storage.getSettings()->write_ahead_log_bytes_to_fsync;
time_t time_to_sync = storage.getSettings()->write_ahead_log_interval_ms_to_fsync;
size_t current_bytes = out->count();

if (bytes_to_sync && current_bytes - bytes_at_last_sync > bytes_to_sync)
{
sync_task->schedule();
bytes_at_last_sync = current_bytes;
}
else if (time_to_sync && !sync_scheduled)
{
sync_task->scheduleAfter(time_to_sync);
sync_scheduled = true;
}

if (storage.getSettings()->in_memory_parts_insert_sync)
sync_cv.wait(lock, [this] { return !sync_scheduled; });
}

void MergeTreeWriteAheadLog::shutdown()
{
{
Expand Down
8 changes: 0 additions & 8 deletions src/Storages/StorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,6 @@ void StorageMergeTree::startup()
}
}

void StorageMergeTree::flush()
{
if (flush_called.exchange(true))
return;

flushAllInMemoryPartsIfNeeded();
}

void StorageMergeTree::shutdown()
{
if (shutdown_called.exchange(true))
Expand Down
1 change: 0 additions & 1 deletion src/Storages/StorageMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class StorageMergeTree final : public MergeTreeData
bool has_force_restore_data_flag);

void startup() override;
void flush() override;
void shutdown() override;

~StorageMergeTree() override;
Expand Down
8 changes: 0 additions & 8 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4455,14 +4455,6 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)
}
}

void StorageReplicatedMergeTree::flush()
{
if (flush_called.exchange(true))
return;

flushAllInMemoryPartsIfNeeded();
}


void StorageReplicatedMergeTree::partialShutdown()
{
Expand Down
1 change: 0 additions & 1 deletion src/Storages/StorageReplicatedMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ class StorageReplicatedMergeTree final : public MergeTreeData
void startup() override;
void shutdown() override;
void partialShutdown();
void flush() override;
~StorageReplicatedMergeTree() override;

static String getDefaultZooKeeperPath(const Poco::Util::AbstractConfiguration & config);
Expand Down
36 changes: 0 additions & 36 deletions tests/integration/test_backup_restore/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,39 +171,3 @@ def test_replace_partition(started_cluster):
assert TSV(res) == expected

instance.query("DROP TABLE IF EXISTS test.tbl3")


def test_freeze_in_memory(started_cluster):
instance.query(
"CREATE TABLE test.t_in_memory(a UInt32, s String) ENGINE = MergeTree ORDER BY a SETTINGS min_rows_for_compact_part = 1000"
)
instance.query("INSERT INTO test.t_in_memory VALUES (1, 'a')")
instance.query("ALTER TABLE test.t_in_memory FREEZE")

fp_backup = get_last_backup_path(
started_cluster.instances["node"], "test", "t_in_memory"
)
part_path = fp_backup + "/all_1_1_0/"

assert TSV(
instance.query(
"SELECT part_type, is_frozen FROM system.parts WHERE database = 'test' AND table = 't_in_memory'"
)
) == TSV("InMemory\t1\n")
instance.exec_in_container(["test", "-f", part_path + "/data.bin"])
assert instance.exec_in_container(["cat", part_path + "/count.txt"]).strip() == "1"

instance.query(
"CREATE TABLE test.t_in_memory_2(a UInt32, s String) ENGINE = MergeTree ORDER BY a"
)
copy_backup_to_detached(
started_cluster.instances["node"], "test", "t_in_memory", "t_in_memory_2"
)

instance.query("ALTER TABLE test.t_in_memory_2 ATTACH PARTITION ID 'all'")
assert TSV(
instance.query(
"SELECT part_type FROM system.parts WHERE database = 'test' AND table = 't_in_memory_2'"
)
) == TSV("Compact\n")
assert TSV(instance.query("SELECT a, s FROM test.t_in_memory_2")) == TSV("1\ta\n")
Loading