diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index a9ba9ee4ca3bc204f573601b8548a4d4a73e15de..5c5d3e3447dbff44615936d8904950964e75fb26 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -12,6 +12,7 @@ from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \ ConnectionTune, ConnectionTuneOk, ConnectionOpen, ConnectionOpenOk from coolamqp.framing.frames import AMQPMethodFrame from coolamqp.uplink.connection.states import ST_ONLINE +from coolamqp.uplink.heartbeat import Heartbeater from coolamqp import __version__ PUBLISHER_CONFIRMS = b'publisher_confirms' @@ -118,9 +119,8 @@ class Handshaker(object): logger.debug('Responding with ConnectionTuneOk') self.connection.frame_max = payload.frame_max self.connection.heartbeat = min(payload.heartbeat, self.heartbeat) - for channel in six.moves.xrange(1, ( - 65535 if payload.channel_max == 0 else payload.channel_max) + 1): - self.connection.free_channels.append(channel) + self.connection.free_channels.extend(six.moves.xrange(1, ( + 65535 if payload.channel_max == 0 else payload.channel_max) + 1)) self.connection.watch_for_method(0, ConnectionOpenOk, self.on_connection_open_ok) @@ -133,7 +133,6 @@ class Handshaker(object): # Install heartbeat handlers NOW, if necessary if self.connection.heartbeat > 0: - from coolamqp.uplink.heartbeat import Heartbeater Heartbeater(self.connection, self.connection.heartbeat) def on_connection_open_ok(self, payload # type: coolamqp.framing.base.AMQPPayload diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index 9a37d4c4fee2798213ba812d78beade12afd3fb1..6258fe5793f51ecf79ca0f1f04dad5ca5b1c8055 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -113,6 +113,11 @@ class EpollListener(object): self.noshot(sock) sock.close() + # Do any of the sockets want to send data Re-register them + for socket in self.fd_to_sock.values(): + if socket.wants_to_send_data(): + self.epoll.modify(socket.fileno(), RW) + def noshot(self, sock): """ Clear all one-shots for a socket diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 29b7000644c5902698eb6e751c27fa826eae0ebb..6a864215c5d75cfa060db2e47bf571d6c2e8e0e9 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -104,6 +104,9 @@ class BaseSocket(object): except ValueError as e: raise SocketFailed(repr(e)) + def wants_to_send_data(self): # type: () -> bool + return not (len(self.data_to_send) == 0 and len(self.priority_queue) == 0) + def on_write(self): """ Socket is writable, called by Listener