Skip to content

ClientSession close hangs when you pass connector and set connector_owner to True #3181

@KenanBek

Description

@KenanBek

Long story short

When you initialize ClientSession with connector and connector_owner=True and expect that connector will be closed when ClientSession closed sometimes it hangs. Example usage:

class HttpClient:
    """A wrapper around aiohttp's ClientSession to match requests.Session API"""

    def __init__(self, loop, *, timeout=None, cache=None, cache_ttl=None, **kwargs):
        self._cache = cache
        self._cache_ttl = cache_ttl
        self.cache_key = None
        self.cache_invalidate = kwargs.get('invalidate_cache', False)
        self.timeout = timeout
        self.headers = {}
        self.proxy = None
        self.loop = loop
        self.connector = TCPConnector(verify_ssl=False, loop=loop)

    async def send(self, request, timeout, **kwargs):
        request['headers'] = dict(self.headers, **request.get('headers', {}))

        if self.proxy:
            request['proxy'] = self.proxy

        async with ClientSession(loop=self.loop, connector=self.connector, connector_owner=True) as client:
            with async_timeout.timeout(timeout, loop=self.loop):
                if self._cache and self._cache_ttl:
                    cache_key = self.get_cache_key(request)
                    cache_block_key = '{0}_block'.format(cache_key)

                    response = self._cache.get(cache_key)

                    if self.cache_invalidate:
                        self._cache.delete(cache_block_key)

                    if self._cache.add(cache_block_key, True, self._cache_ttl) or not response:
                        response = await client.request(**request)
                        self._cache.set(cache_key, response, self._cache_ttl)
                else:
                    response = await client.request(**request)

                return RequestsResponseAdaptor(self.loop, response)

    def prepare_request(self, request):
        """No-op to match requests.Session API"""
        return request

    def _get_request_key(self, request):
        keys = [
            str(request.get(p)) for p in ('url', 'method', 'headers', 'cookies', 'data')
            if request.get(p) is not None
        ]
        return '-'.join(keys)

    def get_cache_key(self, request):
        key = self.cache_key or self._get_request_key(request)
        return hash_cache_key(key)

Expected behaviour

When ClientSession closing it should not hang.

Actual behaviour

From time to time close hangs. Our workaround is set connector_owner=False then it will not hang.

Steps to reproduce

It is hardest part. We catch it in tests with pytest-repeat. Because some time it is not happening:

...
# part of service
class Service(BaseHTTPService):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.timeout = 0.2
        # Disable proxy for requests, since the sync tests fail on Jenkins when
        # attempting to connect to the local test HTTP server when proxy is enabled.
        self.proxy = settings.PROXY_FOR_TESTING

    @decorators.service
    def service_call(self, method, resource, ttl=None, cache_backend=None, *args, **kwargs):
        client = self.get_client(ttl=ttl, cache_backend=cache_backend)
        return self.call(client, method, resource=resource, *args, **kwargs)
...

@pytest.fixture
def http_service(request, monkeypatch, loop, httpserver):
    """Start test HTTP server and yield instance of bound test HTTP service"""
    kwargs = getattr(request, 'param', {})
    content = kwargs.get('content', RESPONSE_CONTENT)
    status_code = kwargs.get('status_code', 200)
    headers = kwargs.get('headers', {})
    response_delay = kwargs.get('response_delay', 0)
    service_kwargs = {}
    async_ = bool('Async' in request.cls.__name__)

    if async_:
        service_kwargs['loop'] = loop

    httpserver.serve_content(content)
    httpserver.code = status_code
    httpserver.headers = headers

    if response_delay:
        # pytest-localserver has no API to dynamically delay the response, so
        # we need to monkeypatch it :-/
        @property
        def content_delay(self):
            import time
            time.sleep(response_delay)
            return content

        monkeypatch.setattr('pytest_localserver.http.ContentServer.content',
                            content_delay, raising=False)

    service = Service(**service_kwargs)
    service.location = httpserver.url
    service.mock = MagicMock()

    yield service

@pytest.fixture
def patch_time_execution(request, monkeypatch):
    fqn = 'de.services.decorators.time_execution{}'.format(
        '_async' if 'Async' in request.cls.__name__ else ''
    )
    m = Mock(side_effect=_MockDecorator)
    monkeypatch.setattr(fqn, m)
    return m


@pytest.fixture
def patch_raise_for_maintenance(request, monkeypatch):
    fqn = 'de.services.decorators.raise_for_maintenance{}'.format(
        '_async' if 'Async' in request.cls.__name__ else ''
    )
    m = Mock(side_effect=_MockDecorator)
    monkeypatch.setattr(fqn, m)
    return m

class TestBaseHTTPServiceAsync:
    async def test_call_simple(self, http_service, patch_time_execution,
                               patch_raise_for_maintenance):
        resp = await http_service.service_call('GET', '/')
        text = await resp.text()
        assert text == RESPONSE_CONTENT

        assert patch_time_execution.call_count == 1
        assert patch_raise_for_maintenance.call_count == 1

Your environment

os: macOS 10.13.6
python: 3.6
aiohttp: 3.0.9
pytest: 3.7.1
pytest-aiohttp: 0.1.3

Debug / Analyze

I spent hours to find out where exactly hang happens. Here is the flow:

1 - Our wrapper with ClientSession in context manager:

        async with ClientSession(loop=self.loop, connector=self.connector, connector_owner=False) as client:
            with async_timeout.timeout(timeout, loop=self.loop):
                if self._cache and self._cache_ttl:
                    cache_key = self.get_cache_key(request)
                    cache_block_key = '{0}_block'.format(cache_key)

                    response = self._cache.get(cache_key)

                    if self.cache_invalidate:
                        self._cache.delete(cache_block_key)

                    if self._cache.add(cache_block_key, True, self._cache_ttl) or not response:
                        response = await client.request(**request)
                        self._cache.set(cache_key, response, self._cache_ttl)
                else:
                    response = await client.request(**request)

                return RequestsResponseAdaptor(self.loop, response)

2 - Session close

    async def close(self):
        """Close underlying connector.

        Release all acquired resources.
        """
        if not self.closed:
            if self._connector_owner:
                self._connector.close()
            self._connector = None

3 - Connector close (TcpConnector:BaseConnector)

... close of tcp connector ...

    def close(self):
        """Close all ongoing DNS calls."""
        for ev in self._throttle_dns_events.values():
            ev.cancel()

        super().close()

... close of base connector ...

    def close(self):
        """Close all opened transports."""
        if self._closed:
            return

        self._closed = True

        try:
            if self._loop.is_closed():
                return noop()

            # cancel cleanup task
            if self._cleanup_handle:
                self._cleanup_handle.cancel()

            # cancel cleanup close task
            if self._cleanup_closed_handle:
                self._cleanup_closed_handle.cancel()

            for data in self._conns.values():
                for proto, t0 in data:
                    proto.close()

            for proto in self._acquired:
                proto.close()

            for transport in self._cleanup_closed_transports:
                if transport is not None:
                    transport.abort()

        finally:
            self._conns.clear()
            self._acquired.clear()
            self._waiters.clear()
            self._cleanup_handle = None
            self._cleanup_closed_transports.clear()
            self._cleanup_closed_handle = None

4 - Actual hang happens in this part:

...
            for proto in self._acquired:
                proto.close()
...

5 - Which is basically is asyncio.selector_events._SelectorSocketTransport(_SelectorTransport) and actual hangs happens here _SelectorTransport.close():

...
    def close(self):
        if self._closing:
            return
        self._closing = True
        self._loop._remove_reader(self._sock_fd)
        if not self._buffer:
            self._conn_lost += 1
            self._loop._remove_writer(self._sock_fd)
            self._loop.call_soon(self._call_connection_lost, None)
...

My conclusion is when you use connector_owner=True connector try to close connection and waits for it. And at the moment connection is not ready to be closed which leads to hangs.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions