Skip to content

Commit 27cd800

Browse files
committed
Fix flaky test_keeper_disks
1 parent 29e817e commit 27cd800

File tree

2 files changed

+40
-13
lines changed

2 files changed

+40
-13
lines changed

tests/integration/test_keeper_disks/configs/enable_keeper.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
<operation_timeout_ms>5000</operation_timeout_ms>
3838
<session_timeout_ms>10000</session_timeout_ms>
3939
<raft_logs_level>trace</raft_logs_level>
40-
<snapshot_distance>10</snapshot_distance>
40+
<snapshot_distance>100000</snapshot_distance>
4141
<stale_log_gap>10</stale_log_gap>
4242
<reserved_log_items>1</reserved_log_items>
4343
<rotate_log_storage_interval>3</rotate_log_storage_interval>

tests/integration/test_keeper_disks/test.py

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import helpers.keeper_utils as keeper_utils
88
from helpers.cluster import ClickHouseCluster, is_arm
9-
9+
import re
1010
import time
1111

1212
import logging
@@ -81,7 +81,7 @@ def stop_clickhouse(cluster, node, cleanup_disks):
8181
)
8282

8383

84-
def setup_storage(cluster, node, storage_config, cleanup_disks):
84+
def setup_storage(cluster, node, storage_config, cleanup_disks, before_start=None):
8585
stop_clickhouse(cluster, node, cleanup_disks)
8686
node.copy_file_to_container(
8787
os.path.join(CURRENT_TEST_DIR, "configs/enable_keeper.xml"),
@@ -92,6 +92,10 @@ def setup_storage(cluster, node, storage_config, cleanup_disks):
9292
"<!-- DISK DEFINITION PLACEHOLDER -->",
9393
storage_config,
9494
)
95+
96+
if before_start:
97+
before_start(node)
98+
9599
node.start_clickhouse()
96100
# complete readiness checks that the sessions can be established,
97101
# but it creates sesssion for this, which will create one more record in log,
@@ -144,11 +148,21 @@ def test_logs_with_disks(started_cluster):
144148
for _ in range(30):
145149
node_zk.create("/test/somenode", b"somedata", sequence=True)
146150

147-
node_logs.wait_for_log_line("Removed changelog changelog_25_27.bin because of compaction")
148-
149151
stop_zk(node_zk)
150152

151-
previous_log_files = get_local_logs(node_logs)
153+
previous_log_files = []
154+
155+
def collect_local_logs(node):
156+
nonlocal previous_log_files
157+
previous_log_files = get_local_logs(node)
158+
# assert all filenames match the expected pattern
159+
for name in previous_log_files:
160+
assert re.match(
161+
r"^changelog_\d+_\d+\.bin$", name
162+
), f"Filename {name} does not match the expected pattern"
163+
164+
previous_log_files.sort(key=lambda name: int(name.split("_")[1]))
165+
logging.info(f"Previous log files: {previous_log_files}")
152166

153167
setup_storage(
154168
started_cluster,
@@ -157,10 +171,9 @@ def test_logs_with_disks(started_cluster):
157171
"<latest_log_storage_disk>log_local<\\/latest_log_storage_disk>"
158172
"<snapshot_storage_disk>snapshot_local<\\/snapshot_storage_disk>",
159173
cleanup_disks=False,
174+
before_start=collect_local_logs,
160175
)
161176

162-
node_logs.wait_for_log_line("KeeperLogStore: Continue to write into changelog_34_36.bin")
163-
164177
def get_single_local_log_file():
165178
local_log_files = get_local_logs(node_logs)
166179
start_time = time.time()
@@ -188,13 +201,16 @@ def get_single_local_log_file():
188201

189202
stop_zk(node_zk)
190203

191-
local_log_files = get_single_local_log_file()
192-
log_files = list_s3_objects(started_cluster, "logs/")
204+
def collect_all_logs(node):
205+
nonlocal previous_log_files
206+
local_log_files = get_single_local_log_file()
207+
log_files = list_s3_objects(started_cluster, "logs/")
193208

194-
log_files.extend(local_log_files)
195-
assert set(log_files) != previous_log_files
209+
log_files.extend(local_log_files)
210+
assert set(log_files) != previous_log_files
196211

197-
previous_log_files = log_files
212+
previous_log_files = log_files
213+
logging.info(f"Previous log files: {previous_log_files}")
198214

199215
setup_storage(
200216
started_cluster,
@@ -203,8 +219,19 @@ def get_single_local_log_file():
203219
"<log_storage_disk>log_local<\\/log_storage_disk>"
204220
"<snapshot_storage_disk>snapshot_local<\\/snapshot_storage_disk>",
205221
cleanup_disks=False,
222+
before_start=collect_all_logs,
206223
)
207224

225+
s3_files = list_s3_objects(started_cluster, "logs/")
226+
start_time = time.time()
227+
while len(s3_files) != 0:
228+
logging.debug(f"S3 log files: {s3_files}")
229+
assert (
230+
time.time() - start_time < 60
231+
), "s3_files size is not equal to 0 after 60s"
232+
time.sleep(1)
233+
s3_files = list_s3_objects(started_cluster, "logs/")
234+
208235
local_log_files = get_local_logs(node_logs)
209236
assert set(local_log_files) == set(previous_log_files)
210237

0 commit comments

Comments
 (0)