Skip to content
Snippets Groups Projects
Commit d21dbff1 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

select() fallback added

satella by default uses poll() in event dispatch. Fallback to select()
will be done on platforms that don't support poll()
parent 037c25ef
No related branches found
No related tags found
No related merge requests found
......@@ -268,17 +268,22 @@ class ServerSocket(FileDescriptorChannel):
def fileno(self):
return self.socket.fileno()
try:
_BASELEVEL = select.POLLIN | select.POLLOUT | select.POLLERR | select.POLLHUP | select.POLLNVAL
IS_POLL_SUPPORTED = True
except AttributeError:
IS_POLL_SUPPORTED = False
pass
_BASELEVEL = select.POLLIN | select.POLLOUT | select.POLLERR | select.POLLHUP | select.POLLNVAL
class SelectHandlingLayer(HandlingLayer):
"""A select-based handling layer"""
def __init__(self):
HandlingLayer.__init__(self)
self.poll = select.poll()
self.fdmap = {}
if IS_POLL_SUPPORTED:
self.poll = select.poll()
self.fdmap = {}
def register_channel(self, channel):
"""
......@@ -296,8 +301,9 @@ class SelectHandlingLayer(HandlingLayer):
self.channels.append(channel)
self.poll.register(channel, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL)
self.fdmap[channel.fileno()] = channel
if IS_POLL_SUPPORTED:
self.poll.register(channel, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL)
self.fdmap[channel.fileno()] = channel
def unregister_channel(self, channel):
"""
......@@ -308,8 +314,9 @@ class SelectHandlingLayer(HandlingLayer):
"""
try:
self.channels.remove(channel)
self.poll.unregister(channel)
del self.fdmap[channel.fileno()]
if IS_POLL_SUPPORTED:
self.poll.unregister(channel)
del self.fdmap[channel.fileno()]
except (ValueError, KeyError, select.error):
raise ValueError, 'channel not found'
......@@ -318,50 +325,92 @@ class SelectHandlingLayer(HandlingLayer):
Channel unregister + channel close
"""
self.unregister_channel(channel)
try:
self.poll.unregister(channel)
del self.fdmap[channel.fileno()]
except KeyError:
pass
if IS_POLL_SUPPORTED:
try:
self.poll.unregister(channel)
del self.fdmap[channel.fileno()]
except KeyError:
pass
channel.on_closed() # this should close the channel
self.on_closed(channel)
def select(self, timeout=5):
self.on_iteration()
for x in self.channels:
if x.is_write_pending():
self.poll.modify(x, _BASELEVEL | select.POLLOUT)
else:
self.poll.modify(x, _BASELEVEL)
events = self.poll.poll(timeout)
if IS_POLL_SUPPORTED: # Perform poll() event loop
# Now, for each event...
for fdinfo, event in events:
channel = self.fdmap[fdinfo]
if event & (select.EPOLLHUP | select.POLLERR | select.POLLNVAL):
self.close_channel(channel)
return
for x in self.channels:
if x.is_write_pending():
self.poll.modify(x, _BASELEVEL | select.POLLOUT)
else:
self.poll.modify(x, _BASELEVEL)
if event & select.POLLIN:
try:
channel.on_readable()
except (ChannelFailure, ChannelClosed):
events = self.poll.poll(timeout)
# Now, for each event...
for fdinfo, event in events:
channel = self.fdmap[fdinfo]
if event & (select.EPOLLHUP | select.POLLERR | select.POLLNVAL):
self.close_channel(channel)
return
self.on_readable(channel)
if event & select.POLLOUT:
if event & select.POLLIN:
try:
channel.on_readable()
except (ChannelFailure, ChannelClosed):
self.close_channel(channel)
return
self.on_readable(channel)
if event & select.POLLOUT:
try:
if not channel.connected:
channel.on_writable()
self.on_connected(channel)
else:
channel.on_writable()
except ChannelFailure:
self.close_channel(channel)
return
self.on_writable(channel)
else: # Fall back to select()
writables = [x for x in self.channels if x.is_write_pending()]
try:
rs, ws, xs = select.select(self.channels, writables, (), timeout)
except select.error:
# we need to trace over each channel to determine who has failed
for channel in self.channels:
try:
select.select((channel, ), (), (), 0)
except select.error:
# we found the one
self.close_channel(channel)
return
except socket.error:
raise RuntimeError, 'ABEND: socket error in select loop'
# Now, for each writeable channel...
for writable in ws:
try:
if not channel.connected:
channel.on_writable()
self.on_connected(channel)
if not writable.connected:
writable.on_writable()
self.on_connected(writable)
else:
channel.on_writable()
writable.on_writable()
except ChannelFailure:
self.close_channel(channel)
self.close_channel(writable)
return
self.on_writable(writable)
# For each readable channel...
for readable in rs:
try:
readable.on_readable()
except (ChannelFailure, ChannelClosed):
self.close_channel(readable)
return
self.on_writable(channel)
self.on_readable(readable)
......@@ -50,7 +50,4 @@ class TQMTest(unittest.TestCase):
self.assertEquals(reader.get(), 10)
self.assertEquals(reader.get(), 12)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment