Skip to content

storage.test_out_of_space_prevention.test_tablet_repair is flaky #26346

@tgrabiec

Description

@tgrabiec

based on master: 8c99f80

https://jenkins.scylladb.com/job/scylla-master/job/scylla-ci/20613/testReport/junit/storage/test_out_of_space_prevention/release___test_py___release___test_tablet_repair_release_2/

self = <ClientResponse(http://127.199.150.60:10000/task_manager/wait_task/8474b7c0-9e1c-11f0-b7f6-d40f83acbf6e) [None None]>
None

connection = Connection<ConnectionKey(host='127.199.150.60', port=10000, is_ssl=False, ssl=True, proxy=None, proxy_auth=None, proxy_headers_hash=None)>

    async def start(self, connection: "Connection") -> "ClientResponse":
        """Start response processing."""
        self._closed = False
        self._protocol = connection.protocol
        self._connection = connection
    
        with self._timer:
            while True:
                # read response
                try:
                    protocol = self._protocol
>                   message, payload = await protocol.read()  # type: ignore[union-attr]

/usr/lib64/python3.13/site-packages/aiohttp/client_reqrep.py:1059: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <aiohttp.client_proto.ResponseHandler object at 0x7f963c5ce090>

    async def read(self) -> _T:
        if not self._buffer and not self._eof:
            assert not self._waiter
            self._waiter = self._loop.create_future()
            try:
>               await self._waiter
E               asyncio.exceptions.CancelledError

/usr/lib64/python3.13/site-packages/aiohttp/streams.py:644: CancelledError

The above exception was the direct cause of the following exception:

manager = <test.pylib.manager_client.ManagerClient object at 0x7f963f2cb230>
volumes_factory = <function volumes_factory.<locals>.wrapper at 0x7f963c7c9440>

    @pytest.mark.asyncio
    async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable) -> None:
        cfg = {
            'tablet_load_stats_refresh_interval_in_seconds': 1,
            }
        async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=global_cmdline, config=cfg) as servers:
            cql, _ = await manager.get_ready_cql(servers)
    
            workdir = await manager.server_get_workdir(servers[0].server_id)
            log = await manager.server_open_log(servers[0].server_id)
            host = await manager.get_host_id(servers[0].server_id)
            mark = await log.mark()
    
            async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 4}") as ks:
                async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
                    table = cf.split('.')[-1]
    
                    for _ in range(2):
                        await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 100)])
                        await manager.api.flush_keyspace(servers[0].ip_addr, ks)
                    await manager.server_stop_gracefully(servers[0].server_id)
                    await manager.server_wipe_sstables(servers[0].server_id, ks, table)
                    await manager.server_start(servers[0].server_id)
    
                    logger.info("Create a big file on the target node to reach critical disk utilization level")
                    disk_info = psutil.disk_usage(workdir)
                    with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
                        for _ in range(2):
                            mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
    
                        coord = await get_topology_coordinator(manager)
                        coord_serv = await find_server_by_host_id(manager, servers, coord)
                        coord_log = await manager.server_open_log(coord_serv.server_id)
                        coord_mark = await coord_log.mark()
    
                        logger.info("Schedule tablet repair")
                        response = await manager.api.tablet_repair(servers[0].ip_addr, ks, table, "all", await_completion=False)
                        task_id = response['tablet_task_id']
    
                        for _ in range(await get_tablet_count(manager, servers[1], ks, table)):
                            coord_mark, matches = await coord_log.wait_for("Initiating tablet repair host=(?P<host>.*) tablet=(?P<tablet>.*)", from_mark=coord_mark)
                            dst_host, tablet = matches[0][1].group("host"), matches[0][1].group("tablet")
                            if host == dst_host:
                                # Tablet repair is triggered on the node with disk utilization above the critical level.
                                # A local tablet repair task is refused to be created and the tablet repair fails.
                                error = "Repair service is disabled. No repairs will be started until it's re-enabled"
                            else:
                                # Tablet repair is triggered on the node with disk utilization below the critical level.
                                # A local tablet repair task is created and the row-level repair is executed. It will try
                                # to send missing rows to the node with critical disk utilization that are rejected.
                                error = f"put_row_diff: Repair follower={host} failed in put_row_diff handler"
    
                            await coord_log.wait_for(f"repair for tablet {tablet} failed: seastar::rpc::remote_verb_error.*{error}", from_mark=coord_mark)
    
                        logger.info("Restart the node")
                        mark = await log.mark()
                        await manager.server_restart(servers[0].server_id, wait_others=2)
                        await manager.driver_connect()
                        for _ in range(2):
                            mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
    
                    logger.info("With blob file removed, wait for the tablet repair to succeed")
>                   await manager.api.wait_task(servers[0].ip_addr, task_id)

test/storage/test_out_of_space_prevention.py:275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
test/pylib/rest_client.py:510: in wait_task
    return await self.client.get_json(f'/task_manager/wait_task/{task_id}', host=node_ip)
test/pylib/rest_client.py:99: in get_json
    ret = await self._fetch("GET", resource_uri, response_type = "json", host = host,
test/pylib/rest_client.py:70: in _fetch
    async with request(method, uri,
/usr/lib64/python3.13/site-packages/aiohttp/client.py:1391: in __aenter__
    self._resp = await self._coro
/usr/lib64/python3.13/site-packages/aiohttp/client.py:690: in _request
    await resp.start(conn)
/usr/lib64/python3.13/site-packages/aiohttp/client_reqrep.py:1054: in start
    with self._timer:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <aiohttp.helpers.TimerContext object at 0x7f963f17fc40>
exc_type = <class 'asyncio.exceptions.CancelledError'>
exc_val = CancelledError(), exc_tb = <traceback object at 0x7f963c816dc0>

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> Optional[bool]:
        enter_task: Optional[asyncio.Task[Any]] = None
        if self._tasks:
            enter_task = self._tasks.pop()
    
        if exc_type is asyncio.CancelledError and self._cancelled:
            assert enter_task is not None
            # The timeout was hit, and the task was cancelled
            # so we need to uncancel the last task that entered the context manager
            # since the cancellation should not leak out of the context manager
            if sys.version_info >= (3, 11):
                # If the task was already cancelling don't raise
                # asyncio.TimeoutError and instead return None
                # to allow the cancellation to propagate
                if enter_task.uncancel() > self._cancelling:
                    return None
>           raise asyncio.TimeoutError from exc_val
E           TimeoutError

/usr/lib64/python3.13/site-packages/aiohttp/helpers.py:749: TimeoutError

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions