diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index bf712755743c3da5c09e780b9a79219ef718624b..87f733907555e49b6a5b7b63901744c27af2e5d8 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -136,7 +136,7 @@ class Channeler(Attache): self.state = ST_OFFLINE if not isinstance(payload, (ChannelClose, ChannelCloseOk)) and ( - payload is not None): + payload is not None): # I do not know how to handle that! return @@ -217,7 +217,7 @@ class Channeler(Attache): To be called by on_close, when it needs to be notified just one more time. """ self.connection.watch_for_method(self.channel_id, ( - ChannelClose, ChannelCloseOk, BasicCancel, BasicCancelOk), + ChannelClose, ChannelCloseOk, BasicCancel, BasicCancelOk), self.on_close, on_fail=self.on_close) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 12ee09104980b27a4d90951c2dde0f048fb2af2b..d908b12aa091a95f3916a1977245269216655ad4 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -40,7 +40,6 @@ class BodyReceiveMode(object): # these constitute received pieces. this is always ZC - class Consumer(Channeler): """ This object represents a consumer in the system. @@ -442,14 +441,14 @@ class MessageReceiver(object): self.header = None # AMQPHeaderFrame if consumer.body_receive_mode == BodyReceiveMode.MEMORYVIEW: self.body = None # None is an important sign - first piece of - # message + # message else: self.body = [] # list of payloads self.data_to_go = None # set on receiving header, how much bytes we - # need yet + # need yet self.message_size = None # in bytes, of currently received message self.offset = 0 # used only in MEMORYVIEW mode - pointer to self.body - # (which would be a buffer) + # (which would be a buffer) self.acks_pending = set() # list of things to ack/reject diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index 112a402da59e53623d83bc95e64d4d1cdae8b638..9e07baff9c2c56529ae415ef9b65d2b0177220d6 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -239,7 +239,7 @@ class Declarer(Channeler, Synchronized): To be called when it's possible that something can be done """ if (self.state != ST_ONLINE) or len(self.left_to_declare) == 0 or ( - self.in_process is not None): + self.in_process is not None): return self.in_process = self.left_to_declare.popleft() diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index ef2cdeece638ec9d820c9220f0f687bfe7146328..6d480f5b1df3a0d8daef9436b00f949959bc3c14 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -233,7 +233,8 @@ class Publisher(Channeler, Synchronized): def on_operational(self, operational): state = {True: u'up', False: u'down'}[operational] - mode = {Publisher.MODE_NOACK: u'noack', Publisher.MODE_CNPUB: u'cnpub'}[ + mode = \ + {Publisher.MODE_NOACK: u'noack', Publisher.MODE_CNPUB: u'cnpub'}[ self.mode] logger.info('Publisher %s is %s', mode, state) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 33d1bbeaeadc1a8f32cc20f9cdb813b83726d245..9f31d190bf9bbc4c01ecaf6c6f76ed7eb2e962e4 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -105,7 +105,8 @@ class Cluster(object): fut.set_running_or_notify_cancel() # it's running right now on_message = on_message or ( lambda rmsg: self.events.put_nowait(MessageReceived(rmsg))) - con = Consumer(queue, on_message, future_to_notify=fut, *args, **kwargs) + con = Consumer(queue, on_message, future_to_notify=fut, *args, + **kwargs) self.attache_group.add(con) return con, fut diff --git a/coolamqp/exceptions.py b/coolamqp/exceptions.py index 0047981684595dda8633c0dfc205da1f300a3713..34956c1e65c6d10f4fe3f3994b1d8c817ee98dd8 100644 --- a/coolamqp/exceptions.py +++ b/coolamqp/exceptions.py @@ -33,7 +33,7 @@ class AMQPError(CoolAMQPError): self.reply_text, self.class_id, self.method_id) - ) + ) def __init__(self, *args): """ diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index 75f74c34614123d9ad76a4daec7043244f17edba..89f057568963c32c49f0bc38a7b2f22fa54fcae5 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -13,9 +13,9 @@ BASIC_TYPES = {u'bit': (None, None, "0", None), # special case u'short': (2, 'H', "b'\\x00\\x00'", 2), u'long': (4, 'I', "b'\\x00\\x00\\x00\\x00'", 4), u'longlong': ( - 8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), + 8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), u'timestamp': ( - 8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), + 8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), u'table': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case u'longstr': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), diff --git a/coolamqp/framing/compilation/content_property.py b/coolamqp/framing/compilation/content_property.py index e011d76ef41590c72b33b8ab91972109f31d3f7a..3b94bc83cf5f50d3cd09daf8a047e7245e417cac 100644 --- a/coolamqp/framing/compilation/content_property.py +++ b/coolamqp/framing/compilation/content_property.py @@ -9,7 +9,6 @@ from coolamqp.framing.compilation.textcode_fields import get_counter, \ logger = logging.getLogger(__name__) - INIT_I = u'\n def __init__(self, %s):\n' SLOTS_I = u'\n __slots__ = (%s)\n' FROM_BUFFER_1 = u' def from_buffer(cls, buf, start_offset):\n ' \ @@ -36,6 +35,7 @@ SPACER = u''' ''' GET_SIZE_HEADER = u'\n def get_size(self):\n' + def _compile_particular_content_property_list_class(zpf, fields): """ Compile a particular content property list. @@ -101,7 +101,7 @@ def _compile_particular_content_property_list_class(zpf, fields): FFN = u', '.join(format_field_name(field.name) for field in present_fields) if len(present_fields) > 0: - mod.append(INIT_I % (FFN, )) + mod.append(INIT_I % (FFN,)) for field in present_fields: mod.append(ASSIGN_A.replace(u'%s', format_field_name( @@ -122,11 +122,11 @@ def _compile_particular_content_property_list_class(zpf, fields): mod.append(u' @classmethod\n') mod.append( FROM_BUFFER_1 % ( - zpf_length,)) + zpf_length,)) mod.append(get_from_buffer( present_fields , prefix='', indent_level=2)) - mod.append(u' return cls(%s)\n' % (FFN, )) + mod.append(u' return cls(%s)\n' % (FFN,)) # get_size mod.append(GET_SIZE_HEADER) diff --git a/coolamqp/framing/compilation/textcode_fields.py b/coolamqp/framing/compilation/textcode_fields.py index 88d99d9ebe53ec6e88bfd3bff437d34ee32b570b..3a27ee44e6fdb94c9f1d24b3d3b65da6c6bd9094 100644 --- a/coolamqp/framing/compilation/textcode_fields.py +++ b/coolamqp/framing/compilation/textcode_fields.py @@ -66,7 +66,7 @@ def get_counter(fields, prefix=u'', indent_level=2): accumulator += int(math.ceil(bits / 8)) return (u' ' * indent_level) + u'return ' + ( - u' + '.join([str(accumulator)] + parts)) + u'\n' + u' + '.join([str(accumulator)] + parts)) + u'\n' def get_from_buffer(fields, prefix='', indent_level=2, remark=False): @@ -208,7 +208,7 @@ def get_serializer(fields, prefix='', indent_level=2): for bit_name, modif in zip(bits, range(8)): if bit_name != 'False': p.append('(' + bit_name + ' << %s)' % ( - modif,)) # yes you can << bools + modif,)) # yes you can << bools format_args.append(u' | '.join(p)) del bits[:] diff --git a/coolamqp/framing/compilation/utilities.py b/coolamqp/framing/compilation/utilities.py index dd93515cd22ab58f317ad93ba164210bfc0967a5..f647043dbe6885d8897df30377b692ed5a5b5726 100644 --- a/coolamqp/framing/compilation/utilities.py +++ b/coolamqp/framing/compilation/utilities.py @@ -11,13 +11,15 @@ from coolamqp.framing.base import BASIC_TYPES, DYNAMIC_BASIC_TYPES # docs may be None Constant = namedtuple('Constant', ( -'name', 'value', 'kind', 'docs')) # kind is AMQP constant class # value is int + 'name', 'value', 'kind', + 'docs')) # kind is AMQP constant class # value is int Field = namedtuple('Field', ( -'name', 'type', 'label', 'docs', 'reserved', 'basic_type')) # reserved is bool + 'name', 'type', 'label', 'docs', 'reserved', + 'basic_type')) # reserved is bool # synchronous is bool, constant is bool # repponse is a list of method.name Class_ = namedtuple('Class_', ( -'name', 'index', 'docs', 'methods', 'properties')) # label is int + 'name', 'index', 'docs', 'methods', 'properties')) # label is int Domain = namedtuple('Domain', ('name', 'type', 'elementary')) # elementary is bool diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index e472c3d376f2de66819fb6592677c6cfe8b5dd92..79be651e1d9bdea9225f6933ce8fd3e9d8bf207f 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -1,5 +1,6 @@ # coding=UTF-8 from __future__ import print_function, absolute_import + """ A Python version of the AMQP machine-readable specification. @@ -25,13 +26,17 @@ Only thing that isn't are field names in tables. import struct, collections, logging, six -from coolamqp.framing.base import AMQPClass, AMQPMethodPayload, AMQPContentPropertyList -from coolamqp.framing.field_table import enframe_table, deframe_table, frame_table_size -from coolamqp.framing.compilation.content_property import compile_particular_content_property_list_class +from coolamqp.framing.base import AMQPClass, AMQPMethodPayload, \ + AMQPContentPropertyList +from coolamqp.framing.field_table import enframe_table, deframe_table, \ + frame_table_size +from coolamqp.framing.compilation.content_property import \ + compile_particular_content_property_list_class logger = logging.getLogger(__name__) -Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved')) +Field = collections.namedtuple('Field', + ('name', 'type', 'basic_type', 'reserved')) # Core constants FRAME_METHOD = 1 @@ -53,67 +58,69 @@ FRAME_END_BYTE = b'\xce' REPLY_SUCCESS = 200 REPLY_SUCCESS_BYTE = b'\xc8' - # Indicates that the method completed successfully. This reply code is - # reserved for future use - the current protocol design does not use positive - # confirmation and reply codes are sent only in case of an error. +# Indicates that the method completed successfully. This reply code is +# reserved for future use - the current protocol design does not use positive +# confirmation and reply codes are sent only in case of an error. CONTENT_TOO_LARGE = 311 - # The client attempted to transfer content larger than the server could accept - # at the present time. The client may retry at a later time. +# The client attempted to transfer content larger than the server could accept +# at the present time. The client may retry at a later time. NO_CONSUMERS = 313 - # When the exchange cannot deliver to a consumer when the immediate flag is - # set. As a result of pending data on the queue or the absence of any - # consumers of the queue. +# When the exchange cannot deliver to a consumer when the immediate flag is +# set. As a result of pending data on the queue or the absence of any +# consumers of the queue. CONNECTION_FORCED = 320 - # An operator intervened to close the connection for some reason. The client - # may retry at some later date. +# An operator intervened to close the connection for some reason. The client +# may retry at some later date. INVALID_PATH = 402 - # The client tried to work with an unknown virtual host. +# The client tried to work with an unknown virtual host. ACCESS_REFUSED = 403 - # The client attempted to work with a server entity to which it has no - # access due to security settings. +# The client attempted to work with a server entity to which it has no +# access due to security settings. NOT_FOUND = 404 - # The client attempted to work with a server entity that does not exist. +# The client attempted to work with a server entity that does not exist. RESOURCE_LOCKED = 405 - # The client attempted to work with a server entity to which it has no - # access because another client is working with it. +# The client attempted to work with a server entity to which it has no +# access because another client is working with it. PRECONDITION_FAILED = 406 - # The client requested a method that was not allowed because some precondition - # failed. +# The client requested a method that was not allowed because some precondition +# failed. FRAME_ERROR = 501 - # The sender sent a malformed frame that the recipient could not decode. - # This strongly implies a programming error in the sending peer. +# The sender sent a malformed frame that the recipient could not decode. +# This strongly implies a programming error in the sending peer. SYNTAX_ERROR = 502 - # The sender sent a frame that contained illegal values for one or more - # fields. This strongly implies a programming error in the sending peer. +# The sender sent a frame that contained illegal values for one or more +# fields. This strongly implies a programming error in the sending peer. COMMAND_INVALID = 503 - # The client sent an invalid sequence of frames, attempting to perform an - # operation that was considered invalid by the server. This usually implies - # a programming error in the client. +# The client sent an invalid sequence of frames, attempting to perform an +# operation that was considered invalid by the server. This usually implies +# a programming error in the client. CHANNEL_ERROR = 504 - # The client attempted to work with a channel that had not been correctly - # opened. This most likely indicates a fault in the client layer. +# The client attempted to work with a channel that had not been correctly +# opened. This most likely indicates a fault in the client layer. UNEXPECTED_FRAME = 505 - # The peer sent a frame that was not expected, usually in the context of - # a content header and body. This strongly indicates a fault in the peer's - # content processing. +# The peer sent a frame that was not expected, usually in the context of +# a content header and body. This strongly indicates a fault in the peer's +# content processing. RESOURCE_ERROR = 506 - # The server could not complete the method because it lacked sufficient - # resources. This may be due to the client creating too many of some type - # of entity. +# The server could not complete the method because it lacked sufficient +# resources. This may be due to the client creating too many of some type +# of entity. NOT_ALLOWED = 530 - # The client tried to work with some entity in a manner that is prohibited - # by the server, due to security settings or by some other criteria. +# The client tried to work with some entity in a manner that is prohibited +# by the server, due to security settings or by some other criteria. NOT_IMPLEMENTED = 540 - # The client tried to use functionality that is not implemented in the - # server. +# The client tried to use functionality that is not implemented in the +# server. INTERNAL_ERROR = 541 - # The server could not complete the method because of an internal error. - # The server may require intervention by an operator in order to resume - # normal operations. - -HARD_ERRORS = [CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, SYNTAX_ERROR, COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, INTERNAL_ERROR] -SOFT_ERRORS = [CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, RESOURCE_LOCKED, PRECONDITION_FAILED] +# The server could not complete the method because of an internal error. +# The server may require intervention by an operator in order to resume +# normal operations. +HARD_ERRORS = [CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, SYNTAX_ERROR, + COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, + RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, INTERNAL_ERROR] +SOFT_ERRORS = [CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, + RESOURCE_LOCKED, PRECONDITION_FAILED] DOMAIN_TO_BASIC_TYPE = { u'class-id': u'short', @@ -142,6 +149,7 @@ DOMAIN_TO_BASIC_TYPE = { u'table': None, } + class Connection(AMQPClass): """ The connection class provides methods for a client to establish a network connection to @@ -158,20 +166,20 @@ class ConnectionBlocked(AMQPMethodPayload): and does not accept new publishes. """ - __slots__ = (u'reason', ) + __slots__ = (u'reason',) NAME = u'connection.blocked' - INDEX = (10, 60) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x3C' # CLASS ID + METHOD ID + INDEX = (10, 60) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x3C' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reason', u'shortstr', u'shortstr', reserved=False), ] @@ -186,7 +194,7 @@ class ConnectionBlocked(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', len(self.reason))) buf.write(self.reason) - + def get_size(self): return 1 + len(self.reason) @@ -195,7 +203,7 @@ class ConnectionBlocked(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - reason = buf[offset:offset+s_len] + reason = buf[offset:offset + s_len] offset += s_len return ConnectionBlocked(reason) @@ -209,20 +217,20 @@ class ConnectionClose(AMQPMethodPayload): a specific method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception. """ - __slots__ = (u'reply_code', u'reply_text', u'class_id', u'method_id', ) + __slots__ = (u'reply_code', u'reply_text', u'class_id', u'method_id',) NAME = u'connection.close' - INDEX = (10, 50) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x32' # CLASS ID + METHOD ID + INDEX = (10, 50) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x32' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reply-code', u'reply-code', u'short', reserved=False), Field(u'reply-text', u'reply-text', u'shortstr', reserved=False), Field(u'class-id', u'class-id', u'short', reserved=False), @@ -252,7 +260,7 @@ class ConnectionClose(AMQPMethodPayload): buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) buf.write(self.reply_text) buf.write(struct.pack('!HH', self.class_id, self.method_id)) - + def get_size(self): return 7 + len(self.reply_text) @@ -261,7 +269,7 @@ class ConnectionClose(AMQPMethodPayload): offset = start_offset reply_code, s_len, = struct.unpack_from('!HB', buf, offset) offset += 3 - reply_text = buf[offset:offset+s_len] + reply_text = buf[offset:offset + s_len] offset += s_len class_id, method_id, = struct.unpack_from('!HH', buf, offset) offset += 4 @@ -279,12 +287,12 @@ class ConnectionCloseOk(AMQPMethodPayload): NAME = u'connection.close-ok' - INDEX = (10, 51) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x33' # CLASS ID + METHOD ID + INDEX = (10, 51) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x33' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -293,7 +301,6 @@ class ConnectionCloseOk(AMQPMethodPayload): Create frame connection.close-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -309,20 +316,20 @@ class ConnectionOpen(AMQPMethodPayload): The server may apply arbitrary limits per virtual host, such as the number of each type of entity that may be used, per connection and/or in total. """ - __slots__ = (u'virtual_host', ) + __slots__ = (u'virtual_host',) NAME = u'connection.open' - INDEX = (10, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x28' # CLASS ID + METHOD ID + INDEX = (10, 40) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x28' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'virtual-host', u'path', u'shortstr', reserved=False), Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), Field(u'reserved-2', u'bit', u'bit', reserved=True), @@ -343,7 +350,7 @@ class ConnectionOpen(AMQPMethodPayload): buf.write(self.virtual_host) buf.write(b'\x00') buf.write(struct.pack('!B', 0)) - + def get_size(self): return 3 + len(self.virtual_host) @@ -352,11 +359,11 @@ class ConnectionOpen(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - virtual_host = buf[offset:offset+s_len] + virtual_host = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - offset += s_len # reserved field! + offset += s_len # reserved field! offset += 1 return ConnectionOpen(virtual_host) @@ -371,17 +378,17 @@ class ConnectionOpenOk(AMQPMethodPayload): NAME = u'connection.open-ok' - INDEX = (10, 41) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x29' # CLASS ID + METHOD ID + INDEX = (10, 41) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x29' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x29\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), ] @@ -390,13 +397,12 @@ class ConnectionOpenOk(AMQPMethodPayload): Create frame connection.open-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - offset += s_len # reserved field! + offset += s_len # reserved field! return ConnectionOpenOk() @@ -408,28 +414,32 @@ class ConnectionStart(AMQPMethodPayload): protocol version that the server proposes, along with a list of security mechanisms which the client can use for authentication. """ - __slots__ = (u'version_major', u'version_minor', u'server_properties', u'mechanisms', u'locales', ) + __slots__ = ( + u'version_major', u'version_minor', u'server_properties', u'mechanisms', + u'locales',) NAME = u'connection.start' - INDEX = (10, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x0A' # CLASS ID + METHOD ID + INDEX = (10, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'version-major', u'octet', u'octet', reserved=False), Field(u'version-minor', u'octet', u'octet', reserved=False), - Field(u'server-properties', u'peer-properties', u'table', reserved=False), + Field(u'server-properties', u'peer-properties', u'table', + reserved=False), Field(u'mechanisms', u'longstr', u'longstr', reserved=False), Field(u'locales', u'longstr', u'longstr', reserved=False), ] - def __init__(self, version_major, version_minor, server_properties, mechanisms, locales): + def __init__(self, version_major, version_minor, server_properties, + mechanisms, locales): """ Create frame connection.start @@ -469,9 +479,10 @@ class ConnectionStart(AMQPMethodPayload): buf.write(self.mechanisms) buf.write(struct.pack('!I', len(self.locales))) buf.write(self.locales) - + def get_size(self): - return 10 + frame_table_size(self.server_properties) + len(self.mechanisms) + len(self.locales) + return 10 + frame_table_size(self.server_properties) + len( + self.mechanisms) + len(self.locales) @staticmethod def from_buffer(buf, start_offset): @@ -482,13 +493,14 @@ class ConnectionStart(AMQPMethodPayload): offset += delta s_len, = struct.unpack_from('!L', buf, offset) offset += 4 - mechanisms = buf[offset:offset+s_len] + mechanisms = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!L', buf, offset) offset += 4 - locales = buf[offset:offset+s_len] + locales = buf[offset:offset + s_len] offset += s_len - return ConnectionStart(version_major, version_minor, server_properties, mechanisms, locales) + return ConnectionStart(version_major, version_minor, server_properties, + mechanisms, locales) class ConnectionSecure(AMQPMethodPayload): @@ -499,20 +511,20 @@ class ConnectionSecure(AMQPMethodPayload): received sufficient information to authenticate each other. This method challenges the client to provide more information. """ - __slots__ = (u'challenge', ) + __slots__ = (u'challenge',) NAME = u'connection.secure' - INDEX = (10, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x14' # CLASS ID + METHOD ID + INDEX = (10, 20) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'challenge', u'longstr', u'longstr', reserved=False), ] @@ -530,7 +542,7 @@ class ConnectionSecure(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!I', len(self.challenge))) buf.write(self.challenge) - + def get_size(self): return 4 + len(self.challenge) @@ -539,7 +551,7 @@ class ConnectionSecure(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!L', buf, offset) offset += 4 - challenge = buf[offset:offset+s_len] + challenge = buf[offset:offset + s_len] offset += s_len return ConnectionSecure(challenge) @@ -550,21 +562,22 @@ class ConnectionStartOk(AMQPMethodPayload): This method selects a SASL security mechanism. """ - __slots__ = (u'client_properties', u'mechanism', u'response', u'locale', ) + __slots__ = (u'client_properties', u'mechanism', u'response', u'locale',) NAME = u'connection.start-ok' - INDEX = (10, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x0B' # CLASS ID + METHOD ID + INDEX = (10, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ - Field(u'client-properties', u'peer-properties', u'table', reserved=False), + FIELDS = [ + Field(u'client-properties', u'peer-properties', u'table', + reserved=False), Field(u'mechanism', u'shortstr', u'shortstr', reserved=False), Field(u'response', u'longstr', u'longstr', reserved=False), Field(u'locale', u'shortstr', u'shortstr', reserved=False), @@ -606,9 +619,10 @@ class ConnectionStartOk(AMQPMethodPayload): buf.write(self.response) buf.write(struct.pack('!B', len(self.locale))) buf.write(self.locale) - + def get_size(self): - return 6 + frame_table_size(self.client_properties) + len(self.mechanism) + len(self.response) + len(self.locale) + return 6 + frame_table_size(self.client_properties) + len( + self.mechanism) + len(self.response) + len(self.locale) @staticmethod def from_buffer(buf, start_offset): @@ -617,17 +631,18 @@ class ConnectionStartOk(AMQPMethodPayload): offset += delta s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - mechanism = buf[offset:offset+s_len] + mechanism = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!L', buf, offset) offset += 4 - response = buf[offset:offset+s_len] + response = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - locale = buf[offset:offset+s_len] + locale = buf[offset:offset + s_len] offset += s_len - return ConnectionStartOk(client_properties, mechanism, response, locale) + return ConnectionStartOk(client_properties, mechanism, response, + locale) class ConnectionSecureOk(AMQPMethodPayload): @@ -637,20 +652,20 @@ class ConnectionSecureOk(AMQPMethodPayload): This method attempts to authenticate, passing a block of SASL data for the security mechanism at the server side. """ - __slots__ = (u'response', ) + __slots__ = (u'response',) NAME = u'connection.secure-ok' - INDEX = (10, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x15' # CLASS ID + METHOD ID + INDEX = (10, 21) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'response', u'longstr', u'longstr', reserved=False), ] @@ -668,7 +683,7 @@ class ConnectionSecureOk(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!I', len(self.response))) buf.write(self.response) - + def get_size(self): return 4 + len(self.response) @@ -677,7 +692,7 @@ class ConnectionSecureOk(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!L', buf, offset) offset += 4 - response = buf[offset:offset+s_len] + response = buf[offset:offset + s_len] offset += s_len return ConnectionSecureOk(response) @@ -689,20 +704,20 @@ class ConnectionTune(AMQPMethodPayload): This method proposes a set of connection configuration values to the client. The client can accept and/or adjust these. """ - __slots__ = (u'channel_max', u'frame_max', u'heartbeat', ) + __slots__ = (u'channel_max', u'frame_max', u'heartbeat',) NAME = u'connection.tune' - INDEX = (10, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x1E' # CLASS ID + METHOD ID + INDEX = (10, 30) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x1E' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'channel-max', u'short', u'short', reserved=False), Field(u'frame-max', u'long', u'long', reserved=False), Field(u'heartbeat', u'short', u'short', reserved=False), @@ -732,15 +747,17 @@ class ConnectionTune(AMQPMethodPayload): self.heartbeat = heartbeat def write_arguments(self, buf): - buf.write(struct.pack('!HIH', self.channel_max, self.frame_max, self.heartbeat)) - + buf.write(struct.pack('!HIH', self.channel_max, self.frame_max, + self.heartbeat)) + def get_size(self): return 8 @staticmethod def from_buffer(buf, start_offset): offset = start_offset - channel_max, frame_max, heartbeat, = struct.unpack_from('!HIH', buf, offset) + channel_max, frame_max, heartbeat, = struct.unpack_from('!HIH', buf, + offset) offset += 8 return ConnectionTune(channel_max, frame_max, heartbeat) @@ -752,20 +769,20 @@ class ConnectionTuneOk(AMQPMethodPayload): This method sends the client's connection tuning parameters to the server. Certain fields are negotiated, others provide capability information. """ - __slots__ = (u'channel_max', u'frame_max', u'heartbeat', ) + __slots__ = (u'channel_max', u'frame_max', u'heartbeat',) NAME = u'connection.tune-ok' - INDEX = (10, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x1F' # CLASS ID + METHOD ID + INDEX = (10, 31) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x1F' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'channel-max', u'short', u'short', reserved=False), Field(u'frame-max', u'long', u'long', reserved=False), Field(u'heartbeat', u'short', u'short', reserved=False), @@ -795,15 +812,17 @@ class ConnectionTuneOk(AMQPMethodPayload): self.heartbeat = heartbeat def write_arguments(self, buf): - buf.write(struct.pack('!HIH', self.channel_max, self.frame_max, self.heartbeat)) - + buf.write(struct.pack('!HIH', self.channel_max, self.frame_max, + self.heartbeat)) + def get_size(self): return 8 @staticmethod def from_buffer(buf, start_offset): offset = start_offset - channel_max, frame_max, heartbeat, = struct.unpack_from('!HIH', buf, offset) + channel_max, frame_max, heartbeat, = struct.unpack_from('!HIH', buf, + offset) offset += 8 return ConnectionTuneOk(channel_max, frame_max, heartbeat) @@ -818,12 +837,12 @@ class ConnectionUnblocked(AMQPMethodPayload): NAME = u'connection.unblocked' - INDEX = (10, 61) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x3D' # CLASS ID + METHOD ID + INDEX = (10, 61) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x3D' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x3D\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -832,7 +851,6 @@ class ConnectionUnblocked(AMQPMethodPayload): Create frame connection.unblocked """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -858,20 +876,20 @@ class ChannelClose(AMQPMethodPayload): method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception. """ - __slots__ = (u'reply_code', u'reply_text', u'class_id', u'method_id', ) + __slots__ = (u'reply_code', u'reply_text', u'class_id', u'method_id',) NAME = u'channel.close' - INDEX = (20, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x14\x00\x28' # CLASS ID + METHOD ID + INDEX = (20, 40) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x14\x00\x28' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reply-code', u'reply-code', u'short', reserved=False), Field(u'reply-text', u'reply-text', u'shortstr', reserved=False), Field(u'class-id', u'class-id', u'short', reserved=False), @@ -901,7 +919,7 @@ class ChannelClose(AMQPMethodPayload): buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) buf.write(self.reply_text) buf.write(struct.pack('!HH', self.class_id, self.method_id)) - + def get_size(self): return 7 + len(self.reply_text) @@ -910,7 +928,7 @@ class ChannelClose(AMQPMethodPayload): offset = start_offset reply_code, s_len, = struct.unpack_from('!HB', buf, offset) offset += 3 - reply_text = buf[offset:offset+s_len] + reply_text = buf[offset:offset + s_len] offset += s_len class_id, method_id, = struct.unpack_from('!HH', buf, offset) offset += 4 @@ -928,12 +946,12 @@ class ChannelCloseOk(AMQPMethodPayload): NAME = u'channel.close-ok' - INDEX = (20, 41) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x14\x00\x29' # CLASS ID + METHOD ID + INDEX = (20, 41) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x14\x00\x29' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x14\x00\x29\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -942,7 +960,6 @@ class ChannelCloseOk(AMQPMethodPayload): Create frame channel.close-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -959,20 +976,20 @@ class ChannelFlow(AMQPMethodPayload): it can process. Note that this method is not intended for window control. It does not affect contents returned by Basic.Get-Ok methods. """ - __slots__ = (u'active', ) + __slots__ = (u'active',) NAME = u'channel.flow' - INDEX = (20, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x14\x00\x14' # CLASS ID + METHOD ID + INDEX = (20, 20) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x14\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'active', u'bit', u'bit', reserved=False), ] @@ -989,7 +1006,7 @@ class ChannelFlow(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', (self.active << 0))) - + def get_size(self): return 1 @@ -1009,20 +1026,20 @@ class ChannelFlowOk(AMQPMethodPayload): Confirms to the peer that a flow command was received and processed. """ - __slots__ = (u'active', ) + __slots__ = (u'active',) NAME = u'channel.flow-ok' - INDEX = (20, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x14\x00\x15' # CLASS ID + METHOD ID + INDEX = (20, 21) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x14\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'active', u'bit', u'bit', reserved=False), ] @@ -1039,7 +1056,7 @@ class ChannelFlowOk(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', (self.active << 0))) - + def get_size(self): return 1 @@ -1063,17 +1080,17 @@ class ChannelOpen(AMQPMethodPayload): NAME = u'channel.open' - INDEX = (20, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x14\x00\x0A' # CLASS ID + METHOD ID + INDEX = (20, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x14\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x05\x00\x14\x00\x0A\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), ] @@ -1082,13 +1099,12 @@ class ChannelOpen(AMQPMethodPayload): Create frame channel.open """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - offset += s_len # reserved field! + offset += s_len # reserved field! return ChannelOpen() @@ -1102,17 +1118,17 @@ class ChannelOpenOk(AMQPMethodPayload): NAME = u'channel.open-ok' - INDEX = (20, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x14\x00\x0B' # CLASS ID + METHOD ID + INDEX = (20, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x14\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x05\x00\x14\x00\x0B\x00\x00\x00\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'longstr', u'longstr', reserved=True), ] @@ -1121,13 +1137,12 @@ class ChannelOpenOk(AMQPMethodPayload): Create frame channel.open-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!L', buf, offset) offset += 4 - offset += s_len # reserved field! + offset += s_len # reserved field! return ChannelOpenOk() @@ -1147,20 +1162,21 @@ class ExchangeBind(AMQPMethodPayload): This method binds an exchange to an exchange. """ - __slots__ = (u'destination', u'source', u'routing_key', u'no_wait', u'arguments', ) + __slots__ = ( + u'destination', u'source', u'routing_key', u'no_wait', u'arguments',) NAME = u'exchange.bind' - INDEX = (40, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x1E' # CLASS ID + METHOD ID + INDEX = (40, 30) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x1E' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'destination', u'exchange-name', u'shortstr', reserved=False), Field(u'source', u'exchange-name', u'shortstr', reserved=False), @@ -1207,24 +1223,25 @@ class ExchangeBind(AMQPMethodPayload): buf.write(self.routing_key) buf.write(struct.pack('!B', (self.no_wait << 0))) enframe_table(buf, self.arguments) - + def get_size(self): - return 6 + len(self.destination) + len(self.source) + len(self.routing_key) + frame_table_size(self.arguments) + return 6 + len(self.destination) + len(self.source) + len( + self.routing_key) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - destination = buf[offset:offset+s_len] + destination = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - source = buf[offset:offset+s_len] + source = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset+s_len] + routing_key = buf[offset:offset + s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1232,7 +1249,8 @@ class ExchangeBind(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return ExchangeBind(destination, source, routing_key, no_wait, arguments) + return ExchangeBind(destination, source, routing_key, no_wait, + arguments) class ExchangeBindOk(AMQPMethodPayload): @@ -1245,12 +1263,12 @@ class ExchangeBindOk(AMQPMethodPayload): NAME = u'exchange.bind-ok' - INDEX = (40, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x1F' # CLASS ID + METHOD ID + INDEX = (40, 31) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x1F' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x1F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -1259,7 +1277,6 @@ class ExchangeBindOk(AMQPMethodPayload): Create frame exchange.bind-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -1273,20 +1290,22 @@ class ExchangeDeclare(AMQPMethodPayload): This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class. """ - __slots__ = (u'exchange', u'type_', u'passive', u'durable', u'auto_delete', u'internal', u'no_wait', u'arguments', ) + __slots__ = ( + u'exchange', u'type_', u'passive', u'durable', u'auto_delete', u'internal', + u'no_wait', u'arguments',) NAME = u'exchange.declare' - INDEX = (40, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x0A' # CLASS ID + METHOD ID + INDEX = (40, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), Field(u'type', u'shortstr', u'shortstr', reserved=False), @@ -1298,7 +1317,8 @@ class ExchangeDeclare(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] - def __init__(self, exchange, type_, passive, durable, auto_delete, internal, no_wait, arguments): + def __init__(self, exchange, type_, passive, durable, auto_delete, + internal, no_wait, arguments): """ Create frame exchange.declare @@ -1356,22 +1376,26 @@ class ExchangeDeclare(AMQPMethodPayload): buf.write(self.exchange) buf.write(struct.pack('!B', len(self.type_))) buf.write(self.type_) - buf.write(struct.pack('!B', (self.passive << 0) | (self.durable << 1) | (self.auto_delete << 2) | (self.internal << 3) | (self.no_wait << 4))) + buf.write(struct.pack('!B', + (self.passive << 0) | (self.durable << 1) | ( + self.auto_delete << 2) | (self.internal << 3) | ( + self.no_wait << 4))) enframe_table(buf, self.arguments) - + def get_size(self): - return 5 + len(self.exchange) + len(self.type_) + frame_table_size(self.arguments) + return 5 + len(self.exchange) + len(self.type_) + frame_table_size( + self.arguments) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - exchange = buf[offset:offset+s_len] + exchange = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - type_ = buf[offset:offset+s_len] + type_ = buf[offset:offset + s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1383,7 +1407,8 @@ class ExchangeDeclare(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return ExchangeDeclare(exchange, type_, passive, durable, auto_delete, internal, no_wait, arguments) + return ExchangeDeclare(exchange, type_, passive, durable, auto_delete, + internal, no_wait, arguments) class ExchangeDelete(AMQPMethodPayload): @@ -1393,20 +1418,20 @@ class ExchangeDelete(AMQPMethodPayload): This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled. """ - __slots__ = (u'exchange', u'if_unused', u'no_wait', ) + __slots__ = (u'exchange', u'if_unused', u'no_wait',) NAME = u'exchange.delete' - INDEX = (40, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x14' # CLASS ID + METHOD ID + INDEX = (40, 20) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), Field(u'if-unused', u'bit', u'bit', reserved=False), @@ -1434,8 +1459,9 @@ class ExchangeDelete(AMQPMethodPayload): buf.write(b'\x00\x00') buf.write(struct.pack('!B', len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', (self.if_unused << 0) | (self.no_wait << 1))) - + buf.write( + struct.pack('!B', (self.if_unused << 0) | (self.no_wait << 1))) + def get_size(self): return 4 + len(self.exchange) @@ -1444,7 +1470,7 @@ class ExchangeDelete(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - exchange = buf[offset:offset+s_len] + exchange = buf[offset:offset + s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1465,12 +1491,12 @@ class ExchangeDeclareOk(AMQPMethodPayload): NAME = u'exchange.declare-ok' - INDEX = (40, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x0B' # CLASS ID + METHOD ID + INDEX = (40, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -1479,7 +1505,6 @@ class ExchangeDeclareOk(AMQPMethodPayload): Create frame exchange.declare-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -1496,12 +1521,12 @@ class ExchangeDeleteOk(AMQPMethodPayload): NAME = u'exchange.delete-ok' - INDEX = (40, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x15' # CLASS ID + METHOD ID + INDEX = (40, 21) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -1510,7 +1535,6 @@ class ExchangeDeleteOk(AMQPMethodPayload): Create frame exchange.delete-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -1523,20 +1547,21 @@ class ExchangeUnbind(AMQPMethodPayload): This method unbinds an exchange from an exchange. """ - __slots__ = (u'destination', u'source', u'routing_key', u'no_wait', u'arguments', ) + __slots__ = ( + u'destination', u'source', u'routing_key', u'no_wait', u'arguments',) NAME = u'exchange.unbind' - INDEX = (40, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x28' # CLASS ID + METHOD ID + INDEX = (40, 40) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x28' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'destination', u'exchange-name', u'shortstr', reserved=False), Field(u'source', u'exchange-name', u'shortstr', reserved=False), @@ -1577,24 +1602,25 @@ class ExchangeUnbind(AMQPMethodPayload): buf.write(self.routing_key) buf.write(struct.pack('!B', (self.no_wait << 0))) enframe_table(buf, self.arguments) - + def get_size(self): - return 6 + len(self.destination) + len(self.source) + len(self.routing_key) + frame_table_size(self.arguments) + return 6 + len(self.destination) + len(self.source) + len( + self.routing_key) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - destination = buf[offset:offset+s_len] + destination = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - source = buf[offset:offset+s_len] + source = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset+s_len] + routing_key = buf[offset:offset + s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1602,7 +1628,8 @@ class ExchangeUnbind(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return ExchangeUnbind(destination, source, routing_key, no_wait, arguments) + return ExchangeUnbind(destination, source, routing_key, no_wait, + arguments) class ExchangeUnbindOk(AMQPMethodPayload): @@ -1615,12 +1642,12 @@ class ExchangeUnbindOk(AMQPMethodPayload): NAME = u'exchange.unbind-ok' - INDEX = (40, 51) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x33' # CLASS ID + METHOD ID + INDEX = (40, 51) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x33' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -1629,7 +1656,6 @@ class ExchangeUnbindOk(AMQPMethodPayload): Create frame exchange.unbind-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -1656,20 +1682,21 @@ class QueueBind(AMQPMethodPayload): are bound to a direct exchange and subscription queues are bound to a topic exchange. """ - __slots__ = (u'queue', u'exchange', u'routing_key', u'no_wait', u'arguments', ) + __slots__ = ( + u'queue', u'exchange', u'routing_key', u'no_wait', u'arguments',) NAME = u'queue.bind' - INDEX = (50, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x14' # CLASS ID + METHOD ID + INDEX = (50, 20) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), @@ -1719,24 +1746,25 @@ class QueueBind(AMQPMethodPayload): buf.write(self.routing_key) buf.write(struct.pack('!B', (self.no_wait << 0))) enframe_table(buf, self.arguments) - + def get_size(self): - return 6 + len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size(self.arguments) + return 6 + len(self.queue) + len(self.exchange) + len( + self.routing_key) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset+s_len] + queue = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - exchange = buf[offset:offset+s_len] + exchange = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset+s_len] + routing_key = buf[offset:offset + s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1757,12 +1785,12 @@ class QueueBindOk(AMQPMethodPayload): NAME = u'queue.bind-ok' - INDEX = (50, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x15' # CLASS ID + METHOD ID + INDEX = (50, 21) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x32\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -1771,7 +1799,6 @@ class QueueBindOk(AMQPMethodPayload): Create frame queue.bind-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -1786,20 +1813,22 @@ class QueueDeclare(AMQPMethodPayload): specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue. """ - __slots__ = (u'queue', u'passive', u'durable', u'exclusive', u'auto_delete', u'no_wait', u'arguments', ) + __slots__ = ( + u'queue', u'passive', u'durable', u'exclusive', u'auto_delete', u'no_wait', + u'arguments',) NAME = u'queue.declare' - INDEX = (50, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x0A' # CLASS ID + METHOD ID + INDEX = (50, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'passive', u'bit', u'bit', reserved=False), @@ -1810,7 +1839,8 @@ class QueueDeclare(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] - def __init__(self, queue, passive, durable, exclusive, auto_delete, no_wait, arguments): + def __init__(self, queue, passive, durable, exclusive, auto_delete, + no_wait, arguments): """ Create frame queue.declare @@ -1862,9 +1892,12 @@ class QueueDeclare(AMQPMethodPayload): buf.write(b'\x00\x00') buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', (self.passive << 0) | (self.durable << 1) | (self.exclusive << 2) | (self.auto_delete << 3) | (self.no_wait << 4))) + buf.write(struct.pack('!B', + (self.passive << 0) | (self.durable << 1) | ( + self.exclusive << 2) | ( + self.auto_delete << 3) | (self.no_wait << 4))) enframe_table(buf, self.arguments) - + def get_size(self): return 4 + len(self.queue) + frame_table_size(self.arguments) @@ -1873,7 +1906,7 @@ class QueueDeclare(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset+s_len] + queue = buf[offset:offset + s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1885,7 +1918,8 @@ class QueueDeclare(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return QueueDeclare(queue, passive, durable, exclusive, auto_delete, no_wait, arguments) + return QueueDeclare(queue, passive, durable, exclusive, auto_delete, + no_wait, arguments) class QueueDelete(AMQPMethodPayload): @@ -1896,20 +1930,20 @@ class QueueDelete(AMQPMethodPayload): to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled. """ - __slots__ = (u'queue', u'if_unused', u'if_empty', u'no_wait', ) + __slots__ = (u'queue', u'if_unused', u'if_empty', u'no_wait',) NAME = u'queue.delete' - INDEX = (50, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x28' # CLASS ID + METHOD ID + INDEX = (50, 40) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x28' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'if-unused', u'bit', u'bit', reserved=False), @@ -1942,8 +1976,10 @@ class QueueDelete(AMQPMethodPayload): buf.write(b'\x00\x00') buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', (self.if_unused << 0) | (self.if_empty << 1) | (self.no_wait << 2))) - + buf.write(struct.pack('!B', + (self.if_unused << 0) | (self.if_empty << 1) | ( + self.no_wait << 2))) + def get_size(self): return 4 + len(self.queue) @@ -1952,7 +1988,7 @@ class QueueDelete(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset+s_len] + queue = buf[offset:offset + s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1970,20 +2006,20 @@ class QueueDeclareOk(AMQPMethodPayload): This method confirms a Declare method and confirms the name of the queue, essential for automatically-named queues. """ - __slots__ = (u'queue', u'message_count', u'consumer_count', ) + __slots__ = (u'queue', u'message_count', u'consumer_count',) NAME = u'queue.declare-ok' - INDEX = (50, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x0B' # CLASS ID + METHOD ID + INDEX = (50, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'message-count', u'message-count', u'long', reserved=False), Field(u'consumer-count', u'long', u'long', reserved=False), @@ -2010,7 +2046,7 @@ class QueueDeclareOk(AMQPMethodPayload): buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!II', self.message_count, self.consumer_count)) - + def get_size(self): return 9 + len(self.queue) @@ -2019,7 +2055,7 @@ class QueueDeclareOk(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - queue = buf[offset:offset+s_len] + queue = buf[offset:offset + s_len] offset += s_len message_count, consumer_count, = struct.unpack_from('!II', buf, offset) offset += 8 @@ -2032,20 +2068,20 @@ class QueueDeleteOk(AMQPMethodPayload): This method confirms the deletion of a queue. """ - __slots__ = (u'message_count', ) + __slots__ = (u'message_count',) NAME = u'queue.delete-ok' - INDEX = (50, 41) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x29' # CLASS ID + METHOD ID + INDEX = (50, 41) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x29' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'message-count', u'message-count', u'long', reserved=False), ] @@ -2060,7 +2096,7 @@ class QueueDeleteOk(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!I', self.message_count)) - + def get_size(self): return 4 @@ -2079,20 +2115,20 @@ class QueuePurge(AMQPMethodPayload): This method removes all messages from a queue which are not awaiting acknowledgment. """ - __slots__ = (u'queue', u'no_wait', ) + __slots__ = (u'queue', u'no_wait',) NAME = u'queue.purge' - INDEX = (50, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x1E' # CLASS ID + METHOD ID + INDEX = (50, 30) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x1E' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'no-wait', u'no-wait', u'bit', reserved=False), @@ -2114,7 +2150,7 @@ class QueuePurge(AMQPMethodPayload): buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!B', (self.no_wait << 0))) - + def get_size(self): return 4 + len(self.queue) @@ -2123,7 +2159,7 @@ class QueuePurge(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset+s_len] + queue = buf[offset:offset + s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -2138,20 +2174,20 @@ class QueuePurgeOk(AMQPMethodPayload): This method confirms the purge of a queue. """ - __slots__ = (u'message_count', ) + __slots__ = (u'message_count',) NAME = u'queue.purge-ok' - INDEX = (50, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x1F' # CLASS ID + METHOD ID + INDEX = (50, 31) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x1F' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'message-count', u'message-count', u'long', reserved=False), ] @@ -2166,7 +2202,7 @@ class QueuePurgeOk(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!I', self.message_count)) - + def get_size(self): return 4 @@ -2184,20 +2220,20 @@ class QueueUnbind(AMQPMethodPayload): This method unbinds a queue from an exchange. """ - __slots__ = (u'queue', u'exchange', u'routing_key', u'arguments', ) + __slots__ = (u'queue', u'exchange', u'routing_key', u'arguments',) NAME = u'queue.unbind' - INDEX = (50, 50) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x32' # CLASS ID + METHOD ID + INDEX = (50, 50) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x32' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), @@ -2234,24 +2270,25 @@ class QueueUnbind(AMQPMethodPayload): buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) enframe_table(buf, self.arguments) - + def get_size(self): - return 5 + len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size(self.arguments) + return 5 + len(self.queue) + len(self.exchange) + len( + self.routing_key) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset+s_len] + queue = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - exchange = buf[offset:offset+s_len] + exchange = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset+s_len] + routing_key = buf[offset:offset + s_len] offset += s_len arguments, delta = deframe_table(buf, offset) offset += delta @@ -2268,12 +2305,12 @@ class QueueUnbindOk(AMQPMethodPayload): NAME = u'queue.unbind-ok' - INDEX = (50, 51) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x33' # CLASS ID + METHOD ID + INDEX = (50, 51) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x33' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x32\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -2282,7 +2319,6 @@ class QueueUnbindOk(AMQPMethodPayload): Create frame queue.unbind-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -2354,48 +2390,72 @@ class BasicContentPropertyList(AMQPContentPropertyList): :type reserved: binary type (max length 255) (AMQP as shortstr) """ zpf = bytearray([ - (('content_type' in kwargs) << 7) | (('content_encoding' in kwargs) << 6) | (('headers' in kwargs) << 5) | (('delivery_mode' in kwargs) << 4) | (('priority' in kwargs) << 3) | (('correlation_id' in kwargs) << 2) | (('reply_to' in kwargs) << 1) | int('expiration' in kwargs), - (('message_id' in kwargs) << 7) | (('timestamp' in kwargs) << 6) | (('type_' in kwargs) << 5) | (('user_id' in kwargs) << 4) | (('app_id' in kwargs) << 3) | (('reserved' in kwargs) << 2) + (('content_type' in kwargs) << 7) | ( + ('content_encoding' in kwargs) << 6) | ( + ('headers' in kwargs) << 5) | ( + ('delivery_mode' in kwargs) << 4) | ( + ('priority' in kwargs) << 3) | ( + ('correlation_id' in kwargs) << 2) | ( + ('reply_to' in kwargs) << 1) | int('expiration' in kwargs), + (('message_id' in kwargs) << 7) | ( + ('timestamp' in kwargs) << 6) | (('type_' in kwargs) << 5) | ( + ('user_id' in kwargs) << 4) | (('app_id' in kwargs) << 3) | ( + ('reserved' in kwargs) << 2) ]) zpf = six.binary_type(zpf) -# If you know in advance what properties you will be using, use typized constructors like -# -# runs once -# my_type = BasicContentPropertyList.typize('content_type', 'content_encoding') -# -# runs many times -# props = my_type('text/plain', 'utf8') -# -# instead of -# -# # runs many times -# props = BasicContentPropertyList(content_type='text/plain', content_encoding='utf8') -# -# This way you will be faster. -# -# If you do not know in advance what properties you will be using, it is correct to use -# this constructor. + # If you know in advance what properties you will be using, use typized constructors like + # + # runs once + # my_type = BasicContentPropertyList.typize('content_type', 'content_encoding') + # + # runs many times + # props = my_type('text/plain', 'utf8') + # + # instead of + # + # # runs many times + # props = BasicContentPropertyList(content_type='text/plain', content_encoding='utf8') + # + # This way you will be faster. + # + # If you do not know in advance what properties you will be using, it is correct to use + # this constructor. if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: return BasicContentPropertyList.PARTICULAR_CLASSES[zpf](**kwargs) else: - logger.debug('Property field (BasicContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, BasicContentPropertyList.FIELDS) + logger.debug( + 'Property field (BasicContentPropertyList:%s) not seen yet, compiling', + repr(zpf)) + c = compile_particular_content_property_list_class(zpf, + BasicContentPropertyList.FIELDS) BasicContentPropertyList.PARTICULAR_CLASSES[zpf] = c return c(**kwargs) @staticmethod def typize(*fields): zpf = bytearray([ - (('content_type' in fields) << 7) | (('content_encoding' in fields) << 6) | (('headers' in fields) << 5) | (('delivery_mode' in fields) << 4) | (('priority' in fields) << 3) | (('correlation_id' in fields) << 2) | (('reply_to' in fields) << 1) | int('expiration' in kwargs), - (('message_id' in fields) << 7) | (('timestamp' in fields) << 6) | (('type_' in fields) << 5) | (('user_id' in fields) << 4) | (('app_id' in fields) << 3) | (('reserved' in fields) << 2) + (('content_type' in fields) << 7) | ( + ('content_encoding' in fields) << 6) | ( + ('headers' in fields) << 5) | ( + ('delivery_mode' in fields) << 4) | ( + ('priority' in fields) << 3) | ( + ('correlation_id' in fields) << 2) | ( + ('reply_to' in fields) << 1) | int('expiration' in kwargs), + (('message_id' in fields) << 7) | ( + ('timestamp' in fields) << 6) | (('type_' in fields) << 5) | ( + ('user_id' in fields) << 4) | (('app_id' in fields) << 3) | ( + ('reserved' in fields) << 2) ]) zpf = six.binary_type(zpf) if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: return BasicContentPropertyList.PARTICULAR_CLASSES[zpf] else: - logger.debug('Property field (BasicContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, BasicContentPropertyList.FIELDS) + logger.debug( + 'Property field (BasicContentPropertyList:%s) not seen yet, compiling', + repr(zpf)) + c = compile_particular_content_property_list_class(zpf, + BasicContentPropertyList.FIELDS) BasicContentPropertyList.PARTICULAR_CLASSES[zpf] = c return c @@ -2413,12 +2473,17 @@ class BasicContentPropertyList(AMQPContentPropertyList): else: while buf[offset + pfl - 1] & 1: pfl += 2 - zpf = BasicContentPropertyList.zero_property_flags(buf[offset:offset+pfl]).tobytes() + zpf = BasicContentPropertyList.zero_property_flags( + buf[offset:offset + pfl]).tobytes() if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: - return BasicContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) + return BasicContentPropertyList.PARTICULAR_CLASSES[ + zpf].from_buffer(buf, offset) else: - logger.debug('Property field (BasicContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, BasicContentPropertyList.FIELDS) + logger.debug( + 'Property field (BasicContentPropertyList:%s) not seen yet, compiling', + repr(zpf)) + c = compile_particular_content_property_list_class(zpf, + BasicContentPropertyList.FIELDS) BasicContentPropertyList.PARTICULAR_CLASSES[zpf] = c return c.from_buffer(buf, offset) @@ -2435,20 +2500,20 @@ class BasicAck(AMQPMethodPayload): The acknowledgement can be for a single message or a set of messages up to and including a specific message. """ - __slots__ = (u'delivery_tag', u'multiple', ) + __slots__ = (u'delivery_tag', u'multiple',) NAME = u'basic.ack' - INDEX = (60, 80) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x50' # CLASS ID + METHOD ID + INDEX = (60, 80) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x50' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'multiple', u'bit', u'bit', reserved=False), ] @@ -2472,7 +2537,7 @@ class BasicAck(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!QB', self.delivery_tag, (self.multiple << 0))) - + def get_size(self): return 9 @@ -2494,20 +2559,22 @@ class BasicConsume(AMQPMethodPayload): messages from a specific queue. Consumers last as long as the channel they were declared on, or until the client cancels them. """ - __slots__ = (u'queue', u'consumer_tag', u'no_local', u'no_ack', u'exclusive', u'no_wait', u'arguments', ) + __slots__ = ( + u'queue', u'consumer_tag', u'no_local', u'no_ack', u'exclusive', + u'no_wait', u'arguments',) NAME = u'basic.consume' - INDEX = (60, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x14' # CLASS ID + METHOD ID + INDEX = (60, 20) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), @@ -2518,7 +2585,8 @@ class BasicConsume(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] - def __init__(self, queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments): + def __init__(self, queue, consumer_tag, no_local, no_ack, exclusive, + no_wait, arguments): """ Create frame basic.consume @@ -2554,22 +2622,25 @@ class BasicConsume(AMQPMethodPayload): buf.write(self.queue) buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) - buf.write(struct.pack('!B', (self.no_local << 0) | (self.no_ack << 1) | (self.exclusive << 2) | (self.no_wait << 3))) + buf.write(struct.pack('!B', + (self.no_local << 0) | (self.no_ack << 1) | ( + self.exclusive << 2) | (self.no_wait << 3))) enframe_table(buf, self.arguments) - + def get_size(self): - return 5 + len(self.queue) + len(self.consumer_tag) + frame_table_size(self.arguments) + return 5 + len(self.queue) + len(self.consumer_tag) + frame_table_size( + self.arguments) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset+s_len] + queue = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - consumer_tag = buf[offset:offset+s_len] + consumer_tag = buf[offset:offset + s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -2580,7 +2651,8 @@ class BasicConsume(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return BasicConsume(queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments) + return BasicConsume(queue, consumer_tag, no_local, no_ack, exclusive, + no_wait, arguments) class BasicCancel(AMQPMethodPayload): @@ -2602,20 +2674,20 @@ class BasicCancel(AMQPMethodPayload): capable of accepting the method, through some means of capability negotiation. """ - __slots__ = (u'consumer_tag', u'no_wait', ) + __slots__ = (u'consumer_tag', u'no_wait',) NAME = u'basic.cancel' - INDEX = (60, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x1E' # CLASS ID + METHOD ID + INDEX = (60, 30) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x1E' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), Field(u'no-wait', u'no-wait', u'bit', reserved=False), ] @@ -2634,7 +2706,7 @@ class BasicCancel(AMQPMethodPayload): buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) buf.write(struct.pack('!B', (self.no_wait << 0))) - + def get_size(self): return 2 + len(self.consumer_tag) @@ -2643,7 +2715,7 @@ class BasicCancel(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - consumer_tag = buf[offset:offset+s_len] + consumer_tag = buf[offset:offset + s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -2659,20 +2731,20 @@ class BasicConsumeOk(AMQPMethodPayload): The server provides the client with a consumer tag, which is used by the client for methods called on the consumer at a later stage. """ - __slots__ = (u'consumer_tag', ) + __slots__ = (u'consumer_tag',) NAME = u'basic.consume-ok' - INDEX = (60, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x15' # CLASS ID + METHOD ID + INDEX = (60, 21) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), ] @@ -2688,7 +2760,7 @@ class BasicConsumeOk(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) - + def get_size(self): return 1 + len(self.consumer_tag) @@ -2697,7 +2769,7 @@ class BasicConsumeOk(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - consumer_tag = buf[offset:offset+s_len] + consumer_tag = buf[offset:offset + s_len] offset += s_len return BasicConsumeOk(consumer_tag) @@ -2708,20 +2780,20 @@ class BasicCancelOk(AMQPMethodPayload): This method confirms that the cancellation was completed. """ - __slots__ = (u'consumer_tag', ) + __slots__ = (u'consumer_tag',) NAME = u'basic.cancel-ok' - INDEX = (60, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x1F' # CLASS ID + METHOD ID + INDEX = (60, 31) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x1F' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), ] @@ -2736,7 +2808,7 @@ class BasicCancelOk(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) - + def get_size(self): return 1 + len(self.consumer_tag) @@ -2745,7 +2817,7 @@ class BasicCancelOk(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - consumer_tag = buf[offset:offset+s_len] + consumer_tag = buf[offset:offset + s_len] offset += s_len return BasicCancelOk(consumer_tag) @@ -2759,20 +2831,21 @@ class BasicDeliver(AMQPMethodPayload): the server responds with Deliver methods as and when messages arrive for that consumer. """ - __slots__ = (u'consumer_tag', u'delivery_tag', u'redelivered', u'exchange', u'routing_key', ) + __slots__ = (u'consumer_tag', u'delivery_tag', u'redelivered', u'exchange', + u'routing_key',) NAME = u'basic.deliver' - INDEX = (60, 60) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x3C' # CLASS ID + METHOD ID + INDEX = (60, 60) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x3C' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'redelivered', u'redelivered', u'bit', reserved=False), @@ -2780,7 +2853,8 @@ class BasicDeliver(AMQPMethodPayload): Field(u'routing-key', u'shortstr', u'shortstr', reserved=False), ] - def __init__(self, consumer_tag, delivery_tag, redelivered, exchange, routing_key): + def __init__(self, consumer_tag, delivery_tag, redelivered, exchange, + routing_key): """ Create frame basic.deliver @@ -2803,20 +2877,23 @@ class BasicDeliver(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) - buf.write(struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), len(self.exchange))) + buf.write( + struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), + len(self.exchange))) buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) - + def get_size(self): - return 12 + len(self.consumer_tag) + len(self.exchange) + len(self.routing_key) + return 12 + len(self.consumer_tag) + len(self.exchange) + len( + self.routing_key) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - consumer_tag = buf[offset:offset+s_len] + consumer_tag = buf[offset:offset + s_len] offset += s_len delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) offset += 8 @@ -2824,13 +2901,14 @@ class BasicDeliver(AMQPMethodPayload): offset += 1 s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - exchange = buf[offset:offset+s_len] + exchange = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset+s_len] + routing_key = buf[offset:offset + s_len] offset += s_len - return BasicDeliver(consumer_tag, delivery_tag, redelivered, exchange, routing_key) + return BasicDeliver(consumer_tag, delivery_tag, redelivered, exchange, + routing_key) class BasicGet(AMQPMethodPayload): @@ -2841,20 +2919,20 @@ class BasicGet(AMQPMethodPayload): dialogue that is designed for specific types of application where synchronous functionality is more important than performance. """ - __slots__ = (u'queue', u'no_ack', ) + __slots__ = (u'queue', u'no_ack',) NAME = u'basic.get' - INDEX = (60, 70) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x46' # CLASS ID + METHOD ID + INDEX = (60, 70) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x46' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'no-ack', u'no-ack', u'bit', reserved=False), @@ -2876,7 +2954,7 @@ class BasicGet(AMQPMethodPayload): buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!B', (self.no_ack << 0))) - + def get_size(self): return 4 + len(self.queue) @@ -2885,7 +2963,7 @@ class BasicGet(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset+s_len] + queue = buf[offset:offset + s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -2902,20 +2980,21 @@ class BasicGetOk(AMQPMethodPayload): delivered by 'get-ok' must be acknowledged unless the no-ack option was set in the get method. """ - __slots__ = (u'delivery_tag', u'redelivered', u'exchange', u'routing_key', u'message_count', ) + __slots__ = (u'delivery_tag', u'redelivered', u'exchange', u'routing_key', + u'message_count',) NAME = u'basic.get-ok' - INDEX = (60, 71) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x47' # CLASS ID + METHOD ID + INDEX = (60, 71) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x47' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'redelivered', u'redelivered', u'bit', reserved=False), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), @@ -2923,7 +3002,8 @@ class BasicGetOk(AMQPMethodPayload): Field(u'message-count', u'message-count', u'long', reserved=False), ] - def __init__(self, delivery_tag, redelivered, exchange, routing_key, message_count): + def __init__(self, delivery_tag, redelivered, exchange, routing_key, + message_count): """ Create frame basic.get-ok @@ -2944,12 +3024,14 @@ class BasicGetOk(AMQPMethodPayload): self.message_count = message_count def write_arguments(self, buf): - buf.write(struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), len(self.exchange))) + buf.write( + struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), + len(self.exchange))) buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) buf.write(struct.pack('!I', self.message_count)) - + def get_size(self): return 15 + len(self.exchange) + len(self.routing_key) @@ -2962,15 +3044,16 @@ class BasicGetOk(AMQPMethodPayload): offset += 1 s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - exchange = buf[offset:offset+s_len] + exchange = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset+s_len] + routing_key = buf[offset:offset + s_len] offset += s_len message_count, = struct.unpack_from('!I', buf, offset) offset += 4 - return BasicGetOk(delivery_tag, redelivered, exchange, routing_key, message_count) + return BasicGetOk(delivery_tag, redelivered, exchange, routing_key, + message_count) class BasicGetEmpty(AMQPMethodPayload): @@ -2984,17 +3067,17 @@ class BasicGetEmpty(AMQPMethodPayload): NAME = u'basic.get-empty' - INDEX = (60, 72) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x48' # CLASS ID + METHOD ID + INDEX = (60, 72) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x48' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x0D\x00\x3C\x00\x48\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), ] @@ -3003,13 +3086,12 @@ class BasicGetEmpty(AMQPMethodPayload): Create frame basic.get-empty """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - offset += s_len # reserved field! + offset += s_len # reserved field! return BasicGetEmpty() @@ -3024,20 +3106,20 @@ class BasicNack(AMQPMethodPayload): confirm mode of unhandled messages. If a publisher receives this method, it probably needs to republish the offending messages. """ - __slots__ = (u'delivery_tag', u'multiple', u'requeue', ) + __slots__ = (u'delivery_tag', u'multiple', u'requeue',) NAME = u'basic.nack' - INDEX = (60, 120) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x78' # CLASS ID + METHOD ID + INDEX = (60, 120) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x78' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'multiple', u'bit', u'bit', reserved=False), Field(u'requeue', u'bit', u'bit', reserved=False), @@ -3067,8 +3149,9 @@ class BasicNack(AMQPMethodPayload): self.requeue = requeue def write_arguments(self, buf): - buf.write(struct.pack('!QB', self.delivery_tag, (self.multiple << 0) | (self.requeue << 1))) - + buf.write(struct.pack('!QB', self.delivery_tag, + (self.multiple << 0) | (self.requeue << 1))) + def get_size(self): return 9 @@ -3091,20 +3174,20 @@ class BasicPublish(AMQPMethodPayload): to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed. """ - __slots__ = (u'exchange', u'routing_key', u'mandatory', u'immediate', ) + __slots__ = (u'exchange', u'routing_key', u'mandatory', u'immediate',) NAME = u'basic.publish' - INDEX = (60, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x28' # CLASS ID + METHOD ID + INDEX = (60, 40) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x28' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), Field(u'routing-key', u'shortstr', u'shortstr', reserved=False), @@ -3147,8 +3230,9 @@ class BasicPublish(AMQPMethodPayload): buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) - buf.write(struct.pack('!B', (self.mandatory << 0) | (self.immediate << 1))) - + buf.write( + struct.pack('!B', (self.mandatory << 0) | (self.immediate << 1))) + def get_size(self): return 5 + len(self.exchange) + len(self.routing_key) @@ -3157,11 +3241,11 @@ class BasicPublish(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - exchange = buf[offset:offset+s_len] + exchange = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset+s_len] + routing_key = buf[offset:offset + s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -3181,20 +3265,20 @@ class BasicQos(AMQPMethodPayload): qos method could in principle apply to both peers, it is currently meaningful only for the server. """ - __slots__ = (u'prefetch_size', u'prefetch_count', u'global_', ) + __slots__ = (u'prefetch_size', u'prefetch_count', u'global_',) NAME = u'basic.qos' - INDEX = (60, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x0A' # CLASS ID + METHOD ID + INDEX = (60, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'prefetch-size', u'long', u'long', reserved=False), Field(u'prefetch-count', u'short', u'short', reserved=False), Field(u'global', u'bit', u'bit', reserved=False), @@ -3236,15 +3320,17 @@ class BasicQos(AMQPMethodPayload): self.global_ = global_ def write_arguments(self, buf): - buf.write(struct.pack('!IHB', self.prefetch_size, self.prefetch_count, (self.global_ << 0))) - + buf.write(struct.pack('!IHB', self.prefetch_size, self.prefetch_count, + (self.global_ << 0))) + def get_size(self): return 7 @staticmethod def from_buffer(buf, start_offset): offset = start_offset - prefetch_size, prefetch_count, _bit, = struct.unpack_from('!IHB', buf, offset) + prefetch_size, prefetch_count, _bit, = struct.unpack_from('!IHB', buf, + offset) offset += 6 global_ = bool(_bit >> 0) offset += 1 @@ -3263,12 +3349,12 @@ class BasicQosOk(AMQPMethodPayload): NAME = u'basic.qos-ok' - INDEX = (60, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x0B' # CLASS ID + METHOD ID + INDEX = (60, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x3C\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -3277,7 +3363,6 @@ class BasicQosOk(AMQPMethodPayload): Create frame basic.qos-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3293,20 +3378,20 @@ class BasicReturn(AMQPMethodPayload): reply code and text provide information about the reason that the message was undeliverable. """ - __slots__ = (u'reply_code', u'reply_text', u'exchange', u'routing_key', ) + __slots__ = (u'reply_code', u'reply_text', u'exchange', u'routing_key',) NAME = u'basic.return' - INDEX = (60, 50) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x32' # CLASS ID + METHOD ID + INDEX = (60, 50) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x32' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reply-code', u'reply-code', u'short', reserved=False), Field(u'reply-text', u'reply-text', u'shortstr', reserved=False), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), @@ -3338,24 +3423,25 @@ class BasicReturn(AMQPMethodPayload): buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) - + def get_size(self): - return 5 + len(self.reply_text) + len(self.exchange) + len(self.routing_key) + return 5 + len(self.reply_text) + len(self.exchange) + len( + self.routing_key) @staticmethod def from_buffer(buf, start_offset): offset = start_offset reply_code, s_len, = struct.unpack_from('!HB', buf, offset) offset += 3 - reply_text = buf[offset:offset+s_len] + reply_text = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - exchange = buf[offset:offset+s_len] + exchange = buf[offset:offset + s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset+s_len] + routing_key = buf[offset:offset + s_len] offset += s_len return BasicReturn(reply_code, reply_text, exchange, routing_key) @@ -3368,20 +3454,20 @@ class BasicReject(AMQPMethodPayload): cancel large incoming messages, or return untreatable messages to their original queue. """ - __slots__ = (u'delivery_tag', u'requeue', ) + __slots__ = (u'delivery_tag', u'requeue',) NAME = u'basic.reject' - INDEX = (60, 90) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x5A' # CLASS ID + METHOD ID + INDEX = (60, 90) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x5A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'requeue', u'bit', u'bit', reserved=False), ] @@ -3401,7 +3487,7 @@ class BasicReject(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!QB', self.delivery_tag, (self.requeue << 0))) - + def get_size(self): return 9 @@ -3423,20 +3509,20 @@ class BasicRecoverAsync(AMQPMethodPayload): specified channel. Zero or more messages may be redelivered. This method is deprecated in favour of the synchronous Recover/Recover-Ok. """ - __slots__ = (u'requeue', ) + __slots__ = (u'requeue',) NAME = u'basic.recover-async' - INDEX = (60, 100) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x64' # CLASS ID + METHOD ID + INDEX = (60, 100) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x64' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'requeue', u'bit', u'bit', reserved=False), ] @@ -3454,7 +3540,7 @@ class BasicRecoverAsync(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', (self.requeue << 0))) - + def get_size(self): return 1 @@ -3476,20 +3562,20 @@ class BasicRecover(AMQPMethodPayload): specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover. """ - __slots__ = (u'requeue', ) + __slots__ = (u'requeue',) NAME = u'basic.recover' - INDEX = (60, 110) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x6E' # CLASS ID + METHOD ID + INDEX = (60, 110) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x6E' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'requeue', u'bit', u'bit', reserved=False), ] @@ -3507,7 +3593,7 @@ class BasicRecover(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', (self.requeue << 0))) - + def get_size(self): return 1 @@ -3531,12 +3617,12 @@ class BasicRecoverOk(AMQPMethodPayload): NAME = u'basic.recover-ok' - INDEX = (60, 111) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x6F' # CLASS ID + METHOD ID + INDEX = (60, 111) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x6F' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x3C\x00\x6F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -3545,7 +3631,6 @@ class BasicRecoverOk(AMQPMethodPayload): Create frame basic.recover-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3580,12 +3665,12 @@ class TxCommit(AMQPMethodPayload): NAME = u'tx.commit' - INDEX = (90, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x5A\x00\x14' # CLASS ID + METHOD ID + INDEX = (90, 20) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x5A\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x14\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -3594,7 +3679,6 @@ class TxCommit(AMQPMethodPayload): Create frame tx.commit """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3612,12 +3696,12 @@ class TxCommitOk(AMQPMethodPayload): NAME = u'tx.commit-ok' - INDEX = (90, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x5A\x00\x15' # CLASS ID + METHOD ID + INDEX = (90, 21) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x5A\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -3626,7 +3710,6 @@ class TxCommitOk(AMQPMethodPayload): Create frame tx.commit-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3646,12 +3729,12 @@ class TxRollback(AMQPMethodPayload): NAME = u'tx.rollback' - INDEX = (90, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x5A\x00\x1E' # CLASS ID + METHOD ID + INDEX = (90, 30) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x5A\x00\x1E' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x1E\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -3660,7 +3743,6 @@ class TxRollback(AMQPMethodPayload): Create frame tx.rollback """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3678,12 +3760,12 @@ class TxRollbackOk(AMQPMethodPayload): NAME = u'tx.rollback-ok' - INDEX = (90, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x5A\x00\x1F' # CLASS ID + METHOD ID + INDEX = (90, 31) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x5A\x00\x1F' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x1F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -3692,7 +3774,6 @@ class TxRollbackOk(AMQPMethodPayload): Create frame tx.rollback-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3710,12 +3791,12 @@ class TxSelect(AMQPMethodPayload): NAME = u'tx.select' - INDEX = (90, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x5A\x00\x0A' # CLASS ID + METHOD ID + INDEX = (90, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x5A\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x0A\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -3724,7 +3805,6 @@ class TxSelect(AMQPMethodPayload): Create frame tx.select """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3742,12 +3822,12 @@ class TxSelectOk(AMQPMethodPayload): NAME = u'tx.select-ok' - INDEX = (90, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x5A\x00\x0B' # CLASS ID + METHOD ID + INDEX = (90, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x5A\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -3756,7 +3836,6 @@ class TxSelectOk(AMQPMethodPayload): Create frame tx.select-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3795,20 +3874,20 @@ class ConfirmSelect(AMQPMethodPayload): The client can only use this method on a non-transactional channel. """ - __slots__ = (u'nowait', ) + __slots__ = (u'nowait',) NAME = u'confirm.select' - INDEX = (85, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x55\x00\x0A' # CLASS ID + METHOD ID + INDEX = (85, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x55\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'nowait', u'bit', u'bit', reserved=False), ] @@ -3825,7 +3904,7 @@ class ConfirmSelect(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', (self.nowait << 0))) - + def get_size(self): return 1 @@ -3849,12 +3928,12 @@ class ConfirmSelectOk(AMQPMethodPayload): NAME = u'confirm.select-ok' - INDEX = (85, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x55\x00\x0B' # CLASS ID + METHOD ID + INDEX = (85, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x55\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x55\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END @@ -3863,7 +3942,6 @@ class ConfirmSelectOk(AMQPMethodPayload): Create frame confirm.select-ok """ - @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3935,7 +4013,6 @@ IDENT_TO_METHOD = { (50, 10): QueueDeclare, } - BINARY_HEADER_TO_METHOD = { b'\x00\x5A\x00\x15': TxCommitOk, b'\x00\x3C\x00\x64': BasicRecoverAsync, @@ -4001,7 +4078,6 @@ BINARY_HEADER_TO_METHOD = { b'\x00\x32\x00\x0A': QueueDeclare, } - CLASS_ID_TO_CONTENT_PROPERTY_LIST = { 60: BasicContentPropertyList, } @@ -4041,7 +4117,7 @@ REPLY_REASONS_FOR = { # Methods that are replies for other, ie. ConnectionOpenOk: ConnectionOpen # a method may be a reply for ONE or NONE other methods # if a method has no replies, it will have an empty list as value here -REPLIES_FOR= { +REPLIES_FOR = { BasicGetEmpty: [], BasicRecoverOk: [], BasicReturn: [], diff --git a/coolamqp/framing/field_table.py b/coolamqp/framing/field_table.py index 2c31fdec30163b7d7da9aabdd84b546625930ce9..91be67d82ad38c2e144dc783fd1a79c2d69baa7f 100644 --- a/coolamqp/framing/field_table.py +++ b/coolamqp/framing/field_table.py @@ -70,14 +70,16 @@ FIELD_TYPES = { 'd': (8, '!d'), 'D': (5, None, enframe_decimal, deframe_decimal), # decimal-value 's': ( - None, None, enframe_shortstr, deframe_shortstr, lambda val: len(val) + 1), -# shortstr + None, None, enframe_shortstr, deframe_shortstr, + lambda val: len(val) + 1), + # shortstr 'S': ( - None, None, enframe_longstr, deframe_longstr, lambda val: len(val) + 4), -# longstr + None, None, enframe_longstr, deframe_longstr, + lambda val: len(val) + 4), + # longstr 'T': (8, '!Q'), 'V': (0, None, lambda buf, v: None, lambda buf, ofs: None, 0), -# rendered as None + # rendered as None } @@ -106,7 +108,8 @@ def deframe_field_value(buf, offset): # -> (value, type), bytes_consumed opt = FIELD_TYPES[field_type] if opt[1] is not None: - field_val, = struct.unpack_from(FIELD_TYPES[field_type][1], buf, offset) + field_val, = struct.unpack_from(FIELD_TYPES[field_type][1], buf, + offset) offset += opt[0] else: field_val, delta = opt[3](buf, offset) diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index 705ee8231f5a4b0b7d9c01fa44cb3e1d325dd703..d5b8736820f25977477bf2aa474a38085583d5b0 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -71,7 +71,8 @@ class AMQPHeaderFrame(AMQPFrame): def write_to(self, buf): buf.write(struct.pack('!BHLHHQ', FRAME_HEADER, self.channel, - 12 + self.properties.get_size(), self.class_id, 0, + 12 + self.properties.get_size(), self.class_id, + 0, self.body_size)) self.properties.write_to(buf) buf.write(FRAME_END_BYTE) @@ -83,7 +84,8 @@ class AMQPHeaderFrame(AMQPFrame): payload_as_buffer, 0) properties = CLASS_ID_TO_CONTENT_PROPERTY_LIST[class_id].from_buffer( payload_as_buffer, 12) - return AMQPHeaderFrame(channel, class_id, weight, body_size, properties) + return AMQPHeaderFrame(channel, class_id, weight, body_size, + properties) def get_size(self): # frame header is always 7, frame end is 1, content header is 12 + props @@ -104,7 +106,8 @@ class AMQPBodyFrame(AMQPFrame): self.data = data def write_to(self, buf): - buf.write(struct.pack('!BHL', FRAME_BODY, self.channel, len(self.data))) + buf.write( + struct.pack('!BHL', FRAME_BODY, self.channel, len(self.data))) buf.write(self.data) buf.write(FRAME_END_BYTE) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 947b77739a49364de004564e4acaf67c0c2a1ab2..c6f0f3473db957867f19fe3927e5711e6c55ff87 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -14,11 +14,13 @@ logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() + def toutf8(q): if isinstance(q, six.binary_type): q = q.decode('utf8') return q + def tobytes(q): if isinstance(q, six.text_type): q = q.encode('utf8') @@ -136,7 +138,6 @@ class ReceivedMessage(Message): self.nack = nack or LAMBDA_NONE - class Exchange(object): """ This represents an Exchange used in AMQP. @@ -273,7 +274,7 @@ class NodeDefinition(object): (six.text_type, six.binary_type)): connstr = args[0].decode('utf8') if isinstance(args[0], six.binary_type) else \ - args[0] + args[0] # AMQP connstring if not connstr.startswith(u'amqp://'): raise ValueError(u'should begin with amqp://') @@ -297,4 +298,4 @@ class NodeDefinition(object): def __str__(self): return six.text_type( b'amqp://%s:%s@%s/%s'.encode('utf8') % ( - self.host, self.port, self.user, self.virtual_host)) + self.host, self.port, self.user, self.virtual_host)) diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index 92f6f33a19cdf9c34c37003dea7b209378c12d0c..3a33a01192f7596f08040c76a9fed3c08f2ed9ec 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -15,5 +15,6 @@ from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection import Connection, HeaderOrBodyWatch, \ MethodWatch, AnyWatch, FailWatch -from coolamqp.uplink.handshake import PUBLISHER_CONFIRMS, CONSUMER_CANCEL_NOTIFY +from coolamqp.uplink.handshake import PUBLISHER_CONFIRMS, \ + CONSUMER_CANCEL_NOTIFY from coolamqp.uplink.listener import ListenerThread diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 182f2d827e957462c98a5ef7cdb0ab87d91f393d..688c28ffac5825180133b6d1586770ac24972089 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -238,7 +238,7 @@ class Connection(object): continue if ((not watch_triggered) or (not watch.oneshot)) and ( - not watch.cancelled): + not watch.cancelled): # Watch remains alive if it was NOT triggered, or it's NOT a oneshot alive_watches.append(watch) @@ -266,7 +266,7 @@ class Connection(object): continue if ((not watch_triggered) or (not watch.oneshot)) and ( - not watch.cancelled): + not watch.cancelled): # Watch remains alive if it was NOT triggered, or it's NOT a oneshot alive_watches.append(watch) diff --git a/coolamqp/uplink/connection/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py index 5d41cf4ae581f079cd2628792d6b6abca396939b..4cdcb72df60092aa4c3c132392e0d1d1c7fc1dfb 100644 --- a/coolamqp/uplink/connection/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -67,7 +67,7 @@ class ReceivingFramer(object): def _extract(self, up_to): # return up to up_to bytes from current chunk, switch if necessary assert self.total_data_len >= up_to, 'Tried to extract %s but %s remaining' % ( - up_to, self.total_data_len) + up_to, self.total_data_len) if up_to >= len(self.chunks[0]): q = self.chunks.popleft() else: @@ -76,7 +76,7 @@ class ReceivingFramer(object): self.total_data_len -= len(q) assert len(q) <= up_to, 'extracted %s but %s was requested' % ( - len(q), up_to) + len(q), up_to) return q def _statemachine(self): @@ -88,14 +88,14 @@ class ReceivingFramer(object): self.frame_type = ord(self._extract(1)[0]) if self.frame_type not in ( - FRAME_HEARTBEAT, FRAME_HEADER, FRAME_METHOD, FRAME_BODY): + FRAME_HEARTBEAT, FRAME_HEADER, FRAME_METHOD, FRAME_BODY): raise ValueError('Invalid frame') return True # state rule 2 elif (self.frame_type == FRAME_HEARTBEAT) and ( - self.total_data_len >= AMQPHeartbeatFrame.LENGTH - 1): + self.total_data_len >= AMQPHeartbeatFrame.LENGTH - 1): data = b'' while len(data) < AMQPHeartbeatFrame.LENGTH - 1: data = data + self._extract( @@ -112,7 +112,8 @@ class ReceivingFramer(object): # state rule 3 elif (self.frame_type != FRAME_HEARTBEAT) and ( - self.frame_type is not None) and (self.frame_size is None) and ( + self.frame_type is not None) and ( + self.frame_size is None) and ( self.total_data_len > 6): hdr = b'' while len(hdr) < 6: @@ -124,7 +125,7 @@ class ReceivingFramer(object): # state rule 4 elif (self.frame_size is not None) and ( - self.total_data_len >= (self.frame_size + 1)): + self.total_data_len >= (self.frame_size + 1)): if len(self.chunks[0]) >= self.frame_size: # We can subslice it - it's very fast diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index f2e0d693cd26220584190d604ad745512a196c59..f1829f5bb34f942bb327544b4ae65035615dbac1 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -27,8 +27,8 @@ CLIENT_DATA = [ (b'copyright', (b'Copyright (C) 2016-2017 DMS Serwis', 'S')), ( b'information', ( - b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', - 'S')), + b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', + 'S')), (b'capabilities', ([(capa, (True, 't')) for capa in SUPPORTED_EXTENSIONS], 'F')), ] @@ -102,7 +102,7 @@ class Handshaker(object): self.connection.frame_max = payload.frame_max self.connection.heartbeat = min(payload.heartbeat, self.heartbeat) for channel in six.moves.xrange(1, ( - 65535 if payload.channel_max == 0 else payload.channel_max) + 1): + 65535 if payload.channel_max == 0 else payload.channel_max) + 1): self.connection.free_channels.append(channel) self.connection.watch_for_method(0, ConnectionOpenOk, diff --git a/coolamqp/uplink/heartbeat.py b/coolamqp/uplink/heartbeat.py index 4cb5a7aa0592104d732579ff9a8f72f6050764e8..66b0b7b591da8eeaa39e92b9c5053a4718858312 100644 --- a/coolamqp/uplink/heartbeat.py +++ b/coolamqp/uplink/heartbeat.py @@ -44,7 +44,7 @@ class Heartbeater(object): self.connection.send([AMQPHeartbeatFrame()], priority=True) if ( - monotonic.monotonic() - self.last_heartbeat_on) > 2 * self.heartbeat_interval: + monotonic.monotonic() - self.last_heartbeat_on) > 2 * self.heartbeat_interval: # closing because of heartbeat self.connection.send(None)