Skip to content

Commit cc2cadb

Browse files
authored
Merge pull request #11893 from excitoon-favorites/autostartmoves
In *MergeTree: Parts moving task shall be started if new storage policy needs them
2 parents c7d8bf6 + 288b407 commit cc2cadb

File tree

8 files changed

+123
-34
lines changed

8 files changed

+123
-34
lines changed

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,6 +1495,8 @@ void MergeTreeData::changeSettings(
14951495
{
14961496
if (new_settings)
14971497
{
1498+
bool has_storage_policy_changed = false;
1499+
14981500
const auto & new_changes = new_settings->as<const ASTSetQuery &>().changes;
14991501

15001502
for (const auto & change : new_changes)
@@ -1503,28 +1505,34 @@ void MergeTreeData::changeSettings(
15031505
StoragePolicyPtr new_storage_policy = global_context.getStoragePolicy(change.value.safeGet<String>());
15041506
StoragePolicyPtr old_storage_policy = getStoragePolicy();
15051507

1506-
checkStoragePolicy(new_storage_policy);
1508+
/// StoragePolicy of different version or name is guaranteed to have different pointer
1509+
if (new_storage_policy != old_storage_policy)
1510+
{
1511+
checkStoragePolicy(new_storage_policy);
15071512

1508-
std::unordered_set<String> all_diff_disk_names;
1509-
for (const auto & disk : new_storage_policy->getDisks())
1510-
all_diff_disk_names.insert(disk->getName());
1511-
for (const auto & disk : old_storage_policy->getDisks())
1512-
all_diff_disk_names.erase(disk->getName());
1513+
std::unordered_set<String> all_diff_disk_names;
1514+
for (const auto & disk : new_storage_policy->getDisks())
1515+
all_diff_disk_names.insert(disk->getName());
1516+
for (const auto & disk : old_storage_policy->getDisks())
1517+
all_diff_disk_names.erase(disk->getName());
15131518

1514-
for (const String & disk_name : all_diff_disk_names)
1515-
{
1516-
auto disk = new_storage_policy->getDiskByName(disk_name);
1517-
if (disk->exists(relative_data_path))
1518-
throw Exception("New storage policy contain disks which already contain data of a table with the same name", ErrorCodes::LOGICAL_ERROR);
1519-
}
1519+
for (const String & disk_name : all_diff_disk_names)
1520+
{
1521+
auto disk = new_storage_policy->getDiskByName(disk_name);
1522+
if (disk->exists(relative_data_path))
1523+
throw Exception("New storage policy contain disks which already contain data of a table with the same name", ErrorCodes::LOGICAL_ERROR);
1524+
}
15201525

1521-
for (const String & disk_name : all_diff_disk_names)
1522-
{
1523-
auto disk = new_storage_policy->getDiskByName(disk_name);
1524-
disk->createDirectories(relative_data_path);
1525-
disk->createDirectories(relative_data_path + "detached");
1526+
for (const String & disk_name : all_diff_disk_names)
1527+
{
1528+
auto disk = new_storage_policy->getDiskByName(disk_name);
1529+
disk->createDirectories(relative_data_path);
1530+
disk->createDirectories(relative_data_path + "detached");
1531+
}
1532+
/// FIXME how would that be done while reloading configuration???
1533+
1534+
has_storage_policy_changed = true;
15261535
}
1527-
/// FIXME how would that be done while reloading configuration???
15281536
}
15291537

15301538
MergeTreeSettings copy = *getSettings();
@@ -1533,6 +1541,9 @@ void MergeTreeData::changeSettings(
15331541
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
15341542
new_metadata.setSettingsChanges(new_settings);
15351543
setInMemoryMetadata(new_metadata);
1544+
1545+
if (has_storage_policy_changed)
1546+
startBackgroundMovesIfNeeded();
15361547
}
15371548
}
15381549

@@ -3291,12 +3302,11 @@ bool MergeTreeData::selectPartsAndMove()
32913302
bool MergeTreeData::areBackgroundMovesNeeded() const
32923303
{
32933304
auto policy = getStoragePolicy();
3294-
auto metadata_snapshot = getInMemoryMetadataPtr();
32953305

32963306
if (policy->getVolumes().size() > 1)
32973307
return true;
32983308

3299-
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1 && metadata_snapshot->hasAnyMoveTTL();
3309+
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1;
33003310
}
33013311

33023312
bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space)

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -794,8 +794,6 @@ class MergeTreeData : public IStorage
794794

795795
void checkStoragePolicy(const StoragePolicyPtr & new_storage_policy) const;
796796

797-
void setStoragePolicy(const String & new_storage_policy_name, bool only_check = false);
798-
799797
/// Calculates column sizes in compressed form for the current state of data_parts. Call with data_parts mutex locked.
800798
void calculateColumnSizesImpl();
801799
/// Adds or subtracts the contribution of the part to compressed column sizes.
@@ -873,6 +871,8 @@ class MergeTreeData : public IStorage
873871
CurrentlyMovingPartsTagger checkPartsForMove(const DataPartsVector & parts, SpacePtr space);
874872

875873
bool canUsePolymorphicParts(const MergeTreeSettings & settings, String * out_reason) const;
874+
875+
virtual void startBackgroundMovesIfNeeded() = 0;
876876
};
877877

878878
}

src/Storages/StorageMergeTree.cpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,7 @@ void StorageMergeTree::startup()
102102
/// Ensure that thread started only after assignment to 'merging_mutating_task_handle' is done.
103103
merge_pool.startTask(merging_mutating_task_handle);
104104

105-
if (areBackgroundMovesNeeded())
106-
{
107-
auto & move_pool = global_context.getBackgroundMovePool();
108-
moving_task_handle = move_pool.createTask([this] { return movePartsTask(); });
109-
move_pool.startTask(moving_task_handle);
110-
}
105+
startBackgroundMovesIfNeeded();
111106
}
112107
catch (...)
113108
{
@@ -464,6 +459,18 @@ bool StorageMergeTree::isMutationDone(Int64 mutation_version) const
464459
return true;
465460
}
466461

462+
463+
void StorageMergeTree::startBackgroundMovesIfNeeded()
464+
{
465+
if (areBackgroundMovesNeeded() && !moving_task_handle)
466+
{
467+
auto & move_pool = global_context.getBackgroundMovePool();
468+
moving_task_handle = move_pool.createTask([this] { return movePartsTask(); });
469+
move_pool.startTask(moving_task_handle);
470+
}
471+
}
472+
473+
467474
std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
468475
{
469476
std::lock_guard lock(currently_processing_in_background_mutex);

src/Storages/StorageMergeTree.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ class StorageMergeTree final : public ext::shared_ptr_helper<StorageMergeTree>,
159159
/// Just checks versions of each active data part
160160
bool isMutationDone(Int64 mutation_version) const;
161161

162+
void startBackgroundMovesIfNeeded() override;
163+
162164
friend class MergeTreeBlockOutputStream;
163165
friend class MergeTreeData;
164166
friend struct CurrentlyMergingPartsTagger;

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3263,12 +3263,7 @@ void StorageReplicatedMergeTree::startup()
32633263
pool.startTask(queue_task_handle);
32643264
}
32653265

3266-
if (areBackgroundMovesNeeded())
3267-
{
3268-
auto & pool = global_context.getBackgroundMovePool();
3269-
move_parts_task_handle = pool.createTask([this] { return movePartsTask(); });
3270-
pool.startTask(move_parts_task_handle);
3271-
}
3266+
startBackgroundMovesIfNeeded();
32723267
}
32733268
catch (...)
32743269
{
@@ -5702,4 +5697,16 @@ MutationCommands StorageReplicatedMergeTree::getFirtsAlterMutationCommandsForPar
57025697
{
57035698
return queue.getFirstAlterMutationCommandsForPart(part);
57045699
}
5700+
5701+
5702+
void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded()
5703+
{
5704+
if (areBackgroundMovesNeeded() && !move_parts_task_handle)
5705+
{
5706+
auto & pool = global_context.getBackgroundMovePool();
5707+
move_parts_task_handle = pool.createTask([this] { return movePartsTask(); });
5708+
pool.startTask(move_parts_task_handle);
5709+
}
5710+
}
5711+
57055712
}

src/Storages/StorageReplicatedMergeTree.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,8 @@ class StorageReplicatedMergeTree final : public ext::shared_ptr_helper<StorageRe
551551

552552
MutationCommands getFirtsAlterMutationCommandsForPart(const DataPartPtr & part) const override;
553553

554+
void startBackgroundMovesIfNeeded() override;
555+
554556
protected:
555557
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
556558
*/

tests/integration/test_ttl_move/configs/config.d/storage_configuration.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,20 @@
4040
</volumes>
4141
</jbods_with_external>
4242

43+
<default_with_small_jbod_with_external>
44+
<volumes>
45+
<default>
46+
<disk>default</disk>
47+
</default>
48+
<main>
49+
<disk>jbod1</disk>
50+
</main>
51+
<external>
52+
<disk>external</disk>
53+
</external>
54+
</volumes>
55+
</default_with_small_jbod_with_external>
56+
4357
<small_jbod_with_external>
4458
<volumes>
4559
<main>

tests/integration/test_ttl_move/test.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,53 @@ def test_inserts_to_disk_work(started_cluster, name, engine, positive):
160160
pass
161161

162162

163+
@pytest.mark.parametrize("name,engine", [
164+
("mt_test_moves_work_after_storage_policy_change","MergeTree()"),
165+
("replicated_mt_test_moves_work_after_storage_policy_change","ReplicatedMergeTree('/clickhouse/test_moves_work_after_storage_policy_change', '1')"),
166+
])
167+
def test_moves_work_after_storage_policy_change(started_cluster, name, engine):
168+
try:
169+
node1.query("""
170+
CREATE TABLE {name} (
171+
s1 String,
172+
d1 DateTime
173+
) ENGINE = {engine}
174+
ORDER BY tuple()
175+
""".format(name=name, engine=engine))
176+
177+
node1.query("""ALTER TABLE {name} MODIFY SETTING storage_policy='default_with_small_jbod_with_external'""".format(name=name))
178+
179+
# Second expression is preferred because d1 > now()-3600.
180+
node1.query("""ALTER TABLE {name} MODIFY TTL now()-3600 TO DISK 'jbod1', d1 TO DISK 'external'""".format(name=name))
181+
182+
wait_expire_1 = 12
183+
wait_expire_2 = 4
184+
time_1 = time.time() + wait_expire_1
185+
time_2 = time.time() + wait_expire_1 + wait_expire_2
186+
187+
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
188+
wait_expire_1_thread.start()
189+
190+
data = [] # 10MB in total
191+
for i in range(10):
192+
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1))) # 1MB row
193+
194+
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
195+
used_disks = get_used_disks_for_table(node1, name)
196+
assert set(used_disks) == {"jbod1"}
197+
198+
wait_expire_1_thread.join()
199+
time.sleep(wait_expire_2/2)
200+
201+
used_disks = get_used_disks_for_table(node1, name)
202+
assert set(used_disks) == {"external"}
203+
204+
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "10"
205+
206+
finally:
207+
node1.query("DROP TABLE IF EXISTS {}".format(name))
208+
209+
163210
@pytest.mark.parametrize("name,engine,positive", [
164211
("mt_test_moves_to_disk_do_not_work","MergeTree()",0),
165212
("replicated_mt_test_moves_to_disk_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_do_not_work', '1')",0),

0 commit comments

Comments
 (0)