66
77import helpers .keeper_utils as keeper_utils
88from helpers .cluster import ClickHouseCluster , is_arm
9-
9+ import re
1010import time
1111
1212import logging
@@ -81,7 +81,7 @@ def stop_clickhouse(cluster, node, cleanup_disks):
8181 )
8282
8383
84- def setup_storage (cluster , node , storage_config , cleanup_disks ):
84+ def setup_storage (cluster , node , storage_config , cleanup_disks , before_start = None ):
8585 stop_clickhouse (cluster , node , cleanup_disks )
8686 node .copy_file_to_container (
8787 os .path .join (CURRENT_TEST_DIR , "configs/enable_keeper.xml" ),
@@ -92,6 +92,10 @@ def setup_storage(cluster, node, storage_config, cleanup_disks):
9292 "<!-- DISK DEFINITION PLACEHOLDER -->" ,
9393 storage_config ,
9494 )
95+
96+ if before_start :
97+ before_start (node )
98+
9599 node .start_clickhouse ()
96100 # complete readiness checks that the sessions can be established,
97101 # but it creates sesssion for this, which will create one more record in log,
@@ -144,11 +148,21 @@ def test_logs_with_disks(started_cluster):
144148 for _ in range (30 ):
145149 node_zk .create ("/test/somenode" , b"somedata" , sequence = True )
146150
147- node_logs .wait_for_log_line ("Removed changelog changelog_25_27.bin because of compaction" )
148-
149151 stop_zk (node_zk )
150152
151- previous_log_files = get_local_logs (node_logs )
153+ previous_log_files = []
154+
155+ def collect_local_logs (node ):
156+ nonlocal previous_log_files
157+ previous_log_files = get_local_logs (node )
158+ # assert all filenames match the expected pattern
159+ for name in previous_log_files :
160+ assert re .match (
161+ r"^changelog_\d+_\d+\.bin$" , name
162+ ), f"Filename { name } does not match the expected pattern"
163+
164+ previous_log_files .sort (key = lambda name : int (name .split ("_" )[1 ]))
165+ logging .info (f"Previous log files: { previous_log_files } " )
152166
153167 setup_storage (
154168 started_cluster ,
@@ -157,10 +171,9 @@ def test_logs_with_disks(started_cluster):
157171 "<latest_log_storage_disk>log_local<\\ /latest_log_storage_disk>"
158172 "<snapshot_storage_disk>snapshot_local<\\ /snapshot_storage_disk>" ,
159173 cleanup_disks = False ,
174+ before_start = collect_local_logs ,
160175 )
161176
162- node_logs .wait_for_log_line ("KeeperLogStore: Continue to write into changelog_34_36.bin" )
163-
164177 def get_single_local_log_file ():
165178 local_log_files = get_local_logs (node_logs )
166179 start_time = time .time ()
@@ -188,13 +201,16 @@ def get_single_local_log_file():
188201
189202 stop_zk (node_zk )
190203
191- local_log_files = get_single_local_log_file ()
192- log_files = list_s3_objects (started_cluster , "logs/" )
204+ def collect_all_logs (node ):
205+ nonlocal previous_log_files
206+ local_log_files = get_single_local_log_file ()
207+ log_files = list_s3_objects (started_cluster , "logs/" )
193208
194- log_files .extend (local_log_files )
195- assert set (log_files ) != previous_log_files
209+ log_files .extend (local_log_files )
210+ assert set (log_files ) != previous_log_files
196211
197- previous_log_files = log_files
212+ previous_log_files = log_files
213+ logging .info (f"Previous log files: { previous_log_files } " )
198214
199215 setup_storage (
200216 started_cluster ,
@@ -203,8 +219,19 @@ def get_single_local_log_file():
203219 "<log_storage_disk>log_local<\\ /log_storage_disk>"
204220 "<snapshot_storage_disk>snapshot_local<\\ /snapshot_storage_disk>" ,
205221 cleanup_disks = False ,
222+ before_start = collect_all_logs ,
206223 )
207224
225+ s3_files = list_s3_objects (started_cluster , "logs/" )
226+ start_time = time .time ()
227+ while len (s3_files ) != 0 :
228+ logging .debug (f"S3 log files: { s3_files } " )
229+ assert (
230+ time .time () - start_time < 60
231+ ), "s3_files size is not equal to 0 after 60s"
232+ time .sleep (1 )
233+ s3_files = list_s3_objects (started_cluster , "logs/" )
234+
208235 local_log_files = get_local_logs (node_logs )
209236 assert set (local_log_files ) == set (previous_log_files )
210237
0 commit comments