diff --git a/CHANGELOG.md b/CHANGELOG.md index 117fca25c3579d4c1e6249f3230cc33dcfdd6fdd..a43e4e8f9f9c5e2a0d886d0c19d76a0c57b9d518 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# v0.101: + +* bugfix: a race condition during connection setup itself +* moved __version__ to coolamqp root + # v0.100: * significant docs update diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 95995627150479d0550211c778e6688909beca06..95c64479c65c594529694e6372316a52de2fbb32 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1,2 @@ # coding=UTF-8 +__version__ = '0.100a1' diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 0083a3cbc5730497fe7aaf3773f2fb9edf1b79ee..c4b05d781c327da29da4536fbe9ec8d5f8dd6e3c 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -91,6 +91,7 @@ class Connection(object): :param node_definition: NodeDefinition instance to use :param listener_thread: ListenerThread to use as async engine + :type listener_thread: coolamqp.uplink.listener.ListenerThread :param extra_properties: extra properties to send to the target server must conform to the syntax given in (/coolamqp/uplink/handshake.py)'s CLIENT_PROPERTIES """ @@ -171,7 +172,12 @@ class Connection(object): logger.debug('[%s] TCP connection established, authentication in progress', self.uuid) sock.settimeout(0) - sock.send(b'AMQP\x00\x00\x09\x01') + header = bytearray(b'AMQP\x00\x00\x09\x01') + rest = sock.send(header) + while rest < len(header): + time.sleep(0.1) + header = header[rest:] + rest = sock.send(header) self.watch_for_method(0, (ConnectionClose, ConnectionCloseOk), self.on_connection_close) @@ -254,9 +260,6 @@ class Connection(object): self.log_frames.on_frame(time.monotonic(), frame, 'to_server') if frames is not None: - # for frame in frames: - # if isinstance(frame, AMQPMethodFrame): - # print('Sending ', frame.payload) self.sendf.send(frames, priority=priority) else: # Listener socket will kill us when time is right diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 7724202c29b5bf32ff8af0f120298f433436a507..2842d14eeb2dfdc5db1f1d61baf781d8b92b4337 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -11,7 +11,7 @@ from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \ ConnectionTune, ConnectionTuneOk, ConnectionOpen, ConnectionOpenOk from coolamqp.framing.frames import AMQPMethodFrame from coolamqp.uplink.connection.states import ST_ONLINE - +from coolamqp import __version__ PUBLISHER_CONFIRMS = b'publisher_confirms' CONSUMER_CANCEL_NOTIFY = b'consumer_cancel_notify' @@ -25,7 +25,7 @@ CLIENT_DATA = [ # because RabbitMQ is some kind of a fascist and does not allow # these fields to be of type short-string (b'product', (b'CoolAMQP', 'S')), - (b'version', (b'0.100', 'S')), + (b'version', (__version__.encode('utf8'), 'S')), (b'copyright', (b'Copyright (C) 2016-2020 SMOK sp. z o.o.', 'S')), ( b'information', ( diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index 796adc3e25d3f05763b1467091308deef3a1c1a2..bbb68af8327ea8d5b60a3b6d3b530f242fb0c3df 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -21,6 +21,10 @@ RW = RO | select.EPOLLOUT class EpollSocket(BaseSocket): """ EpollListener substitutes your BaseSockets with this + :type sock: socket.socket + :type on_read: tp.Callable[[bytes], None] + :type on_fail: tp.Callable[[], None] + :type listener: coolamqp.uplink.listener.ListenerThread """ def __init__(self, sock, on_read, on_fail, listener): @@ -65,8 +69,14 @@ class EpollListener(object): self.epoll = select.epoll() self.fd_to_sock = {} self.time_events = [] + self.sockets_to_activate = [] def wait(self, timeout=1): + for socket_to_activate in self.sockets_to_activate: + logger.debug('Activating fd %s', (socket_to_activate.fileno(),)) + self.epoll.register(socket_to_activate.fileno(), RW) + self.sockets_to_activate = [] + events = self.epoll.poll(timeout=timeout) # Timer events @@ -138,7 +148,7 @@ class EpollListener(object): )) def activate(self, sock): # type: (coolamqp.uplink.listener.epoll_listener.EpollSocket) -> None - self.epoll.register(sock.sock, RW) + self.sockets_to_activate.append(sock) def register(self, sock, on_read=lambda data: None, on_fail=lambda: None): diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 89a52dfc21be5e21e92fb5074bd4d533c4f0ac71..ee44ee2eaa78939a764e0a7c270da7d735b7a166 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -136,7 +136,7 @@ class BaseSocket(object): # We can send a priority pack self.data_to_send.appendleft(self.priority_queue.popleft()) - def fileno(self): + def fileno(self): # type: () -> int """Return descriptor number""" return self.sock.fileno() diff --git a/docs/conf.py b/docs/conf.py index a087dcf22e631395ec3acc81d6e989fa098f31ce..87fca9e173c266898bf805a7ed8295fa570b70fd 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -55,9 +55,11 @@ author = u'DMS Serwis s.c.' # built documents. # # The short X.Y version. -version = '0.100' +from coolamqp import __version__ + +version = __version__ # The full version, including alpha/beta/rc tags. -release = u'0.100' +release = __version__ # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/setup.cfg b/setup.cfg index caac9505a533961f5d1510be7d39003fa949b740..34988c5d85bb6775b2e56933a244adfcd86ada2d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,6 @@ [metadata] description-file = README.md name = CoolAMQP -version = 0.100 license = MIT License classifiers = Programming Language :: Python diff --git a/setup.py b/setup.py index 6837717dcdab4101f3dcf8fab9c2134905f18ca8..a9b0eee30c2fcd9557119c97aa1b77f7539643ec 100644 --- a/setup.py +++ b/setup.py @@ -2,8 +2,10 @@ # coding=UTF-8 from setuptools import setup, find_packages +from coolamqp import __version__ setup(keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], + version=__version__, packages=find_packages(include=['coolamqp', 'coolamqp.*']), long_description=u'''Pure Python AMQP client, but with dynamic class generation and memoryviews FOR THE GODSPEED.