self = <nodetool_additional_test.TestNodetool object at 0x7f777fe79dd0>
def test_disablebinary_and_disablegossip(self):
def run_stress(node, num_keys, mode, consistency, limited_rows_per_second=None):
# mode = read or write
logger.debug('Start stress command')
rate_string = 'threads=10' + (f' throttle={limited_rows_per_second}/s' if limited_rows_per_second else '')
result = node.stress([mode,
f'n={num_keys}',
f'cl={consistency}',
'-rate', rate_string,
'-schema', 'replication(factor=3)'],
capture_output=True)
logger.info('Stress results:\n' + format_cs_output(result))
assert_cs_success(result)
self.run_cluster(nodes=3)
node1, node2, _ = self.cluster.nodelist()
logger.info('Writing data')
number_of_keys = 100000
run_stress(node2, number_of_keys, mode="write", consistency="ALL")
# Start stress in thread
executor = ThreadPoolExecutor(max_workers=1)
read_stress_run = executor.submit(run_stress,
node=node2,
num_keys=number_of_keys,
mode="read",
consistency="QUORUM",
limited_rows_per_second=5000)
for node in self.cluster.nodelist():
node.flush()
with nodetool_context(node=node1, start_command="disablebinary", end_command="enablebinary"):
time.sleep(5)
with nodetool_context(node=node1, start_command="disablegossip", end_command="enablegossip"):
time.sleep(30)
node1.compact()
time.sleep(20)
log_position = node1.mark_log()
time.sleep(30)
assert not node1.grep_log(expr="gate closed", from_mark=log_position), \
"After executing enablebinary, the node still prints 'gate closed' messages"
status = nodetool_status(node2)
node1_status = None
for s in status["nodes"]:
if s['address'] == node1.address():
node1_status = s['status']
break
assert node1_status, "{} not found in {}".format(node1.address(), status["nodes"])
logger.info("Verifying node 1's status is UN")
assert node1_status == "UN", "Node 1's status is expected to be UN, but instead it's {}".format(node1_status)
> read_stress_run.result()
nodetool_additional_test.py:2687:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/lib64/python3.11/concurrent/futures/_base.py:451: in result
self._condition.wait(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Condition(<unlocked _thread.RLock object owner=0 count=0 at 0x7f777ecd94c0>, 0)>
timeout = None
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.
This method releases the underlying lock, and then blocks until it is
awakened by a notify() or notify_all() call for the same condition
variable in another thread, or until the optional timeout occurs. Once
awakened or timed out, it re-acquires the lock and returns.
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof).
When the underlying lock is an RLock, it is not released using its
release() method, since this may not actually unlock the lock when it
was acquired multiple times recursively. Instead, an internal interface
of the RLock class is used, which really unlocks it even when it has
been recursively acquired several times. Another internal interface is
then used to restore the recursion level when the lock is reacquired.
"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
> waiter.acquire()
E Failed: Timeout >2700.0s
/usr/lib64/python3.11/threading.py:320: Failed
Seen during the gating process - https://jenkins.scylladb.com/job/scylla-master/job/next/6874