diff --git a/compile_definitions.py b/compile_definitions.py index 4a78a02b1077ce30d624eb67d813de6d6e45b9c0..54a53e238f743f46e362fd43ab2a9d96769eef94 100644 --- a/compile_definitions.py +++ b/compile_definitions.py @@ -428,6 +428,16 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved line(' ]\n') + # __repr__ + line('''\n def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return '%s(%S)' % (', '.join(map(repr, [%s])))\n''', + full_class_name, + u", ".join(['self.'+format_field_name(field.name) for field in non_reserved_fields])) + # constructor line('''\n def __init__(%s): """ diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 50270d9c6b050a656b26f857296a32d6ac9f0dc7..3857ca8ce8e04d1de1445523765fa079d3527778 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -141,8 +141,7 @@ class Consumer(Channeler): self.future_to_notify = future_to_notify self.future_to_notify_on_dead = None # .cancel - self.fail_on_first_time_resource_locked = \ - fail_on_first_time_resource_locked + self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked self.cancel_on_failure = cancel_on_failure self.body_receive_mode = body_receive_mode @@ -230,7 +229,6 @@ class Consumer(Channeler): Note, this can be called multiple times, and eventually with None. """ - if self.cancel_on_failure and (not self.cancelled): logger.debug( 'Consumer is cancel_on_failure and failure seen, True->cancelled') @@ -288,6 +286,7 @@ class Consumer(Channeler): if self.future_to_notify: self.future_to_notify.set_exception(AMQPError(payload)) self.future_to_notify = None + logger.debug('Notifying connection closed with %s', payload) # We might not want to throw the connection away. should_retry = should_retry and (not self.cancelled) diff --git a/coolamqp/framing/compilation/utilities.py b/coolamqp/framing/compilation/utilities.py index fb0d3acdcd2da8da2e6bcc10575247ab8405624a..c450b2c275191e54f11537ee548b463a2f66fa05 100644 --- a/coolamqp/framing/compilation/utilities.py +++ b/coolamqp/framing/compilation/utilities.py @@ -139,4 +139,5 @@ def ffmt(data, *args, **kwargs): for arg in args: op = str if kwargs.get('sane', True) else frepr data = data.replace('%s', op(arg), 1) + data = data.replace('%S', '%s') return data diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index e59f19138f7e75d53c46dc23d246218b1505fbd4..5fc8c9cf82cd6f687c5552fbb3baf1cae53177a1 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -1,6 +1,5 @@ # coding=UTF-8 from __future__ import print_function, absolute_import - """ A Python version of the AMQP machine-readable specification. @@ -8,7 +7,7 @@ Generated automatically by CoolAMQP from AMQP machine-readable specification. See coolamqp.uplink.framing.compilation for the tool AMQP is copyright (c) 2016 OASIS -CoolAMQP is copyright (c) 2016 DMS Serwis s.c. +CoolAMQP is copyright (c) 2016-2018 DMS Serwis s.c., 2018-2019 SMOK sp. z o.o. ########################################################### @@ -26,17 +25,13 @@ Only thing that isn't are field names in tables. import struct, collections, logging, six -from coolamqp.framing.base import AMQPClass, AMQPMethodPayload, \ - AMQPContentPropertyList -from coolamqp.framing.field_table import enframe_table, deframe_table, \ - frame_table_size -from coolamqp.framing.compilation.content_property import \ - compile_particular_content_property_list_class +from coolamqp.framing.base import AMQPClass, AMQPMethodPayload, AMQPContentPropertyList +from coolamqp.framing.field_table import enframe_table, deframe_table, frame_table_size +from coolamqp.framing.compilation.content_property import compile_particular_content_property_list_class logger = logging.getLogger(__name__) -Field = collections.namedtuple('Field', - ('name', 'type', 'basic_type', 'reserved')) +Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved')) # Core constants FRAME_METHOD = 1 @@ -58,88 +53,86 @@ FRAME_END_BYTE = b'\xce' REPLY_SUCCESS = 200 REPLY_SUCCESS_BYTE = b'\xc8' -# Indicates that the method completed successfully. This reply code is -# reserved for future use - the current protocol design does not use -# positive -# confirmation and reply codes are sent only in case of an error. + # Indicates that the method completed successfully. This reply code is + # reserved for future use - the current protocol design does not use + # positive + # confirmation and reply codes are sent only in case of an error. CONTENT_TOO_LARGE = 311 -# The client attempted to transfer content larger than the server -# could accept -# at the present time. The client may retry at a later time. + # The client attempted to transfer content larger than the server + # could accept + # at the present time. The client may retry at a later time. NO_CONSUMERS = 313 -# When the exchange cannot deliver to a consumer when the immediate -# flag is -# set. As a result of pending data on the queue or the absence of any -# consumers of the queue. + # When the exchange cannot deliver to a consumer when the immediate + # flag is + # set. As a result of pending data on the queue or the absence of any + # consumers of the queue. CONNECTION_FORCED = 320 -# An operator intervened to close the connection for some reason. The -# client -# may retry at some later date. + # An operator intervened to close the connection for some reason. The + # client + # may retry at some later date. INVALID_PATH = 402 -# The client tried to work with an unknown virtual host. + # The client tried to work with an unknown virtual host. ACCESS_REFUSED = 403 -# The client attempted to work with a server entity to which it has no -# access due to security settings. + # The client attempted to work with a server entity to which it has no + # access due to security settings. NOT_FOUND = 404 -# The client attempted to work with a server entity that does not -# exist. + # The client attempted to work with a server entity that does not + # exist. RESOURCE_LOCKED = 405 -# The client attempted to work with a server entity to which it has no -# access because another client is working with it. + # The client attempted to work with a server entity to which it has no + # access because another client is working with it. PRECONDITION_FAILED = 406 -# The client requested a method that was not allowed because some -# precondition -# failed. + # The client requested a method that was not allowed because some + # precondition + # failed. FRAME_ERROR = 501 -# The sender sent a malformed frame that the recipient could not -# decode. -# This strongly implies a programming error in the sending peer. + # The sender sent a malformed frame that the recipient could not + # decode. + # This strongly implies a programming error in the sending peer. SYNTAX_ERROR = 502 -# The sender sent a frame that contained illegal values for one or -# more -# fields. This strongly implies a programming error in the sending -# peer. + # The sender sent a frame that contained illegal values for one or + # more + # fields. This strongly implies a programming error in the sending + # peer. COMMAND_INVALID = 503 -# The client sent an invalid sequence of frames, attempting to perform -# an -# operation that was considered invalid by the server. This usually -# implies -# a programming error in the client. + # The client sent an invalid sequence of frames, attempting to perform + # an + # operation that was considered invalid by the server. This usually + # implies + # a programming error in the client. CHANNEL_ERROR = 504 -# The client attempted to work with a channel that had not been -# correctly -# opened. This most likely indicates a fault in the client layer. + # The client attempted to work with a channel that had not been + # correctly + # opened. This most likely indicates a fault in the client layer. UNEXPECTED_FRAME = 505 -# The peer sent a frame that was not expected, usually in the context -# of -# a content header and body. This strongly indicates a fault in the -# peer's -# content processing. + # The peer sent a frame that was not expected, usually in the context + # of + # a content header and body. This strongly indicates a fault in the + # peer's + # content processing. RESOURCE_ERROR = 506 -# The server could not complete the method because it lacked -# sufficient -# resources. This may be due to the client creating too many of some -# type -# of entity. + # The server could not complete the method because it lacked + # sufficient + # resources. This may be due to the client creating too many of some + # type + # of entity. NOT_ALLOWED = 530 -# The client tried to work with some entity in a manner that is -# prohibited -# by the server, due to security settings or by some other criteria. + # The client tried to work with some entity in a manner that is + # prohibited + # by the server, due to security settings or by some other criteria. NOT_IMPLEMENTED = 540 -# The client tried to use functionality that is not implemented in the -# server. + # The client tried to use functionality that is not implemented in the + # server. INTERNAL_ERROR = 541 -# The server could not complete the method because of an internal -# error. -# The server may require intervention by an operator in order to -# resume -# normal operations. - -HARD_ERRORS = [CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, SYNTAX_ERROR, - COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, - RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, INTERNAL_ERROR] -SOFT_ERRORS = [CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, - RESOURCE_LOCKED, PRECONDITION_FAILED] + # The server could not complete the method because of an internal + # error. + # The server may require intervention by an operator in order to + # resume + # normal operations. + +HARD_ERRORS = [CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, SYNTAX_ERROR, COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, INTERNAL_ERROR] +SOFT_ERRORS = [CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, RESOURCE_LOCKED, PRECONDITION_FAILED] + DOMAIN_TO_BASIC_TYPE = { u'class-id': u'short', @@ -168,7 +161,6 @@ DOMAIN_TO_BASIC_TYPE = { u'table': None, } - class Connection(AMQPClass): """ The connection class provides methods for a client to establish a @@ -186,23 +178,30 @@ class ConnectionBlocked(AMQPMethodPayload): and does not accept new publishes. """ - __slots__ = (u'reason',) + __slots__ = (u'reason', ) NAME = u'connection.blocked' - INDEX = (10, 60) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x3C' # CLASS ID + METHOD ID + INDEX = (10, 60) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x3C' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reason', u'shortstr', u'shortstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionBlocked(%s)' % (', '.join(map(repr, [self.reason]))) + def __init__(self, reason): """ Create frame connection.blocked @@ -214,7 +213,7 @@ class ConnectionBlocked(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', len(self.reason))) buf.write(self.reason) - + def get_size(self): return 1 + len(self.reason) @@ -223,7 +222,7 @@ class ConnectionBlocked(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - reason = buf[offset:offset + s_len] + reason = buf[offset:offset+s_len] offset += s_len return ConnectionBlocked(reason) @@ -241,26 +240,33 @@ class ConnectionClose(AMQPMethodPayload): sender provides the class and method id of the method which caused the exception. """ - __slots__ = (u'reply_code', u'reply_text', u'class_id', u'method_id',) + __slots__ = (u'reply_code', u'reply_text', u'class_id', u'method_id', ) NAME = u'connection.close' - INDEX = (10, 50) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x32' # CLASS ID + METHOD ID + INDEX = (10, 50) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x32' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reply-code', u'reply-code', u'short', reserved=False), Field(u'reply-text', u'reply-text', u'shortstr', reserved=False), Field(u'class-id', u'class-id', u'short', reserved=False), Field(u'method-id', u'method-id', u'short', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionClose(%s)' % (', '.join(map(repr, [self.reply_code, self.reply_text, self.class_id, self.method_id]))) + def __init__(self, reply_code, reply_text, class_id, method_id): """ Create frame connection.close @@ -286,7 +292,7 @@ class ConnectionClose(AMQPMethodPayload): buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) buf.write(self.reply_text) buf.write(struct.pack('!HH', self.class_id, self.method_id)) - + def get_size(self): return 7 + len(self.reply_text) @@ -295,7 +301,7 @@ class ConnectionClose(AMQPMethodPayload): offset = start_offset reply_code, s_len, = struct.unpack_from('!HB', buf, offset) offset += 3 - reply_text = buf[offset:offset + s_len] + reply_text = buf[offset:offset+s_len] offset += s_len class_id, method_id, = struct.unpack_from('!HH', buf, offset) offset += 4 @@ -315,20 +321,28 @@ class ConnectionCloseOk(AMQPMethodPayload): NAME = u'connection.close-ok' - INDEX = (10, 51) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x33' # CLASS ID + METHOD ID + INDEX = (10, 51) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x33' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionCloseOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame connection.close-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -348,25 +362,32 @@ class ConnectionOpen(AMQPMethodPayload): of each type of entity that may be used, per connection and/or in total. """ - __slots__ = (u'virtual_host',) + __slots__ = (u'virtual_host', ) NAME = u'connection.open' - INDEX = (10, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x28' # CLASS ID + METHOD ID + INDEX = (10, 40) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x28' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'virtual-host', u'path', u'shortstr', reserved=False), Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), Field(u'reserved-2', u'bit', u'bit', reserved=True), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionOpen(%s)' % (', '.join(map(repr, [self.virtual_host]))) + def __init__(self, virtual_host): """ Create frame connection.open @@ -382,7 +403,7 @@ class ConnectionOpen(AMQPMethodPayload): buf.write(self.virtual_host) buf.write(b'\x00') buf.write(struct.pack('!B', 0)) - + def get_size(self): return 3 + len(self.virtual_host) @@ -391,11 +412,11 @@ class ConnectionOpen(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - virtual_host = buf[offset:offset + s_len] + virtual_host = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - offset += s_len # reserved field! + offset += s_len # reserved field! offset += 1 return ConnectionOpen(virtual_host) @@ -411,31 +432,39 @@ class ConnectionOpenOk(AMQPMethodPayload): NAME = u'connection.open-ok' - INDEX = (10, 41) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x29' # CLASS ID + METHOD ID + INDEX = (10, 41) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x29' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x29\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionOpenOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame connection.open-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - offset += s_len # reserved field! + offset += s_len # reserved field! return ConnectionOpenOk() @@ -449,32 +478,35 @@ class ConnectionStart(AMQPMethodPayload): security mechanisms which the client can use for authentication. """ - __slots__ = ( - u'version_major', u'version_minor', u'server_properties', u'mechanisms', - u'locales',) + __slots__ = (u'version_major', u'version_minor', u'server_properties', u'mechanisms', u'locales', ) NAME = u'connection.start' - INDEX = (10, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x0A' # CLASS ID + METHOD ID + INDEX = (10, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'version-major', u'octet', u'octet', reserved=False), Field(u'version-minor', u'octet', u'octet', reserved=False), - Field(u'server-properties', u'peer-properties', u'table', - reserved=False), + Field(u'server-properties', u'peer-properties', u'table', reserved=False), Field(u'mechanisms', u'longstr', u'longstr', reserved=False), Field(u'locales', u'longstr', u'longstr', reserved=False), ] - def __init__(self, version_major, version_minor, server_properties, - mechanisms, locales): + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionStart(%s)' % (', '.join(map(repr, [self.version_major, self.version_minor, self.server_properties, self.mechanisms, self.locales]))) + + def __init__(self, version_major, version_minor, server_properties, mechanisms, locales): """ Create frame connection.start @@ -523,10 +555,9 @@ class ConnectionStart(AMQPMethodPayload): buf.write(self.mechanisms) buf.write(struct.pack('!I', len(self.locales))) buf.write(self.locales) - + def get_size(self): - return 10 + frame_table_size(self.server_properties) + len( - self.mechanisms) + len(self.locales) + return 10 + frame_table_size(self.server_properties) + len(self.mechanisms) + len(self.locales) @staticmethod def from_buffer(buf, start_offset): @@ -537,14 +568,13 @@ class ConnectionStart(AMQPMethodPayload): offset += delta s_len, = struct.unpack_from('!L', buf, offset) offset += 4 - mechanisms = buf[offset:offset + s_len] + mechanisms = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!L', buf, offset) offset += 4 - locales = buf[offset:offset + s_len] + locales = buf[offset:offset+s_len] offset += s_len - return ConnectionStart(version_major, version_minor, server_properties, - mechanisms, locales) + return ConnectionStart(version_major, version_minor, server_properties, mechanisms, locales) class ConnectionSecure(AMQPMethodPayload): @@ -557,23 +587,30 @@ class ConnectionSecure(AMQPMethodPayload): method challenges the client to provide more information. """ - __slots__ = (u'challenge',) + __slots__ = (u'challenge', ) NAME = u'connection.secure' - INDEX = (10, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x14' # CLASS ID + METHOD ID + INDEX = (10, 20) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x14' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'challenge', u'longstr', u'longstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionSecure(%s)' % (', '.join(map(repr, [self.challenge]))) + def __init__(self, challenge): """ Create frame connection.secure @@ -589,7 +626,7 @@ class ConnectionSecure(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!I', len(self.challenge))) buf.write(self.challenge) - + def get_size(self): return 4 + len(self.challenge) @@ -598,7 +635,7 @@ class ConnectionSecure(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!L', buf, offset) offset += 4 - challenge = buf[offset:offset + s_len] + challenge = buf[offset:offset+s_len] offset += s_len return ConnectionSecure(challenge) @@ -609,27 +646,33 @@ class ConnectionStartOk(AMQPMethodPayload): This method selects a SASL security mechanism. """ - __slots__ = (u'client_properties', u'mechanism', u'response', u'locale',) + __slots__ = (u'client_properties', u'mechanism', u'response', u'locale', ) NAME = u'connection.start-ok' - INDEX = (10, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x0B' # CLASS ID + METHOD ID + INDEX = (10, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ - Field(u'client-properties', u'peer-properties', u'table', - reserved=False), + FIELDS = [ + Field(u'client-properties', u'peer-properties', u'table', reserved=False), Field(u'mechanism', u'shortstr', u'shortstr', reserved=False), Field(u'response', u'longstr', u'longstr', reserved=False), Field(u'locale', u'shortstr', u'shortstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionStartOk(%s)' % (', '.join(map(repr, [self.client_properties, self.mechanism, self.response, self.locale]))) + def __init__(self, client_properties, mechanism, response, locale): """ Create frame connection.start-ok @@ -672,10 +715,9 @@ class ConnectionStartOk(AMQPMethodPayload): buf.write(self.response) buf.write(struct.pack('!B', len(self.locale))) buf.write(self.locale) - + def get_size(self): - return 6 + frame_table_size(self.client_properties) + len( - self.mechanism) + len(self.response) + len(self.locale) + return 6 + frame_table_size(self.client_properties) + len(self.mechanism) + len(self.response) + len(self.locale) @staticmethod def from_buffer(buf, start_offset): @@ -684,18 +726,17 @@ class ConnectionStartOk(AMQPMethodPayload): offset += delta s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - mechanism = buf[offset:offset + s_len] + mechanism = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!L', buf, offset) offset += 4 - response = buf[offset:offset + s_len] + response = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - locale = buf[offset:offset + s_len] + locale = buf[offset:offset+s_len] offset += s_len - return ConnectionStartOk(client_properties, mechanism, response, - locale) + return ConnectionStartOk(client_properties, mechanism, response, locale) class ConnectionSecureOk(AMQPMethodPayload): @@ -706,23 +747,30 @@ class ConnectionSecureOk(AMQPMethodPayload): data for the security mechanism at the server side. """ - __slots__ = (u'response',) + __slots__ = (u'response', ) NAME = u'connection.secure-ok' - INDEX = (10, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x15' # CLASS ID + METHOD ID + INDEX = (10, 21) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x15' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'response', u'longstr', u'longstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionSecureOk(%s)' % (', '.join(map(repr, [self.response]))) + def __init__(self, response): """ Create frame connection.secure-ok @@ -738,7 +786,7 @@ class ConnectionSecureOk(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!I', len(self.response))) buf.write(self.response) - + def get_size(self): return 4 + len(self.response) @@ -747,7 +795,7 @@ class ConnectionSecureOk(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!L', buf, offset) offset += 4 - response = buf[offset:offset + s_len] + response = buf[offset:offset+s_len] offset += s_len return ConnectionSecureOk(response) @@ -760,25 +808,32 @@ class ConnectionTune(AMQPMethodPayload): the client. The client can accept and/or adjust these. """ - __slots__ = (u'channel_max', u'frame_max', u'heartbeat',) + __slots__ = (u'channel_max', u'frame_max', u'heartbeat', ) NAME = u'connection.tune' - INDEX = (10, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x1E' # CLASS ID + METHOD ID + INDEX = (10, 30) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x1E' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'channel-max', u'short', u'short', reserved=False), Field(u'frame-max', u'long', u'long', reserved=False), Field(u'heartbeat', u'short', u'short', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionTune(%s)' % (', '.join(map(repr, [self.channel_max, self.frame_max, self.heartbeat]))) + def __init__(self, channel_max, frame_max, heartbeat): """ Create frame connection.tune @@ -809,17 +864,15 @@ class ConnectionTune(AMQPMethodPayload): self.heartbeat = heartbeat def write_arguments(self, buf): - buf.write(struct.pack('!HIH', self.channel_max, self.frame_max, - self.heartbeat)) - + buf.write(struct.pack('!HIH', self.channel_max, self.frame_max, self.heartbeat)) + def get_size(self): return 8 @staticmethod def from_buffer(buf, start_offset): offset = start_offset - channel_max, frame_max, heartbeat, = struct.unpack_from('!HIH', buf, - offset) + channel_max, frame_max, heartbeat, = struct.unpack_from('!HIH', buf, offset) offset += 8 return ConnectionTune(channel_max, frame_max, heartbeat) @@ -833,25 +886,32 @@ class ConnectionTuneOk(AMQPMethodPayload): Certain fields are negotiated, others provide capability information. """ - __slots__ = (u'channel_max', u'frame_max', u'heartbeat',) + __slots__ = (u'channel_max', u'frame_max', u'heartbeat', ) NAME = u'connection.tune-ok' - INDEX = (10, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x1F' # CLASS ID + METHOD ID + INDEX = (10, 31) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x1F' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'channel-max', u'short', u'short', reserved=False), Field(u'frame-max', u'long', u'long', reserved=False), Field(u'heartbeat', u'short', u'short', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionTuneOk(%s)' % (', '.join(map(repr, [self.channel_max, self.frame_max, self.heartbeat]))) + def __init__(self, channel_max, frame_max, heartbeat): """ Create frame connection.tune-ok @@ -882,17 +942,15 @@ class ConnectionTuneOk(AMQPMethodPayload): self.heartbeat = heartbeat def write_arguments(self, buf): - buf.write(struct.pack('!HIH', self.channel_max, self.frame_max, - self.heartbeat)) - + buf.write(struct.pack('!HIH', self.channel_max, self.frame_max, self.heartbeat)) + def get_size(self): return 8 @staticmethod def from_buffer(buf, start_offset): offset = start_offset - channel_max, frame_max, heartbeat, = struct.unpack_from('!HIH', buf, - offset) + channel_max, frame_max, heartbeat, = struct.unpack_from('!HIH', buf, offset) offset += 8 return ConnectionTuneOk(channel_max, frame_max, heartbeat) @@ -907,20 +965,28 @@ class ConnectionUnblocked(AMQPMethodPayload): NAME = u'connection.unblocked' - INDEX = (10, 61) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x0A\x00\x3D' # CLASS ID + METHOD ID + INDEX = (10, 61) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x0A\x00\x3D' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x3D\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionUnblocked(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame connection.unblocked """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -951,26 +1017,33 @@ class ChannelClose(AMQPMethodPayload): the class and method id of the method which caused the exception. """ - __slots__ = (u'reply_code', u'reply_text', u'class_id', u'method_id',) + __slots__ = (u'reply_code', u'reply_text', u'class_id', u'method_id', ) NAME = u'channel.close' - INDEX = (20, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x14\x00\x28' # CLASS ID + METHOD ID + INDEX = (20, 40) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x14\x00\x28' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reply-code', u'reply-code', u'short', reserved=False), Field(u'reply-text', u'reply-text', u'shortstr', reserved=False), Field(u'class-id', u'class-id', u'short', reserved=False), Field(u'method-id', u'method-id', u'short', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ChannelClose(%s)' % (', '.join(map(repr, [self.reply_code, self.reply_text, self.class_id, self.method_id]))) + def __init__(self, reply_code, reply_text, class_id, method_id): """ Create frame channel.close @@ -996,7 +1069,7 @@ class ChannelClose(AMQPMethodPayload): buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) buf.write(self.reply_text) buf.write(struct.pack('!HH', self.class_id, self.method_id)) - + def get_size(self): return 7 + len(self.reply_text) @@ -1005,7 +1078,7 @@ class ChannelClose(AMQPMethodPayload): offset = start_offset reply_code, s_len, = struct.unpack_from('!HB', buf, offset) offset += 3 - reply_text = buf[offset:offset + s_len] + reply_text = buf[offset:offset+s_len] offset += s_len class_id, method_id, = struct.unpack_from('!HH', buf, offset) offset += 4 @@ -1024,20 +1097,28 @@ class ChannelCloseOk(AMQPMethodPayload): NAME = u'channel.close-ok' - INDEX = (20, 41) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x14\x00\x29' # CLASS ID + METHOD ID + INDEX = (20, 41) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x14\x00\x29' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x14\x00\x29\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ChannelCloseOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame channel.close-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -1058,23 +1139,30 @@ class ChannelFlow(AMQPMethodPayload): control. It does not affect contents returned by Basic.Get-Ok methods. """ - __slots__ = (u'active',) + __slots__ = (u'active', ) NAME = u'channel.flow' - INDEX = (20, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x14\x00\x14' # CLASS ID + METHOD ID + INDEX = (20, 20) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x14\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'active', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ChannelFlow(%s)' % (', '.join(map(repr, [self.active]))) + def __init__(self, active): """ Create frame channel.flow @@ -1089,7 +1177,7 @@ class ChannelFlow(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', (self.active << 0))) - + def get_size(self): return 1 @@ -1110,23 +1198,30 @@ class ChannelFlowOk(AMQPMethodPayload): Confirms to the peer that a flow command was received and processed. """ - __slots__ = (u'active',) + __slots__ = (u'active', ) NAME = u'channel.flow-ok' - INDEX = (20, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x14\x00\x15' # CLASS ID + METHOD ID + INDEX = (20, 21) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x14\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'active', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ChannelFlowOk(%s)' % (', '.join(map(repr, [self.active]))) + def __init__(self, active): """ Create frame channel.flow-ok @@ -1142,7 +1237,7 @@ class ChannelFlowOk(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', (self.active << 0))) - + def get_size(self): return 1 @@ -1166,31 +1261,39 @@ class ChannelOpen(AMQPMethodPayload): NAME = u'channel.open' - INDEX = (20, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x14\x00\x0A' # CLASS ID + METHOD ID + INDEX = (20, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x14\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x05\x00\x14\x00\x0A\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ChannelOpen(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame channel.open """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - offset += s_len # reserved field! + offset += s_len # reserved field! return ChannelOpen() @@ -1205,31 +1308,39 @@ class ChannelOpenOk(AMQPMethodPayload): NAME = u'channel.open-ok' - INDEX = (20, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x14\x00\x0B' # CLASS ID + METHOD ID + INDEX = (20, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x14\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x05\x00\x14\x00\x0B\x00\x00\x00\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'longstr', u'longstr', reserved=True), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ChannelOpenOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame channel.open-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!L', buf, offset) offset += 4 - offset += s_len # reserved field! + offset += s_len # reserved field! return ChannelOpenOk() @@ -1250,21 +1361,20 @@ class ExchangeBind(AMQPMethodPayload): This method binds an exchange to an exchange. """ - __slots__ = ( - u'destination', u'source', u'routing_key', u'no_wait', u'arguments',) + __slots__ = (u'destination', u'source', u'routing_key', u'no_wait', u'arguments', ) NAME = u'exchange.bind' - INDEX = (40, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x1E' # CLASS ID + METHOD ID + INDEX = (40, 30) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x1E' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'destination', u'exchange-name', u'shortstr', reserved=False), Field(u'source', u'exchange-name', u'shortstr', reserved=False), @@ -1273,6 +1383,13 @@ class ExchangeBind(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeBind(%s)' % (', '.join(map(repr, [self.destination, self.source, self.routing_key, self.no_wait, self.arguments]))) + def __init__(self, destination, source, routing_key, no_wait, arguments): """ Create frame exchange.bind @@ -1311,25 +1428,24 @@ class ExchangeBind(AMQPMethodPayload): buf.write(self.routing_key) buf.write(struct.pack('!B', (self.no_wait << 0))) enframe_table(buf, self.arguments) - + def get_size(self): - return 6 + len(self.destination) + len(self.source) + len( - self.routing_key) + frame_table_size(self.arguments) + return 6 + len(self.destination) + len(self.source) + len(self.routing_key) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - destination = buf[offset:offset + s_len] + destination = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - source = buf[offset:offset + s_len] + source = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset + s_len] + routing_key = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1337,8 +1453,7 @@ class ExchangeBind(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return ExchangeBind(destination, source, routing_key, no_wait, - arguments) + return ExchangeBind(destination, source, routing_key, no_wait, arguments) class ExchangeBindOk(AMQPMethodPayload): @@ -1351,20 +1466,28 @@ class ExchangeBindOk(AMQPMethodPayload): NAME = u'exchange.bind-ok' - INDEX = (40, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x1F' # CLASS ID + METHOD ID + INDEX = (40, 31) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x1F' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x1F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeBindOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame exchange.bind-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -1379,22 +1502,20 @@ class ExchangeDeclare(AMQPMethodPayload): and if the exchange exists, verifies that it is of the correct and expected class. """ - __slots__ = ( - u'exchange', u'type_', u'passive', u'durable', u'auto_delete', u'internal', - u'no_wait', u'arguments',) + __slots__ = (u'exchange', u'type_', u'passive', u'durable', u'auto_delete', u'internal', u'no_wait', u'arguments', ) NAME = u'exchange.declare' - INDEX = (40, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x0A' # CLASS ID + METHOD ID + INDEX = (40, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), Field(u'type', u'shortstr', u'shortstr', reserved=False), @@ -1406,8 +1527,14 @@ class ExchangeDeclare(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] - def __init__(self, exchange, type_, passive, durable, auto_delete, - internal, no_wait, arguments): + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeDeclare(%s)' % (', '.join(map(repr, [self.exchange, self.type_, self.passive, self.durable, self.auto_delete, self.internal, self.no_wait, self.arguments]))) + + def __init__(self, exchange, type_, passive, durable, auto_delete, internal, no_wait, arguments): """ Create frame exchange.declare @@ -1479,26 +1606,22 @@ class ExchangeDeclare(AMQPMethodPayload): buf.write(self.exchange) buf.write(struct.pack('!B', len(self.type_))) buf.write(self.type_) - buf.write(struct.pack('!B', - (self.passive << 0) | (self.durable << 1) | ( - self.auto_delete << 2) | (self.internal << 3) | ( - self.no_wait << 4))) + buf.write(struct.pack('!B', (self.passive << 0) | (self.durable << 1) | (self.auto_delete << 2) | (self.internal << 3) | (self.no_wait << 4))) enframe_table(buf, self.arguments) - + def get_size(self): - return 5 + len(self.exchange) + len(self.type_) + frame_table_size( - self.arguments) + return 5 + len(self.exchange) + len(self.type_) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - exchange = buf[offset:offset + s_len] + exchange = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - type_ = buf[offset:offset + s_len] + type_ = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1510,8 +1633,7 @@ class ExchangeDeclare(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return ExchangeDeclare(exchange, type_, passive, durable, auto_delete, - internal, no_wait, arguments) + return ExchangeDeclare(exchange, type_, passive, durable, auto_delete, internal, no_wait, arguments) class ExchangeDelete(AMQPMethodPayload): @@ -1522,26 +1644,33 @@ class ExchangeDelete(AMQPMethodPayload): queue bindings on the exchange are cancelled. """ - __slots__ = (u'exchange', u'if_unused', u'no_wait',) + __slots__ = (u'exchange', u'if_unused', u'no_wait', ) NAME = u'exchange.delete' - INDEX = (40, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x14' # CLASS ID + METHOD ID + INDEX = (40, 20) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x14' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), Field(u'if-unused', u'bit', u'bit', reserved=False), Field(u'no-wait', u'no-wait', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeDelete(%s)' % (', '.join(map(repr, [self.exchange, self.if_unused, self.no_wait]))) + def __init__(self, exchange, if_unused, no_wait): """ Create frame exchange.delete @@ -1566,9 +1695,8 @@ class ExchangeDelete(AMQPMethodPayload): buf.write(b'\x00\x00') buf.write(struct.pack('!B', len(self.exchange))) buf.write(self.exchange) - buf.write( - struct.pack('!B', (self.if_unused << 0) | (self.no_wait << 1))) - + buf.write(struct.pack('!B', (self.if_unused << 0) | (self.no_wait << 1))) + def get_size(self): return 4 + len(self.exchange) @@ -1577,7 +1705,7 @@ class ExchangeDelete(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - exchange = buf[offset:offset + s_len] + exchange = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1599,20 +1727,28 @@ class ExchangeDeclareOk(AMQPMethodPayload): NAME = u'exchange.declare-ok' - INDEX = (40, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x0B' # CLASS ID + METHOD ID + INDEX = (40, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeDeclareOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame exchange.declare-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -1629,20 +1765,28 @@ class ExchangeDeleteOk(AMQPMethodPayload): NAME = u'exchange.delete-ok' - INDEX = (40, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x15' # CLASS ID + METHOD ID + INDEX = (40, 21) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x15' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeDeleteOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame exchange.delete-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -1655,21 +1799,20 @@ class ExchangeUnbind(AMQPMethodPayload): This method unbinds an exchange from an exchange. """ - __slots__ = ( - u'destination', u'source', u'routing_key', u'no_wait', u'arguments',) + __slots__ = (u'destination', u'source', u'routing_key', u'no_wait', u'arguments', ) NAME = u'exchange.unbind' - INDEX = (40, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x28' # CLASS ID + METHOD ID + INDEX = (40, 40) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x28' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'destination', u'exchange-name', u'shortstr', reserved=False), Field(u'source', u'exchange-name', u'shortstr', reserved=False), @@ -1678,6 +1821,13 @@ class ExchangeUnbind(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeUnbind(%s)' % (', '.join(map(repr, [self.destination, self.source, self.routing_key, self.no_wait, self.arguments]))) + def __init__(self, destination, source, routing_key, no_wait, arguments): """ Create frame exchange.unbind @@ -1710,25 +1860,24 @@ class ExchangeUnbind(AMQPMethodPayload): buf.write(self.routing_key) buf.write(struct.pack('!B', (self.no_wait << 0))) enframe_table(buf, self.arguments) - + def get_size(self): - return 6 + len(self.destination) + len(self.source) + len( - self.routing_key) + frame_table_size(self.arguments) + return 6 + len(self.destination) + len(self.source) + len(self.routing_key) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - destination = buf[offset:offset + s_len] + destination = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - source = buf[offset:offset + s_len] + source = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset + s_len] + routing_key = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1736,8 +1885,7 @@ class ExchangeUnbind(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return ExchangeUnbind(destination, source, routing_key, no_wait, - arguments) + return ExchangeUnbind(destination, source, routing_key, no_wait, arguments) class ExchangeUnbindOk(AMQPMethodPayload): @@ -1750,20 +1898,28 @@ class ExchangeUnbindOk(AMQPMethodPayload): NAME = u'exchange.unbind-ok' - INDEX = (40, 51) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x28\x00\x33' # CLASS ID + METHOD ID + INDEX = (40, 51) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x28\x00\x33' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeUnbindOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame exchange.unbind-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -1795,21 +1951,20 @@ class QueueBind(AMQPMethodPayload): to a topic exchange. """ - __slots__ = ( - u'queue', u'exchange', u'routing_key', u'no_wait', u'arguments',) + __slots__ = (u'queue', u'exchange', u'routing_key', u'no_wait', u'arguments', ) NAME = u'queue.bind' - INDEX = (50, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x14' # CLASS ID + METHOD ID + INDEX = (50, 20) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x14' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), @@ -1818,6 +1973,13 @@ class QueueBind(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueBind(%s)' % (', '.join(map(repr, [self.queue, self.exchange, self.routing_key, self.no_wait, self.arguments]))) + def __init__(self, queue, exchange, routing_key, no_wait, arguments): """ Create frame queue.bind @@ -1868,25 +2030,24 @@ class QueueBind(AMQPMethodPayload): buf.write(self.routing_key) buf.write(struct.pack('!B', (self.no_wait << 0))) enframe_table(buf, self.arguments) - + def get_size(self): - return 6 + len(self.queue) + len(self.exchange) + len( - self.routing_key) + frame_table_size(self.arguments) + return 6 + len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset + s_len] + queue = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - exchange = buf[offset:offset + s_len] + exchange = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset + s_len] + routing_key = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1907,20 +2068,28 @@ class QueueBindOk(AMQPMethodPayload): NAME = u'queue.bind-ok' - INDEX = (50, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x15' # CLASS ID + METHOD ID + INDEX = (50, 21) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x15' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x32\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueBindOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame queue.bind-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -1937,22 +2106,20 @@ class QueueDeclare(AMQPMethodPayload): queue and its contents, and the level of sharing for the queue. """ - __slots__ = ( - u'queue', u'passive', u'durable', u'exclusive', u'auto_delete', u'no_wait', - u'arguments',) + __slots__ = (u'queue', u'passive', u'durable', u'exclusive', u'auto_delete', u'no_wait', u'arguments', ) NAME = u'queue.declare' - INDEX = (50, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x0A' # CLASS ID + METHOD ID + INDEX = (50, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'passive', u'bit', u'bit', reserved=False), @@ -1963,8 +2130,14 @@ class QueueDeclare(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] - def __init__(self, queue, passive, durable, exclusive, auto_delete, - no_wait, arguments): + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueDeclare(%s)' % (', '.join(map(repr, [self.queue, self.passive, self.durable, self.exclusive, self.auto_delete, self.no_wait, self.arguments]))) + + def __init__(self, queue, passive, durable, exclusive, auto_delete, no_wait, arguments): """ Create frame queue.declare @@ -2034,12 +2207,9 @@ class QueueDeclare(AMQPMethodPayload): buf.write(b'\x00\x00') buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', - (self.passive << 0) | (self.durable << 1) | ( - self.exclusive << 2) | ( - self.auto_delete << 3) | (self.no_wait << 4))) + buf.write(struct.pack('!B', (self.passive << 0) | (self.durable << 1) | (self.exclusive << 2) | (self.auto_delete << 3) | (self.no_wait << 4))) enframe_table(buf, self.arguments) - + def get_size(self): return 4 + len(self.queue) + frame_table_size(self.arguments) @@ -2048,7 +2218,7 @@ class QueueDeclare(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset + s_len] + queue = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -2060,8 +2230,7 @@ class QueueDeclare(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return QueueDeclare(queue, passive, durable, exclusive, auto_delete, - no_wait, arguments) + return QueueDeclare(queue, passive, durable, exclusive, auto_delete, no_wait, arguments) class QueueDelete(AMQPMethodPayload): @@ -2074,20 +2243,20 @@ class QueueDelete(AMQPMethodPayload): configuration, and all consumers on the queue are cancelled. """ - __slots__ = (u'queue', u'if_unused', u'if_empty', u'no_wait',) + __slots__ = (u'queue', u'if_unused', u'if_empty', u'no_wait', ) NAME = u'queue.delete' - INDEX = (50, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x28' # CLASS ID + METHOD ID + INDEX = (50, 40) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x28' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'if-unused', u'bit', u'bit', reserved=False), @@ -2095,6 +2264,13 @@ class QueueDelete(AMQPMethodPayload): Field(u'no-wait', u'no-wait', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueDelete(%s)' % (', '.join(map(repr, [self.queue, self.if_unused, self.if_empty, self.no_wait]))) + def __init__(self, queue, if_unused, if_empty, no_wait): """ Create frame queue.delete @@ -2123,10 +2299,8 @@ class QueueDelete(AMQPMethodPayload): buf.write(b'\x00\x00') buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', - (self.if_unused << 0) | (self.if_empty << 1) | ( - self.no_wait << 2))) - + buf.write(struct.pack('!B', (self.if_unused << 0) | (self.if_empty << 1) | (self.no_wait << 2))) + def get_size(self): return 4 + len(self.queue) @@ -2135,7 +2309,7 @@ class QueueDelete(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset + s_len] + queue = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -2154,25 +2328,32 @@ class QueueDeclareOk(AMQPMethodPayload): the queue, essential for automatically-named queues. """ - __slots__ = (u'queue', u'message_count', u'consumer_count',) + __slots__ = (u'queue', u'message_count', u'consumer_count', ) NAME = u'queue.declare-ok' - INDEX = (50, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x0B' # CLASS ID + METHOD ID + INDEX = (50, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'message-count', u'message-count', u'long', reserved=False), Field(u'consumer-count', u'long', u'long', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueDeclareOk(%s)' % (', '.join(map(repr, [self.queue, self.message_count, self.consumer_count]))) + def __init__(self, queue, message_count, consumer_count): """ Create frame queue.declare-ok @@ -2197,7 +2378,7 @@ class QueueDeclareOk(AMQPMethodPayload): buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!II', self.message_count, self.consumer_count)) - + def get_size(self): return 9 + len(self.queue) @@ -2206,7 +2387,7 @@ class QueueDeclareOk(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - queue = buf[offset:offset + s_len] + queue = buf[offset:offset+s_len] offset += s_len message_count, consumer_count, = struct.unpack_from('!II', buf, offset) offset += 8 @@ -2219,23 +2400,30 @@ class QueueDeleteOk(AMQPMethodPayload): This method confirms the deletion of a queue. """ - __slots__ = (u'message_count',) + __slots__ = (u'message_count', ) NAME = u'queue.delete-ok' - INDEX = (50, 41) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x29' # CLASS ID + METHOD ID + INDEX = (50, 41) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x29' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'message-count', u'message-count', u'long', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueDeleteOk(%s)' % (', '.join(map(repr, [self.message_count]))) + def __init__(self, message_count): """ Create frame queue.delete-ok @@ -2247,7 +2435,7 @@ class QueueDeleteOk(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!I', self.message_count)) - + def get_size(self): return 4 @@ -2267,25 +2455,32 @@ class QueuePurge(AMQPMethodPayload): awaiting acknowledgment. """ - __slots__ = (u'queue', u'no_wait',) + __slots__ = (u'queue', u'no_wait', ) NAME = u'queue.purge' - INDEX = (50, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x1E' # CLASS ID + METHOD ID + INDEX = (50, 30) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x1E' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'no-wait', u'no-wait', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueuePurge(%s)' % (', '.join(map(repr, [self.queue, self.no_wait]))) + def __init__(self, queue, no_wait): """ Create frame queue.purge @@ -2302,7 +2497,7 @@ class QueuePurge(AMQPMethodPayload): buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!B', (self.no_wait << 0))) - + def get_size(self): return 4 + len(self.queue) @@ -2311,7 +2506,7 @@ class QueuePurge(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset + s_len] + queue = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -2326,23 +2521,30 @@ class QueuePurgeOk(AMQPMethodPayload): This method confirms the purge of a queue. """ - __slots__ = (u'message_count',) + __slots__ = (u'message_count', ) NAME = u'queue.purge-ok' - INDEX = (50, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x1F' # CLASS ID + METHOD ID + INDEX = (50, 31) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x1F' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'message-count', u'message-count', u'long', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueuePurgeOk(%s)' % (', '.join(map(repr, [self.message_count]))) + def __init__(self, message_count): """ Create frame queue.purge-ok @@ -2354,7 +2556,7 @@ class QueuePurgeOk(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!I', self.message_count)) - + def get_size(self): return 4 @@ -2372,20 +2574,20 @@ class QueueUnbind(AMQPMethodPayload): This method unbinds a queue from an exchange. """ - __slots__ = (u'queue', u'exchange', u'routing_key', u'arguments',) + __slots__ = (u'queue', u'exchange', u'routing_key', u'arguments', ) NAME = u'queue.unbind' - INDEX = (50, 50) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x32' # CLASS ID + METHOD ID + INDEX = (50, 50) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x32' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), @@ -2393,6 +2595,13 @@ class QueueUnbind(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueUnbind(%s)' % (', '.join(map(repr, [self.queue, self.exchange, self.routing_key, self.arguments]))) + def __init__(self, queue, exchange, routing_key, arguments): """ Create frame queue.unbind @@ -2422,25 +2631,24 @@ class QueueUnbind(AMQPMethodPayload): buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) enframe_table(buf, self.arguments) - + def get_size(self): - return 5 + len(self.queue) + len(self.exchange) + len( - self.routing_key) + frame_table_size(self.arguments) + return 5 + len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset + s_len] + queue = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - exchange = buf[offset:offset + s_len] + exchange = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset + s_len] + routing_key = buf[offset:offset+s_len] offset += s_len arguments, delta = deframe_table(buf, offset) offset += delta @@ -2457,20 +2665,28 @@ class QueueUnbindOk(AMQPMethodPayload): NAME = u'queue.unbind-ok' - INDEX = (50, 51) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x32\x00\x33' # CLASS ID + METHOD ID + INDEX = (50, 51) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x32\x00\x33' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x32\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueUnbindOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame queue.unbind-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -2546,72 +2762,48 @@ class BasicContentPropertyList(AMQPContentPropertyList): :type reserved: binary type (max length 255) (AMQP as shortstr) """ zpf = bytearray([ - (('content_type' in kwargs) << 7) | ( - ('content_encoding' in kwargs) << 6) | ( - ('headers' in kwargs) << 5) | ( - ('delivery_mode' in kwargs) << 4) | ( - ('priority' in kwargs) << 3) | ( - ('correlation_id' in kwargs) << 2) | ( - ('reply_to' in kwargs) << 1) | int('expiration' in kwargs), - (('message_id' in kwargs) << 7) | ( - ('timestamp' in kwargs) << 6) | (('type_' in kwargs) << 5) | ( - ('user_id' in kwargs) << 4) | (('app_id' in kwargs) << 3) | ( - ('reserved' in kwargs) << 2) + (('content_type' in kwargs) << 7) | (('content_encoding' in kwargs) << 6) | (('headers' in kwargs) << 5) | (('delivery_mode' in kwargs) << 4) | (('priority' in kwargs) << 3) | (('correlation_id' in kwargs) << 2) | (('reply_to' in kwargs) << 1) | int('expiration' in kwargs), + (('message_id' in kwargs) << 7) | (('timestamp' in kwargs) << 6) | (('type_' in kwargs) << 5) | (('user_id' in kwargs) << 4) | (('app_id' in kwargs) << 3) | (('reserved' in kwargs) << 2) ]) zpf = six.binary_type(zpf) - # If you know in advance what properties you will be using, use typized constructors like - # - # runs once - # my_type = BasicContentPropertyList.typize('content_type', 'content_encoding') - # - # runs many times - # props = my_type('text/plain', 'utf8') - # - # instead of - # - # # runs many times - # props = BasicContentPropertyList(content_type='text/plain', content_encoding='utf8') - # - # This way you will be faster. - # - # If you do not know in advance what properties you will be using, it is correct to use - # this constructor. +# If you know in advance what properties you will be using, use typized constructors like +# +# runs once +# my_type = BasicContentPropertyList.typize('content_type', 'content_encoding') +# +# runs many times +# props = my_type('text/plain', 'utf8') +# +# instead of +# +# # runs many times +# props = BasicContentPropertyList(content_type='text/plain', content_encoding='utf8') +# +# This way you will be faster. +# +# If you do not know in advance what properties you will be using, it is correct to use +# this constructor. if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: return BasicContentPropertyList.PARTICULAR_CLASSES[zpf](**kwargs) else: - logger.debug( - 'Property field (BasicContentPropertyList:%s) not seen yet, compiling', - repr(zpf)) - c = compile_particular_content_property_list_class(zpf, - BasicContentPropertyList.FIELDS) + logger.debug('Property field (BasicContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, BasicContentPropertyList.FIELDS) BasicContentPropertyList.PARTICULAR_CLASSES[zpf] = c return c(**kwargs) @staticmethod def typize(*fields): zpf = bytearray([ - (('content_type' in fields) << 7) | ( - ('content_encoding' in fields) << 6) | ( - ('headers' in fields) << 5) | ( - ('delivery_mode' in fields) << 4) | ( - ('priority' in fields) << 3) | ( - ('correlation_id' in fields) << 2) | ( - ('reply_to' in fields) << 1) | int('expiration' in kwargs), - (('message_id' in fields) << 7) | ( - ('timestamp' in fields) << 6) | (('type_' in fields) << 5) | ( - ('user_id' in fields) << 4) | (('app_id' in fields) << 3) | ( - ('reserved' in fields) << 2) + (('content_type' in fields) << 7) | (('content_encoding' in fields) << 6) | (('headers' in fields) << 5) | (('delivery_mode' in fields) << 4) | (('priority' in fields) << 3) | (('correlation_id' in fields) << 2) | (('reply_to' in fields) << 1) | int('expiration' in kwargs), + (('message_id' in fields) << 7) | (('timestamp' in fields) << 6) | (('type_' in fields) << 5) | (('user_id' in fields) << 4) | (('app_id' in fields) << 3) | (('reserved' in fields) << 2) ]) zpf = six.binary_type(zpf) if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: return BasicContentPropertyList.PARTICULAR_CLASSES[zpf] else: - logger.debug( - 'Property field (BasicContentPropertyList:%s) not seen yet, compiling', - repr(zpf)) - c = compile_particular_content_property_list_class(zpf, - BasicContentPropertyList.FIELDS) + logger.debug('Property field (BasicContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, BasicContentPropertyList.FIELDS) BasicContentPropertyList.PARTICULAR_CLASSES[zpf] = c return c @@ -2629,17 +2821,12 @@ class BasicContentPropertyList(AMQPContentPropertyList): else: while buf[offset + pfl - 1] & 1: pfl += 2 - zpf = BasicContentPropertyList.zero_property_flags( - buf[offset:offset + pfl]).tobytes() + zpf = BasicContentPropertyList.zero_property_flags(buf[offset:offset+pfl]).tobytes() if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: - return BasicContentPropertyList.PARTICULAR_CLASSES[ - zpf].from_buffer(buf, offset) + return BasicContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) else: - logger.debug( - 'Property field (BasicContentPropertyList:%s) not seen yet, compiling', - repr(zpf)) - c = compile_particular_content_property_list_class(zpf, - BasicContentPropertyList.FIELDS) + logger.debug('Property field (BasicContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, BasicContentPropertyList.FIELDS) BasicContentPropertyList.PARTICULAR_CLASSES[zpf] = c return c.from_buffer(buf, offset) @@ -2656,24 +2843,31 @@ class BasicAck(AMQPMethodPayload): The acknowledgement can be for a single message or a set of messages up to and including a specific message. """ - __slots__ = (u'delivery_tag', u'multiple',) + __slots__ = (u'delivery_tag', u'multiple', ) NAME = u'basic.ack' - INDEX = (60, 80) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x50' # CLASS ID + METHOD ID + INDEX = (60, 80) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x50' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'multiple', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicAck(%s)' % (', '.join(map(repr, [self.delivery_tag, self.multiple]))) + def __init__(self, delivery_tag, multiple): """ Create frame basic.ack @@ -2693,7 +2887,7 @@ class BasicAck(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!QB', self.delivery_tag, (self.multiple << 0))) - + def get_size(self): return 9 @@ -2717,22 +2911,20 @@ class BasicConsume(AMQPMethodPayload): channel they were declared on, or until the client cancels them. """ - __slots__ = ( - u'queue', u'consumer_tag', u'no_local', u'no_ack', u'exclusive', - u'no_wait', u'arguments',) + __slots__ = (u'queue', u'consumer_tag', u'no_local', u'no_ack', u'exclusive', u'no_wait', u'arguments', ) NAME = u'basic.consume' - INDEX = (60, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x14' # CLASS ID + METHOD ID + INDEX = (60, 20) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x14' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), @@ -2743,8 +2935,14 @@ class BasicConsume(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] - def __init__(self, queue, consumer_tag, no_local, no_ack, exclusive, - no_wait, arguments): + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicConsume(%s)' % (', '.join(map(repr, [self.queue, self.consumer_tag, self.no_local, self.no_ack, self.exclusive, self.no_wait, self.arguments]))) + + def __init__(self, queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments): """ Create frame basic.consume @@ -2784,25 +2982,22 @@ class BasicConsume(AMQPMethodPayload): buf.write(self.queue) buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) - buf.write(struct.pack('!B', - (self.no_local << 0) | (self.no_ack << 1) | ( - self.exclusive << 2) | (self.no_wait << 3))) + buf.write(struct.pack('!B', (self.no_local << 0) | (self.no_ack << 1) | (self.exclusive << 2) | (self.no_wait << 3))) enframe_table(buf, self.arguments) - + def get_size(self): - return 5 + len(self.queue) + len(self.consumer_tag) + frame_table_size( - self.arguments) + return 5 + len(self.queue) + len(self.consumer_tag) + frame_table_size(self.arguments) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset + s_len] + queue = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - consumer_tag = buf[offset:offset + s_len] + consumer_tag = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -2813,8 +3008,7 @@ class BasicConsume(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return BasicConsume(queue, consumer_tag, no_local, no_ack, exclusive, - no_wait, arguments) + return BasicConsume(queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments) class BasicCancel(AMQPMethodPayload): @@ -2840,24 +3034,31 @@ class BasicCancel(AMQPMethodPayload): capable of accepting the method, through some means of capability negotiation. """ - __slots__ = (u'consumer_tag', u'no_wait',) + __slots__ = (u'consumer_tag', u'no_wait', ) NAME = u'basic.cancel' - INDEX = (60, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x1E' # CLASS ID + METHOD ID + INDEX = (60, 30) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x1E' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), Field(u'no-wait', u'no-wait', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicCancel(%s)' % (', '.join(map(repr, [self.consumer_tag, self.no_wait]))) + def __init__(self, consumer_tag, no_wait): """ Create frame basic.cancel @@ -2872,7 +3073,7 @@ class BasicCancel(AMQPMethodPayload): buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) buf.write(struct.pack('!B', (self.no_wait << 0))) - + def get_size(self): return 2 + len(self.consumer_tag) @@ -2881,7 +3082,7 @@ class BasicCancel(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - consumer_tag = buf[offset:offset + s_len] + consumer_tag = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -2898,23 +3099,30 @@ class BasicConsumeOk(AMQPMethodPayload): used by the client for methods called on the consumer at a later stage. """ - __slots__ = (u'consumer_tag',) + __slots__ = (u'consumer_tag', ) NAME = u'basic.consume-ok' - INDEX = (60, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x15' # CLASS ID + METHOD ID + INDEX = (60, 21) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x15' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicConsumeOk(%s)' % (', '.join(map(repr, [self.consumer_tag]))) + def __init__(self, consumer_tag): """ Create frame basic.consume-ok @@ -2928,7 +3136,7 @@ class BasicConsumeOk(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) - + def get_size(self): return 1 + len(self.consumer_tag) @@ -2937,7 +3145,7 @@ class BasicConsumeOk(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - consumer_tag = buf[offset:offset + s_len] + consumer_tag = buf[offset:offset+s_len] offset += s_len return BasicConsumeOk(consumer_tag) @@ -2948,23 +3156,30 @@ class BasicCancelOk(AMQPMethodPayload): This method confirms that the cancellation was completed. """ - __slots__ = (u'consumer_tag',) + __slots__ = (u'consumer_tag', ) NAME = u'basic.cancel-ok' - INDEX = (60, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x1F' # CLASS ID + METHOD ID + INDEX = (60, 31) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x1F' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicCancelOk(%s)' % (', '.join(map(repr, [self.consumer_tag]))) + def __init__(self, consumer_tag): """ Create frame basic.cancel-ok @@ -2976,7 +3191,7 @@ class BasicCancelOk(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) - + def get_size(self): return 1 + len(self.consumer_tag) @@ -2985,7 +3200,7 @@ class BasicCancelOk(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - consumer_tag = buf[offset:offset + s_len] + consumer_tag = buf[offset:offset+s_len] offset += s_len return BasicCancelOk(consumer_tag) @@ -3002,21 +3217,20 @@ class BasicDeliver(AMQPMethodPayload): arrive for that consumer. """ - __slots__ = (u'consumer_tag', u'delivery_tag', u'redelivered', u'exchange', - u'routing_key',) + __slots__ = (u'consumer_tag', u'delivery_tag', u'redelivered', u'exchange', u'routing_key', ) NAME = u'basic.deliver' - INDEX = (60, 60) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x3C' # CLASS ID + METHOD ID + INDEX = (60, 60) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x3C' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'redelivered', u'redelivered', u'bit', reserved=False), @@ -3024,8 +3238,14 @@ class BasicDeliver(AMQPMethodPayload): Field(u'routing-key', u'shortstr', u'shortstr', reserved=False), ] - def __init__(self, consumer_tag, delivery_tag, redelivered, exchange, - routing_key): + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicDeliver(%s)' % (', '.join(map(repr, [self.consumer_tag, self.delivery_tag, self.redelivered, self.exchange, self.routing_key]))) + + def __init__(self, consumer_tag, delivery_tag, redelivered, exchange, routing_key): """ Create frame basic.deliver @@ -3050,23 +3270,20 @@ class BasicDeliver(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) - buf.write( - struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), - len(self.exchange))) + buf.write(struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), len(self.exchange))) buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) - + def get_size(self): - return 12 + len(self.consumer_tag) + len(self.exchange) + len( - self.routing_key) + return 12 + len(self.consumer_tag) + len(self.exchange) + len(self.routing_key) @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - consumer_tag = buf[offset:offset + s_len] + consumer_tag = buf[offset:offset+s_len] offset += s_len delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) offset += 8 @@ -3074,14 +3291,13 @@ class BasicDeliver(AMQPMethodPayload): offset += 1 s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - exchange = buf[offset:offset + s_len] + exchange = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset + s_len] + routing_key = buf[offset:offset+s_len] offset += s_len - return BasicDeliver(consumer_tag, delivery_tag, redelivered, exchange, - routing_key) + return BasicDeliver(consumer_tag, delivery_tag, redelivered, exchange, routing_key) class BasicGet(AMQPMethodPayload): @@ -3094,25 +3310,32 @@ class BasicGet(AMQPMethodPayload): where synchronous functionality is more important than performance. """ - __slots__ = (u'queue', u'no_ack',) + __slots__ = (u'queue', u'no_ack', ) NAME = u'basic.get' - INDEX = (60, 70) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x46' # CLASS ID + METHOD ID + INDEX = (60, 70) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x46' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'no-ack', u'no-ack', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicGet(%s)' % (', '.join(map(repr, [self.queue, self.no_ack]))) + def __init__(self, queue, no_ack): """ Create frame basic.get @@ -3129,7 +3352,7 @@ class BasicGet(AMQPMethodPayload): buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!B', (self.no_ack << 0))) - + def get_size(self): return 4 + len(self.queue) @@ -3138,7 +3361,7 @@ class BasicGet(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - queue = buf[offset:offset + s_len] + queue = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -3157,21 +3380,20 @@ class BasicGetOk(AMQPMethodPayload): option was set in the get method. """ - __slots__ = (u'delivery_tag', u'redelivered', u'exchange', u'routing_key', - u'message_count',) + __slots__ = (u'delivery_tag', u'redelivered', u'exchange', u'routing_key', u'message_count', ) NAME = u'basic.get-ok' - INDEX = (60, 71) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x47' # CLASS ID + METHOD ID + INDEX = (60, 71) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x47' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'redelivered', u'redelivered', u'bit', reserved=False), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), @@ -3179,8 +3401,14 @@ class BasicGetOk(AMQPMethodPayload): Field(u'message-count', u'message-count', u'long', reserved=False), ] - def __init__(self, delivery_tag, redelivered, exchange, routing_key, - message_count): + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicGetOk(%s)' % (', '.join(map(repr, [self.delivery_tag, self.redelivered, self.exchange, self.routing_key, self.message_count]))) + + def __init__(self, delivery_tag, redelivered, exchange, routing_key, message_count): """ Create frame basic.get-ok @@ -3203,14 +3431,12 @@ class BasicGetOk(AMQPMethodPayload): self.message_count = message_count def write_arguments(self, buf): - buf.write( - struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), - len(self.exchange))) + buf.write(struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), len(self.exchange))) buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) buf.write(struct.pack('!I', self.message_count)) - + def get_size(self): return 15 + len(self.exchange) + len(self.routing_key) @@ -3223,16 +3449,15 @@ class BasicGetOk(AMQPMethodPayload): offset += 1 s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - exchange = buf[offset:offset + s_len] + exchange = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset + s_len] + routing_key = buf[offset:offset+s_len] offset += s_len message_count, = struct.unpack_from('!I', buf, offset) offset += 4 - return BasicGetOk(delivery_tag, redelivered, exchange, routing_key, - message_count) + return BasicGetOk(delivery_tag, redelivered, exchange, routing_key, message_count) class BasicGetEmpty(AMQPMethodPayload): @@ -3247,31 +3472,39 @@ class BasicGetEmpty(AMQPMethodPayload): NAME = u'basic.get-empty' - INDEX = (60, 72) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x48' # CLASS ID + METHOD ID + INDEX = (60, 72) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x48' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x0D\x00\x3C\x00\x48\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicGetEmpty(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame basic.get-empty """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - offset += s_len # reserved field! + offset += s_len # reserved field! return BasicGetEmpty() @@ -3290,25 +3523,32 @@ class BasicNack(AMQPMethodPayload): method, it probably needs to republish the offending messages. """ - __slots__ = (u'delivery_tag', u'multiple', u'requeue',) + __slots__ = (u'delivery_tag', u'multiple', u'requeue', ) NAME = u'basic.nack' - INDEX = (60, 120) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x78' # CLASS ID + METHOD ID + INDEX = (60, 120) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x78' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'multiple', u'bit', u'bit', reserved=False), Field(u'requeue', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicNack(%s)' % (', '.join(map(repr, [self.delivery_tag, self.multiple, self.requeue]))) + def __init__(self, delivery_tag, multiple, requeue): """ Create frame basic.nack @@ -3335,9 +3575,8 @@ class BasicNack(AMQPMethodPayload): self.requeue = requeue def write_arguments(self, buf): - buf.write(struct.pack('!QB', self.delivery_tag, - (self.multiple << 0) | (self.requeue << 1))) - + buf.write(struct.pack('!QB', self.delivery_tag, (self.multiple << 0) | (self.requeue << 1))) + def get_size(self): return 9 @@ -3362,20 +3601,20 @@ class BasicPublish(AMQPMethodPayload): distributed to any active consumers when the transaction, if any, is committed. """ - __slots__ = (u'exchange', u'routing_key', u'mandatory', u'immediate',) + __slots__ = (u'exchange', u'routing_key', u'mandatory', u'immediate', ) NAME = u'basic.publish' - INDEX = (60, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x28' # CLASS ID + METHOD ID + INDEX = (60, 40) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x28' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), Field(u'routing-key', u'shortstr', u'shortstr', reserved=False), @@ -3383,6 +3622,13 @@ class BasicPublish(AMQPMethodPayload): Field(u'immediate', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicPublish(%s)' % (', '.join(map(repr, [self.exchange, self.routing_key, self.mandatory, self.immediate]))) + def __init__(self, exchange, routing_key, mandatory, immediate): """ Create frame basic.publish @@ -3429,9 +3675,8 @@ class BasicPublish(AMQPMethodPayload): buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) - buf.write( - struct.pack('!B', (self.mandatory << 0) | (self.immediate << 1))) - + buf.write(struct.pack('!B', (self.mandatory << 0) | (self.immediate << 1))) + def get_size(self): return 5 + len(self.exchange) + len(self.routing_key) @@ -3440,11 +3685,11 @@ class BasicPublish(AMQPMethodPayload): offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 - exchange = buf[offset:offset + s_len] + exchange = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset + s_len] + routing_key = buf[offset:offset+s_len] offset += s_len _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -3468,25 +3713,32 @@ class BasicQos(AMQPMethodPayload): currently meaningful only for the server. """ - __slots__ = (u'prefetch_size', u'prefetch_count', u'global_',) + __slots__ = (u'prefetch_size', u'prefetch_count', u'global_', ) NAME = u'basic.qos' - INDEX = (60, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x0A' # CLASS ID + METHOD ID + INDEX = (60, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'prefetch-size', u'long', u'long', reserved=False), Field(u'prefetch-count', u'short', u'short', reserved=False), Field(u'global', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicQos(%s)' % (', '.join(map(repr, [self.prefetch_size, self.prefetch_count, self.global_]))) + def __init__(self, prefetch_size, prefetch_count, global_): """ Create frame basic.qos @@ -3535,17 +3787,15 @@ class BasicQos(AMQPMethodPayload): self.global_ = global_ def write_arguments(self, buf): - buf.write(struct.pack('!IHB', self.prefetch_size, self.prefetch_count, - (self.global_ << 0))) - + buf.write(struct.pack('!IHB', self.prefetch_size, self.prefetch_count, (self.global_ << 0))) + def get_size(self): return 7 @staticmethod def from_buffer(buf, start_offset): offset = start_offset - prefetch_size, prefetch_count, _bit, = struct.unpack_from('!IHB', buf, - offset) + prefetch_size, prefetch_count, _bit, = struct.unpack_from('!IHB', buf, offset) offset += 6 global_ = bool(_bit >> 0) offset += 1 @@ -3566,20 +3816,28 @@ class BasicQosOk(AMQPMethodPayload): NAME = u'basic.qos-ok' - INDEX = (60, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x0B' # CLASS ID + METHOD ID + INDEX = (60, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x3C\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicQosOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame basic.qos-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3598,26 +3856,33 @@ class BasicReturn(AMQPMethodPayload): the message was undeliverable. """ - __slots__ = (u'reply_code', u'reply_text', u'exchange', u'routing_key',) + __slots__ = (u'reply_code', u'reply_text', u'exchange', u'routing_key', ) NAME = u'basic.return' - INDEX = (60, 50) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x32' # CLASS ID + METHOD ID + INDEX = (60, 50) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x32' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reply-code', u'reply-code', u'short', reserved=False), Field(u'reply-text', u'reply-text', u'shortstr', reserved=False), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), Field(u'routing-key', u'shortstr', u'shortstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicReturn(%s)' % (', '.join(map(repr, [self.reply_code, self.reply_text, self.exchange, self.routing_key]))) + def __init__(self, reply_code, reply_text, exchange, routing_key): """ Create frame basic.return @@ -3645,25 +3910,24 @@ class BasicReturn(AMQPMethodPayload): buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) - + def get_size(self): - return 5 + len(self.reply_text) + len(self.exchange) + len( - self.routing_key) + return 5 + len(self.reply_text) + len(self.exchange) + len(self.routing_key) @staticmethod def from_buffer(buf, start_offset): offset = start_offset reply_code, s_len, = struct.unpack_from('!HB', buf, offset) offset += 3 - reply_text = buf[offset:offset + s_len] + reply_text = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - exchange = buf[offset:offset + s_len] + exchange = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!B', buf, offset) offset += 1 - routing_key = buf[offset:offset + s_len] + routing_key = buf[offset:offset+s_len] offset += s_len return BasicReturn(reply_code, reply_text, exchange, routing_key) @@ -3678,24 +3942,31 @@ class BasicReject(AMQPMethodPayload): to their original queue. """ - __slots__ = (u'delivery_tag', u'requeue',) + __slots__ = (u'delivery_tag', u'requeue', ) NAME = u'basic.reject' - INDEX = (60, 90) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x5A' # CLASS ID + METHOD ID + INDEX = (60, 90) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x5A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'requeue', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicReject(%s)' % (', '.join(map(repr, [self.delivery_tag, self.requeue]))) + def __init__(self, delivery_tag, requeue): """ Create frame basic.reject @@ -3713,7 +3984,7 @@ class BasicReject(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!QB', self.delivery_tag, (self.requeue << 0))) - + def get_size(self): return 9 @@ -3737,23 +4008,30 @@ class BasicRecoverAsync(AMQPMethodPayload): This method is deprecated in favour of the synchronous Recover/Recover-Ok. """ - __slots__ = (u'requeue',) + __slots__ = (u'requeue', ) NAME = u'basic.recover-async' - INDEX = (60, 100) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x64' # CLASS ID + METHOD ID + INDEX = (60, 100) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x64' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'requeue', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicRecoverAsync(%s)' % (', '.join(map(repr, [self.requeue]))) + def __init__(self, requeue): """ Create frame basic.recover-async @@ -3770,7 +4048,7 @@ class BasicRecoverAsync(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', (self.requeue << 0))) - + def get_size(self): return 1 @@ -3794,23 +4072,30 @@ class BasicRecover(AMQPMethodPayload): This method replaces the asynchronous Recover. """ - __slots__ = (u'requeue',) + __slots__ = (u'requeue', ) NAME = u'basic.recover' - INDEX = (60, 110) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x6E' # CLASS ID + METHOD ID + INDEX = (60, 110) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x6E' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'requeue', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicRecover(%s)' % (', '.join(map(repr, [self.requeue]))) + def __init__(self, requeue): """ Create frame basic.recover @@ -3827,7 +4112,7 @@ class BasicRecover(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', (self.requeue << 0))) - + def get_size(self): return 1 @@ -3851,20 +4136,28 @@ class BasicRecoverOk(AMQPMethodPayload): NAME = u'basic.recover-ok' - INDEX = (60, 111) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x3C\x00\x6F' # CLASS ID + METHOD ID + INDEX = (60, 111) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x3C\x00\x6F' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x3C\x00\x6F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicRecoverOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame basic.recover-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3908,20 +4201,28 @@ class TxCommit(AMQPMethodPayload): NAME = u'tx.commit' - INDEX = (90, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x5A\x00\x14' # CLASS ID + METHOD ID + INDEX = (90, 20) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x5A\x00\x14' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x14\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'TxCommit(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame tx.commit """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3940,20 +4241,28 @@ class TxCommitOk(AMQPMethodPayload): NAME = u'tx.commit-ok' - INDEX = (90, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x5A\x00\x15' # CLASS ID + METHOD ID + INDEX = (90, 21) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x5A\x00\x15' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'TxCommitOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame tx.commit-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -3976,20 +4285,28 @@ class TxRollback(AMQPMethodPayload): NAME = u'tx.rollback' - INDEX = (90, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x5A\x00\x1E' # CLASS ID + METHOD ID + INDEX = (90, 30) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x5A\x00\x1E' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x1E\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'TxRollback(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame tx.rollback """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -4008,20 +4325,28 @@ class TxRollbackOk(AMQPMethodPayload): NAME = u'tx.rollback-ok' - INDEX = (90, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x5A\x00\x1F' # CLASS ID + METHOD ID + INDEX = (90, 31) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x5A\x00\x1F' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x1F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'TxRollbackOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame tx.rollback-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -4041,20 +4366,28 @@ class TxSelect(AMQPMethodPayload): NAME = u'tx.select' - INDEX = (90, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x5A\x00\x0A' # CLASS ID + METHOD ID + INDEX = (90, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x5A\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x0A\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'TxSelect(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame tx.select """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -4073,20 +4406,28 @@ class TxSelectOk(AMQPMethodPayload): NAME = u'tx.select-ok' - INDEX = (90, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x5A\x00\x0B' # CLASS ID + METHOD ID + INDEX = (90, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x5A\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'TxSelectOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame tx.select-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -4125,23 +4466,30 @@ class ConfirmSelect(AMQPMethodPayload): The client can only use this method on a non-transactional channel. """ - __slots__ = (u'nowait',) + __slots__ = (u'nowait', ) NAME = u'confirm.select' - INDEX = (85, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x55\x00\x0A' # CLASS ID + METHOD ID + INDEX = (85, 10) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x55\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'nowait', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConfirmSelect(%s)' % (', '.join(map(repr, [self.nowait]))) + def __init__(self, nowait): """ Create frame confirm.select @@ -4157,7 +4505,7 @@ class ConfirmSelect(AMQPMethodPayload): def write_arguments(self, buf): buf.write(struct.pack('!B', (self.nowait << 0))) - + def get_size(self): return 1 @@ -4182,20 +4530,28 @@ class ConfirmSelectOk(AMQPMethodPayload): NAME = u'confirm.select-ok' - INDEX = (85, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x00\x55\x00\x0B' # CLASS ID + METHOD ID + INDEX = (85, 11) # (Class ID, Method ID) + BINARY_HEADER = b'\x00\x55\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False - IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x55\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConfirmSelectOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame confirm.select-ok """ + @staticmethod def from_buffer(buf, start_offset): offset = start_offset @@ -4267,6 +4623,7 @@ IDENT_TO_METHOD = { (50, 10): QueueDeclare, } + BINARY_HEADER_TO_METHOD = { b'\x00\x5A\x00\x15': TxCommitOk, b'\x00\x3C\x00\x64': BasicRecoverAsync, @@ -4332,6 +4689,7 @@ BINARY_HEADER_TO_METHOD = { b'\x00\x32\x00\x0A': QueueDeclare, } + CLASS_ID_TO_CONTENT_PROPERTY_LIST = { 60: BasicContentPropertyList, } @@ -4371,7 +4729,7 @@ REPLY_REASONS_FOR = { # Methods that are replies for other, ie. ConnectionOpenOk: ConnectionOpen # a method may be a reply for ONE or NONE other methods # if a method has no replies, it will have an empty list as value here -REPLIES_FOR = { +REPLIES_FOR= { BasicGetEmpty: [], BasicRecoverOk: [], BasicReturn: [], diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index fb35aa44359dc49245a9c4db688522286d334ae1..fbecd1e2700d82f890eff2fe532d44798986fa5d 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -3,6 +3,7 @@ from __future__ import absolute_import, division, print_function import logging import collections import monotonic +import uuid import time import socket import six @@ -44,8 +45,16 @@ def alert_watches(watches, trigger): continue if not any((watch_triggered, watch.oneshot, watch.cancelled)): - # Watch remains alive if it was NOT triggered, or it's NOT a oneshot + # Watch remains alive if it was NOT triggered, or it's NOT a oneshot or it's not cancelled alive_watches.append(watch) + elif not watch.oneshot and not watch.cancelled: + alive_watches.append(watch) + elif watch.oneshot and not watch_triggered: + alive_watches.append(watch) + + if set(alive_watches) != set(watches): + for removed_watch in set(watches)-set(alive_watches): + logger.debug('Removing watch %s', repr(removed_watch)) return alive_watches, watch_handled @@ -80,7 +89,7 @@ class Connection(object): """ self.listener_thread = listener_thread self.node_definition = node_definition - + self.uuid = uuid.uuid4().hex[:5] self.recvf = ReceivingFramer(self.on_frame) # todo a list doesn't seem like a very strong atomicity guarantee @@ -100,6 +109,10 @@ class Connection(object): self.heartbeat = None self.extensions = [] + # To be filled in later + self.listener_socket = None + self.sendf = None + def call_on_connected(self, callable): """ Register a callable to be called when this links to the server. @@ -117,7 +130,7 @@ class Connection(object): def on_connected(self): """Called by handshaker upon reception of final connection.open-ok""" - logger.info('Connection ready.') + logger.info('[%s] Connection ready.', self.uuid) self.state = ST_ONLINE @@ -145,7 +158,7 @@ class Connection(object): else: break - logger.debug('TCP connection established, authentication in progress') + logger.debug('[%s] TCP connection established, authentication in progress', self.uuid) sock.settimeout(0) sock.send(b'AMQP\x00\x00\x09\x01') @@ -204,7 +217,8 @@ class Connection(object): if isinstance(payload, ConnectionClose): self.send([AMQPMethodFrame(0, ConnectionCloseOk())]) - logger.info(u'Broker closed our connection - code %s reason %s', + logger.info(u'[%s] Broker closed our connection - code %s reason %s', + self.uuid, payload.reply_code, payload.reply_text.tobytes().decode('utf8')) @@ -244,7 +258,7 @@ class Connection(object): watch_handled = False # True if ANY watch handled this if isinstance(frame, AMQPMethodFrame): - logger.debug('Received %s', frame.payload.NAME) + logger.debug('[%s] Received %s', self.uuid, frame.payload.NAME) # ==================== process per-channel watches # @@ -273,7 +287,10 @@ class Connection(object): self.any_watches.append(watch) if not watch_handled: - logger.warn('Unhandled frame %s', frame) + if isinstance(frame, AMQPMethodFrame): + logger.warning('[%s] Unhandled method frame %s', self.uuid, repr(frame.payload)) + else: + logger.warning('[%s] Unhandled frame %s', self.uuid, frame) def watchdog(self, delay, callback): """ diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py index 3f7893f344a44c7516f13291b808da93868945ae..33f6eae2ffce2214ff145b8c3652e7ec9bac32f4 100644 --- a/coolamqp/uplink/connection/watches.py +++ b/coolamqp/uplink/connection/watches.py @@ -1,8 +1,12 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function +import logging from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeaderFrame, \ AMQPBodyFrame +from coolamqp.framing.base import AMQPMethodPayload + +logger = logging.getLogger(__name__) class Watch(object): @@ -123,10 +127,13 @@ class MethodWatch(Watch): self.callable = callable if isinstance(method_or_methods, (list, tuple)): self.methods = tuple(method_or_methods) - else: - self.methods = method_or_methods + elif issubclass(method_or_methods, AMQPMethodPayload): + self.methods = (method_or_methods, ) self.on_end = on_end + def __repr__(self): + return '<MethodWatch %s, %s, %s, on_end=%s>' % (self.channel, self.methods, self.callable, self.on_end) + def failed(self): if self.on_end is not None: self.on_end() diff --git a/tests/test_clustering/test_double.py b/tests/test_clustering/test_double.py index c6ae6beaf8c9d5886fef2c2969b8a03365a86b1a..b5dd6d11aef06b998940be6dcb42b786746cbcee 100644 --- a/tests/test_clustering/test_double.py +++ b/tests/test_clustering/test_double.py @@ -60,7 +60,7 @@ class TestDouble(unittest.TestCase): try: con2, fut2 = self.c2.consume(q, fail_on_first_time_resource_locked=True) - fut2.result() + fut2.result(timeout=20) except AMQPError as e: self.assertEquals(e.reply_code, RESOURCE_LOCKED) self.assertFalse(e.is_hard_error())