Skip to content

Commit 8a7eb4d

Browse files
committed
Split test
1 parent 27bf90d commit 8a7eb4d

File tree

1 file changed

+104
-96
lines changed
  • tests/integration/test_storage_s3_queue

1 file changed

+104
-96
lines changed

tests/integration/test_storage_s3_queue/test_6.py

Lines changed: 104 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ def started_cluster():
9595
@pytest.mark.parametrize("engine_name", ["S3Queue",
9696
"AzureQueue",
9797
])
98-
def test_ordered_mode_with_hive(started_cluster, engine_name, processing_threads_num, buckets, hosts):
98+
@pytest.mark.parametrize("test_stage", [1, 2, 3, 4])
99+
def test_ordered_mode_with_hive(started_cluster, engine_name, processing_threads_num, buckets, hosts, test_stage):
99100
instances = [started_cluster.instances["instance"]]
100101
if hosts == 2:
101102
instances.append(started_cluster.instances["instance2"])
@@ -152,20 +153,22 @@ def wait_for_data(node, dst_table_name, expected_count):
152153
break
153154
time.sleep(1)
154155

155-
expected_data = [
156-
'1,1,1,"2025-01-01","Amsterdam"',
157-
'1,1,3,"2025-01-01","Amsterdam"',
158-
'1,3,1,"2025-01-01","Copenhagen"',
159-
'1,3,3,"2025-01-01","Copenhagen"',
160-
'3,1,1,"2025-01-03","Amsterdam"',
161-
'3,1,3,"2025-01-03","Amsterdam"',
162-
]
163-
wait_for_data(instances[0], dst_table_name, len(expected_data))
164-
165-
data = ""
166-
for node in instances:
167-
data += node.query(f"SELECT column1, column2, column3, date, city FROM {dst_table_name} ORDER BY column1, column2, column3 FORMAT CSV")
168-
compare_data(data, expected_data, buckets)
156+
if test_stage == 1:
157+
expected_data = [
158+
'1,1,1,"2025-01-01","Amsterdam"',
159+
'1,1,3,"2025-01-01","Amsterdam"',
160+
'1,3,1,"2025-01-01","Copenhagen"',
161+
'1,3,3,"2025-01-01","Copenhagen"',
162+
'3,1,1,"2025-01-03","Amsterdam"',
163+
'3,1,3,"2025-01-03","Amsterdam"',
164+
]
165+
wait_for_data(instances[0], dst_table_name, len(expected_data))
166+
167+
data = ""
168+
for node in instances:
169+
data += node.query(f"SELECT column1, column2, column3, date, city FROM {dst_table_name} ORDER BY column1, column2, column3 FORMAT CSV")
170+
compare_data(data, expected_data, buckets)
171+
return
169172

170173
# Add new files to same partitions
171174
# One in the middle and one in the end
@@ -176,59 +179,63 @@ def wait_for_data(node, dst_table_name, expected_count):
176179
put_file_content(started_cluster, engine_name, f"{files_path}/date=2025-01-03/city=Amsterdam/file2.csv", b"3,1,2\n")
177180
put_file_content(started_cluster, engine_name, f"{files_path}/date=2025-01-03/city=Amsterdam/file4.csv", b"3,1,4\n")
178181

179-
# Only new files on the end (file4.csv) should be visible
180-
expected_data = [
181-
"1,1,1",
182-
"1,1,3",
183-
"1,1,4",
184-
"1,3,1",
185-
"1,3,3",
186-
"1,3,4",
187-
"3,1,1",
188-
"3,1,3",
189-
"3,1,4",
190-
]
191-
wait_for_data(instances[0], dst_table_name, len(expected_data))
192-
193-
# With buckets we can get some files from the middle, if those files are last in bucket, but not global last.
194-
# It depends of hashes of file paths.
195-
# This sleep is for additional time for processing.
196-
if buckets > 1:
197-
time.sleep(10)
198-
199-
data = ""
200-
for node in instances:
201-
data += node.query(f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3 FORMAT CSV")
202-
compare_data(data, expected_data, buckets)
182+
if test_stage == 2:
183+
# Only new files on the end (file4.csv) should be visible
184+
expected_data = [
185+
"1,1,1",
186+
"1,1,3",
187+
"1,1,4",
188+
"1,3,1",
189+
"1,3,3",
190+
"1,3,4",
191+
"3,1,1",
192+
"3,1,3",
193+
"3,1,4",
194+
]
195+
wait_for_data(instances[0], dst_table_name, len(expected_data))
196+
197+
# With buckets we can get some files from the middle, if those files are last in bucket, but not global last.
198+
# It depends of hashes of file paths.
199+
# This sleep is for additional time for processing.
200+
if buckets > 1:
201+
time.sleep(10)
202+
203+
data = ""
204+
for node in instances:
205+
data += node.query(f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3 FORMAT CSV")
206+
compare_data(data, expected_data, buckets)
207+
return
203208

204209
# Add new city and new date
205210
# All should be visible
206211
put_file_content(started_cluster, engine_name, f"{files_path}/date=2025-01-01/city=Berlin/file2.csv", b"1,2,2\n")
207212
put_file_content(started_cluster, engine_name, f"{files_path}/date=2025-01-01/city=Berlin/file4.csv", b"1,2,4\n")
208213
put_file_content(started_cluster, engine_name, f"{files_path}/date=2025-01-02/city=Amsterdam/file2.csv", b"2,1,2\n")
209214

210-
expected_data += [
211-
"1,2,2",
212-
"1,2,4",
213-
"2,1,2",
214-
]
215-
expected_data.sort()
216-
wait_for_data(instances[0], dst_table_name, len(expected_data))
215+
if test_stage == 3:
216+
expected_data += [
217+
"1,2,2",
218+
"1,2,4",
219+
"2,1,2",
220+
]
221+
expected_data.sort()
222+
wait_for_data(instances[0], dst_table_name, len(expected_data))
217223

218-
if buckets > 1:
219-
time.sleep(10)
224+
if buckets > 1:
225+
time.sleep(10)
220226

221-
data = ""
222-
for node in instances:
223-
data += node.query(f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3 FORMAT CSV")
224-
compare_data(data, expected_data, buckets)
227+
data = ""
228+
for node in instances:
229+
data += node.query(f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3 FORMAT CSV")
230+
compare_data(data, expected_data, buckets)
225231

226-
instances[0].restart_clickhouse()
232+
instances[0].restart_clickhouse()
227233

228-
data = ""
229-
for node in instances:
230-
data += node.query(f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3 FORMAT CSV")
231-
compare_data(data, expected_data, buckets)
234+
data = ""
235+
for node in instances:
236+
data += node.query(f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3 FORMAT CSV")
237+
compare_data(data, expected_data, buckets)
238+
return
232239

233240
# Add some records
234241
# Only few should be visible
@@ -238,42 +245,43 @@ def wait_for_data(node, dst_table_name, expected_count):
238245
put_file_content(started_cluster, engine_name, f"{files_path}/date=2025-01-01/city=Berlin/file3.csv", b"1,2,3\n") # Skipped
239246
put_file_content(started_cluster, engine_name, f"{files_path}/date=2025-01-01/city=Berlin/file5.csv", b"1,2,5\n") # Added
240247

241-
expected_data += [
242-
"1,2,5",
243-
"2,1,3",
244-
]
245-
expected_data.sort()
246-
wait_for_data(instances[0], dst_table_name, len(expected_data))
247-
248-
if buckets > 1:
249-
time.sleep(10)
250-
251-
data = ""
252-
for node in instances:
253-
data += node.query(f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3 FORMAT CSV")
254-
compare_data(data, expected_data, buckets)
255-
256-
zk = started_cluster.get_kazoo_client("zoo1")
257-
processed_nodes = []
258-
if (buckets == 1):
259-
processed_nodes = zk.get_children(f"{keeper_path}/processed")
260-
else:
261-
for i in range(buckets):
262-
# Files are linked to buckets by hash of file path.
263-
# Path contains random table name, so distributing files to buckets is not predictable.
264-
# In rare case bucket can have zero processed files.
265-
if not zk.exists(f"{keeper_path}/buckets/{i}/processed"):
266-
continue
267-
bucket_nodes = zk.get_children(f"{keeper_path}/buckets/{i}/processed")
268-
for node in bucket_nodes:
269-
if node not in processed_nodes:
270-
processed_nodes.append(node)
271-
processed_nodes.sort()
272-
expected_nodes = [
273-
"date=2025-01-01_city=Amsterdam",
274-
"date=2025-01-01_city=Berlin",
275-
"date=2025-01-01_city=Copenhagen",
276-
"date=2025-01-02_city=Amsterdam",
277-
"date=2025-01-03_city=Amsterdam",
278-
]
279-
assert processed_nodes == expected_nodes
248+
if test_stage == 4:
249+
expected_data += [
250+
"1,2,5",
251+
"2,1,3",
252+
]
253+
expected_data.sort()
254+
wait_for_data(instances[0], dst_table_name, len(expected_data))
255+
256+
if buckets > 1:
257+
time.sleep(10)
258+
259+
data = ""
260+
for node in instances:
261+
data += node.query(f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3 FORMAT CSV")
262+
compare_data(data, expected_data, buckets)
263+
264+
zk = started_cluster.get_kazoo_client("zoo1")
265+
processed_nodes = []
266+
if (buckets == 1):
267+
processed_nodes = zk.get_children(f"{keeper_path}/processed")
268+
else:
269+
for i in range(buckets):
270+
# Files are linked to buckets by hash of file path.
271+
# Path contains random table name, so distributing files to buckets is not predictable.
272+
# In rare case bucket can have zero processed files.
273+
if not zk.exists(f"{keeper_path}/buckets/{i}/processed"):
274+
continue
275+
bucket_nodes = zk.get_children(f"{keeper_path}/buckets/{i}/processed")
276+
for node in bucket_nodes:
277+
if node not in processed_nodes:
278+
processed_nodes.append(node)
279+
processed_nodes.sort()
280+
expected_nodes = [
281+
"date=2025-01-01_city=Amsterdam",
282+
"date=2025-01-01_city=Berlin",
283+
"date=2025-01-01_city=Copenhagen",
284+
"date=2025-01-02_city=Amsterdam",
285+
"date=2025-01-03_city=Amsterdam",
286+
]
287+
assert processed_nodes == expected_nodes

0 commit comments

Comments
 (0)