From 0493627de15b414adce873bfc0efbffd0933d02d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Fri, 3 Jan 2020 22:58:38 +0100 Subject: [PATCH] bugfix let's hope --- CHANGELOG.md | 5 +++++ coolamqp/__init__.py | 1 + coolamqp/uplink/connection/connection.py | 11 +++++++---- coolamqp/uplink/handshake.py | 4 ++-- coolamqp/uplink/listener/epoll_listener.py | 12 +++++++++++- coolamqp/uplink/listener/socket.py | 2 +- docs/conf.py | 6 ++++-- setup.cfg | 1 - setup.py | 2 ++ 9 files changed, 33 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 117fca2..a43e4e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# v0.101: + +* bugfix: a race condition during connection setup itself +* moved __version__ to coolamqp root + # v0.100: * significant docs update diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 9599562..95c6447 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1,2 @@ # coding=UTF-8 +__version__ = '0.100a1' diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 0083a3c..c4b05d7 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -91,6 +91,7 @@ class Connection(object): :param node_definition: NodeDefinition instance to use :param listener_thread: ListenerThread to use as async engine + :type listener_thread: coolamqp.uplink.listener.ListenerThread :param extra_properties: extra properties to send to the target server must conform to the syntax given in (/coolamqp/uplink/handshake.py)'s CLIENT_PROPERTIES """ @@ -171,7 +172,12 @@ class Connection(object): logger.debug('[%s] TCP connection established, authentication in progress', self.uuid) sock.settimeout(0) - sock.send(b'AMQP\x00\x00\x09\x01') + header = bytearray(b'AMQP\x00\x00\x09\x01') + rest = sock.send(header) + while rest < len(header): + time.sleep(0.1) + header = header[rest:] + rest = sock.send(header) self.watch_for_method(0, (ConnectionClose, ConnectionCloseOk), self.on_connection_close) @@ -254,9 +260,6 @@ class Connection(object): self.log_frames.on_frame(time.monotonic(), frame, 'to_server') if frames is not None: - # for frame in frames: - # if isinstance(frame, AMQPMethodFrame): - # print('Sending ', frame.payload) self.sendf.send(frames, priority=priority) else: # Listener socket will kill us when time is right diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 7724202..2842d14 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -11,7 +11,7 @@ from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \ ConnectionTune, ConnectionTuneOk, ConnectionOpen, ConnectionOpenOk from coolamqp.framing.frames import AMQPMethodFrame from coolamqp.uplink.connection.states import ST_ONLINE - +from coolamqp import __version__ PUBLISHER_CONFIRMS = b'publisher_confirms' CONSUMER_CANCEL_NOTIFY = b'consumer_cancel_notify' @@ -25,7 +25,7 @@ CLIENT_DATA = [ # because RabbitMQ is some kind of a fascist and does not allow # these fields to be of type short-string (b'product', (b'CoolAMQP', 'S')), - (b'version', (b'0.100', 'S')), + (b'version', (__version__.encode('utf8'), 'S')), (b'copyright', (b'Copyright (C) 2016-2020 SMOK sp. z o.o.', 'S')), ( b'information', ( diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index 796adc3..bbb68af 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -21,6 +21,10 @@ RW = RO | select.EPOLLOUT class EpollSocket(BaseSocket): """ EpollListener substitutes your BaseSockets with this + :type sock: socket.socket + :type on_read: tp.Callable[[bytes], None] + :type on_fail: tp.Callable[[], None] + :type listener: coolamqp.uplink.listener.ListenerThread """ def __init__(self, sock, on_read, on_fail, listener): @@ -65,8 +69,14 @@ class EpollListener(object): self.epoll = select.epoll() self.fd_to_sock = {} self.time_events = [] + self.sockets_to_activate = [] def wait(self, timeout=1): + for socket_to_activate in self.sockets_to_activate: + logger.debug('Activating fd %s', (socket_to_activate.fileno(),)) + self.epoll.register(socket_to_activate.fileno(), RW) + self.sockets_to_activate = [] + events = self.epoll.poll(timeout=timeout) # Timer events @@ -138,7 +148,7 @@ class EpollListener(object): )) def activate(self, sock): # type: (coolamqp.uplink.listener.epoll_listener.EpollSocket) -> None - self.epoll.register(sock.sock, RW) + self.sockets_to_activate.append(sock) def register(self, sock, on_read=lambda data: None, on_fail=lambda: None): diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 89a52df..ee44ee2 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -136,7 +136,7 @@ class BaseSocket(object): # We can send a priority pack self.data_to_send.appendleft(self.priority_queue.popleft()) - def fileno(self): + def fileno(self): # type: () -> int """Return descriptor number""" return self.sock.fileno() diff --git a/docs/conf.py b/docs/conf.py index a087dcf..87fca9e 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -55,9 +55,11 @@ author = u'DMS Serwis s.c.' # built documents. # # The short X.Y version. -version = '0.100' +from coolamqp import __version__ + +version = __version__ # The full version, including alpha/beta/rc tags. -release = u'0.100' +release = __version__ # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/setup.cfg b/setup.cfg index caac950..34988c5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,6 @@ [metadata] description-file = README.md name = CoolAMQP -version = 0.100 license = MIT License classifiers = Programming Language :: Python diff --git a/setup.py b/setup.py index 6837717..a9b0eee 100644 --- a/setup.py +++ b/setup.py @@ -2,8 +2,10 @@ # coding=UTF-8 from setuptools import setup, find_packages +from coolamqp import __version__ setup(keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], + version=__version__, packages=find_packages(include=['coolamqp', 'coolamqp.*']), long_description=u'''Pure Python AMQP client, but with dynamic class generation and memoryviews FOR THE GODSPEED. -- GitLab