Skip to content

Commit 964921d

Browse files
author
Nikolay Kim
committed
refactor connector keep-live timer
1 parent b1c5f47 commit 964921d

File tree

7 files changed

+132
-93
lines changed

7 files changed

+132
-93
lines changed

aiohttp/client.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,13 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
107107
self._request_class = request_class
108108
self._response_class = response_class
109109
self._ws_response_class = ws_response_class
110-
self._time_service = (
111-
time_service
112-
if time_service is not None
113-
else TimeService(self._loop))
110+
111+
if time_service is not None:
112+
self._time_service_owner = False
113+
self._time_service = time_service
114+
else:
115+
self._time_service_owner = True
116+
self._time_service = TimeService(self._loop)
114117

115118
def __del__(self, _warnings=warnings):
116119
if not self.closed:
@@ -486,6 +489,10 @@ def close(self):
486489
if not self.closed:
487490
self._connector.close()
488491
self._connector = None
492+
493+
if self._time_service_owner:
494+
self._time_service.close()
495+
489496
ret = helpers.create_future(self._loop)
490497
ret.set_result(None)
491498
return ret

aiohttp/client_reqrep.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,7 @@ def __aenter__(self):
790790

791791
@asyncio.coroutine
792792
def __aexit__(self, exc_type, exc_val, exc_tb):
793-
if exc_type is None:
794-
yield from self.release()
795-
else:
796-
self.close()
793+
# similar to _RequestContextManager, we do not need to check
794+
# for exceptions, response object can closes connection
795+
# is state is broken
796+
yield from self.release()

aiohttp/connector.py

Lines changed: 39 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from collections import defaultdict
88
from hashlib import md5, sha1, sha256
99
from itertools import chain
10-
from math import ceil
1110
from types import MappingProxyType
1211

1312
from yarl import URL
@@ -112,8 +111,7 @@ class BaseConnector(object):
112111
_source_traceback = None
113112

114113
def __init__(self, *, conn_timeout=None, keepalive_timeout=sentinel,
115-
force_close=False, limit=20,
116-
loop=None):
114+
force_close=False, limit=20, time_service=None, loop=None):
117115

118116
if force_close:
119117
if keepalive_timeout is not None and \
@@ -122,7 +120,7 @@ def __init__(self, *, conn_timeout=None, keepalive_timeout=sentinel,
122120
'be set if force_close is True')
123121
else:
124122
if keepalive_timeout is sentinel:
125-
keepalive_timeout = 30
123+
keepalive_timeout = 15.0
126124

127125
if loop is None:
128126
loop = asyncio.get_event_loop()
@@ -135,18 +133,29 @@ def __init__(self, *, conn_timeout=None, keepalive_timeout=sentinel,
135133
self._acquired = defaultdict(set)
136134
self._conn_timeout = conn_timeout
137135
self._keepalive_timeout = keepalive_timeout
138-
self._cleanup_handle = None
139136
self._force_close = force_close
140137
self._limit = limit
141138
self._waiters = defaultdict(list)
142139

140+
if time_service is not None:
141+
self._time_service_owner = False
142+
self._time_service = time_service
143+
else:
144+
self._time_service_owner = True
145+
self._time_service = helpers.TimeService(loop)
146+
143147
self._loop = loop
144148
self._factory = functools.partial(
145149
aiohttp.StreamProtocol, loop=loop,
146150
disconnect_error=ServerDisconnectedError)
147151

148152
self.cookies = SimpleCookie()
149153

154+
self._cleanup_handle = None
155+
if (keepalive_timeout is not sentinel and
156+
keepalive_timeout is not None):
157+
self._cleanup()
158+
150159
def __del__(self, _warnings=warnings):
151160
if self._closed:
152161
return
@@ -197,43 +206,29 @@ def _cleanup(self):
197206
"""Cleanup unused transports."""
198207
if self._cleanup_handle:
199208
self._cleanup_handle.cancel()
200-
self._cleanup_handle = None
201209

202-
now = self._loop.time()
210+
now = self._time_service.loop_time()
203211

204-
connections = {}
205-
timeout = self._keepalive_timeout
212+
if self._conns:
213+
connections = {}
214+
deadline = now - self._keepalive_timeout
215+
for key, conns in self._conns.items():
216+
alive = []
217+
for transport, proto, use_time in conns:
218+
if transport is not None:
219+
if proto.is_connected():
220+
if use_time - deadline < 0:
221+
transport.close()
222+
else:
223+
alive.append((transport, proto, use_time))
206224

207-
for key, conns in self._conns.items():
208-
alive = []
209-
for transport, proto, t0 in conns:
210-
if transport is not None:
211-
if proto and not proto.is_connected():
212-
transport = None
213-
else:
214-
delta = t0 + self._keepalive_timeout - now
215-
if delta < 0:
216-
transport.close()
217-
transport = None
218-
elif delta < timeout:
219-
timeout = delta
220-
221-
if transport is not None:
222-
alive.append((transport, proto, t0))
223-
if alive:
224-
connections[key] = alive
225-
226-
if connections:
227-
self._cleanup_handle = self._loop.call_at(
228-
ceil(now + timeout), self._cleanup)
229-
230-
self._conns = connections
231-
232-
def _start_cleanup_task(self):
233-
if self._cleanup_handle is None:
234-
now = self._loop.time()
235-
self._cleanup_handle = self._loop.call_at(
236-
ceil(now + self._keepalive_timeout), self._cleanup)
225+
if alive:
226+
connections[key] = alive
227+
228+
self._conns = connections
229+
230+
self._cleanup_handle = self._time_service.call_later(
231+
self._keepalive_timeout / 2.0, self._cleanup)
237232

238233
def close(self):
239234
"""Close all opened transports."""
@@ -247,6 +242,9 @@ def close(self):
247242
if self._loop.is_closed():
248243
return ret
249244

245+
if self._time_service_owner:
246+
self._time_service.close()
247+
250248
for key, data in self._conns.items():
251249
for transport, proto, t0 in data:
252250
transport.close()
@@ -330,6 +328,7 @@ def _get(self, key):
330328
conns = self._conns[key]
331329
except KeyError:
332330
return None, None
331+
333332
t1 = self._loop.time()
334333
while conns:
335334
transport, proto, t0 = conns.pop()
@@ -342,6 +341,7 @@ def _get(self, key):
342341
# The very last connection was reclaimed: drop the key
343342
del self._conns[key]
344343
return transport, proto
344+
345345
# No more connections: drop the key
346346
del self._conns[key]
347347
return None, None
@@ -398,8 +398,6 @@ def _release(self, key, req, transport, protocol, *, should_close=False):
398398
conns.append((transport, protocol, self._loop.time()))
399399
reader.unset_parser()
400400

401-
self._start_cleanup_task()
402-
403401
@asyncio.coroutine
404402
def _create_connection(self, req):
405403
raise NotImplementedError()

aiohttp/helpers.py

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ def __init__(self, loop, *, interval=1.0):
610610
self._cb = loop.call_at(self._loop_time + self._interval, self._on_cb)
611611
self._scheduled = []
612612

613-
def stop(self):
613+
def close(self):
614614
if self._cb:
615615
self._cb.cancel()
616616

@@ -673,6 +673,9 @@ def strtime(self):
673673
self._strtime = s = self._format_date_time()
674674
return self._strtime
675675

676+
def loop_time(self):
677+
return self._loop_time
678+
676679
def call_later(self, delay, callback, *args):
677680
"""Arrange for a callback to be called at a given time.
678681
@@ -703,43 +706,49 @@ def timeout(self, timeout):
703706
704707
timeout - value in seconds or None to disable timeout logic
705708
"""
706-
return LowresTimeout(timeout, self, self._loop)
709+
when = self._loop_time + timeout if timeout is not None else 0.0
710+
711+
ctx = _TimeServiceTimeoutContext(when, self._loop)
712+
713+
if timeout is not None:
714+
heapq.heappush(self._scheduled, ctx)
707715

716+
return ctx
708717

709-
class LowresTimeout:
718+
719+
class _TimeServiceTimeoutContext(TimerHandle):
710720
""" Low resolution timeout context manager """
711721

712-
def __init__(self, timeout, time_service, loop):
713-
self._loop = loop
714-
self._timeout = timeout
715-
self._time_service = time_service
722+
def __init__(self, when, loop):
723+
super().__init__(when, self.cancel, (), loop)
724+
716725
self._task = None
717726
self._cancelled = False
718-
self._cancel_handler = None
719727

720728
def __enter__(self):
721729
self._task = asyncio.Task.current_task(loop=self._loop)
722730
if self._task is None:
731+
self._cancelled = True
723732
raise RuntimeError('Timeout context manager should be used '
724733
'inside a task')
725-
if self._timeout is not None:
726-
self._cancel_handler = self._time_service.call_later(
727-
self._timeout, self._cancel_task)
728734

729735
return self
730736

731737
def __exit__(self, exc_type, exc_val, exc_tb):
732738
self._task = None
733739

734740
if exc_type is asyncio.CancelledError and self._cancelled:
735-
self._cancel_handler = None
736741
raise asyncio.TimeoutError from None
737-
if self._timeout is not None:
738-
self._cancel_handler.cancel()
739-
self._cancel_handler = None
740742

741-
def _cancel_task(self):
742-
self._cancelled = self._task.cancel()
743+
self._cancelled = True
744+
745+
def cancel(self):
746+
if not self._cancelled:
747+
if self._task is not None:
748+
self._task.cancel()
749+
self._task = None
750+
751+
self._cancelled = True
743752

744753

745754
class HeadersMixin:

aiohttp/web_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def shutdown(self, timeout=None):
160160
coros = [conn.shutdown(timeout) for conn in self._connections]
161161
yield from asyncio.gather(*coros, loop=self._loop)
162162
self._connections.clear()
163-
self._time_service.stop()
163+
self._time_service.close()
164164

165165
finish_connections = shutdown
166166

0 commit comments

Comments
 (0)