-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathtable_functions.py
More file actions
208 lines (174 loc) · 7.76 KB
/
table_functions.py
File metadata and controls
208 lines (174 loc) · 7.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
from testflows.core import *
from testflows.asserts import error
from helpers.common import getuid
from s3.tests.export_part.steps import *
from helpers.create import *
from helpers.queries import *
from s3.requirements.export_part import *
from s3.tests.common import named_s3_credentials
from alter.stress.tests.tc_netem import *
@TestScenario
@Requirements(RQ_ClickHouse_ExportPart_TableFunction_ExplicitSchema("1.0"))
def explicit_schema(self):
"""Test exporting parts to a table function with explicit schema (structure parameter)."""
with Given("I create a populated source table"):
source_table = f"source_{getuid()}"
partitioned_merge_tree_table(
table_name=source_table,
partition_by="p",
columns=default_columns(),
stop_merges=True,
)
create_temp_bucket()
with And("I get the source table structure"):
source_columns = get_column_info(table_name=source_table)
structure_str = ", ".join(
[f"{col['name']} {col['type']}" for col in source_columns]
)
with When("I export parts to a table function with explicit structure"):
filename = f"export_{getuid()}"
export_parts_to_table_function(
source_table=source_table,
filename=filename,
structure=structure_str,
)
with And("I wait for exports to complete"):
wait_for_all_exports_to_complete(table_name=source_table)
with Then("I verify exported data matches source by reading from table function"):
table_function = f"s3(s3_credentials, url='{self.context.uri}{filename}/**.parquet', format='Parquet')"
source_data = select_all_ordered(table_name=source_table)
dest_data = select_all_ordered(table_name=table_function, identifier="*")
assert source_data == dest_data, error()
@TestScenario
@Requirements(RQ_ClickHouse_ExportPart_TableFunction_SchemaInheritance("1.0"))
def schema_inheritance(self):
"""Test exporting parts to a table function with schema inheritance (no structure parameter)."""
with Given("I create a populated source table"):
source_table = f"source_{getuid()}"
partitioned_merge_tree_table(
table_name=source_table,
partition_by="p",
columns=default_columns(),
stop_merges=True,
)
create_temp_bucket()
with When(
"I export parts to a table function without explicit structure (schema inheritance)"
):
filename = f"export_{getuid()}"
export_parts_to_table_function(
source_table=source_table,
filename=filename,
)
with And("I wait for exports to complete"):
wait_for_all_exports_to_complete(table_name=source_table)
with Then("I verify exported data matches source by reading from table function"):
table_function = f"s3(s3_credentials, url='{self.context.uri}{filename}/**.parquet', format='Parquet')"
source_data = select_all_ordered(table_name=source_table)
dest_data = select_all_ordered(table_name=table_function, identifier="*")
assert source_data == dest_data, error()
with And("I verify the table function has the correct column names"):
source_column_names = [
col["name"] for col in get_column_info(table_name=source_table)
]
dest_result = self.context.node.query(
f"DESCRIBE TABLE {table_function} FORMAT TabSeparated",
exitcode=0,
steps=True,
)
dest_column_names = [
line.split("\t")[0] for line in dest_result.output.strip().splitlines()
]
assert source_column_names == dest_column_names, error(
"Column names should match"
)
@TestScenario
@Requirements(RQ_ClickHouse_ExportPart_Restrictions_SimultaneousExport("1.0"))
def same_part_simultaneous_export_error(self):
"""Test that the same part cannot be exported simultaneously to different locations."""
with Given("I create a populated source table"):
source_table = f"source_{getuid()}"
partitioned_merge_tree_table(
table_name=source_table,
partition_by="p",
columns=default_columns(),
stop_merges=True,
)
create_temp_bucket()
with And("I get a single part to export"):
parts = get_parts(table_name=source_table)
assert len(parts) > 0, error("No parts found in source table")
part_to_export = parts[0]
with And("I slow the network to make export take longer"):
network_packet_rate_limit(node=self.context.node, rate_mbit=0.05)
with When(
"I try to export the same part to two different table functions simultaneously"
):
filename1 = f"export1_{getuid()}"
filename2 = f"export2_{getuid()}"
result1 = export_parts_to_table_function(
source_table=source_table,
filename=filename1,
parts=[part_to_export],
exitcode=0,
)
result2 = export_parts_to_table_function(
source_table=source_table,
filename=filename2,
parts=[part_to_export],
exitcode=1,
)
with Then("I verify the first export succeeded"):
assert result1[0].exitcode == 0, error("First export should have succeeded")
wait_for_all_exports_to_complete(table_name=source_table)
table_function1 = f"s3(s3_credentials, url='{self.context.uri}{filename1}/**.parquet', format='Parquet')"
count1 = get_row_count(node=self.context.node, table_name=table_function1)
assert count1 > 0, error("First export should have data")
with And("I verify the second export failed"):
assert result2[0].exitcode != 0, error("Second export should have failed")
@TestScenario
@Requirements(RQ_ClickHouse_ExportPart_TableFunction_Destination("1.0"))
def multiple_parts(self):
"""Test exporting multiple parts to a table function."""
with Given("I create a source table with multiple parts"):
source_table = f"source_{getuid()}"
partitioned_merge_tree_table(
table_name=source_table,
partition_by="p",
columns=default_columns(),
stop_merges=True,
number_of_partitions=3,
number_of_parts=2,
)
create_temp_bucket()
with When("I export all parts to a table function"):
filename = f"export_{getuid()}"
export_parts_to_table_function(
source_table=source_table,
filename=filename,
)
with And("I wait for exports to complete"):
wait_for_all_exports_to_complete(table_name=source_table)
with Then("I verify exported data matches source"):
table_function = f"s3(s3_credentials, url='{self.context.uri}{filename}/**.parquet', format='Parquet')"
source_data = select_all_ordered(table_name=source_table)
dest_data = select_all_ordered(table_name=table_function, identifier="*")
assert source_data == dest_data, error()
with And("I verify row counts match"):
source_count = get_row_count(node=self.context.node, table_name=source_table)
dest_count = get_row_count(node=self.context.node, table_name=table_function)
assert source_count == dest_count, error()
@TestFeature
@Name("table functions")
def feature(self):
"""Check export part functionality with table functions as destinations."""
with Given("I set up s3_credentials named collection"):
named_s3_credentials(
access_key_id=self.context.access_key_id,
secret_access_key=self.context.secret_access_key,
restart=False,
)
Scenario(run=explicit_schema)
Scenario(run=schema_inheritance)
Scenario(run=same_part_simultaneous_export_error)
Scenario(run=multiple_parts)