@@ -95,7 +95,8 @@ def started_cluster():
9595@pytest .mark .parametrize ("engine_name" , ["S3Queue" ,
9696 "AzureQueue" ,
9797 ])
98- def test_ordered_mode_with_hive (started_cluster , engine_name , processing_threads_num , buckets , hosts ):
98+ @pytest .mark .parametrize ("test_stage" , [1 , 2 , 3 , 4 ])
99+ def test_ordered_mode_with_hive (started_cluster , engine_name , processing_threads_num , buckets , hosts , test_stage ):
99100 instances = [started_cluster .instances ["instance" ]]
100101 if hosts == 2 :
101102 instances .append (started_cluster .instances ["instance2" ])
@@ -152,20 +153,22 @@ def wait_for_data(node, dst_table_name, expected_count):
152153 break
153154 time .sleep (1 )
154155
155- expected_data = [
156- '1,1,1,"2025-01-01","Amsterdam"' ,
157- '1,1,3,"2025-01-01","Amsterdam"' ,
158- '1,3,1,"2025-01-01","Copenhagen"' ,
159- '1,3,3,"2025-01-01","Copenhagen"' ,
160- '3,1,1,"2025-01-03","Amsterdam"' ,
161- '3,1,3,"2025-01-03","Amsterdam"' ,
162- ]
163- wait_for_data (instances [0 ], dst_table_name , len (expected_data ))
164-
165- data = ""
166- for node in instances :
167- data += node .query (f"SELECT column1, column2, column3, date, city FROM { dst_table_name } ORDER BY column1, column2, column3 FORMAT CSV" )
168- compare_data (data , expected_data , buckets )
156+ if test_stage == 1 :
157+ expected_data = [
158+ '1,1,1,"2025-01-01","Amsterdam"' ,
159+ '1,1,3,"2025-01-01","Amsterdam"' ,
160+ '1,3,1,"2025-01-01","Copenhagen"' ,
161+ '1,3,3,"2025-01-01","Copenhagen"' ,
162+ '3,1,1,"2025-01-03","Amsterdam"' ,
163+ '3,1,3,"2025-01-03","Amsterdam"' ,
164+ ]
165+ wait_for_data (instances [0 ], dst_table_name , len (expected_data ))
166+
167+ data = ""
168+ for node in instances :
169+ data += node .query (f"SELECT column1, column2, column3, date, city FROM { dst_table_name } ORDER BY column1, column2, column3 FORMAT CSV" )
170+ compare_data (data , expected_data , buckets )
171+ return
169172
170173 # Add new files to same partitions
171174 # One in the middle and one in the end
@@ -176,59 +179,63 @@ def wait_for_data(node, dst_table_name, expected_count):
176179 put_file_content (started_cluster , engine_name , f"{ files_path } /date=2025-01-03/city=Amsterdam/file2.csv" , b"3,1,2\n " )
177180 put_file_content (started_cluster , engine_name , f"{ files_path } /date=2025-01-03/city=Amsterdam/file4.csv" , b"3,1,4\n " )
178181
179- # Only new files on the end (file4.csv) should be visible
180- expected_data = [
181- "1,1,1" ,
182- "1,1,3" ,
183- "1,1,4" ,
184- "1,3,1" ,
185- "1,3,3" ,
186- "1,3,4" ,
187- "3,1,1" ,
188- "3,1,3" ,
189- "3,1,4" ,
190- ]
191- wait_for_data (instances [0 ], dst_table_name , len (expected_data ))
192-
193- # With buckets we can get some files from the middle, if those files are last in bucket, but not global last.
194- # It depends of hashes of file paths.
195- # This sleep is for additional time for processing.
196- if buckets > 1 :
197- time .sleep (10 )
198-
199- data = ""
200- for node in instances :
201- data += node .query (f"SELECT column1, column2, column3 FROM { dst_table_name } ORDER BY column1, column2, column3 FORMAT CSV" )
202- compare_data (data , expected_data , buckets )
182+ if test_stage == 2 :
183+ # Only new files on the end (file4.csv) should be visible
184+ expected_data = [
185+ "1,1,1" ,
186+ "1,1,3" ,
187+ "1,1,4" ,
188+ "1,3,1" ,
189+ "1,3,3" ,
190+ "1,3,4" ,
191+ "3,1,1" ,
192+ "3,1,3" ,
193+ "3,1,4" ,
194+ ]
195+ wait_for_data (instances [0 ], dst_table_name , len (expected_data ))
196+
197+ # With buckets we can get some files from the middle, if those files are last in bucket, but not global last.
198+ # It depends of hashes of file paths.
199+ # This sleep is for additional time for processing.
200+ if buckets > 1 :
201+ time .sleep (10 )
202+
203+ data = ""
204+ for node in instances :
205+ data += node .query (f"SELECT column1, column2, column3 FROM { dst_table_name } ORDER BY column1, column2, column3 FORMAT CSV" )
206+ compare_data (data , expected_data , buckets )
207+ return
203208
204209 # Add new city and new date
205210 # All should be visible
206211 put_file_content (started_cluster , engine_name , f"{ files_path } /date=2025-01-01/city=Berlin/file2.csv" , b"1,2,2\n " )
207212 put_file_content (started_cluster , engine_name , f"{ files_path } /date=2025-01-01/city=Berlin/file4.csv" , b"1,2,4\n " )
208213 put_file_content (started_cluster , engine_name , f"{ files_path } /date=2025-01-02/city=Amsterdam/file2.csv" , b"2,1,2\n " )
209214
210- expected_data += [
211- "1,2,2" ,
212- "1,2,4" ,
213- "2,1,2" ,
214- ]
215- expected_data .sort ()
216- wait_for_data (instances [0 ], dst_table_name , len (expected_data ))
215+ if test_stage == 3 :
216+ expected_data += [
217+ "1,2,2" ,
218+ "1,2,4" ,
219+ "2,1,2" ,
220+ ]
221+ expected_data .sort ()
222+ wait_for_data (instances [0 ], dst_table_name , len (expected_data ))
217223
218- if buckets > 1 :
219- time .sleep (10 )
224+ if buckets > 1 :
225+ time .sleep (10 )
220226
221- data = ""
222- for node in instances :
223- data += node .query (f"SELECT column1, column2, column3 FROM { dst_table_name } ORDER BY column1, column2, column3 FORMAT CSV" )
224- compare_data (data , expected_data , buckets )
227+ data = ""
228+ for node in instances :
229+ data += node .query (f"SELECT column1, column2, column3 FROM { dst_table_name } ORDER BY column1, column2, column3 FORMAT CSV" )
230+ compare_data (data , expected_data , buckets )
225231
226- instances [0 ].restart_clickhouse ()
232+ instances [0 ].restart_clickhouse ()
227233
228- data = ""
229- for node in instances :
230- data += node .query (f"SELECT column1, column2, column3 FROM { dst_table_name } ORDER BY column1, column2, column3 FORMAT CSV" )
231- compare_data (data , expected_data , buckets )
234+ data = ""
235+ for node in instances :
236+ data += node .query (f"SELECT column1, column2, column3 FROM { dst_table_name } ORDER BY column1, column2, column3 FORMAT CSV" )
237+ compare_data (data , expected_data , buckets )
238+ return
232239
233240 # Add some records
234241 # Only few should be visible
@@ -238,42 +245,43 @@ def wait_for_data(node, dst_table_name, expected_count):
238245 put_file_content (started_cluster , engine_name , f"{ files_path } /date=2025-01-01/city=Berlin/file3.csv" , b"1,2,3\n " ) # Skipped
239246 put_file_content (started_cluster , engine_name , f"{ files_path } /date=2025-01-01/city=Berlin/file5.csv" , b"1,2,5\n " ) # Added
240247
241- expected_data += [
242- "1,2,5" ,
243- "2,1,3" ,
244- ]
245- expected_data .sort ()
246- wait_for_data (instances [0 ], dst_table_name , len (expected_data ))
247-
248- if buckets > 1 :
249- time .sleep (10 )
250-
251- data = ""
252- for node in instances :
253- data += node .query (f"SELECT column1, column2, column3 FROM { dst_table_name } ORDER BY column1, column2, column3 FORMAT CSV" )
254- compare_data (data , expected_data , buckets )
255-
256- zk = started_cluster .get_kazoo_client ("zoo1" )
257- processed_nodes = []
258- if (buckets == 1 ):
259- processed_nodes = zk .get_children (f"{ keeper_path } /processed" )
260- else :
261- for i in range (buckets ):
262- # Files are linked to buckets by hash of file path.
263- # Path contains random table name, so distributing files to buckets is not predictable.
264- # In rare case bucket can have zero processed files.
265- if not zk .exists (f"{ keeper_path } /buckets/{ i } /processed" ):
266- continue
267- bucket_nodes = zk .get_children (f"{ keeper_path } /buckets/{ i } /processed" )
268- for node in bucket_nodes :
269- if node not in processed_nodes :
270- processed_nodes .append (node )
271- processed_nodes .sort ()
272- expected_nodes = [
273- "date=2025-01-01_city=Amsterdam" ,
274- "date=2025-01-01_city=Berlin" ,
275- "date=2025-01-01_city=Copenhagen" ,
276- "date=2025-01-02_city=Amsterdam" ,
277- "date=2025-01-03_city=Amsterdam" ,
278- ]
279- assert processed_nodes == expected_nodes
248+ if test_stage == 4 :
249+ expected_data += [
250+ "1,2,5" ,
251+ "2,1,3" ,
252+ ]
253+ expected_data .sort ()
254+ wait_for_data (instances [0 ], dst_table_name , len (expected_data ))
255+
256+ if buckets > 1 :
257+ time .sleep (10 )
258+
259+ data = ""
260+ for node in instances :
261+ data += node .query (f"SELECT column1, column2, column3 FROM { dst_table_name } ORDER BY column1, column2, column3 FORMAT CSV" )
262+ compare_data (data , expected_data , buckets )
263+
264+ zk = started_cluster .get_kazoo_client ("zoo1" )
265+ processed_nodes = []
266+ if (buckets == 1 ):
267+ processed_nodes = zk .get_children (f"{ keeper_path } /processed" )
268+ else :
269+ for i in range (buckets ):
270+ # Files are linked to buckets by hash of file path.
271+ # Path contains random table name, so distributing files to buckets is not predictable.
272+ # In rare case bucket can have zero processed files.
273+ if not zk .exists (f"{ keeper_path } /buckets/{ i } /processed" ):
274+ continue
275+ bucket_nodes = zk .get_children (f"{ keeper_path } /buckets/{ i } /processed" )
276+ for node in bucket_nodes :
277+ if node not in processed_nodes :
278+ processed_nodes .append (node )
279+ processed_nodes .sort ()
280+ expected_nodes = [
281+ "date=2025-01-01_city=Amsterdam" ,
282+ "date=2025-01-01_city=Berlin" ,
283+ "date=2025-01-01_city=Copenhagen" ,
284+ "date=2025-01-02_city=Amsterdam" ,
285+ "date=2025-01-03_city=Amsterdam" ,
286+ ]
287+ assert processed_nodes == expected_nodes
0 commit comments