Skip to content

Commit 502df28

Browse files
committed
Fix remote initiator host name
1 parent b806ac5 commit 502df28

File tree

3 files changed

+129
-1
lines changed

3 files changed

+129
-1
lines changed

src/Storages/IStorageCluster.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include <Analyzer/JoinNode.h>
3535
#include <Analyzer/InDepthQueryTreeVisitor.h>
3636
#include <Analyzer/Utils.h>
37+
#include <Poco/URI.h>
3738

3839
#include <algorithm>
3940
#include <memory>
@@ -405,7 +406,8 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
405406
/// After getClusterImpl each shard must have exactly 1 replica
406407
if (shard_addresses.size() != 1)
407408
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings);
408-
auto host_name = shard_addresses[0].toString();
409+
std::string host_name;
410+
Poco::URI::decode(shard_addresses[0].toString(), host_name);
409411

410412
LOG_INFO(log, "Choose remote initiator '{}'", host_name);
411413

tests/integration/test_s3_cluster/configs/cluster.xml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,20 @@
2020
</shard>
2121
</cluster_simple>
2222

23+
<!-- Cluster without s0_0_0-->
24+
<cluster_remote>
25+
<shard>
26+
<replica>
27+
<host>s0_0_1</host>
28+
<port>9000</port>
29+
</replica>
30+
<replica>
31+
<host>s0_1_0</host>
32+
<port>9000</port>
33+
</replica>
34+
</shard>
35+
</cluster_remote>
36+
2337
<!-- A part of the cluster above, represents only one shard-->
2438
<first_shard>
2539
<shard>
@@ -49,6 +63,44 @@
4963
</shard>
5064
</cluster_non_existent_port>
5165

66+
<cluster_with_dots>
67+
<shard>
68+
<replica>
69+
<host>c2.s0_0_0</host>
70+
<port>9000</port>
71+
</replica>
72+
<replica>
73+
<host>c2.s0_0_1</host>
74+
<port>9000</port>
75+
</replica>
76+
</shard>
77+
</cluster_with_dots>
78+
79+
<cluster_all>
80+
<shard>
81+
<replica>
82+
<host>s0_0_0</host>
83+
<port>9000</port>
84+
</replica>
85+
<replica>
86+
<host>s0_0_1</host>
87+
<port>9000</port>
88+
</replica>
89+
<replica>
90+
<host>s0_1_0</host>
91+
<port>9000</port>
92+
</replica>
93+
<replica>
94+
<host>c2.s0_0_0</host>
95+
<port>9000</port>
96+
</replica>
97+
<replica>
98+
<host>c2.s0_0_1</host>
99+
<port>9000</port>
100+
</replica>
101+
</shard>
102+
</cluster_all>
103+
52104
</remote_servers>
53105
<macros>
54106
<default_cluster_macro>cluster_simple</default_cluster_macro>

tests/integration/test_s3_cluster/test.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,22 @@ def started_cluster():
114114
with_zookeeper=True,
115115
stay_alive=True,
116116
)
117+
cluster.add_instance(
118+
"c2.s0_0_0",
119+
main_configs=["configs/cluster.xml", "configs/named_collections.xml"],
120+
user_configs=["configs/users.xml"],
121+
macros={"replica": "replica1", "shard": "shard1"},
122+
with_zookeeper=True,
123+
stay_alive=True,
124+
)
125+
cluster.add_instance(
126+
"c2.s0_0_1",
127+
main_configs=["configs/cluster.xml", "configs/named_collections.xml"],
128+
user_configs=["configs/users.xml"],
129+
macros={"replica": "replica2", "shard": "shard1"},
130+
with_zookeeper=True,
131+
stay_alive=True,
132+
)
117133

118134
logging.info("Starting cluster...")
119135
cluster.start()
@@ -1197,3 +1213,61 @@ def test_joins(started_cluster):
11971213
)
11981214
res = list(map(str.split, result8.splitlines()))
11991215
assert len(res) == 25
1216+
1217+
1218+
def test_object_storage_remote_initiator(started_cluster):
1219+
node = started_cluster.instances["s0_0_0"]
1220+
1221+
query_id = uuid.uuid4().hex
1222+
result = node.query(
1223+
f"""
1224+
SELECT * from s3Cluster(
1225+
'cluster_remote',
1226+
'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
1227+
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
1228+
SETTINGS object_storage_remote_initiator=1
1229+
""",
1230+
query_id = query_id,
1231+
)
1232+
1233+
assert result is not None
1234+
1235+
node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'")
1236+
queries = node.query(
1237+
f"""
1238+
SELECT count()
1239+
FROM clusterAllReplicas('cluster_all', system.query_log)
1240+
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
1241+
FORMAT TSV
1242+
"""
1243+
).splitlines()
1244+
1245+
# initial node + describe table + remote initiator + 2 subqueries on replicas
1246+
assert queries == ["5"]
1247+
1248+
query_id = uuid.uuid4().hex
1249+
result = node.query(
1250+
f"""
1251+
SELECT * from s3Cluster(
1252+
'cluster_with_dots',
1253+
'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
1254+
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)
1255+
SETTINGS object_storage_remote_initiator=1
1256+
""",
1257+
query_id = query_id,
1258+
)
1259+
1260+
assert result is not None
1261+
1262+
node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'")
1263+
queries = node.query(
1264+
f"""
1265+
SELECT count()
1266+
FROM clusterAllReplicas('cluster_all', system.query_log)
1267+
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
1268+
FORMAT TSV
1269+
"""
1270+
).splitlines()
1271+
1272+
# initial node + describe table + remote initiator + 2 subqueries on replicas
1273+
assert queries == ["5"]

0 commit comments

Comments
 (0)