Skip to content

Commit cc8695a

Browse files
Backport #87020 to 25.8: Fix table name encoding in data lake rest catalog
1 parent cc191d0 commit cc8695a

File tree

2 files changed

+77
-3
lines changed

2 files changed

+77
-3
lines changed

src/Databases/DataLake/RestCatalog.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,8 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(
302302
{
303303
const auto & context = getContext();
304304

305-
Poco::URI url(base_url / endpoint);
305+
/// enable_url_encoding=false to allow use tables with encoded sequences in names like 'foo%2Fbar'
306+
Poco::URI url(base_url / endpoint, /* enable_url_encoding */ false);
306307
if (!params.empty())
307308
url.setQueryParameters(params);
308309

@@ -542,7 +543,12 @@ DB::Names RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & bas
542543
for (size_t i = 0; i < identifiers_object->size(); ++i)
543544
{
544545
const auto current_table_json = identifiers_object->get(static_cast<int>(i)).extract<Poco::JSON::Object::Ptr>();
545-
const auto table_name = current_table_json->get("name").extract<String>();
546+
/// If table has encoded sequence (like 'foo%2Fbar')
547+
/// catalog returns decoded character instead of sequence ('foo/bar')
548+
/// Here name encoded back to 'foo%2Fbar' format
549+
const auto table_name_raw = current_table_json->get("name").extract<String>();
550+
std::string table_name;
551+
Poco::URI::encode(table_name_raw, "/", table_name);
546552

547553
tables.push_back(base_namespace + "." + table_name);
548554
if (limit && tables.size() >= limit)
@@ -731,7 +737,8 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r
731737
};
732738
}
733739

734-
Poco::URI url(endpoint);
740+
/// enable_url_encoding=false to allow use tables with encoded sequences in names like 'foo%2Fbar'
741+
Poco::URI url(endpoint, /* enable_url_encoding */ false);
735742
auto wb = DB::BuilderRWBufferFromHTTP(url)
736743
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
737744
.withMethod(method)

tests/integration/test_database_iceberg/test.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,7 @@ def test_create(started_cluster):
560560
node.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_name}` VALUES ('AAPL');", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1})
561561
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`") == "AAPL\n"
562562

563+
563564
def test_drop_table(started_cluster):
564565
node = started_cluster.instances["node1"]
565566

@@ -575,3 +576,69 @@ def test_drop_table(started_cluster):
575576

576577
drop_clickhouse_iceberg_table(node, root_namespace, table_name)
577578
assert len(catalog.list_tables(root_namespace)) == 0
579+
580+
581+
def test_table_with_slash(started_cluster):
582+
node = started_cluster.instances["node1"]
583+
584+
# pyiceberg at current moment (version 0.9.1) has a bug with table names with slashes
585+
# see https://github.com/apache/iceberg-python/issues/2462
586+
# so we need to encode it manually
587+
table_raw_suffix = "table/foo"
588+
table_encoded_suffix = "table%2Ffoo"
589+
590+
test_ref = f"test_list_tables_{uuid.uuid4()}"
591+
table_name = f"{test_ref}_{table_raw_suffix}"
592+
table_encoded_name = f"{test_ref}_{table_encoded_suffix}"
593+
root_namespace = f"{test_ref}_namespace"
594+
595+
catalog = load_catalog_impl(started_cluster)
596+
catalog.create_namespace(root_namespace)
597+
598+
create_table(catalog, root_namespace, table_name, DEFAULT_SCHEMA, PartitionSpec(), DEFAULT_SORT_ORDER)
599+
600+
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
601+
node.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}` VALUES (NULL, 'AAPL', 193.24, 193.31, tuple('bot'));", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1})
602+
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}`") == "\\N\tAAPL\t193.24\t193.31\t('bot')\n"
603+
604+
605+
def test_cluster_select(started_cluster):
606+
node1 = started_cluster.instances["node1"]
607+
node2 = started_cluster.instances["node2"]
608+
609+
test_ref = f"test_list_tables_{uuid.uuid4()}"
610+
table_name = f"{test_ref}_table"
611+
root_namespace = f"{test_ref}_namespace"
612+
613+
catalog = load_catalog_impl(started_cluster)
614+
create_clickhouse_iceberg_database(started_cluster, node1, CATALOG_NAME)
615+
create_clickhouse_iceberg_database(started_cluster, node2, CATALOG_NAME)
616+
create_clickhouse_iceberg_table(started_cluster, node1, root_namespace, table_name, "(x String)")
617+
node1.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_name}` VALUES ('pablo');", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1})
618+
619+
query_id = uuid.uuid4().hex
620+
assert node1.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` SETTINGS parallel_replicas_for_cluster_engines=1, enable_parallel_replicas=2, cluster_for_parallel_replicas='cluster_simple'", query_id=query_id) == 'pablo\n'
621+
622+
node1.query("SYSTEM FLUSH LOGS system.query_log")
623+
node2.query("SYSTEM FLUSH LOGS system.query_log")
624+
625+
assert node1.query(f"SELECT Settings['parallel_replicas_for_cluster_engines'] AS parallel_replicas_for_cluster_engines FROM system.query_log WHERE query_id = '{query_id}' LIMIT 1;") == '1\n'
626+
627+
for replica in [node1, node2]:
628+
cluster_secondary_queries = (
629+
replica.query(
630+
f"""
631+
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
632+
WHERE
633+
type = 'QueryStart' AND
634+
positionCaseInsensitive(query, 's3Cluster') != 0 AND
635+
position(query, 'system.query_log') = 0 AND
636+
NOT is_initial_query
637+
"""
638+
)
639+
.strip()
640+
.split("\n")
641+
)
642+
assert len(cluster_secondary_queries) == 1
643+
644+
assert node2.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`", settings={"parallel_replicas_for_cluster_engines":1, 'enable_parallel_replicas': 2, 'cluster_for_parallel_replicas': 'cluster_simple', 'parallel_replicas_for_cluster_engines' : 1}) == 'pablo\n'

0 commit comments

Comments
 (0)