diff --git a/.coveragerc b/.coveragerc index ed2e4d866871594e4d0f191d4ff4e67351b65725..f24350cc6d75e2246462bc73bdf1e64e9bcb0e6d 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,4 +1,7 @@ [run] branch=1 include=coolamqp/* -omit=tests/* +omit= + tests/* + coolamqp/framing/definitions.py + diff --git a/.gitignore b/.gitignore index 5f7e7f26b6fafb8f247e7a158f87386472cfc443..59d7707e0826078e2efb60980c3ef641397df0dd 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ __pycache__/ # C extensions *.so +coolamqp/framing/definitions.py .pycharm_helpers/ # Distribution / packaging .Python diff --git a/coolamqp/attaches/__init__.py b/coolamqp/attaches/__init__.py index f13c492ef7a368f9ef406e99430f838306200aa6..7c8a8a569499f9c85156f85a466f0faca47bc984 100644 --- a/coolamqp/attaches/__init__.py +++ b/coolamqp/attaches/__init__.py @@ -1,5 +1,6 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function + """ Attaches are components that attach to an coolamqp.uplink.Connection and perform some duties These duties almost always require allocating a channel. A base class - Channeler - is provided to faciliate that. @@ -15,4 +16,4 @@ EVERYTHING HERE IS CALLED BY LISTENER THREAD UNLESS STATED OTHERWISE. from coolamqp.attaches.consumer import Consumer, BodyReceiveMode from coolamqp.attaches.publisher import Publisher from coolamqp.attaches.agroup import AttacheGroup -from coolamqp.attaches.declarer import Declarer \ No newline at end of file +from coolamqp.attaches.declarer import Declarer diff --git a/coolamqp/attaches/agroup.py b/coolamqp/attaches/agroup.py index 97838abbe35d1b0d079852c3c4ed9613639a1b05..1433f7f5363b29f1f9043670763c61499b58e8e6 100644 --- a/coolamqp/attaches/agroup.py +++ b/coolamqp/attaches/agroup.py @@ -5,9 +5,8 @@ This is an attache that attaches multiple attaches. It evicts cancelled attaches. """ from __future__ import print_function, absolute_import, division -import six + import logging -import weakref logger = logging.getLogger(__name__) diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index 4c8f30c63696925b332356b38144fdf08131b75a..bf712755743c3da5c09e780b9a79219ef718624b 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -4,15 +4,14 @@ Base class for consumer or publisher with the capabiility to set up and tear down channels """ from __future__ import print_function, absolute_import, division -import six -from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, AMQPHeaderFrame -from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, BasicConsume, \ - BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ - QueueBind, QueueBindOk, ChannelClose, ChannelCloseOk, BasicCancel, BasicDeliver, \ - BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED, BasicCancelOk -from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch + import logging +from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, \ + ChannelClose, ChannelCloseOk, BasicCancel, \ + BasicCancelOk +from coolamqp.framing.frames import AMQPMethodFrame + ST_OFFLINE = 0 # Consumer is *not* consuming, no setup attempts are being made ST_SYNCING = 1 # A process targeted at consuming has been started ST_ONLINE = 2 # Consumer is declared all right @@ -136,7 +135,8 @@ class Channeler(Attache): self.on_operational(False) self.state = ST_OFFLINE - if not isinstance(payload, (ChannelClose, ChannelCloseOk)) and (payload is not None): + if not isinstance(payload, (ChannelClose, ChannelCloseOk)) and ( + payload is not None): # I do not know how to handle that! return @@ -157,7 +157,8 @@ class Channeler(Attache): self.channel_id = None if isinstance(payload, ChannelClose): - logger.debug('Channel closed: %s %s', payload.reply_code, payload.reply_text.tobytes()) + logger.debug('Channel closed: %s %s', payload.reply_code, + payload.reply_text.tobytes()) def methods(self, payloads): """ @@ -171,7 +172,8 @@ class Channeler(Attache): if self.channel_id is None: return # advanced teardown xD - frames = [AMQPMethodFrame(self.channel_id, payload) for payload in payloads] + frames = [AMQPMethodFrame(self.channel_id, payload) for payload in + payloads] self.connection.send(frames) def method(self, payload): @@ -182,7 +184,8 @@ class Channeler(Attache): """ self.methods([payload]) - def method_and_watch(self, method_payload, method_classes_to_watch, callable): + def method_and_watch(self, method_payload, method_classes_to_watch, + callable): """ Syntactic sugar for @@ -192,7 +195,8 @@ class Channeler(Attache): callable) """ assert self.channel_id is not None - self.connection.method_and_watch(self.channel_id, method_payload, method_classes_to_watch, callable) + self.connection.method_and_watch(self.channel_id, method_payload, + method_classes_to_watch, callable) def on_setup(self, payload): """ @@ -212,7 +216,8 @@ class Channeler(Attache): To be called by on_close, when it needs to be notified just one more time. """ - self.connection.watch_for_method(self.channel_id, (ChannelClose, ChannelCloseOk, BasicCancel, BasicCancelOk), + self.connection.watch_for_method(self.channel_id, ( + ChannelClose, ChannelCloseOk, BasicCancel, BasicCancelOk), self.on_close, on_fail=self.on_close) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 32085bdfb53955dda7ae34e29f72752b2b4b84d7..5deb6bb99cae36061922644f2c7b2e5f31f78eb6 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -1,20 +1,22 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -import six + import io import logging import uuid -import warnings -from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame + +from concurrent.futures import Future + +from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE +from coolamqp.exceptions import AMQPError from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ - BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ + BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, \ + ExchangeDeclareOk, \ QueueBind, QueueBindOk, ChannelClose, BasicDeliver, BasicCancel, \ BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, BasicQosOk -from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch -from concurrent.futures import Future +from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame from coolamqp.objects import Callable -from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE -from coolamqp.exceptions import AMQPError +from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch logger = logging.getLogger(__name__) @@ -73,7 +75,8 @@ class Consumer(Channeler): """ - def __init__(self, queue, on_message, no_ack=True, qos=None, cancel_on_failure=False, + def __init__(self, queue, on_message, no_ack=True, qos=None, + cancel_on_failure=False, future_to_notify=None, fail_on_first_time_resource_locked=False, body_receive_mode=BodyReceiveMode.BYTES @@ -137,8 +140,10 @@ class Consumer(Channeler): self.consumer_tag = None - self.on_cancel = Callable(oneshots=True) #: public, called on cancel for any reason - self.on_broker_cancel = Callable(oneshots=True) #: public, called on Customer Cancel Notification (RabbitMQ) + self.on_cancel = Callable( + oneshots=True) #: public, called on cancel for any reason + self.on_broker_cancel = Callable( + oneshots=True) #: public, called on Customer Cancel Notification (RabbitMQ) def set_qos(self, prefetch_size, prefetch_count): """ @@ -212,7 +217,8 @@ class Consumer(Channeler): """ if self.cancel_on_failure and (not self.cancelled): - logger.debug('Consumer is cancel_on_failure and failure seen, True->cancelled') + logger.debug( + 'Consumer is cancel_on_failure and failure seen, True->cancelled') self.cancelled = True self.on_cancel() @@ -231,7 +237,8 @@ class Consumer(Channeler): # on_close is a one_shot watch. We need to re-register it now. self.register_on_close_watch() - self.methods([BasicCancelOk(payload.consumer_tag), ChannelClose(0, b'Received basic.cancel', 0, 0)]) + self.methods([BasicCancelOk(payload.consumer_tag), + ChannelClose(0, b'Received basic.cancel', 0, 0)]) self.cancelled = True # wasn't I? self.on_cancel() self.on_broker_cancel() @@ -268,7 +275,8 @@ class Consumer(Channeler): old_con = self.connection - super(Consumer, self).on_close(payload) # this None's self.connection and returns port + super(Consumer, self).on_close( + payload) # this None's self.connection and returns port self.fail_on_first_time_resource_locked = False if self.future_to_notify_on_dead: # notify it was cancelled @@ -351,7 +359,8 @@ class Consumer(Channeler): if self.queue.exchange is not None: self.method_and_watch( QueueBind( - self.queue.name, self.queue.exchange.name.encode('utf8'), + self.queue.name, + self.queue.exchange.name.encode('utf8'), b'', False, []), QueueBindOk, self.on_setup @@ -369,10 +378,12 @@ class Consumer(Channeler): else: self.on_setup(BasicQosOk()) # pretend QoS went ok elif isinstance(payload, BasicQosOk): - self.consumer_tag = uuid.uuid4().hex.encode('utf8') # str in py2, unicode in py3 + self.consumer_tag = uuid.uuid4().hex.encode( + 'utf8') # str in py2, unicode in py3 self.method_and_watch( BasicConsume(self.queue.name, self.consumer_tag, - False, self.no_ack, self.queue.exclusive, False, []), + False, self.no_ack, self.queue.exclusive, False, + []), BasicConsumeOk, self.on_setup ) @@ -387,7 +398,8 @@ class Consumer(Channeler): self.connection.watch(self.hb_watch) # multi-shot watches need manual cleanup! - self.deliver_watch = MethodWatch(self.channel_id, BasicDeliver, self.on_delivery) + self.deliver_watch = MethodWatch(self.channel_id, BasicDeliver, + self.on_delivery) self.deliver_watch.oneshot = False self.connection.watch(self.deliver_watch) @@ -544,8 +556,10 @@ class MessageReceiver(object): self.bdeliver.routing_key, self.header.properties, self.bdeliver.delivery_tag, - None if self.consumer.no_ack else self.confirm(self.bdeliver.delivery_tag, True), - None if self.consumer.no_ack else self.confirm(self.bdeliver.delivery_tag, False), + None if self.consumer.no_ack else self.confirm( + self.bdeliver.delivery_tag, True), + None if self.consumer.no_ack else self.confirm( + self.bdeliver.delivery_tag, False), ) self.consumer.on_message(rm) diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index ba254448a71d4f129dcc5481bbf45e98f6d5f8f7..112a402da59e53623d83bc95e64d4d1cdae8b638 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -3,16 +3,19 @@ queue.declare, exchange.declare and that shit """ from __future__ import print_function, absolute_import, division -import six + import collections import logging -from coolamqp.framing.definitions import ChannelOpenOk, ExchangeDeclare, ExchangeDeclareOk, QueueDeclare, \ - QueueDeclareOk, ChannelClose, QueueDelete, QueueDeleteOk -from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE -from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, Synchronized + from concurrent.futures import Future -from coolamqp.objects import Exchange, Queue, Callable + +from coolamqp.attaches.channeler import Channeler, ST_ONLINE +from coolamqp.attaches.utils import Synchronized from coolamqp.exceptions import AMQPError, ConnectionDead +from coolamqp.framing.definitions import ChannelOpenOk, ExchangeDeclare, \ + ExchangeDeclareOk, QueueDeclare, \ + QueueDeclareOk, ChannelClose, QueueDelete, QueueDeleteOk +from coolamqp.objects import Exchange, Queue, Callable logger = logging.getLogger(__name__) @@ -46,13 +49,16 @@ class Operation(object): """Attempt to perform this op.""" obj = self.obj if isinstance(obj, Exchange): - self.declarer.method_and_watch(ExchangeDeclare(self.obj.name.encode('utf8'), obj.type, False, obj.durable, - obj.auto_delete, False, False, []), - (ExchangeDeclareOk, ChannelClose), - self._callback) + self.declarer.method_and_watch( + ExchangeDeclare(self.obj.name.encode('utf8'), obj.type, False, + obj.durable, + obj.auto_delete, False, False, []), + (ExchangeDeclareOk, ChannelClose), + self._callback) elif isinstance(obj, Queue): self.declarer.method_and_watch( - QueueDeclare(obj.name, False, obj.durable, obj.exclusive, obj.auto_delete, False, []), + QueueDeclare(obj.name, False, obj.durable, obj.exclusive, + obj.auto_delete, False, []), (QueueDeclareOk, ChannelClose), self._callback) @@ -66,7 +72,8 @@ class Operation(object): else: # something that had no Future failed. Is it in declared? if self.obj in self.declarer.declared: - self.declarer.declared.remove(self.obj) # todo access not threadsafe + self.declarer.declared.remove( + self.obj) # todo access not threadsafe self.declarer.on_discard(self.obj) else: if self.fut is not None: @@ -83,9 +90,10 @@ class DeleteQueue(Operation): queue = self.obj print('bang') - self.declarer.method_and_watch(QueueDelete(queue.name, False, False, False), - (QueueDeleteOk, ChannelClose), - self._callback) + self.declarer.method_and_watch( + QueueDelete(queue.name, False, False, False), + (QueueDeleteOk, ChannelClose), + self._callback) def _callback(self, payload): print('got', payload) @@ -230,7 +238,8 @@ class Declarer(Channeler, Synchronized): To be called when it's possible that something can be done """ - if (self.state != ST_ONLINE) or len(self.left_to_declare) == 0 or (self.in_process is not None): + if (self.state != ST_ONLINE) or len(self.left_to_declare) == 0 or ( + self.in_process is not None): return self.in_process = self.left_to_declare.popleft() diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 82d0390e207a0a506e5a3032711d2b367efa0013..9915dbaf381c711cf8395f303d22b44accc46aa9 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -12,22 +12,26 @@ from __future__ import absolute_import, division, print_function import collections import logging -import struct -import six import warnings -from coolamqp.framing.definitions import ChannelOpenOk, BasicPublish, Basic, BasicAck -from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, AMQPHeaderFrame +import six + +from coolamqp.framing.definitions import ChannelOpenOk, BasicPublish, Basic, \ + BasicAck +from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, \ + AMQPHeaderFrame try: # these extensions will be available - from coolamqp.framing.definitions import ConfirmSelect, ConfirmSelectOk, BasicNack + from coolamqp.framing.definitions import ConfirmSelect, ConfirmSelectOk, \ + BasicNack except ImportError: pass from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.uplink import PUBLISHER_CONFIRMS, MethodWatch, FailWatch -from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, Synchronized +from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, \ + Synchronized from concurrent.futures import Future from coolamqp.objects import Exchange @@ -35,8 +39,9 @@ from coolamqp.objects import Exchange logger = logging.getLogger(__name__) # for holding messages when MODE_CNPUB and link is down -CnpubMessageSendOrder = collections.namedtuple('CnpubMessageSendOrder', ('message', 'exchange_name', - 'routing_key', 'future')) +CnpubMessageSendOrder = collections.namedtuple('CnpubMessageSendOrder', + ('message', 'exchange_name', + 'routing_key', 'future')) # todo what if publisher in MODE_CNPUB fails mid message? they dont seem to be recovered @@ -126,8 +131,11 @@ class Publisher(Channeler, Synchronized): body = body[max_body_size:] self.connection.send([ - AMQPMethodFrame(self.channel_id, BasicPublish(exchange_name, routing_key, False, False)), - AMQPHeaderFrame(self.channel_id, Basic.INDEX, 0, len(message.body), message.properties) + AMQPMethodFrame(self.channel_id, + BasicPublish(exchange_name, routing_key, False, + False)), + AMQPHeaderFrame(self.channel_id, Basic.INDEX, 0, len(message.body), + message.properties) ]) # todo optimize it - if there's only one frame it can with previous send @@ -148,13 +156,14 @@ class Publisher(Channeler, Synchronized): try: msg, xchg, rk, fut = self.messages.popleft() except IndexError: - #todo see docs/casefile-0001 + # todo see docs/casefile-0001 break if not fut.set_running_or_notify_cancel(): continue # cancelled - self.tagger.deposit(self.tagger.get_key(), FutureConfirmableRejectable(fut)) + self.tagger.deposit(self.tagger.get_key(), + FutureConfirmableRejectable(fut)) assert isinstance(xchg, (six.binary_type, six.text_type)) self._pub(msg, xchg, rk) @@ -203,7 +212,8 @@ class Publisher(Channeler, Synchronized): if self.mode == Publisher.MODE_NOACK: # If we are not connected right now, drop the message on the floor and log it with DEBUG if self.state != ST_ONLINE: - logger.debug(u'Publish request, but not connected - dropping the message') + logger.debug( + u'Publish request, but not connected - dropping the message') else: self._pub(message, exchange, routing_key) @@ -223,7 +233,8 @@ class Publisher(Channeler, Synchronized): def on_operational(self, operational): state = {True: u'up', False: u'down'}[operational] - mode = {Publisher.MODE_NOACK: u'noack', Publisher.MODE_CNPUB: u'cnpub'}[self.mode] + mode = {Publisher.MODE_NOACK: u'noack', Publisher.MODE_CNPUB: u'cnpub'}[ + self.mode] logger.info('Publisher %s is %s', mode, state) @@ -232,8 +243,9 @@ class Publisher(Channeler, Synchronized): # Assert that mode is OK if self.mode == Publisher.MODE_CNPUB: if PUBLISHER_CONFIRMS not in self.connection.extensions: - warnings.warn(u'Broker does not support publisher_confirms, refusing to start publisher', - RuntimeWarning) + warnings.warn( + u'Broker does not support publisher_confirms, refusing to start publisher', + RuntimeWarning) self.state = ST_OFFLINE self.critically_failed = True return @@ -245,7 +257,8 @@ class Publisher(Channeler, Synchronized): # the functionality. if self.mode == Publisher.MODE_CNPUB: - self.method_and_watch(ConfirmSelect(False), ConfirmSelectOk, self.on_setup) + self.method_and_watch(ConfirmSelect(False), ConfirmSelectOk, + self.on_setup) elif self.mode == Publisher.MODE_NOACK: # A-OK! Boot it. self.state = ST_ONLINE @@ -263,7 +276,8 @@ class Publisher(Channeler, Synchronized): # now we need to listen for BasicAck and BasicNack - mw = MethodWatch(self.channel_id, (BasicAck, BasicNack), self._on_cnpub_delivery) + mw = MethodWatch(self.channel_id, (BasicAck, BasicNack), + self._on_cnpub_delivery) mw.oneshot = False self.connection.watch(mw) self._mode_cnpub_process_deliveries() diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py index a7e1bee88ee9eb7165ae84ad70d20e520565aa0c..35c915f72d6c7f3290cb43fdf9a8b47c9440ba99 100644 --- a/coolamqp/attaches/utils.py +++ b/coolamqp/attaches/utils.py @@ -1,9 +1,9 @@ # coding=UTF-8 from __future__ import print_function, absolute_import, division -import six + +import functools import logging import threading -import functools logger = logging.getLogger(__name__) @@ -112,7 +112,8 @@ class AtomicTagger(object): i = len(self.tags) - 1 # start index while i > 0: # this will terminate at i=0 - if self.tags[i][0] > tag: # this means we should insert it here... + if self.tags[i][ + 0] > tag: # this means we should insert it here... break i -= 1 # previousl index diff --git a/coolamqp/clustering/__init__.py b/coolamqp/clustering/__init__.py index ca17e3262079602596775eb6f515e5c50bbab2ed..3258579bd1f57b9093aa28746155b5e2b07ea0f0 100644 --- a/coolamqp/clustering/__init__.py +++ b/coolamqp/clustering/__init__.py @@ -12,4 +12,5 @@ logger = logging.getLogger(__name__) __all__ = ('Cluster') from coolamqp.clustering.cluster import Cluster -from coolamqp.clustering.events import MessageReceived, NothingMuch, ConnectionLost +from coolamqp.clustering.events import MessageReceived, NothingMuch, \ + ConnectionLost diff --git a/coolamqp/clustering/events.py b/coolamqp/clustering/events.py index d5fa9ef62ed0e4f612317a256a42e5437132abfa..0e3894251f578b8c221094e6967bef9897358cec 100644 --- a/coolamqp/clustering/events.py +++ b/coolamqp/clustering/events.py @@ -5,10 +5,9 @@ Cluster will emit Events. They mean that something, like, happened. """ from __future__ import print_function, absolute_import, division -import six -import time + import logging -import monotonic + from coolamqp.objects import ReceivedMessage logger = logging.getLogger(__name__) @@ -44,6 +43,8 @@ class MessageReceived(ReceivedMessage, Event): def __init__(self, msg): """:type msg: ReceivedMessage""" - ReceivedMessage.__init__(self, msg.body, msg.exchange_name, msg.routing_key, - properties=msg.properties, delivery_tag=msg.delivery_tag, + ReceivedMessage.__init__(self, msg.body, msg.exchange_name, + msg.routing_key, + properties=msg.properties, + delivery_tag=msg.delivery_tag, ack=msg.ack, nack=msg.nack) diff --git a/coolamqp/exceptions.py b/coolamqp/exceptions.py index be3bdedda8e39d40ec6ada06f5451bea82e5d1f8..18388eefba4b3c0df7b6df0065c73993606bf8c5 100644 --- a/coolamqp/exceptions.py +++ b/coolamqp/exceptions.py @@ -1,10 +1,7 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -import six -from coolamqp.framing.definitions import HARD_ERRORS, SOFT_ERRORS, CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, \ - SYNTAX_ERROR, COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, \ - INTERNAL_ERROR, CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, RESOURCE_LOCKED, PRECONDITION_FAILED +from coolamqp.framing.definitions import HARD_ERRORS class CoolAMQPError(Exception): diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index 31753c93c5e9e1bc99f6792c1709a492ff626b61..75f74c34614123d9ad76a4daec7043244f17edba 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -2,8 +2,6 @@ from __future__ import absolute_import, division, print_function import logging -import six -import struct logger = logging.getLogger(__name__) @@ -14,10 +12,14 @@ BASIC_TYPES = {u'bit': (None, None, "0", None), # special case u'octet': (1, 'B', "b'\\x00'", 1), u'short': (2, 'H', "b'\\x00\\x00'", 2), u'long': (4, 'I', "b'\\x00\\x00\\x00\\x00'", 4), - u'longlong': (8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), - u'timestamp': (8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), - u'table': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case - u'longstr': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case + u'longlong': ( + 8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), + u'timestamp': ( + 8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), + u'table': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), + # special case + u'longstr': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), + # special case u'shortstr': (None, None, "b'\\x00'", 1), # special case } @@ -37,7 +39,8 @@ class AMQPFrame(object): # base class for framing This writes type and channel ID. """ # DO NOT UNCOMMENT buf.write(struct.pack('!BH', self.FRAME_TYPE, self.channel)) - raise NotImplementedError('Please write the frame type and channel in child classes, its faster that way ') + raise NotImplementedError( + 'Please write the frame type and channel in child classes, its faster that way ') @staticmethod def unserialize(channel, payload_as_buffer): @@ -151,7 +154,8 @@ class AMQPMethodPayload(AMQPPayload): :return: int, size of argument section """ if self.IS_CONTENT_STATIC: - return len(self.STATIC_CONTENT) - 4 - 4 - 1 # minus length, class, method, frame_end + return len( + self.STATIC_CONTENT) - 4 - 4 - 1 # minus length, class, method, frame_end raise NotImplementedError() diff --git a/coolamqp/framing/compilation/compile_definitions.py b/coolamqp/framing/compilation/compile_definitions.py index 62a957d53a2ba88897478d87fae438cd1cd6f008..426c5775217432a33f022e0031cf05a047eaa2ec 100644 --- a/coolamqp/framing/compilation/compile_definitions.py +++ b/coolamqp/framing/compilation/compile_definitions.py @@ -7,8 +7,10 @@ from xml.etree import ElementTree import six -from coolamqp.framing.compilation.utilities import get_constants, get_classes, get_domains, \ - name_class, format_method_class_name, format_field_name, ffmt, to_docstring, pythonify_name, to_code_binary, \ +from coolamqp.framing.compilation.utilities import get_constants, get_classes, \ + get_domains, \ + name_class, format_method_class_name, format_field_name, ffmt, to_docstring, \ + pythonify_name, to_code_binary, \ frepr, get_size TYPE_TRANSLATOR = { @@ -24,7 +26,8 @@ TYPE_TRANSLATOR = { } -def compile_definitions(xml_file='resources/amqp0-9-1.extended.xml', out_file='coolamqp/framing/definitions.py'): +def compile_definitions(xml_file='resources/amqp0-9-1.extended.xml', + out_file='coolamqp/framing/definitions.py'): """parse resources/amqp-0-9-1.xml into """ xml = ElementTree.parse(xml_file) @@ -99,13 +102,15 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved con_classes[constant.kind].append(pythonify_name(constant.name)) for constant_kind, constants in con_classes.items(): - line('\n%sS = [%s]', pythonify_name(constant_kind), u', '.join(constants)) + line('\n%sS = [%s]', pythonify_name(constant_kind), + u', '.join(constants)) # get domains domain_to_basic_type = {} line('\n\n\nDOMAIN_TO_BASIC_TYPE = {\n') for domain in get_domains(xml): - line(u' %s: %s,\n', frepr(domain.name), frepr(None if domain.elementary else domain.type)) + line(u' %s: %s,\n', frepr(domain.name), + frepr(None if domain.elementary else domain.type)) domain_to_basic_type[domain.name] = domain.type line('}\n') @@ -119,7 +124,9 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved # Output classes for cls in get_classes(xml): - cls = cls._replace(properties=[p._replace(basic_type=domain_to_basic_type[p.type]) for p in cls.properties]) + cls = cls._replace( + properties=[p._replace(basic_type=domain_to_basic_type[p.type]) for + p in cls.properties]) line('''\nclass %s(AMQPClass): """ @@ -129,10 +136,12 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved INDEX = %s ''', - name_class(cls.name), to_docstring(None, cls.docs), frepr(cls.name), cls.index) + name_class(cls.name), to_docstring(None, cls.docs), + frepr(cls.name), cls.index) if len(cls.properties) > 0: - class_id_to_contentpropertylist[cls.index] = name_class(cls.name) + 'ContentPropertyList' + class_id_to_contentpropertylist[cls.index] = name_class( + cls.name) + 'ContentPropertyList' line('''\nclass %sContentPropertyList(AMQPContentPropertyList): """ @@ -141,15 +150,19 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved FIELDS = [ ''', - name_class(cls.name), to_docstring(None, cls.docs), frepr(cls.name), cls.index, name_class(cls.name)) + name_class(cls.name), to_docstring(None, cls.docs), + frepr(cls.name), cls.index, name_class(cls.name)) - is_static = all(property.basic_type not in ('table', 'longstr', 'shortstr') for property in cls.properties) + is_static = all( + property.basic_type not in ('table', 'longstr', 'shortstr') for + property in cls.properties) for property in cls.properties: if property.basic_type == 'bit': raise ValueError('bit properties are not supported!' ) - line(' Field(%s, %s, %s, %s),\n', frepr(property.name), frepr(property.type), + line(' Field(%s, %s, %s, %s),\n', frepr(property.name), + frepr(property.type), frepr(property.basic_type), repr(property.reserved)) line(''' ] # A dictionary from a zero property list to a class typized with @@ -159,7 +172,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved name_class(cls.name)) if any(prop.basic_type == 'bit' for prop in cls.properties): - raise NotImplementedError('I should emit a custom zero_property_list staticmethod :(') + raise NotImplementedError( + 'I should emit a custom zero_property_list staticmethod :(') line(u''' def __new__(self, **kwargs): """ Return a property list. @@ -168,8 +182,10 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved my_props = [prop for prop in cls.properties if (not prop.reserved)] for property in my_props: - line(' :param %s: %s\n', format_field_name(property.name), property.label) - line(' :type %s: %s (AMQP as %s)\n', format_field_name(property.name), + line(' :param %s: %s\n', + format_field_name(property.name), property.label) + line(' :type %s: %s (AMQP as %s)\n', + format_field_name(property.name), TYPE_TRANSLATOR[property.basic_type], property.basic_type) line(' """\n') zpf_len = int(math.ceil(len(cls.properties) // 15)) @@ -187,14 +203,16 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved if field.reserved or field.basic_type == 'bit': pass # zero anyway else: - byte_chunk.append(u"(('%s' in kwargs) << %s)" % (format_field_name(field.name), piece_index)) + byte_chunk.append(u"(('%s' in kwargs) << %s)" % ( + format_field_name(field.name), piece_index)) piece_index -= 1 else: if first_byte: if field.reserved or field.basic_type == 'bit': pass # zero anyway else: - byte_chunk.append(u"int('%s' in kwargs)" % (format_field_name(field.name),)) + byte_chunk.append(u"int('%s' in kwargs)" % ( + format_field_name(field.name),)) else: # this is the "do we need moar flags" section byte_chunk.append(u"kwargs['%s']" % ( @@ -209,7 +227,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved fields_remaining -= 1 if len(byte_chunk) > 0: - line(u' %s\n', u' | '.join(byte_chunk)) # We did not finish + line(u' %s\n', + u' | '.join(byte_chunk)) # We did not finish line(u' ])\n zpf = six.binary_type(zpf)\n') line(u''' @@ -237,7 +256,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved c = compile_particular_content_property_list_class(zpf, %s.FIELDS) %s.PARTICULAR_CLASSES[zpf] = c return c(**kwargs) -'''.replace('%s', name_class(cls.name) + 'ContentPropertyList').replace('%d', '%s')) +'''.replace('%s', name_class(cls.name) + 'ContentPropertyList').replace('%d', + '%s')) line(u''' @staticmethod @@ -256,14 +276,16 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved if field.reserved or field.basic_type == 'bit': pass # zero else: - byte_chunk.append(u"(('%s' in fields) << %s)" % (format_field_name(field.name), piece_index)) + byte_chunk.append(u"(('%s' in fields) << %s)" % ( + format_field_name(field.name), piece_index)) piece_index -= 1 else: if first_byte: if field.reserved or field.basic_type == 'bit': pass # zero else: - byte_chunk.append(u"int('%s' in kwargs)" % (format_field_name(field.name),)) + byte_chunk.append(u"int('%s' in kwargs)" % ( + format_field_name(field.name),)) else: # this is the "do we need moar flags" section byte_chunk.append(u"kwargs['%s']" % ( @@ -278,7 +300,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved fields_remaining -= 1 if len(byte_chunk) > 0: - line(u' %s\n', u' | '.join(byte_chunk)) # We did not finish + line(u' %s\n', + u' | '.join(byte_chunk)) # We did not finish line(u''' ]) zpf = six.binary_type(zpf) @@ -289,7 +312,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved c = compile_particular_content_property_list_class(zpf, %s.FIELDS) %s.PARTICULAR_CLASSES[zpf] = c return c -'''.replace("%s", name_class(cls.name) + 'ContentPropertyList').replace('%d', '%s')) +'''.replace("%s", name_class(cls.name) + 'ContentPropertyList').replace('%d', + '%s')) line(u''' @staticmethod @@ -315,27 +339,35 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved %s.PARTICULAR_CLASSES[zpf] = c return c.from_buffer(buf, offset) -'''.replace('%s', name_class(cls.name) + 'ContentPropertyList').replace("%d", "%s")) +'''.replace('%s', name_class(cls.name) + 'ContentPropertyList').replace("%d", + "%s")) # ============================================ Do methods for this class for method in cls.methods: - full_class_name = u'%s%s' % (name_class(cls.name), format_method_class_name(method.name)) + full_class_name = u'%s%s' % ( + name_class(cls.name), format_method_class_name(method.name)) # annotate types - method.fields = [field._replace(basic_type=domain_to_basic_type[field.type]) for field in method.fields] + method.fields = [ + field._replace(basic_type=domain_to_basic_type[field.type]) for + field in method.fields] - non_reserved_fields = [field for field in method.fields if not field.reserved] + non_reserved_fields = [field for field in method.fields if + not field.reserved] is_static = method.is_static() if is_static: static_size = get_size(method.fields) - is_content_static = len([f for f in method.fields if not f.reserved]) == 0 + is_content_static = len( + [f for f in method.fields if not f.reserved]) == 0 if len(non_reserved_fields) == 0: slots = u'' else: - slots = (u', '.join(map(lambda f: frepr(format_field_name(f.name)), non_reserved_fields))) + u', ' + slots = (u', '.join( + map(lambda f: frepr(format_field_name(f.name)), + non_reserved_fields))) + u', ' line('''\nclass %s(AMQPMethodPayload): """ @@ -366,19 +398,24 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved repr(is_content_static) ) - _namify = lambda x: name_class(cls.name) + format_method_class_name(x) + _namify = lambda x: name_class(cls.name) + format_method_class_name( + x) methods_that_are_replies_for[full_class_name] = [] for response in method.response: - methods_that_are_reply_reasons_for[_namify(response)] = full_class_name - methods_that_are_replies_for[full_class_name].append(_namify(response)) + methods_that_are_reply_reasons_for[ + _namify(response)] = full_class_name + methods_that_are_replies_for[full_class_name].append( + _namify(response)) if is_content_static: line(''' STATIC_CONTENT = %s # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END ''', - to_code_binary(struct.pack('!LHH', static_size + 4, cls.index, method.index) + \ - method.get_static_body() + \ - struct.pack('!B', FRAME_END))) + to_code_binary( + struct.pack('!LHH', static_size + 4, cls.index, + method.index) + \ + method.get_static_body() + \ + struct.pack('!B', FRAME_END))) # fields if len(method.fields) > 0: @@ -386,7 +423,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved line(' FIELDS = [ \n') for field in method.fields: - line(' Field(%s, %s, %s, reserved=%s),\n', frepr(field.name), frepr(field.type), + line(' Field(%s, %s, %s, reserved=%s),\n', + frepr(field.name), frepr(field.type), frepr(field.basic_type), repr(field.reserved)) line(' ]\n') @@ -396,7 +434,9 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved """ Create frame %s ''', - u', '.join(['self'] + [format_field_name(field.name) for field in non_reserved_fields]), + u', '.join( + ['self'] + [format_field_name(field.name) for field in + non_reserved_fields]), cls.name + '.' + method.name, ) @@ -405,23 +445,28 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved for field in non_reserved_fields: if (field.label is not None) or (field.docs is not None): - line(' :param %s: %s\n', format_field_name(field.name), - to_docstring(field.label, field.docs, prefix=12, blank=False)) + line(' :param %s: %s\n', + format_field_name(field.name), + to_docstring(field.label, field.docs, prefix=12, + blank=False)) - line(' :type %s: %s (%s in AMQP)\n', format_field_name(field.name), + line(' :type %s: %s (%s in AMQP)\n', + format_field_name(field.name), TYPE_TRANSLATOR[field.basic_type], field.type) line(' """\n') for field in non_reserved_fields: - line(' self.%s = %s\n', format_field_name(field.name), format_field_name(field.name)) + line(' self.%s = %s\n', format_field_name(field.name), + format_field_name(field.name)) if len(non_reserved_fields) == 0: line('\n') # end if not is_content_static: - from coolamqp.framing.compilation.textcode_fields import get_serializer, get_counter, get_from_buffer + from coolamqp.framing.compilation.textcode_fields import \ + get_serializer, get_counter, get_from_buffer line('\n def write_arguments(self, buf):\n') line(get_serializer(method.fields, 'self.', 2)) @@ -433,11 +478,14 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved offset = start_offset ''') - line(get_from_buffer(method.fields, '', 2, remark=(method.name == 'deliver'))) + line(get_from_buffer(method.fields, '', 2, + remark=(method.name == 'deliver'))) line(" return %s(%s)", full_class_name, - u', '.join(format_field_name(field.name) for field in method.fields if not field.reserved)) + u', '.join( + format_field_name(field.name) for field in method.fields if + not field.reserved)) line('\n\n') diff --git a/coolamqp/framing/compilation/content_property.py b/coolamqp/framing/compilation/content_property.py index d533a9c60bdc71b5d0e77fdff64fc12053d44891..93a8cfdaca3d6e82f60fc2e90709e68174d41089 100644 --- a/coolamqp/framing/compilation/content_property.py +++ b/coolamqp/framing/compilation/content_property.py @@ -3,11 +3,9 @@ from __future__ import absolute_import, division, print_function """Generate serializers/unserializers/length getters for given property_flags""" import six -import struct import logging -from coolamqp.framing.compilation.textcode_fields import get_counter, get_from_buffer, get_serializer -from coolamqp.framing.base import AMQPContentPropertyList -from coolamqp.framing.field_table import enframe_table, deframe_table, frame_table_size +from coolamqp.framing.compilation.textcode_fields import get_counter, \ + get_from_buffer, get_serializer logger = logging.getLogger(__name__) @@ -40,7 +38,8 @@ def _compile_particular_content_property_list_class(zpf, fields): zpf_length = len(zpf) # 1 here does not mean that field is present. All bit fields are present, but 0 in a ZPF. Fix this. - zpf_bits = [zpf_bit or field.type == 'bit' for zpf_bit, field in zip(zpf_bits, fields)] + zpf_bits = [zpf_bit or field.type == 'bit' for zpf_bit, field in + zip(zpf_bits, fields)] mod = [u'''class ParticularContentTypeList(AMQPContentPropertyList): """ @@ -48,7 +47,8 @@ def _compile_particular_content_property_list_class(zpf, fields): '''] for field in fields: - mod.append(u' * %s::%s' % (format_field_name(field.name), field.type)) + mod.append( + u' * %s::%s' % (format_field_name(field.name), field.type)) if field.reserved: mod.append(u' (reserved)') mod.append(u'\n') @@ -57,7 +57,8 @@ def _compile_particular_content_property_list_class(zpf, fields): if not x.startswith('b'): x = 'b' + x - present_fields = [field for field, present in zip(fields, zpf_bits) if present] + present_fields = [field for field, present in zip(fields, zpf_bits) if + present] mod.append(u''' """ @@ -66,7 +67,9 @@ def _compile_particular_content_property_list_class(zpf, fields): if len(present_fields) == 0: slots = u'' else: - slots = (u', '.join((u"u'%s'" % format_field_name(field.name) for field in present_fields))) + u', ' + slots = (u', '.join( + (u"u'%s'" % format_field_name(field.name) for field in + present_fields))) + u', ' mod.append(u''' __slots__ = (%s) @@ -83,7 +86,8 @@ def _compile_particular_content_property_list_class(zpf, fields): ''' % (u', '.join(format_field_name(field.name) for field in present_fields))) for field in present_fields: - mod.append(u' self.%s = %s\n'.replace(u'%s', format_field_name(field.name))) + mod.append(u' self.%s = %s\n'.replace(u'%s', format_field_name( + field.name))) # Let's do write_to mod.append(u'\n def write_to(self, buf):\n') @@ -99,16 +103,20 @@ def _compile_particular_content_property_list_class(zpf, fields): # from_buffer # note that non-bit values mod.append(u' @classmethod\n') - mod.append(u' def from_buffer(cls, buf, start_offset):\n offset = start_offset + %s\n' % (zpf_length,)) + mod.append( + u' def from_buffer(cls, buf, start_offset):\n offset = start_offset + %s\n' % ( + zpf_length,)) mod.append(get_from_buffer( present_fields , prefix='', indent_level=2)) mod.append(u' return cls(%s)\n' % - u', '.join(format_field_name(field.name) for field in present_fields)) + u', '.join( + format_field_name(field.name) for field in present_fields)) # get_size mod.append(u'\n def get_size(self):\n') - mod.append(get_counter(present_fields, prefix=u'self.', indent_level=2)[:-1]) # skip eol + mod.append(get_counter(present_fields, prefix=u'self.', indent_level=2)[ + :-1]) # skip eol mod.append(u' + %s\n' % (zpf_length,)) # account for pf length return u''.join(mod) diff --git a/coolamqp/framing/compilation/textcode_fields.py b/coolamqp/framing/compilation/textcode_fields.py index a20969c3ec812dccad2fcb876c8085b5e1d079f3..88d99d9ebe53ec6e88bfd3bff437d34ee32b570b 100644 --- a/coolamqp/framing/compilation/textcode_fields.py +++ b/coolamqp/framing/compilation/textcode_fields.py @@ -16,10 +16,11 @@ a module that has following ok: """ from __future__ import absolute_import, division, print_function + import math -from coolamqp.framing.base import BASIC_TYPES, DYNAMIC_BASIC_TYPES -from coolamqp.framing.compilation.utilities import format_field_name, get_size +from coolamqp.framing.base import BASIC_TYPES +from coolamqp.framing.compilation.utilities import format_field_name def get_counter(fields, prefix=u'', indent_level=2): @@ -64,7 +65,8 @@ def get_counter(fields, prefix=u'', indent_level=2): if bits > 0: # sync bits accumulator += int(math.ceil(bits / 8)) - return (u' ' * indent_level) + u'return ' + (u' + '.join([str(accumulator)] + parts)) + u'\n' + return (u' ' * indent_level) + u'return ' + ( + u' + '.join([str(accumulator)] + parts)) + u'\n' def get_from_buffer(fields, prefix='', indent_level=2, remark=False): @@ -115,7 +117,8 @@ def get_from_buffer(fields, prefix='', indent_level=2, remark=False): return fffnames = [a for a, b in to_struct if a != u'_'] # skip reserved ffffmts = [b for a, b in to_struct] - emit("%s, = struct.unpack_from('!%s', buf, offset)", u', '.join(fffnames), u''.join(ffffmts)) + emit("%s, = struct.unpack_from('!%s', buf, offset)", + u', '.join(fffnames), u''.join(ffffmts)) emit("offset += %s", ln['ln']) ln['ln'] = 0 del to_struct[:] @@ -137,7 +140,8 @@ def get_from_buffer(fields, prefix='', indent_level=2, remark=False): assert len(bits) == 0 if field.reserved: - to_struct.append((u'_', '%sx' % (BASIC_TYPES[field.basic_type][0],))) + to_struct.append( + (u'_', '%sx' % (BASIC_TYPES[field.basic_type][0],))) else: to_struct.append((fieldname, BASIC_TYPES[field.basic_type][1])) @@ -203,12 +207,14 @@ def get_serializer(fields, prefix='', indent_level=2): else: for bit_name, modif in zip(bits, range(8)): if bit_name != 'False': - p.append('(' + bit_name + ' << %s)' % (modif,)) # yes you can << bools + p.append('(' + bit_name + ' << %s)' % ( + modif,)) # yes you can << bools format_args.append(u' | '.join(p)) del bits[:] def emit_single_struct_pack(): - emit("buf.write(struct.pack('!%s', %s))", u''.join(formats), u', '.join(format_args)) + emit("buf.write(struct.pack('!%s', %s))", u''.join(formats), + u', '.join(format_args)) del formats[:] del format_args[:] diff --git a/coolamqp/framing/compilation/utilities.py b/coolamqp/framing/compilation/utilities.py index d332441c90b8f86f9fc81aa0d3f610bd70c29655..dd93515cd22ab58f317ad93ba164210bfc0967a5 100644 --- a/coolamqp/framing/compilation/utilities.py +++ b/coolamqp/framing/compilation/utilities.py @@ -10,16 +10,21 @@ from coolamqp.framing.base import BASIC_TYPES, DYNAMIC_BASIC_TYPES # docs may be None -Constant = namedtuple('Constant', ('name', 'value', 'kind', 'docs')) # kind is AMQP constant class # value is int -Field = namedtuple('Field', ('name', 'type', 'label', 'docs', 'reserved', 'basic_type')) # reserved is bool +Constant = namedtuple('Constant', ( +'name', 'value', 'kind', 'docs')) # kind is AMQP constant class # value is int +Field = namedtuple('Field', ( +'name', 'type', 'label', 'docs', 'reserved', 'basic_type')) # reserved is bool # synchronous is bool, constant is bool # repponse is a list of method.name -Class_ = namedtuple('Class_', ('name', 'index', 'docs', 'methods', 'properties')) # label is int -Domain = namedtuple('Domain', ('name', 'type', 'elementary')) # elementary is bool +Class_ = namedtuple('Class_', ( +'name', 'index', 'docs', 'methods', 'properties')) # label is int +Domain = namedtuple('Domain', + ('name', 'type', 'elementary')) # elementary is bool class Method(object): - def __init__(self, name, synchronous, index, label, docs, fields, response, sent_by_client, sent_by_server): + def __init__(self, name, synchronous, index, label, docs, fields, response, + sent_by_client, sent_by_server): self.name = name self.synchronous = synchronous self.index = index @@ -68,7 +73,8 @@ def get_size(fields): # assume all fields have static length if field.basic_type == 'bit': bits += 1 else: - size += len(BASIC_TYPES[field.basic_type][2]) # default minimum entry + size += len( + BASIC_TYPES[field.basic_type][2]) # default minimum entry else: size += BASIC_TYPES[field.basic_type][0] @@ -100,7 +106,8 @@ def for_domain(elem): def for_field(elem): # for <field> in <method> """Parse method. Return fields""" a = elem.attrib - return Field(six.text_type(a['name']), a['domain'] if 'domain' in a else a['type'], + return Field(six.text_type(a['name']), + a['domain'] if 'domain' in a else a['type'], a.get('label', None), get_docs(elem), a.get('reserved', '0') == '1', @@ -110,29 +117,38 @@ def for_field(elem): # for <field> in <method> def for_method(elem): # for <method> """Parse class, return methods""" a = elem.attrib - return Method(six.text_type(a['name']), bool(int(a.get('synchronous', '0'))), int(a['index']), a.get('label', None), + return Method(six.text_type(a['name']), + bool(int(a.get('synchronous', '0'))), int(a['index']), + a.get('label', None), get_docs(elem), - [for_field(fie) for fie in elem.getchildren() if fie.tag == 'field'], + [for_field(fie) for fie in elem.getchildren() if + fie.tag == 'field'], [e.attrib['name'] for e in elem.findall('response')], # if chassis=server that means server has to accept it - any([e.attrib.get('name', '') == 'server' for e in elem.getchildren() if e.tag == 'chassis']), - any([e.attrib.get('name', '') == 'client' for e in elem.getchildren() if e.tag == 'chassis']) + any([e.attrib.get('name', '') == 'server' for e in + elem.getchildren() if e.tag == 'chassis']), + any([e.attrib.get('name', '') == 'client' for e in + elem.getchildren() if e.tag == 'chassis']) ) def for_class(elem): # for <class> """Parse XML, return classes""" a = elem.attrib - methods = sorted([for_method(me) for me in elem.getchildren() if me.tag == 'method'], - key=lambda m: (m.name.strip('-')[0], -len(m.response))) - return Class_(six.text_type(a['name']), int(a['index']), get_docs(elem) or a['label'], methods, - [for_field(e) for e in elem.getchildren() if e.tag == 'field']) + methods = sorted( + [for_method(me) for me in elem.getchildren() if me.tag == 'method'], + key=lambda m: (m.name.strip('-')[0], -len(m.response))) + return Class_(six.text_type(a['name']), int(a['index']), + get_docs(elem) or a['label'], methods, + [for_field(e) for e in elem.getchildren() if + e.tag == 'field']) def for_constant(elem): # for <constant> """Parse XML, return constants""" a = elem.attrib - return Constant(a['name'], int(a['value']), a.get('class', ''), get_docs(elem)) + return Constant(a['name'], int(a['value']), a.get('class', ''), + get_docs(elem)) def get_constants(xml): @@ -168,7 +184,8 @@ def name_class(classname): def format_method_class_name(methodname): if '-' in methodname: i = methodname.find('-') - return methodname[0:i].capitalize() + methodname[i + 1].upper() + methodname[i + 2:] + return methodname[0:i].capitalize() + methodname[ + i + 1].upper() + methodname[i + 2:] else: return methodname.capitalize() @@ -185,7 +202,8 @@ def frepr(p, sop=six.text_type): p = sop(p) s = repr(p) - if isinstance(p, (six.binary_type, six.text_type)) and not s.startswith('u'): + if isinstance(p, (six.binary_type, six.text_type)) and not s.startswith( + 'u'): return ('u' if sop == six.text_type else 'b') + s else: return s @@ -214,9 +232,11 @@ def try_to_int(p): return p -def to_docstring(label, doc, prefix=4, blank=True): # output a full docstring section +def to_docstring(label, doc, prefix=4, + blank=True): # output a full docstring section label = [] if label is None else [label] - doc = [] if doc is None else [q.strip() for q in doc.split(u'\n') if len(q.strip()) > 0] + doc = [] if doc is None else [q.strip() for q in doc.split(u'\n') if + len(q.strip()) > 0] pre = u' ' * prefix doc = label + doc diff --git a/coolamqp/framing/field_table.py b/coolamqp/framing/field_table.py index e03d144ea4fc5eb09b46826143726d82d8ac9176..2c31fdec30163b7d7da9aabdd84b546625930ce9 100644 --- a/coolamqp/framing/field_table.py +++ b/coolamqp/framing/field_table.py @@ -12,7 +12,9 @@ A table is of form ( (name1::bytes, fv1), (name2::bytes, fv2), ...) NOTE: it's not buffers, it's memoryview all along """ from __future__ import absolute_import, division, print_function + import struct + import six @@ -67,10 +69,15 @@ FIELD_TYPES = { 'f': (4, '!f'), 'd': (8, '!d'), 'D': (5, None, enframe_decimal, deframe_decimal), # decimal-value - 's': (None, None, enframe_shortstr, deframe_shortstr, lambda val: len(val) + 1), # shortstr - 'S': (None, None, enframe_longstr, deframe_longstr, lambda val: len(val) + 4), # longstr + 's': ( + None, None, enframe_shortstr, deframe_shortstr, lambda val: len(val) + 1), +# shortstr + 'S': ( + None, None, enframe_longstr, deframe_longstr, lambda val: len(val) + 4), +# longstr 'T': (8, '!Q'), - 'V': (0, None, lambda buf, v: None, lambda buf, ofs: None, 0), # rendered as None + 'V': (0, None, lambda buf, v: None, lambda buf, ofs: None, 0), +# rendered as None } @@ -120,8 +127,9 @@ def deframe_array(buf, offset): values.append((v, t)) if offset != start_offset + 4 + ln: - raise ValueError('Array longer than expected, took %s, expected %s bytes', - (offset - (start_offset + ln + 4), ln + 4)) + raise ValueError( + 'Array longer than expected, took %s, expected %s bytes', + (offset - (start_offset + ln + 4), ln + 4)) return values, ln + 4 @@ -164,8 +172,9 @@ def deframe_table(buf, start_offset): # -> (table, bytes_consumed) fields.append((field_name.tobytes(), fv)) if offset > (start_offset + table_length + 4): - raise ValueError('Table turned out longer than expected! Found %s bytes expected %s', - (offset - start_offset, table_length)) + raise ValueError( + 'Table turned out longer than expected! Found %s bytes expected %s', + (offset - start_offset, table_length)) return fields, table_length + 4 diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index 3f36f911d37b58b324603159f758083149848bb1..0401e314eb5d51f763dc22980b5d6e01b87bf885 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -5,12 +5,13 @@ Concrete frame definitions from __future__ import absolute_import, division, print_function import struct + import six from coolamqp.framing.base import AMQPFrame -from coolamqp.framing.definitions import FRAME_METHOD, FRAME_HEARTBEAT, FRAME_BODY, FRAME_HEADER, FRAME_END, \ - IDENT_TO_METHOD, CLASS_ID_TO_CONTENT_PROPERTY_LIST, FRAME_METHOD_BYTE, FRAME_BODY_BYTE, FRAME_HEADER_BYTE, \ - FRAME_END_BYTE +from coolamqp.framing.definitions import FRAME_METHOD, FRAME_HEARTBEAT, \ + FRAME_BODY, FRAME_HEADER, FRAME_END, \ + IDENT_TO_METHOD, CLASS_ID_TO_CONTENT_PROPERTY_LIST, FRAME_END_BYTE class AMQPMethodFrame(AMQPFrame): @@ -71,15 +72,18 @@ class AMQPHeaderFrame(AMQPFrame): def write_to(self, buf): buf.write(struct.pack('!BHLHHQ', FRAME_HEADER, self.channel, - 12 + self.properties.get_size(), self.class_id, 0, self.body_size)) + 12 + self.properties.get_size(), self.class_id, 0, + self.body_size)) self.properties.write_to(buf) buf.write(FRAME_END_BYTE) @staticmethod def unserialize(channel, payload_as_buffer): # payload starts with class ID - class_id, weight, body_size = struct.unpack_from('!HHQ', payload_as_buffer, 0) - properties = CLASS_ID_TO_CONTENT_PROPERTY_LIST[class_id].from_buffer(payload_as_buffer, 12) + class_id, weight, body_size = struct.unpack_from('!HHQ', + payload_as_buffer, 0) + properties = CLASS_ID_TO_CONTENT_PROPERTY_LIST[class_id].from_buffer( + payload_as_buffer, 12) return AMQPHeaderFrame(channel, class_id, weight, body_size, properties) def get_size(self): diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 96f620a121f1f4487e0672ff2f5b8683e84b3332..45e176d037400a6be399f33825b8a642bd2d679b 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -2,12 +2,13 @@ """ Core objects used in CoolAMQP """ +import logging import uuid + import six -import logging -import warnings -from coolamqp.framing.definitions import BasicContentPropertyList as MessageProperties +from coolamqp.framing.definitions import \ + BasicContentPropertyList as MessageProperties logger = logging.getLogger(__name__) @@ -126,13 +127,16 @@ class Exchange(object): direct = None # the direct exchange - def __init__(self, name=u'', type=b'direct', durable=True, auto_delete=False): + def __init__(self, name=u'', type=b'direct', durable=True, + auto_delete=False): """ :type name: unicode is preferred, binary type will get decoded to unicode with utf8 :param type: exchange type. binary/unicode """ - self.name = name.decode('utf8') if isinstance(name, six.binary_type) else name # must be unicode - self.type = type.encode('utf8') if isinstance(type, six.text_type) else type # must be bytes + self.name = name.decode('utf8') if isinstance(name, + six.binary_type) else name # must be unicode + self.type = type.encode('utf8') if isinstance(type, + six.text_type) else type # must be bytes self.durable = durable self.auto_delete = auto_delete @@ -141,7 +145,8 @@ class Exchange(object): def __repr__(self): return u'Exchange(%s, %s, %s, %s)' % ( - repr(self.name), repr(self.type), repr(self.durable), repr(self.auto_delete)) + repr(self.name), repr(self.type), repr(self.durable), + repr(self.auto_delete)) def __hash__(self): return self.name.__hash__() @@ -158,7 +163,8 @@ class Queue(object): This object represents a Queue that applications consume from or publish to. """ - def __init__(self, name=b'', durable=False, exchange=None, exclusive=False, auto_delete=False): + def __init__(self, name=b'', durable=False, exchange=None, exclusive=False, + auto_delete=False): """ Create a queue definition. @@ -173,14 +179,16 @@ class Queue(object): :param exclusive: Is this queue exclusive? :param auto_delete: Is this queue auto_delete ? """ - self.name = name.encode('utf8') if isinstance(name, six.text_type) else name #: public, must be bytes + self.name = name.encode('utf8') if isinstance(name, + six.text_type) else name #: public, must be bytes # if name is '', this will be filled in with broker-generated name upon declaration self.durable = durable self.exchange = exchange self.auto_delete = auto_delete self.exclusive = exclusive - self.anonymous = len(self.name) == 0 # if this queue is anonymous, it must be regenerated upon reconnect + self.anonymous = len( + self.name) == 0 # if this queue is anonymous, it must be regenerated upon reconnect self.consumer_tag = self.name if not self.anonymous else uuid.uuid4().hex.encode( 'utf8') # bytes, consumer tag to use in AMQP comms @@ -245,8 +253,11 @@ class NodeDefinition(object): self.virtual_host = '/' elif len(args) == 4: self.host, self.user, self.password, self.virtual_host = args - elif len(args) == 1 and isinstance(args[0], (six.text_type, six.binary_type)): - connstr = args[0].decode('utf8') if isinstance(args[0], six.binary_type) else args[0] + elif len(args) == 1 and isinstance(args[0], + (six.text_type, six.binary_type)): + connstr = args[0].decode('utf8') if isinstance(args[0], + six.binary_type) else \ + args[0] # AMQP connstring if not connstr.startswith(u'amqp://'): raise ValueError(u'should begin with amqp://') @@ -269,4 +280,5 @@ class NodeDefinition(object): def __str__(self): return six.text_type( - b'amqp://%s:%s@%s/%s'.encode('utf8') % (self.host, self.port, self.user, self.virtual_host)) + b'amqp://%s:%s@%s/%s'.encode('utf8') % ( + self.host, self.port, self.user, self.virtual_host)) diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index cef792f6a97f9813867f7ca9741d5f1783ee03bd..92f6f33a19cdf9c34c37003dea7b209378c12d0c 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -13,6 +13,7 @@ EVERYTHING HERE IS CALLED BY LISTENER THREAD UNLESS STATED OTHERWISE. """ from __future__ import absolute_import, division, print_function -from coolamqp.uplink.connection import Connection, HeaderOrBodyWatch, MethodWatch, AnyWatch, FailWatch -from coolamqp.uplink.listener import ListenerThread +from coolamqp.uplink.connection import Connection, HeaderOrBodyWatch, \ + MethodWatch, AnyWatch, FailWatch from coolamqp.uplink.handshake import PUBLISHER_CONFIRMS, CONSUMER_CANCEL_NOTIFY +from coolamqp.uplink.listener import ListenerThread diff --git a/coolamqp/uplink/connection/__init__.py b/coolamqp/uplink/connection/__init__.py index 7ec0d01c5042462dcf11aa52740f21c667e1f0ae..8a07f01dcff33f918aa859559d29255aec4a4e9f 100644 --- a/coolamqp/uplink/connection/__init__.py +++ b/coolamqp/uplink/connection/__init__.py @@ -12,5 +12,7 @@ Connection is something that can: from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection.connection import Connection -from coolamqp.uplink.connection.watches import FailWatch, Watch, HeaderOrBodyWatch, MethodWatch, AnyWatch -from coolamqp.uplink.connection.states import ST_OFFLINE, ST_CONNECTING, ST_ONLINE +from coolamqp.uplink.connection.states import ST_OFFLINE, ST_CONNECTING, \ + ST_ONLINE +from coolamqp.uplink.connection.watches import FailWatch, Watch, \ + HeaderOrBodyWatch, MethodWatch, AnyWatch diff --git a/coolamqp/uplink/connection/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py index 6c347ffc8ad9ce8f119f62183d57b8d3a0ade638..5d41cf4ae581f079cd2628792d6b6abca396939b 100644 --- a/coolamqp/uplink/connection/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -3,11 +3,14 @@ from __future__ import absolute_import, division, print_function import collections import io -import six import struct -from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame, AMQPHeartbeatFrame, AMQPMethodFrame -from coolamqp.framing.definitions import FRAME_HEADER, FRAME_HEARTBEAT, FRAME_END, FRAME_METHOD, FRAME_BODY +import six + +from coolamqp.framing.definitions import FRAME_HEADER, FRAME_HEARTBEAT, \ + FRAME_END, FRAME_METHOD, FRAME_BODY +from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame, \ + AMQPHeartbeatFrame, AMQPMethodFrame FRAME_TYPES = { FRAME_HEADER: AMQPHeaderFrame, @@ -61,8 +64,10 @@ class ReceivingFramer(object): while self._statemachine(): pass - def _extract(self, up_to): # return up to up_to bytes from current chunk, switch if necessary - assert self.total_data_len >= up_to, 'Tried to extract %s but %s remaining' % (up_to, self.total_data_len) + def _extract(self, + up_to): # return up to up_to bytes from current chunk, switch if necessary + assert self.total_data_len >= up_to, 'Tried to extract %s but %s remaining' % ( + up_to, self.total_data_len) if up_to >= len(self.chunks[0]): q = self.chunks.popleft() else: @@ -70,7 +75,8 @@ class ReceivingFramer(object): self.chunks[0] = self.chunks[0][up_to:] self.total_data_len -= len(q) - assert len(q) <= up_to, 'extracted %s but %s was requested' % (len(q), up_to) + assert len(q) <= up_to, 'extracted %s but %s was requested' % ( + len(q), up_to) return q def _statemachine(self): @@ -81,16 +87,19 @@ class ReceivingFramer(object): else: self.frame_type = ord(self._extract(1)[0]) - if self.frame_type not in (FRAME_HEARTBEAT, FRAME_HEADER, FRAME_METHOD, FRAME_BODY): + if self.frame_type not in ( + FRAME_HEARTBEAT, FRAME_HEADER, FRAME_METHOD, FRAME_BODY): raise ValueError('Invalid frame') return True # state rule 2 - elif (self.frame_type == FRAME_HEARTBEAT) and (self.total_data_len >= AMQPHeartbeatFrame.LENGTH - 1): + elif (self.frame_type == FRAME_HEARTBEAT) and ( + self.total_data_len >= AMQPHeartbeatFrame.LENGTH - 1): data = b'' while len(data) < AMQPHeartbeatFrame.LENGTH - 1: - data = data + self._extract(AMQPHeartbeatFrame.LENGTH - 1 - len(data)).tobytes() + data = data + self._extract( + AMQPHeartbeatFrame.LENGTH - 1 - len(data)).tobytes() if data != AMQPHeartbeatFrame.DATA[1:]: # Invalid heartbeat frame! @@ -102,8 +111,9 @@ class ReceivingFramer(object): return True # state rule 3 - elif (self.frame_type != FRAME_HEARTBEAT) and (self.frame_type is not None) and (self.frame_size is None) and ( - self.total_data_len > 6): + elif (self.frame_type != FRAME_HEARTBEAT) and ( + self.frame_type is not None) and (self.frame_size is None) and ( + self.total_data_len > 6): hdr = b'' while len(hdr) < 6: hdr = hdr + self._extract(6 - len(hdr)).tobytes() @@ -113,7 +123,8 @@ class ReceivingFramer(object): return True # state rule 4 - elif (self.frame_size is not None) and (self.total_data_len >= (self.frame_size + 1)): + elif (self.frame_size is not None) and ( + self.total_data_len >= (self.frame_size + 1)): if len(self.chunks[0]) >= self.frame_size: # We can subslice it - it's very fast @@ -122,7 +133,8 @@ class ReceivingFramer(object): # Construct a separate buffer :( payload = io.BytesIO() while payload.tell() < self.frame_size: - payload.write(self._extract(self.frame_size - payload.tell())) + payload.write( + self._extract(self.frame_size - payload.tell())) assert payload.tell() <= self.frame_size @@ -136,7 +148,8 @@ class ReceivingFramer(object): raise ValueError('Invalid frame end') try: - frame = FRAME_TYPES[self.frame_type].unserialize(self.frame_channel, payload) + frame = FRAME_TYPES[self.frame_type].unserialize( + self.frame_channel, payload) except ValueError: raise diff --git a/coolamqp/uplink/connection/send_framer.py b/coolamqp/uplink/connection/send_framer.py index 06e7abc2064e693f02fafdaffdbe5fd002937f45..b8d83e6cb918a864d706aa1951a8350cdf6ea7c0 100644 --- a/coolamqp/uplink/connection/send_framer.py +++ b/coolamqp/uplink/connection/send_framer.py @@ -1,10 +1,7 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -import collections -import threading import io -import socket class SendingFramer(object): diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py index 1518a346518deb97295c68a96ed0367a37e5e368..3f7893f344a44c7516f13291b808da93868945ae 100644 --- a/coolamqp/uplink/connection/watches.py +++ b/coolamqp/uplink/connection/watches.py @@ -1,7 +1,8 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeartbeatFrame, AMQPHeaderFrame, AMQPBodyFrame +from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeaderFrame, \ + AMQPBodyFrame class Watch(object): diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index aecc0bdeae111ff74b2814e1f992588b72a75db8..f2e0d693cd26220584190d604ad745512a196c59 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -6,7 +6,7 @@ Provides reactors that can authenticate an AQMP session """ import six from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \ - ConnectionTune, ConnectionTuneOk, ConnectionOpen, ConnectionOpenOk, ConnectionClose + ConnectionTune, ConnectionTuneOk, ConnectionOpen, ConnectionOpenOk from coolamqp.framing.frames import AMQPMethodFrame from coolamqp.uplink.connection.states import ST_ONLINE @@ -15,7 +15,8 @@ CONSUMER_CANCEL_NOTIFY = b'consumer_cancel_notify' SUPPORTED_EXTENSIONS = [ PUBLISHER_CONFIRMS, - CONSUMER_CANCEL_NOTIFY # half assed support - we just .cancel the consumer, see #12 + CONSUMER_CANCEL_NOTIFY + # half assed support - we just .cancel the consumer, see #12 ] CLIENT_DATA = [ @@ -25,8 +26,11 @@ CLIENT_DATA = [ (b'version', (b'0.91', 'S')), (b'copyright', (b'Copyright (C) 2016-2017 DMS Serwis', 'S')), ( - b'information', (b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', 'S')), - (b'capabilities', ([(capa, (True, 't')) for capa in SUPPORTED_EXTENSIONS], 'F')), + b'information', ( + b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', + 'S')), + (b'capabilities', + ([(capa, (True, 't')) for capa in SUPPORTED_EXTENSIONS], 'F')), ] WATCHDOG_TIMEOUT = 10 @@ -48,7 +52,8 @@ class Handshaker(object): self.password = node_definition.password.encode('utf8') self.virtual_host = node_definition.virtual_host.encode('utf8') self.heartbeat = node_definition.heartbeat or 0 - self.connection.watch_for_method(0, ConnectionStart, self.on_connection_start) + self.connection.watch_for_method(0, ConnectionStart, + self.on_connection_start) # Callbacks self.on_success = on_success @@ -83,7 +88,8 @@ class Handshaker(object): self.connection.extensions.append(label) self.connection.watchdog(WATCHDOG_TIMEOUT, self.on_watchdog) - self.connection.watch_for_method(0, ConnectionTune, self.on_connection_tune) + self.connection.watch_for_method(0, ConnectionTune, + self.on_connection_tune) self.connection.send([ AMQPMethodFrame(0, ConnectionStartOk(CLIENT_DATA, b'PLAIN', @@ -95,12 +101,16 @@ class Handshaker(object): def on_connection_tune(self, payload): self.connection.frame_max = payload.frame_max self.connection.heartbeat = min(payload.heartbeat, self.heartbeat) - for channel in six.moves.xrange(1, (65535 if payload.channel_max == 0 else payload.channel_max) + 1): + for channel in six.moves.xrange(1, ( + 65535 if payload.channel_max == 0 else payload.channel_max) + 1): self.connection.free_channels.append(channel) - self.connection.watch_for_method(0, ConnectionOpenOk, self.on_connection_open_ok) + self.connection.watch_for_method(0, ConnectionOpenOk, + self.on_connection_open_ok) self.connection.send([ - AMQPMethodFrame(0, ConnectionTuneOk(payload.channel_max, payload.frame_max, self.connection.heartbeat)), + AMQPMethodFrame(0, ConnectionTuneOk(payload.channel_max, + payload.frame_max, + self.connection.heartbeat)), AMQPMethodFrame(0, ConnectionOpen(self.virtual_host)) ]) diff --git a/coolamqp/uplink/heartbeat.py b/coolamqp/uplink/heartbeat.py index 54d4b9225f91d99249ad87d1ae7d50709569b0d0..4cb5a7aa0592104d732579ff9a8f72f6050764e8 100644 --- a/coolamqp/uplink/heartbeat.py +++ b/coolamqp/uplink/heartbeat.py @@ -1,5 +1,6 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function + import monotonic from coolamqp.framing.frames import AMQPHeartbeatFrame @@ -42,7 +43,8 @@ class Heartbeater(object): """Timer says we should send a heartbeat""" self.connection.send([AMQPHeartbeatFrame()], priority=True) - if (monotonic.monotonic() - self.last_heartbeat_on) > 2 * self.heartbeat_interval: + if ( + monotonic.monotonic() - self.last_heartbeat_on) > 2 * self.heartbeat_interval: # closing because of heartbeat self.connection.send(None) diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index a92033a9780aabbb55c12a701cb74f4cf109fc63..1a42b7129bc47499c9c647a028caeb5859a0a33b 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -1,12 +1,14 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -import six + +import collections +import heapq import logging import select -import monotonic import socket -import collections -import heapq + +import monotonic +import six from coolamqp.uplink.listener.socket import SocketFailed, BaseSocket @@ -88,7 +90,8 @@ class EpollListener(object): sock.on_write() # I'm done with sending for now - if len(sock.data_to_send) == 0 and len(sock.priority_queue) == 0: + if len(sock.data_to_send) == 0 and len( + sock.priority_queue) == 0: self.epoll.modify(sock.fileno(), RO) except SocketFailed: diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index 34420c1eae411a4b7234e2b1c8c515cb1daee687..89a52dfc21be5e21e92fb5074bd4d533c4f0ac71 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -1,8 +1,8 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function + import collections import socket -import six class SocketFailed(IOError): diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py index 4d82d8ce5b291384539569e693775c376b16d2e0..7fda4fe1b8dbecd8952d0b71f037e0970c3ef77a 100644 --- a/coolamqp/uplink/listener/thread.py +++ b/coolamqp/uplink/listener/thread.py @@ -3,8 +3,8 @@ from __future__ import absolute_import, division, print_function import threading -from coolamqp.uplink.listener.epoll_listener import EpollListener from coolamqp.objects import Callable +from coolamqp.uplink.listener.epoll_listener import EpollListener class ListenerThread(threading.Thread): diff --git a/setup.cfg b/setup.cfg index 81e69759c0dae482e4b49885492fbda43dd5f0e9..3ab7d192521aa0088dcc024eed2d945cc4af88bc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] description-file = README.md name = CoolAMQP -version = 0.92rc1 +version = 0.92rc2 license = MIT License classifiers = Programming Language :: Python