diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index a8863d8254d08542d55ff98ca2ceddbf69b76511..95995627150479d0550211c778e6688909beca06 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,2 +1 @@ # coding=UTF-8 - diff --git a/coolamqp/attaches/agroup.py b/coolamqp/attaches/agroup.py index 341c7a35a7f6c460eee73ea08cd8f960f209dc35..97838abbe35d1b0d079852c3c4ed9613639a1b05 100644 --- a/coolamqp/attaches/agroup.py +++ b/coolamqp/attaches/agroup.py @@ -11,7 +11,6 @@ import weakref logger = logging.getLogger(__name__) - from coolamqp.attaches.channeler import Attache, ST_OFFLINE from coolamqp.attaches.consumer import Consumer @@ -66,5 +65,3 @@ class AttacheGroup(Attache): for attache in self.attaches: if not attache.cancelled: attache.attach(connection) - - diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index e988f3ef7e866b791f6743670dedc23d77cd0d9e..4c8f30c63696925b332356b38144fdf08131b75a 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -15,8 +15,7 @@ import logging ST_OFFLINE = 0 # Consumer is *not* consuming, no setup attempts are being made ST_SYNCING = 1 # A process targeted at consuming has been started -ST_ONLINE = 2 # Consumer is declared all right - +ST_ONLINE = 2 # Consumer is declared all right logger = logging.getLogger(__name__) @@ -25,8 +24,9 @@ class Attache(object): """ Something that can be attached to connection. """ + def __init__(self): - self.cancelled = False #: public, if this is True, it won't be attached to next connection + self.cancelled = False #: public, if this is True, it won't be attached to next connection self.state = ST_OFFLINE self.connection = None @@ -68,7 +68,7 @@ class Channeler(Attache): [EXTEND ME!] """ super(Channeler, self).__init__() - self.channel_id = None # channel obtained from Connection + self.channel_id = None # channel obtained from Connection def attach(self, connection): """ @@ -132,7 +132,7 @@ class Channeler(Attache): # teardown already done return - if self.state == ST_ONLINE: # The channel has just lost operationality! Inform others ASAP. + if self.state == ST_ONLINE: # The channel has just lost operationality! Inform others ASAP. self.on_operational(False) self.state = ST_OFFLINE @@ -204,7 +204,6 @@ class Channeler(Attache): """ raise Exception('Abstract method - override me!') - def register_on_close_watch(self): """ Register a watch for on_close. @@ -231,4 +230,3 @@ class Channeler(Attache): ChannelOpenOk, self.on_setup ) - diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index d8e9df36a89fdb5a05b318ee796ad92f35f87993..27976bb275829568666f618e126a13f4eee92ebc 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -16,10 +16,8 @@ from coolamqp.objects import Callable from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.exceptions import AMQPError - logger = logging.getLogger(__name__) - EMPTY_MEMORYVIEW = memoryview(b'') # for empty messages @@ -27,17 +25,18 @@ class BodyReceiveMode(object): # ZC - zero copy # C - copy (copies every byte once) - BYTES = 0 # message.body will be a single bytes object - # this will gather frames as memoryviews, and b''.join() them upon receiving last frame - # this is C + BYTES = 0 # message.body will be a single bytes object + # this will gather frames as memoryviews, and b''.join() them upon receiving last frame + # this is C - MEMORYVIEW = 1 # message.body will be returned as a memoryview object - # this is ZC for small messages, and C for multi-frame ones - # think less than 800B, since 2048 is the buffer for socket recv, and an AMQP - # frame (or many frames!) have to fit there + MEMORYVIEW = 1 # message.body will be returned as a memoryview object + # this is ZC for small messages, and C for multi-frame ones + # think less than 800B, since 2048 is the buffer for socket recv, and an AMQP + # frame (or many frames!) have to fit there + + LIST_OF_MEMORYVIEW = 2 # message.body will be returned as list of memoryview objects + # these constitute received pieces. this is always ZC - LIST_OF_MEMORYVIEW = 2 # message.body will be returned as list of memoryview objects - # these constitute received pieces. this is always ZC class Consumer(Channeler): """ @@ -74,8 +73,6 @@ class Consumer(Channeler): """ - - def __init__(self, queue, on_message, no_ack=True, qos=None, cancel_on_failure=False, future_to_notify=None, fail_on_first_time_resource_locked=False, @@ -121,17 +118,17 @@ class Consumer(Channeler): self.cancelled = False # did the client want to STOP using this consumer? self.receiver = None # MessageReceiver instance - self.attache_group = None # attache group this belongs to. - # if this is not None, then it has an attribute - # on_cancel_customer(Consumer instance) + self.attache_group = None # attache group this belongs to. + # if this is not None, then it has an attribute + # on_cancel_customer(Consumer instance) if qos is not None: if qos[0] is None: - qos = 0, qos[1] # prefetch_size=0=undefined + qos = 0, qos[1] # prefetch_size=0=undefined self.qos = qos - self.qos_update_sent = False # QoS was not sent to server + self.qos_update_sent = False # QoS was not sent to server self.future_to_notify = future_to_notify - self.future_to_notify_on_dead = None # .cancel + self.future_to_notify_on_dead = None # .cancel self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked self.cancel_on_failure = cancel_on_failure @@ -139,8 +136,8 @@ class Consumer(Channeler): self.consumer_tag = None - self.on_cancel = Callable(oneshots=True) #: public, called on cancel for any reason - self.on_broker_cancel = Callable(oneshots=True) #: public, called on Customer Cancel Notification (RabbitMQ) + self.on_cancel = Callable(oneshots=True) #: public, called on cancel for any reason + self.on_broker_cancel = Callable(oneshots=True) #: public, called on Customer Cancel Notification (RabbitMQ) def set_qos(self, prefetch_size, prefetch_count): """ @@ -182,7 +179,6 @@ class Consumer(Channeler): return self.future_to_notify_on_dead - def on_operational(self, operational): super(Consumer, self).on_operational(operational) @@ -235,7 +231,7 @@ class Consumer(Channeler): # on_close is a one_shot watch. We need to re-register it now. self.register_on_close_watch() self.methods([BasicCancelOk(payload.consumer_tag), ChannelClose(0, b'Received basic.cancel', 0, 0)]) - self.cancelled = True # wasn't I? + self.cancelled = True # wasn't I? self.on_cancel() self.on_broker_cancel() return @@ -271,14 +267,13 @@ class Consumer(Channeler): old_con = self.connection - super(Consumer, self).on_close(payload) # this None's self.connection and returns port + super(Consumer, self).on_close(payload) # this None's self.connection and returns port self.fail_on_first_time_resource_locked = False - if self.future_to_notify_on_dead: # notify it was cancelled + if self.future_to_notify_on_dead: # notify it was cancelled logger.info('Consumer successfully cancelled') self.future_to_notify_on_dead.set_result(None) - if should_retry: if old_con.state == ST_ONLINE: logger.info('Retrying with %s', self.queue.name) @@ -295,13 +290,13 @@ class Consumer(Channeler): return if isinstance(sth, BasicDeliver): - self.receiver.on_basic_deliver(sth) + self.receiver.on_basic_deliver(sth) elif isinstance(sth, AMQPBodyFrame): - self.receiver.on_body(sth.data) + self.receiver.on_body(sth.data) elif isinstance(sth, AMQPHeaderFrame): - self.receiver.on_head(sth) + self.receiver.on_head(sth) - # No point in listening for more stuff, that's all the watches even listen for + # No point in listening for more stuff, that's all the watches even listen for def on_setup(self, payload): """Called with different kinds of frames - during setup""" @@ -371,15 +366,15 @@ class Consumer(Channeler): self.on_setup ) else: - self.on_setup(BasicQosOk()) # pretend QoS went ok + self.on_setup(BasicQosOk()) # pretend QoS went ok elif isinstance(payload, BasicQosOk): - self.consumer_tag = uuid.uuid4().hex.encode('utf8') # str in py2, unicode in py3 - self.method_and_watch( - BasicConsume(self.queue.name, self.consumer_tag, - False, self.no_ack, self.queue.exclusive, False, []), - BasicConsumeOk, - self.on_setup - ) + self.consumer_tag = uuid.uuid4().hex.encode('utf8') # str in py2, unicode in py3 + self.method_and_watch( + BasicConsume(self.queue.name, self.consumer_tag, + False, self.no_ack, self.queue.exclusive, False, []), + BasicConsumeOk, + self.on_setup + ) elif isinstance(payload, BasicConsumeOk): # AWWW RIGHT~!!! We're good. self.on_operational(True) @@ -414,24 +409,25 @@ class MessageReceiver(object): and may opt to kill the connection on bad framing with self.consumer.connection.send(None) """ + def __init__(self, consumer): self.consumer = consumer self.state = 0 # 0 - waiting for Basic-Deliver - # 1 - waiting for Header - # 2 - waiting for Body [all] - # 3 - gone! + # 1 - waiting for Header + # 2 - waiting for Body [all] + # 3 - gone! - self.bdeliver = None # payload of Basic-Deliver - self.header = None # AMQPHeaderFrame + self.bdeliver = None # payload of Basic-Deliver + self.header = None # AMQPHeaderFrame if consumer.body_receive_mode == BodyReceiveMode.MEMORYVIEW: - self.body = None # None is an important sign - first piece of message + self.body = None # None is an important sign - first piece of message else: - self.body = [] # list of payloads + self.body = [] # list of payloads self.data_to_go = None # set on receiving header, how much bytes we need yet - self.message_size = None # in bytes, of currently received message - self.offset = 0 # used only in MEMORYVIEW mode - pointer to self.body (which would be a buffer) + self.message_size = None # in bytes, of currently received message + self.offset = 0 # used only in MEMORYVIEW mode - pointer to self.body (which would be a buffer) - self.acks_pending = set() # list of things to ack/reject + self.acks_pending = set() # list of things to ack/reject self.recv_mode = consumer.body_receive_mode # if BYTES, pieces (as mvs) are received into .body and b''.join()ed at the end @@ -463,7 +459,7 @@ class MessageReceiver(object): return # Gone! if self.consumer.cancelled: - return # cancelled! + return # cancelled! if delivery_tag not in self.acks_pending: return # already confirmed/rejected @@ -475,7 +471,6 @@ class MessageReceiver(object): return callable - def on_head(self, frame): assert self.state == 1 self.header = frame @@ -484,7 +479,7 @@ class MessageReceiver(object): if self.header.body_size == 0: # An empty message is no common guest. It won't have a BODY field though... - self.on_body(EMPTY_MEMORYVIEW) # trigger it manually + self.on_body(EMPTY_MEMORYVIEW) # trigger it manually def on_basic_deliver(self, payload): assert self.state == 0 @@ -504,14 +499,14 @@ class MessageReceiver(object): self.offset += len(payload) else: # new one - if self.data_to_go == 0: # special case - single frame message + if self.data_to_go == 0: # special case - single frame message self.body = payload else: self.body = memoryview(bytearray(self.message_size)) self.body[0:len(payload)] = payload self.offset = len(payload) - else: # BYTES and LIST_OF_MEMORYVIEW + else: # BYTES and LIST_OF_MEMORYVIEW self.body.append(payload) assert self.data_to_go >= 0 diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index 82a5080ad11fa1561cc6948b3a11ee035e8cfafe..ba254448a71d4f129dcc5481bbf45e98f6d5f8f7 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -7,7 +7,7 @@ import six import collections import logging from coolamqp.framing.definitions import ChannelOpenOk, ExchangeDeclare, ExchangeDeclareOk, QueueDeclare, \ - QueueDeclareOk, ChannelClose, QueueDelete, QueueDeleteOk + QueueDeclareOk, ChannelClose, QueueDelete, QueueDeleteOk from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, Synchronized from concurrent.futures import Future @@ -17,7 +17,6 @@ from coolamqp.exceptions import AMQPError, ConnectionDead logger = logging.getLogger(__name__) - class Operation(object): """ An abstract operation. @@ -28,13 +27,14 @@ class Operation(object): This will register it's own callback. Please, call on_connection_dead when connection is broken to fail futures with ConnectionDead, since this object does not watch for Fails """ + def __init__(self, declarer, obj, fut=None): self.done = False self.fut = fut self.declarer = declarer self.obj = obj - self.on_done = Callable() # callable/0 + self.on_done = Callable() # callable/0 def on_connection_dead(self): """To be called by declarer when our link fails""" @@ -47,13 +47,14 @@ class Operation(object): obj = self.obj if isinstance(obj, Exchange): self.declarer.method_and_watch(ExchangeDeclare(self.obj.name.encode('utf8'), obj.type, False, obj.durable, - obj.auto_delete, False, False, []), - (ExchangeDeclareOk, ChannelClose), - self._callback) + obj.auto_delete, False, False, []), + (ExchangeDeclareOk, ChannelClose), + self._callback) elif isinstance(obj, Queue): - self.declarer.method_and_watch(QueueDeclare(obj.name, False, obj.durable, obj.exclusive, obj.auto_delete, False, []), - (QueueDeclareOk, ChannelClose), - self._callback) + self.declarer.method_and_watch( + QueueDeclare(obj.name, False, obj.durable, obj.exclusive, obj.auto_delete, False, []), + (QueueDeclareOk, ChannelClose), + self._callback) def _callback(self, payload): assert not self.done @@ -65,7 +66,7 @@ class Operation(object): else: # something that had no Future failed. Is it in declared? if self.obj in self.declarer.declared: - self.declarer.declared.remove(self.obj) #todo access not threadsafe + self.declarer.declared.remove(self.obj) # todo access not threadsafe self.declarer.on_discard(self.obj) else: if self.fut is not None: @@ -92,7 +93,7 @@ class DeleteQueue(Operation): self.done = True if isinstance(payload, ChannelClose): self.fut.set_exception(AMQPError(payload)) - else: # Queue.DeleteOk + else: # Queue.DeleteOk self.fut.set_result(None) self.declarer.on_operation_done() @@ -103,6 +104,7 @@ class Declarer(Channeler, Synchronized): This also maintains a list of declared queues/exchanges, and redeclares them on each reconnect. """ + def __init__(self): """ Create a new declarer. @@ -110,16 +112,16 @@ class Declarer(Channeler, Synchronized): Channeler.__init__(self) Synchronized.__init__(self) - self.declared = set() # since Queues and Exchanges are hashable... - # anonymous queues aren't, but we reject those - # persistent + self.declared = set() # since Queues and Exchanges are hashable... + # anonymous queues aren't, but we reject those + # persistent self.left_to_declare = collections.deque() # since last disconnect. persistent+transient - # deque of Operation objects + # deque of Operation objects - self.on_discard = Callable() # callable/1, with discarded elements + self.on_discard = Callable() # callable/1, with discarded elements - self.in_process = None # Operation instance that is being progressed right now + self.in_process = None # Operation instance that is being progressed right now def on_close(self, payload=None): @@ -214,7 +216,7 @@ class Declarer(Channeler, Synchronized): if persistent: if obj not in self.declared: - self.declared.add(obj) #todo access not threadsafe + self.declared.add(obj) # todo access not threadsafe self.left_to_declare.append(Operation(self, obj, fut)) self._do_operations() diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 229f7e1942183e4c1efdc2b275b131caabfbd8c2..459a91e285a37869f329ce34fe5bc19b6d261ae4 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -32,9 +32,8 @@ class Cluster(object): """ # Events you can be informed about - ST_LINK_LOST = 0 # Link has been lost - ST_LINK_REGAINED = 1 # Link has been regained - + ST_LINK_LOST = 0 # Link has been lost + ST_LINK_REGAINED = 1 # Link has been regained def __init__(self, nodes): """ @@ -168,7 +167,7 @@ class Cluster(object): self.attache_group = AttacheGroup() - self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* + self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) @@ -182,12 +181,11 @@ class Cluster(object): self.attache_group.add(self.pub_na) self.attache_group.add(self.decl) - self.listener.init() self.listener.start() self.snr.connect() - #todo not really elegant + # todo not really elegant if wait: while not self.snr.is_connected(): time.sleep(0.1) diff --git a/coolamqp/clustering/events.py b/coolamqp/clustering/events.py index d18c3d39be760fdd1e3562c1a17a68d24f2563c3..d5fa9ef62ed0e4f612317a256a42e5437132abfa 100644 --- a/coolamqp/clustering/events.py +++ b/coolamqp/clustering/events.py @@ -41,6 +41,7 @@ class MessageReceived(ReceivedMessage, Event): """ Something that works as an ersatz ReceivedMessage, but is an event """ + def __init__(self, msg): """:type msg: ReceivedMessage""" ReceivedMessage.__init__(self, msg.body, msg.exchange_name, msg.routing_key, diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index fe2b97fa9ba1f141078751cacf2afe2c5b3cd835..45ad444aea3c1c45bfa10ab5419631a52605f339 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -22,7 +22,7 @@ class SingleNodeReconnector(object): self.terminating = False - self.on_fail = Callable() #: public + self.on_fail = Callable() #: public self.on_fail.add(self._on_fail) diff --git a/coolamqp/framing/__init__.py b/coolamqp/framing/__init__.py index 7e413d8fc8f284c7d85e772ebf519058ee1af9b8..ac09b29fdb417239528a3528054de4a0289fdfcf 100644 --- a/coolamqp/framing/__init__.py +++ b/coolamqp/framing/__init__.py @@ -7,4 +7,4 @@ Mechanisms for serialization/deserialization of AMQP framing and other types. definitions.py is machine-generated from AMQP specification. -""" \ No newline at end of file +""" diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index a9c49db1dbbcb42e5ffa457381a7f4eae9408da0..78b92bd34e23481973ca43e7dcaf96297f68ce4b 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -7,27 +7,25 @@ import struct logger = logging.getLogger(__name__) - AMQP_HELLO_HEADER = b'AMQP\x00\x00\x09\x01' - # name => (length|None, struct ID|None, reserved-field-value : for struct if structable, bytes else, length of default) -BASIC_TYPES = {u'bit': (None, None, "0", None), # special case +BASIC_TYPES = {u'bit': (None, None, "0", None), # special case u'octet': (1, 'B', "b'\\x00'", 1), u'short': (2, 'H', "b'\\x00\\x00'", 2), u'long': (4, 'I', "b'\\x00\\x00\\x00\\x00'", 4), u'longlong': (8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), u'timestamp': (8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), - u'table': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case - u'longstr': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case - u'shortstr': (None, None, "b'\\x00'", 1), # special case + u'table': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case + u'longstr': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case + u'shortstr': (None, None, "b'\\x00'", 1), # special case } DYNAMIC_BASIC_TYPES = (u'table', u'longstr', u'shortstr') -class AMQPFrame(object): # base class for framing - FRAME_TYPE = None # override me! +class AMQPFrame(object): # base class for framing + FRAME_TYPE = None # override me! def __init__(self, channel): self.channel = channel @@ -92,7 +90,7 @@ class AMQPContentPropertyList(object): """ PROPERTIES = [] - #todo they are immutable, so they could just serialize themselves... + # todo they are immutable, so they could just serialize themselves... @staticmethod def zero_property_flags(property_flags): @@ -147,7 +145,7 @@ class AMQPMethodPayload(AMQPPayload): if self.IS_CONTENT_STATIC: buf.write(self.STATIC_CONTENT) else: - buf.write(struct.pack('!I', self.get_size()+2)) + buf.write(struct.pack('!I', self.get_size() + 2)) buf.write(self.BINARY_HEADER) self.write_arguments(buf) buf.write(FRAME_END_BYTE) @@ -159,7 +157,7 @@ class AMQPMethodPayload(AMQPPayload): :return: int, size of argument section """ if self.IS_CONTENT_STATIC: - return len(self.STATIC_CONTENT)-4-4-1 # minus length, class, method, frame_end + return len(self.STATIC_CONTENT) - 4 - 4 - 1 # minus length, class, method, frame_end raise NotImplementedError() diff --git a/coolamqp/framing/compilation/__init__.py b/coolamqp/framing/compilation/__init__.py index 935f6982dab8906e875c41282d90a84ce549d57a..665f5dde145d7d9aeadefaaac010886ce83523be 100644 --- a/coolamqp/framing/compilation/__init__.py +++ b/coolamqp/framing/compilation/__init__.py @@ -1,6 +1,7 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function + """ - Compile XML to definitions.py - Dynamically compile classes from property_flags -""" \ No newline at end of file +""" diff --git a/coolamqp/framing/compilation/compile_definitions.py b/coolamqp/framing/compilation/compile_definitions.py index 4aed36c1d9e8600128d9ac1db2cff426c213426c..6ab7f1bf0657799f0a4475f4e22103909e0ad04e 100644 --- a/coolamqp/framing/compilation/compile_definitions.py +++ b/coolamqp/framing/compilation/compile_definitions.py @@ -450,7 +450,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved for cls in get_classes(xml): for method in cls.methods: dct[((cls.index, method.index))] = '%s%s' % ( - name_class(cls.name), format_method_class_name(method.name)) + name_class(cls.name), format_method_class_name(method.name)) line('\nIDENT_TO_METHOD = {\n') for k, v in dct.items(): diff --git a/coolamqp/framing/compilation/content_property.py b/coolamqp/framing/compilation/content_property.py index 3ebb60e5d8cdee13ed1990adb707e7ae102fcc49..d533a9c60bdc71b5d0e77fdff64fc12053d44891 100644 --- a/coolamqp/framing/compilation/content_property.py +++ b/coolamqp/framing/compilation/content_property.py @@ -1,5 +1,6 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function + """Generate serializers/unserializers/length getters for given property_flags""" import six import struct @@ -8,7 +9,6 @@ from coolamqp.framing.compilation.textcode_fields import get_counter, get_from_b from coolamqp.framing.base import AMQPContentPropertyList from coolamqp.framing.field_table import enframe_table, deframe_table, frame_table_size - logger = logging.getLogger(__name__) @@ -55,7 +55,7 @@ def _compile_particular_content_property_list_class(zpf, fields): x = repr(six.binary_type(zpf)) if not x.startswith('b'): - x = 'b'+x + x = 'b' + x present_fields = [field for field, present in zip(fields, zpf_bits) if present] @@ -66,7 +66,7 @@ def _compile_particular_content_property_list_class(zpf, fields): if len(present_fields) == 0: slots = u'' else: - slots = (u', '.join((u"u'%s'" % format_field_name(field.name) for field in present_fields)))+u', ' + slots = (u', '.join((u"u'%s'" % format_field_name(field.name) for field in present_fields))) + u', ' mod.append(u''' __slots__ = (%s) @@ -75,7 +75,7 @@ def _compile_particular_content_property_list_class(zpf, fields): mod.append(u''' # A value for property flags that is used, assuming all bit fields are FALSE (0) ZERO_PROPERTY_FLAGS = %s -''' % (x, )) +''' % (x,)) if len(present_fields) > 0: mod.append(u''' @@ -99,18 +99,17 @@ def _compile_particular_content_property_list_class(zpf, fields): # from_buffer # note that non-bit values mod.append(u' @classmethod\n') - mod.append(u' def from_buffer(cls, buf, start_offset):\n offset = start_offset + %s\n' % (zpf_length, )) + mod.append(u' def from_buffer(cls, buf, start_offset):\n offset = start_offset + %s\n' % (zpf_length,)) mod.append(get_from_buffer( present_fields , prefix='', indent_level=2)) mod.append(u' return cls(%s)\n' % u', '.join(format_field_name(field.name) for field in present_fields)) - # get_size mod.append(u'\n def get_size(self):\n') - mod.append(get_counter(present_fields, prefix=u'self.', indent_level=2)[:-1]) # skip eol - mod.append(u' + %s\n' % (zpf_length, )) # account for pf length + mod.append(get_counter(present_fields, prefix=u'self.', indent_level=2)[:-1]) # skip eol + mod.append(u' + %s\n' % (zpf_length,)) # account for pf length return u''.join(mod) @@ -118,6 +117,5 @@ def _compile_particular_content_property_list_class(zpf, fields): def compile_particular_content_property_list_class(zpf, fields): q = _compile_particular_content_property_list_class(zpf, fields) loc = {} - exec(q, globals(), loc) + exec (q, globals(), loc) return loc['ParticularContentTypeList'] - diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index 2f06d6deb95706d4b7f64729901cf4b8a642af7f..3f36f911d37b58b324603159f758083149848bb1 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -71,7 +71,7 @@ class AMQPHeaderFrame(AMQPFrame): def write_to(self, buf): buf.write(struct.pack('!BHLHHQ', FRAME_HEADER, self.channel, - 12+self.properties.get_size(), self.class_id, 0, self.body_size)) + 12 + self.properties.get_size(), self.class_id, 0, self.body_size)) self.properties.write_to(buf) buf.write(FRAME_END_BYTE) diff --git a/coolamqp/uplink/connection/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py index 7b2f3d95ec1ed8d49f4b6163052ad3ce5a574357..6c347ffc8ad9ce8f119f62183d57b8d3a0ade638 100644 --- a/coolamqp/uplink/connection/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -36,15 +36,16 @@ class ReceivingFramer(object): frame_type <- None frame_size < None) """ + def __init__(self, on_frame=lambda frame: None): - self.chunks = collections.deque() # all received data + self.chunks = collections.deque() # all received data self.total_data_len = 0 self.frame_type = None self.frame_channel = None self.frame_size = None - self.bytes_needed = None # bytes needed for a new frame + self.bytes_needed = None # bytes needed for a new frame self.on_frame = on_frame def put(self, data): @@ -60,10 +61,10 @@ class ReceivingFramer(object): while self._statemachine(): pass - def _extract(self, up_to): # return up to up_to bytes from current chunk, switch if necessary + def _extract(self, up_to): # return up to up_to bytes from current chunk, switch if necessary assert self.total_data_len >= up_to, 'Tried to extract %s but %s remaining' % (up_to, self.total_data_len) if up_to >= len(self.chunks[0]): - q = self.chunks.popleft() + q = self.chunks.popleft() else: q = self.chunks[0][:up_to] self.chunks[0] = self.chunks[0][up_to:] @@ -86,10 +87,10 @@ class ReceivingFramer(object): return True # state rule 2 - elif (self.frame_type == FRAME_HEARTBEAT) and (self.total_data_len >= AMQPHeartbeatFrame.LENGTH-1): + elif (self.frame_type == FRAME_HEARTBEAT) and (self.total_data_len >= AMQPHeartbeatFrame.LENGTH - 1): data = b'' - while len(data) < AMQPHeartbeatFrame.LENGTH-1: - data = data + self._extract(AMQPHeartbeatFrame.LENGTH-1 - len(data)).tobytes() + while len(data) < AMQPHeartbeatFrame.LENGTH - 1: + data = data + self._extract(AMQPHeartbeatFrame.LENGTH - 1 - len(data)).tobytes() if data != AMQPHeartbeatFrame.DATA[1:]: # Invalid heartbeat frame! @@ -101,17 +102,18 @@ class ReceivingFramer(object): return True # state rule 3 - elif (self.frame_type != FRAME_HEARTBEAT) and (self.frame_type is not None) and (self.frame_size is None) and (self.total_data_len > 6): + elif (self.frame_type != FRAME_HEARTBEAT) and (self.frame_type is not None) and (self.frame_size is None) and ( + self.total_data_len > 6): hdr = b'' while len(hdr) < 6: hdr = hdr + self._extract(6 - len(hdr)).tobytes() - self.frame_channel, self.frame_size = struct.unpack('!HI',hdr) + self.frame_channel, self.frame_size = struct.unpack('!HI', hdr) return True # state rule 4 - elif (self.frame_size is not None) and (self.total_data_len >= (self.frame_size+1)): + elif (self.frame_size is not None) and (self.total_data_len >= (self.frame_size + 1)): if len(self.chunks[0]) >= self.frame_size: # We can subslice it - it's very fast @@ -143,4 +145,4 @@ class ReceivingFramer(object): self.frame_size = None return True - return False \ No newline at end of file + return False diff --git a/coolamqp/uplink/connection/send_framer.py b/coolamqp/uplink/connection/send_framer.py index bb07e45552b6fe213454b344fdd68805c250b41a..06e7abc2064e693f02fafdaffdbe5fd002937f45 100644 --- a/coolamqp/uplink/connection/send_framer.py +++ b/coolamqp/uplink/connection/send_framer.py @@ -26,6 +26,7 @@ class SendingFramer(object): frame_type <- None frame_size < None) """ + def __init__(self, on_send=lambda data: None): """ :param on_send: a callable(data, priority=False) that can be called with some data to send @@ -33,7 +34,6 @@ class SendingFramer(object): """ self.on_send = on_send - def send(self, frames, priority=False): """ Schedule to send some frames. diff --git a/coolamqp/uplink/connection/states.py b/coolamqp/uplink/connection/states.py index 9f340b26b72a615934208b19042aa7c859899d4f..5bcd3c68b9783fd927c720dae5e58bb91fe7aaca 100644 --- a/coolamqp/uplink/connection/states.py +++ b/coolamqp/uplink/connection/states.py @@ -3,4 +3,4 @@ from __future__ import absolute_import, division, print_function ST_OFFLINE = 0 ST_CONNECTING = 1 -ST_ONLINE = 2 \ No newline at end of file +ST_ONLINE = 2 diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py index 5f273d586580e635d877fb563d86b9f12e4d607d..1518a346518deb97295c68a96ed0367a37e5e368 100644 --- a/coolamqp/uplink/connection/watches.py +++ b/coolamqp/uplink/connection/watches.py @@ -58,6 +58,7 @@ class AnyWatch(Watch): Eg. RabbitMQ will happily disconnect you if you don't, but it can get lax with heartbeats as it wants. """ + def __init__(self, callable): super(AnyWatch, self).__init__(None, False) self.callable = callable @@ -74,6 +75,7 @@ class FailWatch(Watch): """ A special kind of watch that fires when connection has died """ + def __init__(self, callable): super(FailWatch, self).__init__(None, True) self.callable = callable @@ -90,6 +92,7 @@ class HeaderOrBodyWatch(Watch): """ A multi-shot watch listening for AMQP header or body frames """ + def __init__(self, channel, callable): Watch.__init__(self, channel, False) self.callable = callable @@ -108,6 +111,7 @@ class MethodWatch(Watch): """ One-shot watch listening for methods. """ + def __init__(self, channel, method_or_methods, callable, on_end=None): """ :param method_or_methods: class, or list of AMQPMethodPayload classes diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 279f83eb7c50645954bd222757378124108648e8..aecc0bdeae111ff74b2814e1f992588b72a75db8 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -1,5 +1,6 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function + """ Provides reactors that can authenticate an AQMP session """ @@ -9,24 +10,24 @@ from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \ from coolamqp.framing.frames import AMQPMethodFrame from coolamqp.uplink.connection.states import ST_ONLINE - PUBLISHER_CONFIRMS = b'publisher_confirms' CONSUMER_CANCEL_NOTIFY = b'consumer_cancel_notify' SUPPORTED_EXTENSIONS = [ PUBLISHER_CONFIRMS, - CONSUMER_CANCEL_NOTIFY # half assed support - we just .cancel the consumer, see #12 + CONSUMER_CANCEL_NOTIFY # half assed support - we just .cancel the consumer, see #12 ] CLIENT_DATA = [ - # because RabbitMQ is some kind of a fascist and does not allow - # these fields to be of type short-string - (b'product', (b'CoolAMQP', 'S')), - (b'version', (b'0.91', 'S')), - (b'copyright', (b'Copyright (C) 2016-2017 DMS Serwis', 'S')), - (b'information', (b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', 'S')), - (b'capabilities', ([(capa, (True, 't')) for capa in SUPPORTED_EXTENSIONS], 'F')), - ] + # because RabbitMQ is some kind of a fascist and does not allow + # these fields to be of type short-string + (b'product', (b'CoolAMQP', 'S')), + (b'version', (b'0.91', 'S')), + (b'copyright', (b'Copyright (C) 2016-2017 DMS Serwis', 'S')), + ( + b'information', (b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', 'S')), + (b'capabilities', ([(capa, (True, 't')) for capa in SUPPORTED_EXTENSIONS], 'F')), +] WATCHDOG_TIMEOUT = 10 @@ -94,7 +95,7 @@ class Handshaker(object): def on_connection_tune(self, payload): self.connection.frame_max = payload.frame_max self.connection.heartbeat = min(payload.heartbeat, self.heartbeat) - for channel in six.moves.xrange(1, (65535 if payload.channel_max == 0 else payload.channel_max)+1): + for channel in six.moves.xrange(1, (65535 if payload.channel_max == 0 else payload.channel_max) + 1): self.connection.free_channels.append(channel) self.connection.watch_for_method(0, ConnectionOpenOk, self.on_connection_open_ok) diff --git a/coolamqp/uplink/heartbeat.py b/coolamqp/uplink/heartbeat.py index 3f37ba89a3b1fedc33de7f977b8bed40bf8fde58..54d4b9225f91d99249ad87d1ae7d50709569b0d0 100644 --- a/coolamqp/uplink/heartbeat.py +++ b/coolamqp/uplink/heartbeat.py @@ -42,7 +42,7 @@ class Heartbeater(object): """Timer says we should send a heartbeat""" self.connection.send([AMQPHeartbeatFrame()], priority=True) - if (monotonic.monotonic() - self.last_heartbeat_on) > 2*self.heartbeat_interval: + if (monotonic.monotonic() - self.last_heartbeat_on) > 2 * self.heartbeat_interval: # closing because of heartbeat self.connection.send(None) diff --git a/coolamqp/uplink/listener/__init__.py b/coolamqp/uplink/listener/__init__.py index 6e09bdfd2188ed56b734fcadeee2c4f3879ab522..3f7bc8c82726127347a25c4425902481c45c519a 100644 --- a/coolamqp/uplink/listener/__init__.py +++ b/coolamqp/uplink/listener/__init__.py @@ -14,5 +14,4 @@ immediately be able to do so. With epoll, you can. """ from __future__ import absolute_import, division, print_function - from coolamqp.uplink.listener.thread import ListenerThread diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py index 692b2aa455c9c8eebd1bc3fdaa7dbb00ceb148ea..a92033a9780aabbb55c12a701cb74f4cf109fc63 100644 --- a/coolamqp/uplink/listener/epoll_listener.py +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -10,10 +10,8 @@ import heapq from coolamqp.uplink.listener.socket import SocketFailed, BaseSocket - logger = logging.getLogger(__name__) - RO = select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR RW = RO | select.EPOLLOUT @@ -22,6 +20,7 @@ class EpollSocket(BaseSocket): """ EpollListener substitutes your BaseSockets with this """ + def __init__(self, sock, on_read, on_fail, listener): BaseSocket.__init__(self, sock, on_read=on_read, on_fail=on_fail) self.listener = listener @@ -136,7 +135,7 @@ class EpollListener(object): )) def register(self, sock, on_read=lambda data: None, - on_fail=lambda: None): + on_fail=lambda: None): """ Add a socket to be listened for by the loop. @@ -151,4 +150,3 @@ class EpollListener(object): self.epoll.register(sock, RW) return sock - diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index a24d0c101b8f9fa52dc9f654dbccda222e63c6af..34420c1eae411a4b7234e2b1c8c515cb1daee687 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -19,8 +19,8 @@ class BaseSocket(object): """ def __init__(self, sock, on_read=lambda data: None, - on_time=lambda: None, - on_fail=lambda: None): + on_time=lambda: None, + on_fail=lambda: None): """ :param sock: socketobject @@ -36,7 +36,7 @@ class BaseSocket(object): assert sock is not None self.sock = sock self.data_to_send = collections.deque() - self.priority_queue = collections.deque() # when a piece of data is finished, this queue is checked first + self.priority_queue = collections.deque() # when a piece of data is finished, this queue is checked first self.my_on_read = on_read self._on_fail = on_fail self.on_time = on_time @@ -117,7 +117,7 @@ class BaseSocket(object): assert len(self.data_to_send) > 0 if self.data_to_send[0] is None: - raise SocketFailed() # We should terminate the connection! + raise SocketFailed() # We should terminate the connection! try: sent = self.sock.send(self.data_to_send[0]) @@ -130,7 +130,7 @@ class BaseSocket(object): return False else: # Looks like everything has been sent - self.data_to_send.popleft() # mark as sent + self.data_to_send.popleft() # mark as sent if len(self.priority_queue) > 0: # We can send a priority pack @@ -142,4 +142,4 @@ class BaseSocket(object): def close(self): """Close this socket""" - self.sock.close() \ No newline at end of file + self.sock.close() diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py index 013bd23db0d3fbb168accd51bb7ccc1c574a7202..5af8893e6d9b1be9044c28444ace87b87eeb4f8e 100644 --- a/coolamqp/uplink/listener/thread.py +++ b/coolamqp/uplink/listener/thread.py @@ -19,7 +19,7 @@ class ListenerThread(threading.Thread): self.terminating = False def terminate(self): - self.terminating = True + self.terminating = True def init(self): """Called before start. It is not safe to fork after this"""