self = <ClientResponse(http://127.199.150.60:10000/task_manager/wait_task/8474b7c0-9e1c-11f0-b7f6-d40f83acbf6e) [None None]>
None
connection = Connection<ConnectionKey(host='127.199.150.60', port=10000, is_ssl=False, ssl=True, proxy=None, proxy_auth=None, proxy_headers_hash=None)>
async def start(self, connection: "Connection") -> "ClientResponse":
"""Start response processing."""
self._closed = False
self._protocol = connection.protocol
self._connection = connection
with self._timer:
while True:
# read response
try:
protocol = self._protocol
> message, payload = await protocol.read() # type: ignore[union-attr]
/usr/lib64/python3.13/site-packages/aiohttp/client_reqrep.py:1059:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <aiohttp.client_proto.ResponseHandler object at 0x7f963c5ce090>
async def read(self) -> _T:
if not self._buffer and not self._eof:
assert not self._waiter
self._waiter = self._loop.create_future()
try:
> await self._waiter
E asyncio.exceptions.CancelledError
/usr/lib64/python3.13/site-packages/aiohttp/streams.py:644: CancelledError
The above exception was the direct cause of the following exception:
manager = <test.pylib.manager_client.ManagerClient object at 0x7f963f2cb230>
volumes_factory = <function volumes_factory.<locals>.wrapper at 0x7f963c7c9440>
@pytest.mark.asyncio
async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable) -> None:
cfg = {
'tablet_load_stats_refresh_interval_in_seconds': 1,
}
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=global_cmdline, config=cfg) as servers:
cql, _ = await manager.get_ready_cql(servers)
workdir = await manager.server_get_workdir(servers[0].server_id)
log = await manager.server_open_log(servers[0].server_id)
host = await manager.get_host_id(servers[0].server_id)
mark = await log.mark()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 4}") as ks:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
table = cf.split('.')[-1]
for _ in range(2):
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 100)])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await manager.server_stop_gracefully(servers[0].server_id)
await manager.server_wipe_sstables(servers[0].server_id, ks, table)
await manager.server_start(servers[0].server_id)
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id)
coord_mark = await coord_log.mark()
logger.info("Schedule tablet repair")
response = await manager.api.tablet_repair(servers[0].ip_addr, ks, table, "all", await_completion=False)
task_id = response['tablet_task_id']
for _ in range(await get_tablet_count(manager, servers[1], ks, table)):
coord_mark, matches = await coord_log.wait_for("Initiating tablet repair host=(?P<host>.*) tablet=(?P<tablet>.*)", from_mark=coord_mark)
dst_host, tablet = matches[0][1].group("host"), matches[0][1].group("tablet")
if host == dst_host:
# Tablet repair is triggered on the node with disk utilization above the critical level.
# A local tablet repair task is refused to be created and the tablet repair fails.
error = "Repair service is disabled. No repairs will be started until it's re-enabled"
else:
# Tablet repair is triggered on the node with disk utilization below the critical level.
# A local tablet repair task is created and the row-level repair is executed. It will try
# to send missing rows to the node with critical disk utilization that are rejected.
error = f"put_row_diff: Repair follower={host} failed in put_row_diff handler"
await coord_log.wait_for(f"repair for tablet {tablet} failed: seastar::rpc::remote_verb_error.*{error}", from_mark=coord_mark)
logger.info("Restart the node")
mark = await log.mark()
await manager.server_restart(servers[0].server_id, wait_others=2)
await manager.driver_connect()
for _ in range(2):
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
logger.info("With blob file removed, wait for the tablet repair to succeed")
> await manager.api.wait_task(servers[0].ip_addr, task_id)
test/storage/test_out_of_space_prevention.py:275:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
test/pylib/rest_client.py:510: in wait_task
return await self.client.get_json(f'/task_manager/wait_task/{task_id}', host=node_ip)
test/pylib/rest_client.py:99: in get_json
ret = await self._fetch("GET", resource_uri, response_type = "json", host = host,
test/pylib/rest_client.py:70: in _fetch
async with request(method, uri,
/usr/lib64/python3.13/site-packages/aiohttp/client.py:1391: in __aenter__
self._resp = await self._coro
/usr/lib64/python3.13/site-packages/aiohttp/client.py:690: in _request
await resp.start(conn)
/usr/lib64/python3.13/site-packages/aiohttp/client_reqrep.py:1054: in start
with self._timer:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <aiohttp.helpers.TimerContext object at 0x7f963f17fc40>
exc_type = <class 'asyncio.exceptions.CancelledError'>
exc_val = CancelledError(), exc_tb = <traceback object at 0x7f963c816dc0>
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> Optional[bool]:
enter_task: Optional[asyncio.Task[Any]] = None
if self._tasks:
enter_task = self._tasks.pop()
if exc_type is asyncio.CancelledError and self._cancelled:
assert enter_task is not None
# The timeout was hit, and the task was cancelled
# so we need to uncancel the last task that entered the context manager
# since the cancellation should not leak out of the context manager
if sys.version_info >= (3, 11):
# If the task was already cancelling don't raise
# asyncio.TimeoutError and instead return None
# to allow the cancellation to propagate
if enter_task.uncancel() > self._cancelling:
return None
> raise asyncio.TimeoutError from exc_val
E TimeoutError
/usr/lib64/python3.13/site-packages/aiohttp/helpers.py:749: TimeoutError
based on master: 8c99f80
https://jenkins.scylladb.com/job/scylla-master/job/scylla-ci/20613/testReport/junit/storage/test_out_of_space_prevention/release___test_py___release___test_tablet_repair_release_2/