-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Description
Long story short
When you initialize ClientSession with connector and connector_owner=True and expect that connector will be closed when ClientSession closed sometimes it hangs. Example usage:
class HttpClient:
"""A wrapper around aiohttp's ClientSession to match requests.Session API"""
def __init__(self, loop, *, timeout=None, cache=None, cache_ttl=None, **kwargs):
self._cache = cache
self._cache_ttl = cache_ttl
self.cache_key = None
self.cache_invalidate = kwargs.get('invalidate_cache', False)
self.timeout = timeout
self.headers = {}
self.proxy = None
self.loop = loop
self.connector = TCPConnector(verify_ssl=False, loop=loop)
async def send(self, request, timeout, **kwargs):
request['headers'] = dict(self.headers, **request.get('headers', {}))
if self.proxy:
request['proxy'] = self.proxy
async with ClientSession(loop=self.loop, connector=self.connector, connector_owner=True) as client:
with async_timeout.timeout(timeout, loop=self.loop):
if self._cache and self._cache_ttl:
cache_key = self.get_cache_key(request)
cache_block_key = '{0}_block'.format(cache_key)
response = self._cache.get(cache_key)
if self.cache_invalidate:
self._cache.delete(cache_block_key)
if self._cache.add(cache_block_key, True, self._cache_ttl) or not response:
response = await client.request(**request)
self._cache.set(cache_key, response, self._cache_ttl)
else:
response = await client.request(**request)
return RequestsResponseAdaptor(self.loop, response)
def prepare_request(self, request):
"""No-op to match requests.Session API"""
return request
def _get_request_key(self, request):
keys = [
str(request.get(p)) for p in ('url', 'method', 'headers', 'cookies', 'data')
if request.get(p) is not None
]
return '-'.join(keys)
def get_cache_key(self, request):
key = self.cache_key or self._get_request_key(request)
return hash_cache_key(key)
Expected behaviour
When ClientSession closing it should not hang.
Actual behaviour
From time to time close hangs. Our workaround is set connector_owner=False then it will not hang.
Steps to reproduce
It is hardest part. We catch it in tests with pytest-repeat. Because some time it is not happening:
...
# part of service
class Service(BaseHTTPService):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.timeout = 0.2
# Disable proxy for requests, since the sync tests fail on Jenkins when
# attempting to connect to the local test HTTP server when proxy is enabled.
self.proxy = settings.PROXY_FOR_TESTING
@decorators.service
def service_call(self, method, resource, ttl=None, cache_backend=None, *args, **kwargs):
client = self.get_client(ttl=ttl, cache_backend=cache_backend)
return self.call(client, method, resource=resource, *args, **kwargs)
...
@pytest.fixture
def http_service(request, monkeypatch, loop, httpserver):
"""Start test HTTP server and yield instance of bound test HTTP service"""
kwargs = getattr(request, 'param', {})
content = kwargs.get('content', RESPONSE_CONTENT)
status_code = kwargs.get('status_code', 200)
headers = kwargs.get('headers', {})
response_delay = kwargs.get('response_delay', 0)
service_kwargs = {}
async_ = bool('Async' in request.cls.__name__)
if async_:
service_kwargs['loop'] = loop
httpserver.serve_content(content)
httpserver.code = status_code
httpserver.headers = headers
if response_delay:
# pytest-localserver has no API to dynamically delay the response, so
# we need to monkeypatch it :-/
@property
def content_delay(self):
import time
time.sleep(response_delay)
return content
monkeypatch.setattr('pytest_localserver.http.ContentServer.content',
content_delay, raising=False)
service = Service(**service_kwargs)
service.location = httpserver.url
service.mock = MagicMock()
yield service
@pytest.fixture
def patch_time_execution(request, monkeypatch):
fqn = 'de.services.decorators.time_execution{}'.format(
'_async' if 'Async' in request.cls.__name__ else ''
)
m = Mock(side_effect=_MockDecorator)
monkeypatch.setattr(fqn, m)
return m
@pytest.fixture
def patch_raise_for_maintenance(request, monkeypatch):
fqn = 'de.services.decorators.raise_for_maintenance{}'.format(
'_async' if 'Async' in request.cls.__name__ else ''
)
m = Mock(side_effect=_MockDecorator)
monkeypatch.setattr(fqn, m)
return m
class TestBaseHTTPServiceAsync:
async def test_call_simple(self, http_service, patch_time_execution,
patch_raise_for_maintenance):
resp = await http_service.service_call('GET', '/')
text = await resp.text()
assert text == RESPONSE_CONTENT
assert patch_time_execution.call_count == 1
assert patch_raise_for_maintenance.call_count == 1
Your environment
os: macOS 10.13.6
python: 3.6
aiohttp: 3.0.9
pytest: 3.7.1
pytest-aiohttp: 0.1.3
Debug / Analyze
I spent hours to find out where exactly hang happens. Here is the flow:
1 - Our wrapper with ClientSession in context manager:
async with ClientSession(loop=self.loop, connector=self.connector, connector_owner=False) as client:
with async_timeout.timeout(timeout, loop=self.loop):
if self._cache and self._cache_ttl:
cache_key = self.get_cache_key(request)
cache_block_key = '{0}_block'.format(cache_key)
response = self._cache.get(cache_key)
if self.cache_invalidate:
self._cache.delete(cache_block_key)
if self._cache.add(cache_block_key, True, self._cache_ttl) or not response:
response = await client.request(**request)
self._cache.set(cache_key, response, self._cache_ttl)
else:
response = await client.request(**request)
return RequestsResponseAdaptor(self.loop, response)
2 - Session close
async def close(self):
"""Close underlying connector.
Release all acquired resources.
"""
if not self.closed:
if self._connector_owner:
self._connector.close()
self._connector = None
3 - Connector close (TcpConnector:BaseConnector)
... close of tcp connector ...
def close(self):
"""Close all ongoing DNS calls."""
for ev in self._throttle_dns_events.values():
ev.cancel()
super().close()
... close of base connector ...
def close(self):
"""Close all opened transports."""
if self._closed:
return
self._closed = True
try:
if self._loop.is_closed():
return noop()
# cancel cleanup task
if self._cleanup_handle:
self._cleanup_handle.cancel()
# cancel cleanup close task
if self._cleanup_closed_handle:
self._cleanup_closed_handle.cancel()
for data in self._conns.values():
for proto, t0 in data:
proto.close()
for proto in self._acquired:
proto.close()
for transport in self._cleanup_closed_transports:
if transport is not None:
transport.abort()
finally:
self._conns.clear()
self._acquired.clear()
self._waiters.clear()
self._cleanup_handle = None
self._cleanup_closed_transports.clear()
self._cleanup_closed_handle = None
4 - Actual hang happens in this part:
...
for proto in self._acquired:
proto.close()
...
5 - Which is basically is asyncio.selector_events._SelectorSocketTransport(_SelectorTransport) and actual hangs happens here _SelectorTransport.close():
...
def close(self):
if self._closing:
return
self._closing = True
self._loop._remove_reader(self._sock_fd)
if not self._buffer:
self._conn_lost += 1
self._loop._remove_writer(self._sock_fd)
self._loop.call_soon(self._call_connection_lost, None)
...
My conclusion is when you use connector_owner=True connector try to close connection and waits for it. And at the moment connection is not ready to be closed which leads to hangs.