From 589e91a4caf6eec06a0a9e97b0a7c8d96567f84d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sun, 5 Jan 2020 15:47:35 +0100 Subject: [PATCH] modify epoll behaviour --- coolamqp/uplink/handshake.py | 7 +++---- coolamqp/uplink/listener/epoll_listener.py | 5 +++++ coolamqp/uplink/listener/socket.py | 3 +++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index a9ba9ee..5c5d3e3 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -12,6 +12,7 @@ from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \ ConnectionTune, ConnectionTuneOk, ConnectionOpen, ConnectionOpenOk from coolamqp.framing.frames import AMQPMethodFrame from coolamqp.uplink.connection.states import ST_ONLINE +from coolamqp.uplink.heartbeat import Heartbeater from coolamqp import __version__ PUBLISHER_CONFIRMS = b'publisher_confirms' @@ -118,9 +119,8 @@ class Handshaker(object): logger.debug('Responding with ConnectionTuneOk') self.connection.frame_max = payload.frame_max self.connection.heartbeat = min(payload.heartbeat, self.heartbeat) - for channel in six.moves.xrange(1, ( - 65535 if payload.channel_max == 0 else payload.channel_max) + 1): - self.connection.free_channels.append(channel) + self.connection.free_channels.extend(six.moves.xrange(1, ( + 65535 if payload.channel_max == 0 else payload.channel_max) + 1)) self.connection.watch_for_method(0, ConnectionOpenOk, self.on_connection_open_ok) @@ -133,7 +133,6 @@ class Handshaker(object): # Install heartbeat handlers NOW, if necessary if self.connection.heartbeat > 0: - from coolamqp.uplink.heartbeat import Heartbeater Heartbeater(self.connection, self.connection.heartbeat) def on_connection_open_ok(self, payload # type: coolamqp.framing.base.AMQPPayload diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index 9a37d4c..6258fe5 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -113,6 +113,11 @@ class EpollListener(object): self.noshot(sock) sock.close() + # Do any of the sockets want to send data Re-register them + for socket in self.fd_to_sock.values(): + if socket.wants_to_send_data(): + self.epoll.modify(socket.fileno(), RW) + def noshot(self, sock): """ Clear all one-shots for a socket diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 29b7000..6a86421 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -104,6 +104,9 @@ class BaseSocket(object): except ValueError as e: raise SocketFailed(repr(e)) + def wants_to_send_data(self): # type: () -> bool + return not (len(self.data_to_send) == 0 and len(self.priority_queue) == 0) + def on_write(self): """ Socket is writable, called by Listener -- GitLab