Skip to content
Snippets Groups Projects
Commit 90cbb552 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

changed from select to poll

parent 5a94b933
No related branches found
No related tags found
No related merge requests found
......@@ -267,11 +267,17 @@ class ServerSocket(FileDescriptorChannel):
def fileno(self):
return self.socket.fileno()
_BASELEVEL = select.POLLIN | select.POLLOUT | select.POLLERR | select.POLLHUP | select.POLLNVAL
class SelectHandlingLayer(HandlingLayer):
"""A select-based handling layer"""
def __init__(self):
HandlingLayer.__init__(self)
self.poll = select.poll()
self.fdmap = {}
def register_channel(self, channel):
"""
......@@ -288,6 +294,9 @@ class SelectHandlingLayer(HandlingLayer):
raise ValueError, 'is_write_pending() method lacking'
self.channels.append(channel)
self.poll.register(channel, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL)
self.fdmap[channel.fileno()] = channel
def unregister_channel(self, channel):
"""
......@@ -298,7 +307,9 @@ class SelectHandlingLayer(HandlingLayer):
"""
try:
self.channels.remove(channel)
except ValueError:
self.poll.unregister(channel)
del self.fdmap[channel.fileno()]
except (ValueError, KeyError, select.error):
raise ValueError, 'channel not found'
def close_channel(self, channel):
......@@ -306,47 +317,50 @@ class SelectHandlingLayer(HandlingLayer):
Channel unregister + channel close
"""
self.unregister_channel(channel)
try:
self.poll.unregister(channel)
del self.fdmap[channel.fileno()]
except KeyError:
pass
channel.on_closed() # this should close the channel
self.on_closed(channel)
def select(self, timeout=5):
self.on_iteration()
writables = [x for x in self.channels if x.is_write_pending()]
try:
rs, ws, xs = select.select(self.channels, writables, (), timeout)
except select.error:
# we need to trace over each channel to determine who has failed
for channel in self.channels:
for x in self.channels:
if x.is_write_pending():
self.poll.modify(x, _BASELEVEL | select.POLLOUT)
else:
self.poll.modify(x, _BASELEVEL)
events = self.poll.poll(timeout)
# Now, for each event...
for fdinfo, event in events:
channel = self.fdmap[fdinfo]
if event & (select.EPOLLHUP | select.POLLERR | select.POLLNVAL):
self.close_channel(channel)
return
if event & select.POLLIN:
try:
select.select((channel, ), (), (), 0)
except select.error:
# we found the one
channel.on_readable()
except (ChannelFailure, ChannelClosed):
self.close_channel(channel)
return
except socket.error:
raise RuntimeError, 'ABEND: socket error in select loop'
# Now, for each writeable channel...
for writable in ws:
try:
if not writable.connected:
writable.on_writable()
self.on_connected(writable)
else:
writable.on_writable()
except ChannelFailure:
self.close_channel(writable)
return
self.on_writable(writable)
# For each readable channel...
for readable in rs:
try:
readable.on_readable()
except (ChannelFailure, ChannelClosed):
self.close_channel(readable)
return
self.on_readable(readable)
self.on_readable(channel)
if event & select.POLLOUT:
try:
if not channel.connected:
channel.on_writable()
self.on_connected(channel)
else:
channel.on_writable()
except ChannelFailure:
self.close_channel(channel)
return
self.on_writable(channel)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment