Skip to content

Commit 6971379

Browse files
committed
add cluster tests
1 parent 44faec6 commit 6971379

File tree

3 files changed

+89
-0
lines changed

3 files changed

+89
-0
lines changed

tests/integration/test_parquet_drop_metadata_cache/__init__.py

Whitespace-only changes.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<clickhouse>
2+
<remote_servers>
3+
<parquet_clear_cache_cluster>
4+
<shard>
5+
<replica>
6+
<host>node1</host>
7+
<port>9000</port>
8+
</replica>
9+
<replica>
10+
<host>node2</host>
11+
<port>9000</port>
12+
</replica>
13+
<replica>
14+
<host>node3</host>
15+
<port>9000</port>
16+
</replica>
17+
</shard>
18+
</parquet_clear_cache_cluster>
19+
</remote_servers>
20+
</clickhouse>
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
2+
from helpers.cluster import ClickHouseCluster
3+
4+
cluster = ClickHouseCluster(__file__)
5+
node1 = cluster.add_instance("node1", main_configs=["configs/config.d/cluster.xml"], with_zookeeper=True)
6+
node2 = cluster.add_instance("node2", main_configs=["configs/config.d/cluster.xml"], with_zookeeper=True)
7+
node3 = cluster.add_instance("node3", main_configs=["configs/config.d/cluster.xml"], with_zookeeper=True)
8+
9+
10+
@pytest.fixture(scope="module")
11+
def start_cluster():
12+
try:
13+
cluster.start()
14+
yield cluster
15+
finally:
16+
cluster.shutdown()
17+
18+
19+
def test_clear_cache_on_cluster(started_cluster):
20+
node1.query("INSERT INTO TABLE FUNCTION s3(s3_conn, filename='test_clear_cache/{_partition_id}.parquet', format=Parquet) PARTITION BY number SELECT number FROM numbers(3)")
21+
22+
node1.query("SELECT * FROM s3(s3_conn, filename='test_clear_cache/1.parquet', format=Parquet) SETTINGS log_comment='cold_cache'")
23+
node2.query("SELECT * FROM s3(s3_conn, filename='test_clear_cache/2.parquet', format=Parquet) SETTINGS log_comment='cold_cache'")
24+
node3.query("SELECT * FROM s3(s3_conn, filename='test_clear_cache/3.parquet', format=Parquet) SETTINGS log_comment='cold_cache'")
25+
26+
node1.query("SYSTEM FLUSH LOGS ON CLUSTER parquet_clear_cache_cluster")
27+
28+
cold_cache_result_n1 = node1.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cold_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
29+
cold_cache_result_n2 = node2.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cold_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
30+
cold_cache_result_n3 = node3.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cold_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
31+
32+
assert(cold_cache_result_n1 == cold_cache_result_n2 == cold_cache_result_n3)
33+
assert(cold_cache_result_n1 == '0')
34+
35+
36+
node1.query("SELECT * FROM s3(s3_conn, filename='test_clear_cache/1.parquet', format=Parquet) SETTINGS log_comment='hot_cache'")
37+
node2.query("SELECT * FROM s3(s3_conn, filename='test_clear_cache/2.parquet', format=Parquet) SETTINGS log_comment='hot_cache'")
38+
node3.query("SELECT * FROM s3(s3_conn, filename='test_clear_cache/3.parquet', format=Parquet) SETTINGS log_comment='hot_cache'")
39+
40+
node1.query("SYSTEM FLUSH LOGS ON CLUSTER parquet_clear_cache_cluster")
41+
42+
warm_cache_result_n1 = node1.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cold_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
43+
warm_cache_result_n2 = node2.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cold_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
44+
warm_cache_result_n3 = node3.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cold_cache' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
45+
46+
assert(warm_cache_result_n1 == warm_cache_result_n2 == warm_cache_result_n3)
47+
assert(warm_cache_result_n1 == '1')
48+
49+
node1.query("SYSTEM DROP PARQUET METADATA CACHE ON CLUSTER parquet_clear_cache_cluster")
50+
51+
node1.query("SELECT * FROM s3(s3_conn, filename='test_clear_cache/1.parquet', format=Parquet) SETTINGS log_comment='cache_after_drop'")
52+
node2.query("SELECT * FROM s3(s3_conn, filename='test_clear_cache/2.parquet', format=Parquet) SETTINGS log_comment='cache_after_drop'")
53+
node3.query("SELECT * FROM s3(s3_conn, filename='test_clear_cache/3.parquet', format=Parquet) SETTINGS log_comment='cache_after_drop'")
54+
55+
node1.query("SYSTEM FLUSH LOGS ON CLUSTER parquet_clear_cache_cluster")
56+
57+
cache_after_drop_result_n1 = node1.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
58+
cache_after_drop_result_n2 = node2.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
59+
cache_after_drop_result_n3 = node3.query("SELECT ProfileEvents['ParquetMetaDataCacheHits'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
60+
61+
assert(cache_after_drop_result_n1 == cache_after_drop_result_n2 == cache_after_drop_result_n3)
62+
assert(cache_after_drop_result_n1 == '0')
63+
64+
misses_after_drop_result_n1 = node1.query("SELECT ProfileEvents['ParquetMetaDataCacheMisses'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
65+
misses_after_drop_result_n2 = node2.query("SELECT ProfileEvents['ParquetMetaDataCacheMisses'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
66+
misses_after_drop_result_n3 = node3.query("SELECT ProfileEvents['ParquetMetaDataCacheMisses'] FROM system.query_log where log_comment = 'cache_after_drop' AND type = 'QueryFinish' ORDER BY event_time desc LIMIT 1;")
67+
68+
assert(misses_after_drop_result_n1 == misses_after_drop_result_n2 == misses_after_drop_result_n3)
69+
assert(misses_after_drop_result_n1 == '1')

0 commit comments

Comments
 (0)