-
Notifications
You must be signed in to change notification settings - Fork 8.3k
test_scheduler_cpu_preemptive is flaky #87655
Copy link
Copy link
Closed
Labels
flaky testflaky test found by CIflaky test found by CI
Description
____________________________ test_independent_pools ____________________________
[gw1] linux -- Python 3.10.12 /usr/bin/python3
def test_independent_pools():
node.query(
f"""
create resource cpu (master thread, worker thread);
create workload all;
create workload production in all settings max_concurrent_threads=15;
create workload development in all settings max_concurrent_threads=10;
create workload admin in all settings max_concurrent_threads=5;
"""
)
def query_thread(workload):
node.query(
f"select count(*) from numbers_mt(10000000000) settings workload='{workload}', max_threads=100",
query_id=f"test_{workload}",
)
threads = [
threading.Thread(target=query_thread, args=('production',)),
threading.Thread(target=query_thread, args=('development',)),
threading.Thread(target=query_thread, args=('admin',)),
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
def assert_query(node, query_id, slots):
node.query("SYSTEM FLUSH LOGS")
# Note that we cannot guarantee that all slots that should be (a) granted, (b) acquired and (c) passed to a thread, will actually undergo even the first stage before query finishes
# So any attempt to make a stricter checks here lead to flakiness due to described race condition. Workaround would require failpoint.
# We assume threads will never be preempted and downscaled and upscaled again, so we can check ConcurrencyControlSlotsAcquired against limit
assert_profile_event(
node,
query_id,
"ConcurrencyControlSlotsAcquired",
lambda x: x <= slots,
)
# Short preemptions may happen due to lags in the scheduler thread, but dowscales should not
assert_profile_event(
node,
query_id,
"ConcurrencyControlDownscales",
lambda x: x == 0,
)
# NOTE: checking thread_ids length is pointless, because query could downscale and then upscale again, gaining more threads than slots
assert_query(node, 'test_production', 15)
> assert_query(node, 'test_development', 10)
test_scheduler_cpu_preemptive/test.py:188:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
test_scheduler_cpu_preemptive/test.py:179: in assert_query
assert_profile_event(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
node = <helpers.cluster.ClickHouseInstance object at 0xffdc7c38ef20>
query_id = 'test_development', profile_event = 'ConcurrencyControlDownscales'
check = <function test_independent_pools.<locals>.assert_query.<locals>.<lambda> at 0xffdc783d5cf0>
def assert_profile_event(node, query_id, profile_event, check):
> assert check(
int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where current_database = currentDatabase() and query_id = '{query_id}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
)
E assert False
E + where False = <function test_independent_pools.<locals>.assert_query.<locals>.<lambda> at 0xffdc783d5cf0>(2)
E + where 2 = int('2\n')
E + where '2\n' = <bound method ClickHouseInstance.query of <helpers.cluster.ClickHouseInstance object at 0xffdc7c38ef20>>("select ProfileEvents['ConcurrencyControlDownscales'] from system.query_log where current_database = currentDatabase() and query_id = 'test_development' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1")
E + where <bound method ClickHouseInstance.query of <helpers.cluster.ClickHouseInstance object at 0xffdc7c38ef20>> = <helpers.cluster.ClickHouseInstance object at 0xffdc7c38ef20>.query
CC @serxa
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
flaky testflaky test found by CIflaky test found by CI