Skip to content

Commit de63cf0

Browse files
Avogarmkmkme
authored andcommitted
Merge pull request ClickHouse#94335 from mkmkme/dot-issue
Fix reading columns with dot-separated names from Iceberg
1 parent cb2aef9 commit de63cf0

File tree

3 files changed

+238
-7
lines changed

3 files changed

+238
-7
lines changed

src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ NamesAndTypesList SchemaConverter::inferSchema()
135135
return res;
136136
}
137137

138-
std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElement & element) const
138+
std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElement & element, const String & current_path) const
139139
{
140140
if (!column_mapper)
141141
return element.name;
@@ -150,8 +150,19 @@ std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElem
150150
auto it = map.find(element.field_id);
151151
if (it == map.end())
152152
throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Parquet file has column {} with field_id {} that is not in datalake metadata", element.name, element.field_id);
153-
auto split = Nested::splitName(std::string_view(it->second), /*reverse=*/ true);
154-
return split.second.empty() ? split.first : split.second;
153+
154+
/// At top level (empty path), return the full mapped name. For nested
155+
/// elements, strip the parent path prefix to get the child name.
156+
if (current_path.empty())
157+
return it->second;
158+
159+
/// Strip "current_path." prefix to get the child name (preserves dots in child names)
160+
std::string_view mapped = it->second;
161+
if (mapped.starts_with(current_path) && mapped.size() > current_path.size()
162+
&& mapped[current_path.size()] == '.')
163+
return mapped.substr(current_path.size() + 1);
164+
165+
return it->second;
155166
}
156167

157168
void SchemaConverter::processSubtree(TraversalNode & node)
@@ -169,7 +180,7 @@ void SchemaConverter::processSubtree(TraversalNode & node)
169180

170181
if (node.schema_context == SchemaContext::None)
171182
{
172-
node.appendNameComponent(node.element->name, useColumnMapperIfNeeded(*node.element));
183+
node.appendNameComponent(node.element->name, useColumnMapperIfNeeded(*node.element, node.name));
173184

174185
if (sample_block)
175186
{
@@ -617,7 +628,7 @@ void SchemaConverter::processSubtreeTuple(TraversalNode & node)
617628
std::vector<String> element_names_in_file;
618629
for (size_t i = 0; i < size_t(node.element->num_children); ++i)
619630
{
620-
const String & element_name = element_names_in_file.emplace_back(useColumnMapperIfNeeded(file_metadata.schema.at(schema_idx)));
631+
const String & element_name = element_names_in_file.emplace_back(useColumnMapperIfNeeded(file_metadata.schema.at(schema_idx), node.name));
621632
std::optional<size_t> idx_in_output_tuple = i - skipped_unsupported_columns;
622633
if (lookup_by_name)
623634
{

src/Processors/Formats/Impl/Parquet/SchemaConverter.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,10 @@ struct SchemaConverter
137137
DataTypePtr & out_inferred_type, std::optional<GeoColumnMetadata> geo_metadata) const;
138138

139139
/// Returns element.name or a corresponding name from ColumnMapper.
140-
/// For tuple elements, that's just the element name like `x`, not the whole path like `t.x`.
141-
std::string_view useColumnMapperIfNeeded(const parq::SchemaElement & element) const;
140+
/// For nested tuple elements, returns just the element name like `x`, not the whole path like `t.x`.
141+
/// For top-level columns (when current_path is empty), returns the full mapped name to support
142+
/// column names with dots (e.g. `integer.col` in Iceberg).
143+
std::string_view useColumnMapperIfNeeded(const parq::SchemaElement & element, const String & current_path) const;
142144
};
143145

144146
}
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
import pytest
2+
3+
from pyspark.sql.types import (
4+
IntegerType,
5+
StringType,
6+
StructField,
7+
StructType,
8+
)
9+
10+
from helpers.iceberg_utils import (
11+
default_upload_directory,
12+
write_iceberg_from_df,
13+
create_iceberg_table,
14+
get_creation_expression,
15+
get_uuid_str,
16+
)
17+
18+
19+
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
20+
def test_column_names_with_dots(started_cluster_iceberg_with_spark, storage_type):
21+
"""
22+
Test that Iceberg tables with dot-separated column names are read correctly.
23+
This tests the fix for field ID-based column name mapping in Parquet V3 reader.
24+
"""
25+
instance = started_cluster_iceberg_with_spark.instances["node1"]
26+
spark = started_cluster_iceberg_with_spark.spark_session
27+
TABLE_NAME = "test_column_names_with_dots_" + storage_type + "_" + get_uuid_str()
28+
29+
# Create DataFrame with column names containing dots
30+
data = [
31+
(1, "value1", "multi_dot_value1"),
32+
(2, "value2", "multi_dot_value2"),
33+
(3, "value3", "multi_dot_value3"),
34+
]
35+
schema = StructType([
36+
StructField("id", IntegerType()),
37+
StructField("name.column", StringType()),
38+
StructField("double.column.dot", StringType()),
39+
])
40+
df = spark.createDataFrame(data=data, schema=schema)
41+
42+
write_iceberg_from_df(spark, df, TABLE_NAME, mode="overwrite", format_version="2")
43+
44+
default_upload_directory(
45+
started_cluster_iceberg_with_spark,
46+
storage_type,
47+
f"/iceberg_data/default/{TABLE_NAME}/",
48+
f"/iceberg_data/default/{TABLE_NAME}/",
49+
)
50+
51+
# Test via table function
52+
table_function_expr = get_creation_expression(
53+
storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True
54+
)
55+
56+
# Verify single-dot column name
57+
result = instance.query(
58+
f"SELECT `name.column` FROM {table_function_expr} ORDER BY id"
59+
).strip()
60+
assert result == "value1\nvalue2\nvalue3", f"Expected values, got: {result}"
61+
62+
# Verify multi-dot column name
63+
result = instance.query(
64+
f"SELECT `double.column.dot` FROM {table_function_expr} ORDER BY id"
65+
).strip()
66+
assert result == "multi_dot_value1\nmulti_dot_value2\nmulti_dot_value3", f"Expected values, got: {result}"
67+
68+
# Verify all columns together
69+
result = instance.query(
70+
f"SELECT id, `name.column`, `double.column.dot` FROM {table_function_expr} ORDER BY id"
71+
).strip()
72+
expected = "1\tvalue1\tmulti_dot_value1\n2\tvalue2\tmulti_dot_value2\n3\tvalue3\tmulti_dot_value3"
73+
assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"
74+
75+
# Test via table engine
76+
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark)
77+
78+
result = instance.query(
79+
f"SELECT `name.column`, `double.column.dot` FROM {TABLE_NAME} ORDER BY id"
80+
).strip()
81+
expected = "value1\tmulti_dot_value1\nvalue2\tmulti_dot_value2\nvalue3\tmulti_dot_value3"
82+
assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"
83+
84+
85+
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
86+
def test_nested_struct_with_dotted_field(started_cluster_iceberg_with_spark, storage_type):
87+
"""
88+
Test that nested struct fields with dot-separated names are read correctly.
89+
This tests the fix for prefix stripping in useColumnMapperIfNeeded.
90+
E.g., for my_struct.weird.field we should return "weird.field", not just "field".
91+
"""
92+
instance = started_cluster_iceberg_with_spark.instances["node1"]
93+
spark = started_cluster_iceberg_with_spark.spark_session
94+
TABLE_NAME = "test_nested_struct_with_dotted_field_" + storage_type + "_" + get_uuid_str()
95+
96+
# Create DataFrame with nested struct containing a dotted field
97+
data = [
98+
(1, (100, "nested_dot_value1")),
99+
(2, (200, "nested_dot_value2")),
100+
(3, (300, "nested_dot_value3")),
101+
]
102+
schema = StructType(
103+
[
104+
StructField("id", IntegerType()),
105+
StructField(
106+
"my_struct",
107+
StructType(
108+
[
109+
StructField("normal_field", IntegerType()),
110+
StructField("weird.field", StringType()),
111+
]
112+
)
113+
)
114+
]
115+
)
116+
df = spark.createDataFrame(data=data, schema=schema)
117+
118+
write_iceberg_from_df(spark, df, TABLE_NAME, mode="overwrite", format_version="2")
119+
120+
default_upload_directory(
121+
started_cluster_iceberg_with_spark,
122+
storage_type,
123+
f"/iceberg_data/default/{TABLE_NAME}/",
124+
f"/iceberg_data/default/{TABLE_NAME}/",
125+
)
126+
127+
# Test via table function
128+
table_function_expr = get_creation_expression(
129+
storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True
130+
)
131+
132+
# Verify nested struct with dotted field via table function
133+
result = instance.query(
134+
f"SELECT my_struct.normal_field, `my_struct.weird.field` FROM {table_function_expr} ORDER BY id"
135+
).strip()
136+
expected = "100\tnested_dot_value1\n200\tnested_dot_value2\n300\tnested_dot_value3"
137+
assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"
138+
139+
# Test via table engine
140+
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark)
141+
142+
result = instance.query(
143+
f"SELECT my_struct.normal_field, `my_struct.weird.field` FROM {TABLE_NAME} ORDER BY id"
144+
).strip()
145+
expected = "100\tnested_dot_value1\n200\tnested_dot_value2\n300\tnested_dot_value3"
146+
assert result == expected, f"Expected:\n{expected}\nGot\n{result}"
147+
148+
149+
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
150+
def test_deeply_nested_struct_with_dotted_names(started_cluster_iceberg_with_spark, storage_type):
151+
"""
152+
Test deeply nested structs where EVERY level has dots in the name.
153+
Structure: my.struct -> some_dot.separated_parent -> weird.field
154+
Full path: my.struct.some_dot.separated_parent.weird.field
155+
156+
This verifies that prefix stripping works correctly at all nesting depths.
157+
"""
158+
instance = started_cluster_iceberg_with_spark.instances["node1"]
159+
spark = started_cluster_iceberg_with_spark.spark_session
160+
TABLE_NAME = "test_deeply_nested_struct_with_dotted_names_" + storage_type + "_" + get_uuid_str()
161+
162+
# Create DataFrame with deeply nested struct containing dotted names
163+
data = [
164+
(1, (("deep_value1",),)),
165+
(2, (("deep_value2",),)),
166+
(3, (("deep_value3",),)),
167+
]
168+
schema = StructType(
169+
[
170+
StructField("id", IntegerType()),
171+
StructField(
172+
"my.struct",
173+
StructType(
174+
[
175+
StructField(
176+
"some_dot.separated_parent",
177+
StructType(
178+
[
179+
StructField("weird.field", StringType()),
180+
]
181+
),
182+
),
183+
]
184+
),
185+
),
186+
]
187+
)
188+
df = spark.createDataFrame(data=data, schema=schema)
189+
190+
write_iceberg_from_df(spark, df, TABLE_NAME, mode="overwrite", format_version="2")
191+
192+
default_upload_directory(
193+
started_cluster_iceberg_with_spark,
194+
storage_type,
195+
f"/iceberg_data/default/{TABLE_NAME}/",
196+
f"/iceberg_data/default/{TABLE_NAME}/",
197+
)
198+
199+
# Test via table function
200+
table_function_expr = get_creation_expression(
201+
storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True
202+
)
203+
204+
# Query the deeply nested dotted field
205+
result = instance.query(
206+
f"SELECT `my.struct.some_dot.separated_parent.weird.field` FROM {table_function_expr} ORDER BY id"
207+
).strip()
208+
expected = "deep_value1\ndeep_value2\ndeep_value3"
209+
assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"
210+
211+
# Test via table engine
212+
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark)
213+
214+
result = instance.query(
215+
f"SELECT `my.struct.some_dot.separated_parent.weird.field` FROM {TABLE_NAME} ORDER BY id"
216+
).strip()
217+
expected = "deep_value1\ndeep_value2\ndeep_value3"
218+
assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"

0 commit comments

Comments
 (0)