Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Commit 83d1ab4

Browse files
committed
transports: Make _ProactorBasePipeTransport use _FlowControlMixin
1 parent 0c43056 commit 83d1ab4

File tree

4 files changed

+75
-125
lines changed

4 files changed

+75
-125
lines changed

asyncio/proactor_events.py

Lines changed: 2 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
from .log import logger
1616

1717

18-
class _ProactorBasePipeTransport(transports.BaseTransport):
18+
class _ProactorBasePipeTransport(transports._FlowControlMixin,
19+
transports.BaseTransport):
1920
"""Base class for pipe and socket transports."""
2021

2122
def __init__(self, loop, sock, protocol, waiter=None,
@@ -33,8 +34,6 @@ def __init__(self, loop, sock, protocol, waiter=None,
3334
self._conn_lost = 0
3435
self._closing = False # Set when close() called.
3536
self._eof_written = False
36-
self._protocol_paused = False
37-
self.set_write_buffer_limits()
3837
if self._server is not None:
3938
self._server.attach(self)
4039
self._loop.call_soon(self._protocol.connection_made, self)
@@ -94,56 +93,6 @@ def _call_connection_lost(self, exc):
9493
server.detach(self)
9594
self._server = None
9695

97-
# XXX The next four methods are nearly identical to corresponding
98-
# ones in _SelectorTransport. Maybe refactor buffer management to
99-
# share the implementations? (Also these are really only needed
100-
# by _ProactorWritePipeTransport but since _buffer is defined on
101-
# the base class I am putting it here for now.)
102-
103-
def _maybe_pause_protocol(self):
104-
size = self.get_write_buffer_size()
105-
if size <= self._high_water:
106-
return
107-
if not self._protocol_paused:
108-
self._protocol_paused = True
109-
try:
110-
self._protocol.pause_writing()
111-
except Exception as exc:
112-
self._loop.call_exception_handler({
113-
'message': 'protocol.pause_writing() failed',
114-
'exception': exc,
115-
'transport': self,
116-
'protocol': self._protocol,
117-
})
118-
119-
def _maybe_resume_protocol(self):
120-
if (self._protocol_paused and
121-
self.get_write_buffer_size() <= self._low_water):
122-
self._protocol_paused = False
123-
try:
124-
self._protocol.resume_writing()
125-
except Exception as exc:
126-
self._loop.call_exception_handler({
127-
'message': 'protocol.resume_writing() failed',
128-
'exception': exc,
129-
'transport': self,
130-
'protocol': self._protocol,
131-
})
132-
133-
def set_write_buffer_limits(self, high=None, low=None):
134-
if high is None:
135-
if low is None:
136-
high = 64*1024
137-
else:
138-
high = 4*low
139-
if low is None:
140-
low = high // 4
141-
if not high >= low >= 0:
142-
raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
143-
(high, low))
144-
self._high_water = high
145-
self._low_water = low
146-
14796
def get_write_buffer_size(self):
14897
size = self._pending_write
14998
if self._buffer is not None:

asyncio/selector_events.py

Lines changed: 2 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -338,77 +338,8 @@ def _stop_serving(self, sock):
338338
sock.close()
339339

340340

341-
class _FlowControlMixin(transports.Transport):
342-
"""All the logic for (write) flow control in a mix-in base class.
343-
344-
The subclass must implement get_write_buffer_size(). It must call
345-
_maybe_pause_protocol() whenever the write buffer size increases,
346-
and _maybe_resume_protocol() whenever it decreases. It may also
347-
override set_write_buffer_limits() (e.g. to specify different
348-
defaults).
349-
350-
The subclass constructor must call super().__init__(extra). This
351-
will call set_write_buffer_limits().
352-
353-
The user may call set_write_buffer_limits() and
354-
get_write_buffer_size(), and their protocol's pause_writing() and
355-
resume_writing() may be called.
356-
"""
357-
358-
def __init__(self, extra=None):
359-
super().__init__(extra)
360-
self._protocol_paused = False
361-
self.set_write_buffer_limits()
362-
363-
def _maybe_pause_protocol(self):
364-
size = self.get_write_buffer_size()
365-
if size <= self._high_water:
366-
return
367-
if not self._protocol_paused:
368-
self._protocol_paused = True
369-
try:
370-
self._protocol.pause_writing()
371-
except Exception as exc:
372-
self._loop.call_exception_handler({
373-
'message': 'protocol.pause_writing() failed',
374-
'exception': exc,
375-
'transport': self,
376-
'protocol': self._protocol,
377-
})
378-
379-
def _maybe_resume_protocol(self):
380-
if (self._protocol_paused and
381-
self.get_write_buffer_size() <= self._low_water):
382-
self._protocol_paused = False
383-
try:
384-
self._protocol.resume_writing()
385-
except Exception as exc:
386-
self._loop.call_exception_handler({
387-
'message': 'protocol.resume_writing() failed',
388-
'exception': exc,
389-
'transport': self,
390-
'protocol': self._protocol,
391-
})
392-
393-
def set_write_buffer_limits(self, high=None, low=None):
394-
if high is None:
395-
if low is None:
396-
high = 64*1024
397-
else:
398-
high = 4*low
399-
if low is None:
400-
low = high // 4
401-
if not high >= low >= 0:
402-
raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
403-
(high, low))
404-
self._high_water = high
405-
self._low_water = low
406-
407-
def get_write_buffer_size(self):
408-
raise NotImplementedError
409-
410-
411-
class _SelectorTransport(_FlowControlMixin, transports.Transport):
341+
class _SelectorTransport(transports._FlowControlMixin,
342+
transports.Transport):
412343

413344
max_size = 256 * 1024 # Buffer size passed to recv().
414345

asyncio/transports.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,73 @@ def kill(self):
219219
http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
220220
"""
221221
raise NotImplementedError
222+
223+
224+
class _FlowControlMixin(Transport):
225+
"""All the logic for (write) flow control in a mix-in base class.
226+
227+
The subclass must implement get_write_buffer_size(). It must call
228+
_maybe_pause_protocol() whenever the write buffer size increases,
229+
and _maybe_resume_protocol() whenever it decreases. It may also
230+
override set_write_buffer_limits() (e.g. to specify different
231+
defaults).
232+
233+
The subclass constructor must call super().__init__(extra). This
234+
will call set_write_buffer_limits().
235+
236+
The user may call set_write_buffer_limits() and
237+
get_write_buffer_size(), and their protocol's pause_writing() and
238+
resume_writing() may be called.
239+
"""
240+
241+
def __init__(self, extra=None):
242+
super().__init__(extra)
243+
self._protocol_paused = False
244+
self.set_write_buffer_limits()
245+
246+
def _maybe_pause_protocol(self):
247+
size = self.get_write_buffer_size()
248+
if size <= self._high_water:
249+
return
250+
if not self._protocol_paused:
251+
self._protocol_paused = True
252+
try:
253+
self._protocol.pause_writing()
254+
except Exception as exc:
255+
self._loop.call_exception_handler({
256+
'message': 'protocol.pause_writing() failed',
257+
'exception': exc,
258+
'transport': self,
259+
'protocol': self._protocol,
260+
})
261+
262+
def _maybe_resume_protocol(self):
263+
if (self._protocol_paused and
264+
self.get_write_buffer_size() <= self._low_water):
265+
self._protocol_paused = False
266+
try:
267+
self._protocol.resume_writing()
268+
except Exception as exc:
269+
self._loop.call_exception_handler({
270+
'message': 'protocol.resume_writing() failed',
271+
'exception': exc,
272+
'transport': self,
273+
'protocol': self._protocol,
274+
})
275+
276+
def set_write_buffer_limits(self, high=None, low=None):
277+
if high is None:
278+
if low is None:
279+
high = 64*1024
280+
else:
281+
high = 4*low
282+
if low is None:
283+
low = high // 4
284+
if not high >= low >= 0:
285+
raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
286+
(high, low))
287+
self._high_water = high
288+
self._low_water = low
289+
290+
def get_write_buffer_size(self):
291+
raise NotImplementedError

asyncio/unix_events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ def _call_connection_lost(self, exc):
317317
self._loop = None
318318

319319

320-
class _UnixWritePipeTransport(selector_events._FlowControlMixin,
320+
class _UnixWritePipeTransport(transports._FlowControlMixin,
321321
transports.WriteTransport):
322322

323323
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):

0 commit comments

Comments
 (0)