From 13485ae01400927de2601ea42892bbb2fe62fa60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Fri, 3 Jan 2020 18:03:31 +0100 Subject: [PATCH] fix tests --- compile_definitions.py | 9 +- coolamqp/framing/base.py | 6 +- coolamqp/framing/definitions.py | 817 +++++++++++++++++++++----------- tests/test_clustering/test_a.py | 63 +-- 4 files changed, 592 insertions(+), 303 deletions(-) diff --git a/compile_definitions.py b/compile_definitions.py index 57ff6e9..e8074c5 100644 --- a/compile_definitions.py +++ b/compile_definitions.py @@ -2,6 +2,7 @@ from __future__ import division import collections import struct +import subprocess from xml.etree import ElementTree import math @@ -42,7 +43,7 @@ Generated automatically by CoolAMQP from AMQP machine-readable specification. See coolamqp.uplink.framing.compilation for the tool AMQP is copyright (c) 2016 OASIS -CoolAMQP is copyright (c) 2016-2018 DMS Serwis s.c., 2018-2019 SMOK sp. z o.o. +CoolAMQP is copyright (c) 2016-2018 DMS Serwis s.c., 2018-2020 SMOK sp. z o.o. ########################################################### @@ -553,6 +554,6 @@ REPLIES_FOR = {\n''') if __name__ == '__main__': compile_definitions() - # proc = subprocess.run(['yapf', 'coolamqp/framing/definitions.py'], stdout=subprocess.PIPE) - # with open('coolamqp/framing/definitions.py', 'wb') as f_out: - # f_out.write(proc.stdout) + proc = subprocess.run(['yapf', 'coolamqp/framing/definitions.py'], stdout=subprocess.PIPE) + with open('coolamqp/framing/definitions.py', 'wb') as f_out: + f_out.write(proc.stdout) diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index d9dbc34..e41b69f 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -175,8 +175,8 @@ class AMQPMethodPayload(AMQPPayload): """ raise NotImplementedError() - @staticmethod - def from_buffer(buf, offset): # type: (buffer, int) -> AMQPMethodPayload + @classmethod + def from_buffer(cls, buf, offset): # type: (buffer, int) -> AMQPMethodPayload """ Construct this frame from a buffer @@ -184,7 +184,7 @@ class AMQPMethodPayload(AMQPPayload): :type buf: buffer or memoryview :param offset: offset the argument portion begins at :type offset: int - :return: tuple of (an instance of %s, amount of bytes consumed as int) + :return: tuple of (an instance of this class, amount of bytes consumed as int) :raise ValueError: invalid data """ raise NotImplementedError('') diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 9049e2d..f1adcef 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -7,7 +7,7 @@ Generated automatically by CoolAMQP from AMQP machine-readable specification. See coolamqp.uplink.framing.compilation for the tool AMQP is copyright (c) 2016 OASIS -CoolAMQP is copyright (c) 2016-2018 DMS Serwis s.c., 2018-2019 SMOK sp. z o.o. +CoolAMQP is copyright (c) 2016-2018 DMS Serwis s.c., 2018-2020 SMOK sp. z o.o. ########################################################### @@ -35,7 +35,8 @@ from coolamqp.framing.compilation.content_property import compile_particular_con logger = logging.getLogger(__name__) -Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved')) +Field = collections.namedtuple('Field', + ('name', 'type', 'basic_type', 'reserved')) # Core constants FRAME_METHOD = 1 @@ -151,12 +152,15 @@ NOT_IMPLEMENTED = 540 # normal operations. INTERNAL_ERROR = 541 -SOFT_ERRORS = [CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, RESOURCE_LOCKED, - PRECONDITION_FAILED] -HARD_ERRORS = [CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, SYNTAX_ERROR, COMMAND_INVALID, - CHANNEL_ERROR, UNEXPECTED_FRAME, RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, - INTERNAL_ERROR] - +SOFT_ERRORS = [ + CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, + RESOURCE_LOCKED, PRECONDITION_FAILED +] +HARD_ERRORS = [ + CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, SYNTAX_ERROR, + COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, RESOURCE_ERROR, + NOT_ALLOWED, NOT_IMPLEMENTED, INTERNAL_ERROR +] DOMAIN_TO_BASIC_TYPE = { u'class-id': u'short', @@ -185,6 +189,7 @@ DOMAIN_TO_BASIC_TYPE = { u'table': None, } + class Connection(AMQPClass): """ The connection class provides methods for a client to establish a @@ -216,7 +221,7 @@ class ConnectionBlocked(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reason', u'shortstr', u'shortstr', reserved=False), ] @@ -242,7 +247,8 @@ class ConnectionBlocked(AMQPMethodPayload): return 1 + len(self.reason) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionBlocked + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConnectionBlocked offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 @@ -275,7 +281,12 @@ class ConnectionClose(AMQPMethodPayload): the ID of the method. :type method_id: int, 16 bit unsigned (method-id in AMQP) """ - __slots__ = (u'reply_code', u'reply_text', u'class_id', u'method_id',) + __slots__ = ( + u'reply_code', + u'reply_text', + u'class_id', + u'method_id', + ) NAME = u'connection.close' @@ -288,7 +299,7 @@ class ConnectionClose(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reply-code', u'reply-code', u'short', reserved=False), Field(u'reply-text', u'reply-text', u'shortstr', reserved=False), Field(u'class-id', u'class-id', u'short', reserved=False), @@ -301,8 +312,10 @@ class ConnectionClose(AMQPMethodPayload): :return: Python string representation """ - return 'ConnectionClose(%s)' % ( - ', '.join(map(repr, [self.reply_code, self.reply_text, self.class_id, self.method_id]))) + return 'ConnectionClose(%s)' % (', '.join( + map(repr, [ + self.reply_code, self.reply_text, self.class_id, self.method_id + ]))) def __init__(self, reply_code, reply_text, class_id, method_id): """ @@ -322,7 +335,8 @@ class ConnectionClose(AMQPMethodPayload): return 7 + len(self.reply_text) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionClose + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConnectionClose offset = start_offset reply_code, s_len, = struct.unpack_from('!HB', buf, offset) offset += 3 @@ -364,7 +378,8 @@ class ConnectionCloseOk(AMQPMethodPayload): return 'ConnectionCloseOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionCloseOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConnectionCloseOk offset = start_offset return cls() @@ -398,7 +413,7 @@ class ConnectionOpen(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'virtual-host', u'path', u'shortstr', reserved=False), Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), Field(u'reserved-2', u'bit', u'bit', reserved=True), @@ -410,7 +425,8 @@ class ConnectionOpen(AMQPMethodPayload): :return: Python string representation """ - return 'ConnectionOpen(%s)' % (', '.join(map(repr, [self.virtual_host]))) + return 'ConnectionOpen(%s)' % (', '.join(map(repr, + [self.virtual_host]))) def __init__(self, virtual_host): """ @@ -428,7 +444,8 @@ class ConnectionOpen(AMQPMethodPayload): return 3 + len(self.virtual_host) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionOpen + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConnectionOpen offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 @@ -462,7 +479,7 @@ class ConnectionOpenOk(AMQPMethodPayload): STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x29\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), ] @@ -475,7 +492,8 @@ class ConnectionOpenOk(AMQPMethodPayload): return 'ConnectionOpenOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionOpenOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConnectionOpenOk offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 @@ -525,7 +543,12 @@ class ConnectionStart(AMQPMethodPayload): :type locales: binary type (longstr in AMQP) """ __slots__ = ( - u'version_major', u'version_minor', u'server_properties', u'mechanisms', u'locales',) + u'version_major', + u'version_minor', + u'server_properties', + u'mechanisms', + u'locales', + ) NAME = u'connection.start' @@ -541,7 +564,10 @@ class ConnectionStart(AMQPMethodPayload): FIELDS = [ Field(u'version-major', u'octet', u'octet', reserved=False), Field(u'version-minor', u'octet', u'octet', reserved=False), - Field(u'server-properties', u'peer-properties', u'table', reserved=False), + Field(u'server-properties', + u'peer-properties', + u'table', + reserved=False), Field(u'mechanisms', u'longstr', u'longstr', reserved=False), Field(u'locales', u'longstr', u'longstr', reserved=False), ] @@ -552,11 +578,14 @@ class ConnectionStart(AMQPMethodPayload): :return: Python string representation """ - return 'ConnectionStart(%s)' % (', '.join(map(repr, [self.version_major, self.version_minor, - self.server_properties, - self.mechanisms, self.locales]))) + return 'ConnectionStart(%s)' % (', '.join( + map(repr, [ + self.version_major, self.version_minor, self.server_properties, + self.mechanisms, self.locales + ]))) - def __init__(self, version_major, version_minor, server_properties, mechanisms, locales): + def __init__(self, version_major, version_minor, server_properties, + mechanisms, locales): """ Create frame connection.start """ @@ -575,11 +604,12 @@ class ConnectionStart(AMQPMethodPayload): buf.write(self.locales) def get_size(self): # type: () -> int - return 10 + frame_table_size(self.server_properties) + len(self.mechanisms) + len( - self.locales) + return 10 + frame_table_size(self.server_properties) + len( + self.mechanisms) + len(self.locales) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionStart + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConnectionStart offset = start_offset version_major, version_minor, = struct.unpack_from('!BB', buf, offset) offset += 2 @@ -593,7 +623,8 @@ class ConnectionStart(AMQPMethodPayload): offset += 4 locales = buf[offset:offset + s_len] offset += s_len - return cls(version_major, version_minor, server_properties, mechanisms, locales) + return cls(version_major, version_minor, server_properties, mechanisms, + locales) class ConnectionSecure(AMQPMethodPayload): @@ -624,7 +655,7 @@ class ConnectionSecure(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'challenge', u'longstr', u'longstr', reserved=False), ] @@ -634,7 +665,8 @@ class ConnectionSecure(AMQPMethodPayload): :return: Python string representation """ - return 'ConnectionSecure(%s)' % (', '.join(map(repr, [self.challenge]))) + return 'ConnectionSecure(%s)' % (', '.join(map(repr, + [self.challenge]))) def __init__(self, challenge): """ @@ -650,7 +682,8 @@ class ConnectionSecure(AMQPMethodPayload): return 4 + len(self.challenge) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionSecure + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConnectionSecure offset = start_offset s_len, = struct.unpack_from('!L', buf, offset) offset += 4 @@ -689,7 +722,12 @@ class ConnectionStartOk(AMQPMethodPayload): specified by the server. :type locale: binary type (max length 255) (shortstr in AMQP) """ - __slots__ = (u'client_properties', u'mechanism', u'response', u'locale',) + __slots__ = ( + u'client_properties', + u'mechanism', + u'response', + u'locale', + ) NAME = u'connection.start-ok' @@ -703,7 +741,10 @@ class ConnectionStartOk(AMQPMethodPayload): # See constructor pydoc for details FIELDS = [ - Field(u'client-properties', u'peer-properties', u'table', reserved=False), + Field(u'client-properties', + u'peer-properties', + u'table', + reserved=False), Field(u'mechanism', u'shortstr', u'shortstr', reserved=False), Field(u'response', u'longstr', u'longstr', reserved=False), Field(u'locale', u'shortstr', u'shortstr', reserved=False), @@ -716,7 +757,10 @@ class ConnectionStartOk(AMQPMethodPayload): :return: Python string representation """ return 'ConnectionStartOk(%s)' % (', '.join( - map(repr, [self.client_properties, self.mechanism, self.response, self.locale]))) + map(repr, [ + self.client_properties, self.mechanism, self.response, + self.locale + ]))) def __init__(self, client_properties, mechanism, response, locale): """ @@ -737,11 +781,12 @@ class ConnectionStartOk(AMQPMethodPayload): buf.write(self.locale) def get_size(self): # type: () -> int - return 6 + frame_table_size(self.client_properties) + len(self.mechanism) + len( - self.response) + len(self.locale) + return 6 + frame_table_size(self.client_properties) + len( + self.mechanism) + len(self.response) + len(self.locale) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionStartOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConnectionStartOk offset = start_offset client_properties, delta = deframe_table(buf, offset) offset += delta @@ -786,7 +831,7 @@ class ConnectionSecureOk(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'response', u'longstr', u'longstr', reserved=False), ] @@ -796,7 +841,8 @@ class ConnectionSecureOk(AMQPMethodPayload): :return: Python string representation """ - return 'ConnectionSecureOk(%s)' % (', '.join(map(repr, [self.response]))) + return 'ConnectionSecureOk(%s)' % (', '.join(map( + repr, [self.response]))) def __init__(self, response): """ @@ -812,7 +858,8 @@ class ConnectionSecureOk(AMQPMethodPayload): return 4 + len(self.response) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionSecureOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConnectionSecureOk offset = start_offset s_len, = struct.unpack_from('!L', buf, offset) offset += 4 @@ -849,7 +896,11 @@ class ConnectionTune(AMQPMethodPayload): Zero means the server does not want a heartbeat. :type heartbeat: int, 16 bit unsigned (short in AMQP) """ - __slots__ = (u'channel_max', u'frame_max', u'heartbeat',) + __slots__ = ( + u'channel_max', + u'frame_max', + u'heartbeat', + ) NAME = u'connection.tune' @@ -874,8 +925,8 @@ class ConnectionTune(AMQPMethodPayload): :return: Python string representation """ - return 'ConnectionTune(%s)' % ( - ', '.join(map(repr, [self.channel_max, self.frame_max, self.heartbeat]))) + return 'ConnectionTune(%s)' % (', '.join( + map(repr, [self.channel_max, self.frame_max, self.heartbeat]))) def __init__(self, channel_max, frame_max, heartbeat): """ @@ -886,15 +937,19 @@ class ConnectionTune(AMQPMethodPayload): self.heartbeat = heartbeat def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!HIH', self.channel_max, self.frame_max, self.heartbeat)) + buf.write( + struct.pack('!HIH', self.channel_max, self.frame_max, + self.heartbeat)) def get_size(self): # type: () -> int return 8 @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionTune + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConnectionTune offset = start_offset - channel_max, frame_max, heartbeat, = struct.unpack_from('!HIH', buf, offset) + channel_max, frame_max, heartbeat, = struct.unpack_from( + '!HIH', buf, offset) offset += 8 return cls(channel_max, frame_max, heartbeat) @@ -928,7 +983,11 @@ class ConnectionTuneOk(AMQPMethodPayload): means the client does not want a heartbeat. :type heartbeat: int, 16 bit unsigned (short in AMQP) """ - __slots__ = (u'channel_max', u'frame_max', u'heartbeat',) + __slots__ = ( + u'channel_max', + u'frame_max', + u'heartbeat', + ) NAME = u'connection.tune-ok' @@ -953,8 +1012,8 @@ class ConnectionTuneOk(AMQPMethodPayload): :return: Python string representation """ - return 'ConnectionTuneOk(%s)' % ( - ', '.join(map(repr, [self.channel_max, self.frame_max, self.heartbeat]))) + return 'ConnectionTuneOk(%s)' % (', '.join( + map(repr, [self.channel_max, self.frame_max, self.heartbeat]))) def __init__(self, channel_max, frame_max, heartbeat): """ @@ -965,15 +1024,19 @@ class ConnectionTuneOk(AMQPMethodPayload): self.heartbeat = heartbeat def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!HIH', self.channel_max, self.frame_max, self.heartbeat)) + buf.write( + struct.pack('!HIH', self.channel_max, self.frame_max, + self.heartbeat)) def get_size(self): # type: () -> int return 8 @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionTuneOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConnectionTuneOk offset = start_offset - channel_max, frame_max, heartbeat, = struct.unpack_from('!HIH', buf, offset) + channel_max, frame_max, heartbeat, = struct.unpack_from( + '!HIH', buf, offset) offset += 8 return cls(channel_max, frame_max, heartbeat) @@ -1006,7 +1069,8 @@ class ConnectionUnblocked(AMQPMethodPayload): return 'ConnectionUnblocked(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionUnblocked + def from_buffer( + cls, buf, start_offset): # type: (buffer, int) -> ConnectionUnblocked offset = start_offset return cls() @@ -1046,7 +1110,12 @@ class ChannelClose(AMQPMethodPayload): the ID of the method. :type method_id: int, 16 bit unsigned (method-id in AMQP) """ - __slots__ = (u'reply_code', u'reply_text', u'class_id', u'method_id',) + __slots__ = ( + u'reply_code', + u'reply_text', + u'class_id', + u'method_id', + ) NAME = u'channel.close' @@ -1059,7 +1128,7 @@ class ChannelClose(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reply-code', u'reply-code', u'short', reserved=False), Field(u'reply-text', u'reply-text', u'shortstr', reserved=False), Field(u'class-id', u'class-id', u'short', reserved=False), @@ -1072,8 +1141,10 @@ class ChannelClose(AMQPMethodPayload): :return: Python string representation """ - return 'ChannelClose(%s)' % ( - ', '.join(map(repr, [self.reply_code, self.reply_text, self.class_id, self.method_id]))) + return 'ChannelClose(%s)' % (', '.join( + map(repr, [ + self.reply_code, self.reply_text, self.class_id, self.method_id + ]))) def __init__(self, reply_code, reply_text, class_id, method_id): """ @@ -1093,7 +1164,8 @@ class ChannelClose(AMQPMethodPayload): return 7 + len(self.reply_text) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelClose + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ChannelClose offset = start_offset reply_code, s_len, = struct.unpack_from('!HB', buf, offset) offset += 3 @@ -1134,7 +1206,8 @@ class ChannelCloseOk(AMQPMethodPayload): return 'ChannelCloseOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelCloseOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ChannelCloseOk offset = start_offset return cls() @@ -1171,7 +1244,7 @@ class ChannelFlow(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'active', u'bit', u'bit', reserved=False), ] @@ -1196,7 +1269,8 @@ class ChannelFlow(AMQPMethodPayload): return 1 @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelFlow + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ChannelFlow offset = start_offset _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1231,7 +1305,7 @@ class ChannelFlowOk(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'active', u'bit', u'bit', reserved=False), ] @@ -1256,7 +1330,8 @@ class ChannelFlowOk(AMQPMethodPayload): return 1 @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelFlowOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ChannelFlowOk offset = start_offset _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -1285,7 +1360,7 @@ class ChannelOpen(AMQPMethodPayload): STATIC_CONTENT = b'\x00\x00\x00\x05\x00\x14\x00\x0A\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), ] @@ -1298,7 +1373,8 @@ class ChannelOpen(AMQPMethodPayload): return 'ChannelOpen(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelOpen + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ChannelOpen offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 @@ -1327,7 +1403,7 @@ class ChannelOpenOk(AMQPMethodPayload): STATIC_CONTENT = b'\x00\x00\x00\x05\x00\x14\x00\x0B\x00\x00\x00\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'longstr', u'longstr', reserved=True), ] @@ -1340,7 +1416,8 @@ class ChannelOpenOk(AMQPMethodPayload): return 'ChannelOpenOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelOpenOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ChannelOpenOk offset = start_offset s_len, = struct.unpack_from('!L', buf, offset) offset += 4 @@ -1382,7 +1459,13 @@ class ExchangeBind(AMQPMethodPayload): of these arguments depends on the exchange class. :type arguments: table. See coolamqp.uplink.framing.field_table (table in AMQP) """ - __slots__ = (u'destination', u'source', u'routing_key', u'no_wait', u'arguments',) + __slots__ = ( + u'destination', + u'source', + u'routing_key', + u'no_wait', + u'arguments', + ) NAME = u'exchange.bind' @@ -1395,7 +1478,7 @@ class ExchangeBind(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'destination', u'exchange-name', u'shortstr', reserved=False), Field(u'source', u'exchange-name', u'shortstr', reserved=False), @@ -1410,9 +1493,11 @@ class ExchangeBind(AMQPMethodPayload): :return: Python string representation """ - return 'ExchangeBind(%s)' % (', '.join(map(repr, - [self.destination, self.source, self.routing_key, - self.no_wait, self.arguments]))) + return 'ExchangeBind(%s)' % (', '.join( + map(repr, [ + self.destination, self.source, self.routing_key, self.no_wait, + self.arguments + ]))) def __init__(self, destination, source, routing_key, no_wait, arguments): """ @@ -1440,7 +1525,8 @@ class ExchangeBind(AMQPMethodPayload): self.routing_key) + frame_table_size(self.arguments) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeBind + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ExchangeBind offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 @@ -1491,7 +1577,8 @@ class ExchangeBindOk(AMQPMethodPayload): return 'ExchangeBindOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeBindOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ExchangeBindOk offset = start_offset return cls() @@ -1557,8 +1644,15 @@ class ExchangeDeclare(AMQPMethodPayload): :type arguments: table. See coolamqp.uplink.framing.field_table (table in AMQP) """ __slots__ = ( - u'exchange', u'type_', u'passive', u'durable', u'auto_delete', u'internal', u'no_wait', - u'arguments',) + u'exchange', + u'type_', + u'passive', + u'durable', + u'auto_delete', + u'internal', + u'no_wait', + u'arguments', + ) NAME = u'exchange.declare' @@ -1589,14 +1683,14 @@ class ExchangeDeclare(AMQPMethodPayload): :return: Python string representation """ - return 'ExchangeDeclare(%s)' % (', '.join(map(repr, - [self.exchange, self.type_, self.passive, - self.durable, self.auto_delete, - self.internal, self.no_wait, - self.arguments]))) + return 'ExchangeDeclare(%s)' % (', '.join( + map(repr, [ + self.exchange, self.type_, self.passive, self.durable, + self.auto_delete, self.internal, self.no_wait, self.arguments + ]))) - def __init__(self, exchange, type_, passive, durable, auto_delete, internal, no_wait, - arguments): + def __init__(self, exchange, type_, passive, durable, auto_delete, + internal, no_wait, arguments): """ Create frame exchange.declare """ @@ -1615,15 +1709,19 @@ class ExchangeDeclare(AMQPMethodPayload): buf.write(self.exchange) buf.write(struct.pack('!B', len(self.type_))) buf.write(self.type_) - buf.write(struct.pack('!B', (self.passive << 0) | (self.durable << 1) | ( - self.auto_delete << 2) | (self.internal << 3) | (self.no_wait << 4))) + buf.write( + struct.pack('!B', (self.passive << 0) | (self.durable << 1) | + (self.auto_delete << 2) | (self.internal << 3) | + (self.no_wait << 4))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int - return 5 + len(self.exchange) + len(self.type_) + frame_table_size(self.arguments) + return 5 + len(self.exchange) + len(self.type_) + frame_table_size( + self.arguments) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeDeclare + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ExchangeDeclare offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 @@ -1643,7 +1741,8 @@ class ExchangeDeclare(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return cls(exchange, type_, passive, durable, auto_delete, internal, no_wait, arguments) + return cls(exchange, type_, passive, durable, auto_delete, internal, + no_wait, arguments) class ExchangeDelete(AMQPMethodPayload): @@ -1665,7 +1764,11 @@ class ExchangeDelete(AMQPMethodPayload): :type if_unused: bool (bit in AMQP) :type no_wait: bool (no-wait in AMQP) """ - __slots__ = (u'exchange', u'if_unused', u'no_wait',) + __slots__ = ( + u'exchange', + u'if_unused', + u'no_wait', + ) NAME = u'exchange.delete' @@ -1678,7 +1781,7 @@ class ExchangeDelete(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), Field(u'if-unused', u'bit', u'bit', reserved=False), @@ -1691,8 +1794,8 @@ class ExchangeDelete(AMQPMethodPayload): :return: Python string representation """ - return 'ExchangeDelete(%s)' % ( - ', '.join(map(repr, [self.exchange, self.if_unused, self.no_wait]))) + return 'ExchangeDelete(%s)' % (', '.join( + map(repr, [self.exchange, self.if_unused, self.no_wait]))) def __init__(self, exchange, if_unused, no_wait): """ @@ -1706,13 +1809,15 @@ class ExchangeDelete(AMQPMethodPayload): buf.write(b'\x00\x00') buf.write(struct.pack('!B', len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', (self.if_unused << 0) | (self.no_wait << 1))) + buf.write( + struct.pack('!B', (self.if_unused << 0) | (self.no_wait << 1))) def get_size(self): # type: () -> int return 4 + len(self.exchange) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeDelete + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ExchangeDelete offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 @@ -1756,7 +1861,8 @@ class ExchangeDeclareOk(AMQPMethodPayload): return 'ExchangeDeclareOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeDeclareOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ExchangeDeclareOk offset = start_offset return cls() @@ -1789,7 +1895,8 @@ class ExchangeDeleteOk(AMQPMethodPayload): return 'ExchangeDeleteOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeDeleteOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ExchangeDeleteOk offset = start_offset return cls() @@ -1811,7 +1918,13 @@ class ExchangeUnbind(AMQPMethodPayload): Specifies the arguments of the binding to unbind. :type arguments: table. See coolamqp.uplink.framing.field_table (table in AMQP) """ - __slots__ = (u'destination', u'source', u'routing_key', u'no_wait', u'arguments',) + __slots__ = ( + u'destination', + u'source', + u'routing_key', + u'no_wait', + u'arguments', + ) NAME = u'exchange.unbind' @@ -1824,7 +1937,7 @@ class ExchangeUnbind(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'destination', u'exchange-name', u'shortstr', reserved=False), Field(u'source', u'exchange-name', u'shortstr', reserved=False), @@ -1839,9 +1952,11 @@ class ExchangeUnbind(AMQPMethodPayload): :return: Python string representation """ - return 'ExchangeUnbind(%s)' % (', '.join(map(repr, [self.destination, self.source, - self.routing_key, self.no_wait, - self.arguments]))) + return 'ExchangeUnbind(%s)' % (', '.join( + map(repr, [ + self.destination, self.source, self.routing_key, self.no_wait, + self.arguments + ]))) def __init__(self, destination, source, routing_key, no_wait, arguments): """ @@ -1869,7 +1984,8 @@ class ExchangeUnbind(AMQPMethodPayload): self.routing_key) + frame_table_size(self.arguments) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeUnbind + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ExchangeUnbind offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 @@ -1920,7 +2036,8 @@ class ExchangeUnbindOk(AMQPMethodPayload): return 'ExchangeUnbindOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeUnbindOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ExchangeUnbindOk offset = start_offset return cls() @@ -1979,7 +2096,13 @@ class QueueBind(AMQPMethodPayload): depends on the exchange class. :type arguments: table. See coolamqp.uplink.framing.field_table (table in AMQP) """ - __slots__ = (u'queue', u'exchange', u'routing_key', u'no_wait', u'arguments',) + __slots__ = ( + u'queue', + u'exchange', + u'routing_key', + u'no_wait', + u'arguments', + ) NAME = u'queue.bind' @@ -2008,7 +2131,10 @@ class QueueBind(AMQPMethodPayload): :return: Python string representation """ return 'QueueBind(%s)' % (', '.join( - map(repr, [self.queue, self.exchange, self.routing_key, self.no_wait, self.arguments]))) + map(repr, [ + self.queue, self.exchange, self.routing_key, self.no_wait, + self.arguments + ]))) def __init__(self, queue, exchange, routing_key, no_wait, arguments): """ @@ -2032,11 +2158,12 @@ class QueueBind(AMQPMethodPayload): enframe_table(buf, self.arguments) def get_size(self): # type: () -> int - return 6 + len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size( - self.arguments) + return 6 + len(self.queue) + len(self.exchange) + len( + self.routing_key) + frame_table_size(self.arguments) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueBind + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> QueueBind offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 @@ -2087,7 +2214,8 @@ class QueueBindOk(AMQPMethodPayload): return 'QueueBindOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueBindOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> QueueBindOk offset = start_offset return cls() @@ -2156,7 +2284,14 @@ class QueueDeclare(AMQPMethodPayload): :type arguments: table. See coolamqp.uplink.framing.field_table (table in AMQP) """ __slots__ = ( - u'queue', u'passive', u'durable', u'exclusive', u'auto_delete', u'no_wait', u'arguments',) + u'queue', + u'passive', + u'durable', + u'exclusive', + u'auto_delete', + u'no_wait', + u'arguments', + ) NAME = u'queue.declare' @@ -2186,11 +2321,14 @@ class QueueDeclare(AMQPMethodPayload): :return: Python string representation """ - return 'QueueDeclare(%s)' % (', '.join(map(repr, [self.queue, self.passive, self.durable, - self.exclusive, self.auto_delete, - self.no_wait, self.arguments]))) + return 'QueueDeclare(%s)' % (', '.join( + map(repr, [ + self.queue, self.passive, self.durable, self.exclusive, + self.auto_delete, self.no_wait, self.arguments + ]))) - def __init__(self, queue, passive, durable, exclusive, auto_delete, no_wait, arguments): + def __init__(self, queue, passive, durable, exclusive, auto_delete, + no_wait, arguments): """ Create frame queue.declare """ @@ -2206,16 +2344,18 @@ class QueueDeclare(AMQPMethodPayload): buf.write(b'\x00\x00') buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', - (self.passive << 0) | (self.durable << 1) | (self.exclusive << 2) | ( - self.auto_delete << 3) | (self.no_wait << 4))) + buf.write( + struct.pack('!B', (self.passive << 0) | (self.durable << 1) | + (self.exclusive << 2) | (self.auto_delete << 3) | + (self.no_wait << 4))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int return 4 + len(self.queue) + frame_table_size(self.arguments) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueDeclare + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> QueueDeclare offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 @@ -2231,7 +2371,8 @@ class QueueDeclare(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return cls(queue, passive, durable, exclusive, auto_delete, no_wait, arguments) + return cls(queue, passive, durable, exclusive, auto_delete, no_wait, + arguments) class QueueDelete(AMQPMethodPayload): @@ -2258,7 +2399,12 @@ class QueueDelete(AMQPMethodPayload): :type if_empty: bool (bit in AMQP) :type no_wait: bool (no-wait in AMQP) """ - __slots__ = (u'queue', u'if_unused', u'if_empty', u'no_wait',) + __slots__ = ( + u'queue', + u'if_unused', + u'if_empty', + u'no_wait', + ) NAME = u'queue.delete' @@ -2271,7 +2417,7 @@ class QueueDelete(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'if-unused', u'bit', u'bit', reserved=False), @@ -2285,8 +2431,9 @@ class QueueDelete(AMQPMethodPayload): :return: Python string representation """ - return 'QueueDelete(%s)' % ( - ', '.join(map(repr, [self.queue, self.if_unused, self.if_empty, self.no_wait]))) + return 'QueueDelete(%s)' % (', '.join( + map(repr, + [self.queue, self.if_unused, self.if_empty, self.no_wait]))) def __init__(self, queue, if_unused, if_empty, no_wait): """ @@ -2302,13 +2449,15 @@ class QueueDelete(AMQPMethodPayload): buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write( - struct.pack('!B', (self.if_unused << 0) | (self.if_empty << 1) | (self.no_wait << 2))) + struct.pack('!B', (self.if_unused << 0) | (self.if_empty << 1) | + (self.no_wait << 2))) def get_size(self): # type: () -> int return 4 + len(self.queue) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueDelete + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> QueueDelete offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 @@ -2342,7 +2491,11 @@ class QueueDeclareOk(AMQPMethodPayload): appear in this count. :type consumer_count: int, 32 bit unsigned (long in AMQP) """ - __slots__ = (u'queue', u'message_count', u'consumer_count',) + __slots__ = ( + u'queue', + u'message_count', + u'consumer_count', + ) NAME = u'queue.declare-ok' @@ -2355,7 +2508,7 @@ class QueueDeclareOk(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'message-count', u'message-count', u'long', reserved=False), Field(u'consumer-count', u'long', u'long', reserved=False), @@ -2367,8 +2520,8 @@ class QueueDeclareOk(AMQPMethodPayload): :return: Python string representation """ - return 'QueueDeclareOk(%s)' % ( - ', '.join(map(repr, [self.queue, self.message_count, self.consumer_count]))) + return 'QueueDeclareOk(%s)' % (', '.join( + map(repr, [self.queue, self.message_count, self.consumer_count]))) def __init__(self, queue, message_count, consumer_count): """ @@ -2387,7 +2540,8 @@ class QueueDeclareOk(AMQPMethodPayload): return 9 + len(self.queue) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueDeclareOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> QueueDeclareOk offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 @@ -2419,7 +2573,7 @@ class QueueDeleteOk(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'message-count', u'message-count', u'long', reserved=False), ] @@ -2429,7 +2583,8 @@ class QueueDeleteOk(AMQPMethodPayload): :return: Python string representation """ - return 'QueueDeleteOk(%s)' % (', '.join(map(repr, [self.message_count]))) + return 'QueueDeleteOk(%s)' % (', '.join(map(repr, + [self.message_count]))) def __init__(self, message_count): """ @@ -2444,7 +2599,8 @@ class QueueDeleteOk(AMQPMethodPayload): return 4 @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueDeleteOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> QueueDeleteOk offset = start_offset message_count, = struct.unpack_from('!I', buf, offset) offset += 4 @@ -2462,7 +2618,10 @@ class QueuePurge(AMQPMethodPayload): :type queue: binary type (max length 255) (queue-name in AMQP) :type no_wait: bool (no-wait in AMQP) """ - __slots__ = (u'queue', u'no_wait',) + __slots__ = ( + u'queue', + u'no_wait', + ) NAME = u'queue.purge' @@ -2475,7 +2634,7 @@ class QueuePurge(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'no-wait', u'no-wait', u'bit', reserved=False), @@ -2487,7 +2646,8 @@ class QueuePurge(AMQPMethodPayload): :return: Python string representation """ - return 'QueuePurge(%s)' % (', '.join(map(repr, [self.queue, self.no_wait]))) + return 'QueuePurge(%s)' % (', '.join( + map(repr, [self.queue, self.no_wait]))) def __init__(self, queue, no_wait): """ @@ -2506,7 +2666,8 @@ class QueuePurge(AMQPMethodPayload): return 4 + len(self.queue) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueuePurge + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> QueuePurge offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 @@ -2540,7 +2701,7 @@ class QueuePurgeOk(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'message-count', u'message-count', u'long', reserved=False), ] @@ -2550,7 +2711,8 @@ class QueuePurgeOk(AMQPMethodPayload): :return: Python string representation """ - return 'QueuePurgeOk(%s)' % (', '.join(map(repr, [self.message_count]))) + return 'QueuePurgeOk(%s)' % (', '.join(map(repr, + [self.message_count]))) def __init__(self, message_count): """ @@ -2565,7 +2727,8 @@ class QueuePurgeOk(AMQPMethodPayload): return 4 @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueuePurgeOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> QueuePurgeOk offset = start_offset message_count, = struct.unpack_from('!I', buf, offset) offset += 4 @@ -2588,7 +2751,12 @@ class QueueUnbind(AMQPMethodPayload): Specifies the arguments of the binding to unbind. :type arguments: table. See coolamqp.uplink.framing.field_table (table in AMQP) """ - __slots__ = (u'queue', u'exchange', u'routing_key', u'arguments',) + __slots__ = ( + u'queue', + u'exchange', + u'routing_key', + u'arguments', + ) NAME = u'queue.unbind' @@ -2601,7 +2769,7 @@ class QueueUnbind(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), @@ -2615,8 +2783,10 @@ class QueueUnbind(AMQPMethodPayload): :return: Python string representation """ - return 'QueueUnbind(%s)' % ( - ', '.join(map(repr, [self.queue, self.exchange, self.routing_key, self.arguments]))) + return 'QueueUnbind(%s)' % (', '.join( + map(repr, + [self.queue, self.exchange, self.routing_key, self.arguments + ]))) def __init__(self, queue, exchange, routing_key, arguments): """ @@ -2638,11 +2808,12 @@ class QueueUnbind(AMQPMethodPayload): enframe_table(buf, self.arguments) def get_size(self): # type: () -> int - return 5 + len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size( - self.arguments) + return 5 + len(self.queue) + len(self.exchange) + len( + self.routing_key) + frame_table_size(self.arguments) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueUnbind + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> QueueUnbind offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 @@ -2689,7 +2860,8 @@ class QueueUnbindOk(AMQPMethodPayload): return 'QueueUnbindOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueUnbindOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> QueueUnbindOk offset = start_offset return cls() @@ -2763,13 +2935,14 @@ class BasicContentPropertyList(AMQPContentPropertyList): :type reserved: binary type (max length 255) (AMQP as shortstr) """ zpf = bytearray([ - (('content_type' in kwargs) << 7) | (('content_encoding' in kwargs) << 6) | ( - ('headers' in kwargs) << 5) | (('delivery_mode' in kwargs) << 4) | ( - ('priority' in kwargs) << 3) | (('correlation_id' in kwargs) << 2) | ( - ('reply_to' in kwargs) << 1) | int('expiration' in kwargs), - (('message_id' in kwargs) << 7) | (('timestamp' in kwargs) << 6) | ( - ('type_' in kwargs) << 5) | (('user_id' in kwargs) << 4) | ( - ('app_id' in kwargs) << 3) | (('reserved' in kwargs) << 2) + (('content_type' in kwargs) << 7) | + (('content_encoding' in kwargs) << 6) | + (('headers' in kwargs) << 5) | (('delivery_mode' in kwargs) << 4) | + (('priority' in kwargs) << 3) | (('correlation_id' in kwargs) << 2) + | (('reply_to' in kwargs) << 1) | int('expiration' in kwargs), + (('message_id' in kwargs) << 7) | (('timestamp' in kwargs) << 6) | + (('type_' in kwargs) << 5) | (('user_id' in kwargs) << 4) | + (('app_id' in kwargs) << 3) | (('reserved' in kwargs) << 2) ]) zpf = six.binary_type(zpf) @@ -2793,35 +2966,41 @@ class BasicContentPropertyList(AMQPContentPropertyList): if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: return BasicContentPropertyList.PARTICULAR_CLASSES[zpf](**kwargs) else: - logger.debug('Property field (BasicContentPropertyList:%s) not seen yet, compiling', - repr(zpf)) - c = compile_particular_content_property_list_class(zpf, BasicContentPropertyList.FIELDS) + logger.debug( + 'Property field (BasicContentPropertyList:%s) not seen yet, compiling', + repr(zpf)) + c = compile_particular_content_property_list_class( + zpf, BasicContentPropertyList.FIELDS) BasicContentPropertyList.PARTICULAR_CLASSES[zpf] = c return c(**kwargs) @staticmethod def typize(*fields): # type: (*str) -> type zpf = bytearray([ - (('content_type' in fields) << 7) | (('content_encoding' in fields) << 6) | ( - ('headers' in fields) << 5) | (('delivery_mode' in fields) << 4) | ( - ('priority' in fields) << 3) | (('correlation_id' in fields) << 2) | ( - ('reply_to' in fields) << 1) | int('expiration' in kwargs), - (('message_id' in fields) << 7) | (('timestamp' in fields) << 6) | ( - ('type_' in fields) << 5) | (('user_id' in fields) << 4) | ( - ('app_id' in fields) << 3) | (('reserved' in fields) << 2) + (('content_type' in fields) << 7) | + (('content_encoding' in fields) << 6) | + (('headers' in fields) << 5) | (('delivery_mode' in fields) << 4) | + (('priority' in fields) << 3) | (('correlation_id' in fields) << 2) + | (('reply_to' in fields) << 1) | int('expiration' in kwargs), + (('message_id' in fields) << 7) | (('timestamp' in fields) << 6) | + (('type_' in fields) << 5) | (('user_id' in fields) << 4) | + (('app_id' in fields) << 3) | (('reserved' in fields) << 2) ]) zpf = six.binary_type(zpf) if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: return BasicContentPropertyList.PARTICULAR_CLASSES[zpf] else: - logger.debug('Property field (BasicContentPropertyList:%s) not seen yet, compiling', - repr(zpf)) - c = compile_particular_content_property_list_class(zpf, BasicContentPropertyList.FIELDS) + logger.debug( + 'Property field (BasicContentPropertyList:%s) not seen yet, compiling', + repr(zpf)) + c = compile_particular_content_property_list_class( + zpf, BasicContentPropertyList.FIELDS) BasicContentPropertyList.PARTICULAR_CLASSES[zpf] = c return c @staticmethod - def from_buffer(buf, offset): # type: (buffer, int) -> BasicContentPropertyList + def from_buffer(buf, + offset): # type: (buffer, int) -> BasicContentPropertyList """ Return a content property list instance unserialized from buffer, so that buf[offset] marks the start of property flags @@ -2834,13 +3013,17 @@ class BasicContentPropertyList(AMQPContentPropertyList): else: while buf[offset + pfl - 1] & 1: pfl += 2 - zpf = BasicContentPropertyList.zero_property_flags(buf[offset:offset + pfl]).tobytes() + zpf = BasicContentPropertyList.zero_property_flags(buf[offset:offset + + pfl]).tobytes() if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: - return BasicContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) + return BasicContentPropertyList.PARTICULAR_CLASSES[ + zpf].from_buffer(buf, offset) else: - logger.debug('Property field (BasicContentPropertyList:%s) not seen yet, compiling', - repr(zpf)) - c = compile_particular_content_property_list_class(zpf, BasicContentPropertyList.FIELDS) + logger.debug( + 'Property field (BasicContentPropertyList:%s) not seen yet, compiling', + repr(zpf)) + c = compile_particular_content_property_list_class( + zpf, BasicContentPropertyList.FIELDS) BasicContentPropertyList.PARTICULAR_CLASSES[zpf] = c return c.from_buffer(buf, offset) @@ -2866,7 +3049,10 @@ class BasicAck(AMQPMethodPayload): all outstanding messages. :type multiple: bool (bit in AMQP) """ - __slots__ = (u'delivery_tag', u'multiple',) + __slots__ = ( + u'delivery_tag', + u'multiple', + ) NAME = u'basic.ack' @@ -2879,7 +3065,7 @@ class BasicAck(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'multiple', u'bit', u'bit', reserved=False), ] @@ -2890,7 +3076,8 @@ class BasicAck(AMQPMethodPayload): :return: Python string representation """ - return 'BasicAck(%s)' % (', '.join(map(repr, [self.delivery_tag, self.multiple]))) + return 'BasicAck(%s)' % (', '.join( + map(repr, [self.delivery_tag, self.multiple]))) def __init__(self, delivery_tag, multiple): """ @@ -2947,7 +3134,14 @@ class BasicConsume(AMQPMethodPayload): :type arguments: table. See coolamqp.uplink.framing.field_table (table in AMQP) """ __slots__ = ( - u'queue', u'consumer_tag', u'no_local', u'no_ack', u'exclusive', u'no_wait', u'arguments',) + u'queue', + u'consumer_tag', + u'no_local', + u'no_ack', + u'exclusive', + u'no_wait', + u'arguments', + ) NAME = u'basic.consume' @@ -2977,12 +3171,14 @@ class BasicConsume(AMQPMethodPayload): :return: Python string representation """ - return 'BasicConsume(%s)' % (', '.join(map(repr, - [self.queue, self.consumer_tag, self.no_local, - self.no_ack, self.exclusive, self.no_wait, - self.arguments]))) + return 'BasicConsume(%s)' % (', '.join( + map(repr, [ + self.queue, self.consumer_tag, self.no_local, self.no_ack, + self.exclusive, self.no_wait, self.arguments + ]))) - def __init__(self, queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments): + def __init__(self, queue, consumer_tag, no_local, no_ack, exclusive, + no_wait, arguments): """ Create frame basic.consume """ @@ -3000,16 +3196,18 @@ class BasicConsume(AMQPMethodPayload): buf.write(self.queue) buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) - buf.write(struct.pack('!B', - (self.no_local << 0) | (self.no_ack << 1) | (self.exclusive << 2) | ( - self.no_wait << 3))) + buf.write( + struct.pack('!B', (self.no_local << 0) | (self.no_ack << 1) | + (self.exclusive << 2) | (self.no_wait << 3))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int - return 5 + len(self.queue) + len(self.consumer_tag) + frame_table_size(self.arguments) + return 5 + len(self.queue) + len(self.consumer_tag) + frame_table_size( + self.arguments) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicConsume + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicConsume offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 @@ -3028,7 +3226,8 @@ class BasicConsume(AMQPMethodPayload): offset += 1 arguments, delta = deframe_table(buf, offset) offset += delta - return cls(queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments) + return cls(queue, consumer_tag, no_local, no_ack, exclusive, no_wait, + arguments) class BasicCancel(AMQPMethodPayload): @@ -3056,7 +3255,10 @@ class BasicCancel(AMQPMethodPayload): :type consumer_tag: binary type (max length 255) (consumer-tag in AMQP) :type no_wait: bool (no-wait in AMQP) """ - __slots__ = (u'consumer_tag', u'no_wait',) + __slots__ = ( + u'consumer_tag', + u'no_wait', + ) NAME = u'basic.cancel' @@ -3069,7 +3271,7 @@ class BasicCancel(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), Field(u'no-wait', u'no-wait', u'bit', reserved=False), ] @@ -3080,7 +3282,8 @@ class BasicCancel(AMQPMethodPayload): :return: Python string representation """ - return 'BasicCancel(%s)' % (', '.join(map(repr, [self.consumer_tag, self.no_wait]))) + return 'BasicCancel(%s)' % (', '.join( + map(repr, [self.consumer_tag, self.no_wait]))) def __init__(self, consumer_tag, no_wait): """ @@ -3098,7 +3301,8 @@ class BasicCancel(AMQPMethodPayload): return 2 + len(self.consumer_tag) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicCancel + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicCancel offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 @@ -3135,7 +3339,7 @@ class BasicConsumeOk(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), ] @@ -3145,7 +3349,8 @@ class BasicConsumeOk(AMQPMethodPayload): :return: Python string representation """ - return 'BasicConsumeOk(%s)' % (', '.join(map(repr, [self.consumer_tag]))) + return 'BasicConsumeOk(%s)' % (', '.join(map(repr, + [self.consumer_tag]))) def __init__(self, consumer_tag): """ @@ -3161,7 +3366,8 @@ class BasicConsumeOk(AMQPMethodPayload): return 1 + len(self.consumer_tag) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicConsumeOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicConsumeOk offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 @@ -3190,7 +3396,7 @@ class BasicCancelOk(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), ] @@ -3200,7 +3406,8 @@ class BasicCancelOk(AMQPMethodPayload): :return: Python string representation """ - return 'BasicCancelOk(%s)' % (', '.join(map(repr, [self.consumer_tag]))) + return 'BasicCancelOk(%s)' % (', '.join(map(repr, + [self.consumer_tag]))) def __init__(self, consumer_tag): """ @@ -3216,7 +3423,8 @@ class BasicCancelOk(AMQPMethodPayload): return 1 + len(self.consumer_tag) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicCancelOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicCancelOk offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 @@ -3248,7 +3456,13 @@ class BasicDeliver(AMQPMethodPayload): was published. :type routing_key: binary type (max length 255) (shortstr in AMQP) """ - __slots__ = (u'consumer_tag', u'delivery_tag', u'redelivered', u'exchange', u'routing_key',) + __slots__ = ( + u'consumer_tag', + u'delivery_tag', + u'redelivered', + u'exchange', + u'routing_key', + ) NAME = u'basic.deliver' @@ -3261,7 +3475,7 @@ class BasicDeliver(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'redelivered', u'redelivered', u'bit', reserved=False), @@ -3275,11 +3489,14 @@ class BasicDeliver(AMQPMethodPayload): :return: Python string representation """ - return 'BasicDeliver(%s)' % (', '.join(map(repr, [self.consumer_tag, self.delivery_tag, - self.redelivered, self.exchange, - self.routing_key]))) + return 'BasicDeliver(%s)' % (', '.join( + map(repr, [ + self.consumer_tag, self.delivery_tag, self.redelivered, + self.exchange, self.routing_key + ]))) - def __init__(self, consumer_tag, delivery_tag, redelivered, exchange, routing_key): + def __init__(self, consumer_tag, delivery_tag, redelivered, exchange, + routing_key): """ Create frame basic.deliver """ @@ -3293,16 +3510,19 @@ class BasicDeliver(AMQPMethodPayload): buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) buf.write( - struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), len(self.exchange))) + struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), + len(self.exchange))) buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) def get_size(self): # type: () -> int - return 12 + len(self.consumer_tag) + len(self.exchange) + len(self.routing_key) + return 12 + len(self.consumer_tag) + len(self.exchange) + len( + self.routing_key) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicDeliver + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicDeliver offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 @@ -3320,7 +3540,8 @@ class BasicDeliver(AMQPMethodPayload): offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len - return cls(consumer_tag, delivery_tag, redelivered, exchange, routing_key) + return cls(consumer_tag, delivery_tag, redelivered, exchange, + routing_key) class BasicGet(AMQPMethodPayload): @@ -3336,7 +3557,10 @@ class BasicGet(AMQPMethodPayload): :type queue: binary type (max length 255) (queue-name in AMQP) :type no_ack: bool (no-ack in AMQP) """ - __slots__ = (u'queue', u'no_ack',) + __slots__ = ( + u'queue', + u'no_ack', + ) NAME = u'basic.get' @@ -3349,7 +3573,7 @@ class BasicGet(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'short', u'short', reserved=True), Field(u'queue', u'queue-name', u'shortstr', reserved=False), Field(u'no-ack', u'no-ack', u'bit', reserved=False), @@ -3361,7 +3585,8 @@ class BasicGet(AMQPMethodPayload): :return: Python string representation """ - return 'BasicGet(%s)' % (', '.join(map(repr, [self.queue, self.no_ack]))) + return 'BasicGet(%s)' % (', '.join(map(repr, + [self.queue, self.no_ack]))) def __init__(self, queue, no_ack): """ @@ -3414,7 +3639,13 @@ class BasicGetOk(AMQPMethodPayload): :type routing_key: binary type (max length 255) (shortstr in AMQP) :type message_count: int, 32 bit unsigned (message-count in AMQP) """ - __slots__ = (u'delivery_tag', u'redelivered', u'exchange', u'routing_key', u'message_count',) + __slots__ = ( + u'delivery_tag', + u'redelivered', + u'exchange', + u'routing_key', + u'message_count', + ) NAME = u'basic.get-ok' @@ -3427,7 +3658,7 @@ class BasicGetOk(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'redelivered', u'redelivered', u'bit', reserved=False), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), @@ -3441,11 +3672,14 @@ class BasicGetOk(AMQPMethodPayload): :return: Python string representation """ - return 'BasicGetOk(%s)' % (', '.join(map(repr, [self.delivery_tag, self.redelivered, - self.exchange, self.routing_key, - self.message_count]))) + return 'BasicGetOk(%s)' % (', '.join( + map(repr, [ + self.delivery_tag, self.redelivered, self.exchange, + self.routing_key, self.message_count + ]))) - def __init__(self, delivery_tag, redelivered, exchange, routing_key, message_count): + def __init__(self, delivery_tag, redelivered, exchange, routing_key, + message_count): """ Create frame basic.get-ok """ @@ -3457,7 +3691,8 @@ class BasicGetOk(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write( - struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), len(self.exchange))) + struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), + len(self.exchange))) buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) @@ -3467,7 +3702,8 @@ class BasicGetOk(AMQPMethodPayload): return 15 + len(self.exchange) + len(self.routing_key) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicGetOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicGetOk offset = start_offset delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) offset += 8 @@ -3483,7 +3719,8 @@ class BasicGetOk(AMQPMethodPayload): offset += s_len message_count, = struct.unpack_from('!I', buf, offset) offset += 4 - return cls(delivery_tag, redelivered, exchange, routing_key, message_count) + return cls(delivery_tag, redelivered, exchange, routing_key, + message_count) class BasicGetEmpty(AMQPMethodPayload): @@ -3508,7 +3745,7 @@ class BasicGetEmpty(AMQPMethodPayload): STATIC_CONTENT = b'\x00\x00\x00\x0D\x00\x3C\x00\x48\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), ] @@ -3521,7 +3758,8 @@ class BasicGetEmpty(AMQPMethodPayload): return 'BasicGetEmpty(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicGetEmpty + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicGetEmpty offset = start_offset s_len, = struct.unpack_from('!B', buf, offset) offset += 1 @@ -3560,7 +3798,11 @@ class BasicNack(AMQPMethodPayload): Clients receiving the Nack methods should ignore this flag. :type requeue: bool (bit in AMQP) """ - __slots__ = (u'delivery_tag', u'multiple', u'requeue',) + __slots__ = ( + u'delivery_tag', + u'multiple', + u'requeue', + ) NAME = u'basic.nack' @@ -3573,7 +3815,7 @@ class BasicNack(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'multiple', u'bit', u'bit', reserved=False), Field(u'requeue', u'bit', u'bit', reserved=False), @@ -3585,8 +3827,8 @@ class BasicNack(AMQPMethodPayload): :return: Python string representation """ - return 'BasicNack(%s)' % ( - ', '.join(map(repr, [self.delivery_tag, self.multiple, self.requeue]))) + return 'BasicNack(%s)' % (', '.join( + map(repr, [self.delivery_tag, self.multiple, self.requeue]))) def __init__(self, delivery_tag, multiple, requeue): """ @@ -3597,13 +3839,16 @@ class BasicNack(AMQPMethodPayload): self.requeue = requeue def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!QB', self.delivery_tag, (self.multiple << 0) | (self.requeue << 1))) + buf.write( + struct.pack('!QB', self.delivery_tag, + (self.multiple << 0) | (self.requeue << 1))) def get_size(self): # type: () -> int return 9 @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicNack + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicNack offset = start_offset delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) offset += 8 @@ -3653,7 +3898,12 @@ class BasicPublish(AMQPMethodPayload): ever be consumed. :type immediate: bool (bit in AMQP) """ - __slots__ = (u'exchange', u'routing_key', u'mandatory', u'immediate',) + __slots__ = ( + u'exchange', + u'routing_key', + u'mandatory', + u'immediate', + ) NAME = u'basic.publish' @@ -3680,8 +3930,10 @@ class BasicPublish(AMQPMethodPayload): :return: Python string representation """ - return 'BasicPublish(%s)' % ( - ', '.join(map(repr, [self.exchange, self.routing_key, self.mandatory, self.immediate]))) + return 'BasicPublish(%s)' % (', '.join( + map(repr, [ + self.exchange, self.routing_key, self.mandatory, self.immediate + ]))) def __init__(self, exchange, routing_key, mandatory, immediate): """ @@ -3698,13 +3950,15 @@ class BasicPublish(AMQPMethodPayload): buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) - buf.write(struct.pack('!B', (self.mandatory << 0) | (self.immediate << 1))) + buf.write( + struct.pack('!B', (self.mandatory << 0) | (self.immediate << 1))) def get_size(self): # type: () -> int return 5 + len(self.exchange) + len(self.routing_key) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicPublish + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicPublish offset = start_offset s_len, = struct.unpack_from('!2xB', buf, offset) offset += 3 @@ -3774,7 +4028,11 @@ class BasicQos(AMQPMethodPayload): settings should apply per-channel. :type global_: bool (bit in AMQP) """ - __slots__ = (u'prefetch_size', u'prefetch_count', u'global_',) + __slots__ = ( + u'prefetch_size', + u'prefetch_count', + u'global_', + ) NAME = u'basic.qos' @@ -3799,8 +4057,9 @@ class BasicQos(AMQPMethodPayload): :return: Python string representation """ - return 'BasicQos(%s)' % ( - ', '.join(map(repr, [self.prefetch_size, self.prefetch_count, self.global_]))) + return 'BasicQos(%s)' % (', '.join( + map(repr, + [self.prefetch_size, self.prefetch_count, self.global_]))) def __init__(self, prefetch_size, prefetch_count, global_): """ @@ -3811,7 +4070,9 @@ class BasicQos(AMQPMethodPayload): self.global_ = global_ def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!IHB', self.prefetch_size, self.prefetch_count, (self.global_ << 0))) + buf.write( + struct.pack('!IHB', self.prefetch_size, self.prefetch_count, + (self.global_ << 0))) def get_size(self): # type: () -> int return 7 @@ -3819,7 +4080,8 @@ class BasicQos(AMQPMethodPayload): @classmethod def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicQos offset = start_offset - prefetch_size, prefetch_count, _bit, = struct.unpack_from('!IHB', buf, offset) + prefetch_size, prefetch_count, _bit, = struct.unpack_from( + '!IHB', buf, offset) offset += 6 global_ = bool(_bit >> 0) offset += 1 @@ -3858,7 +4120,8 @@ class BasicQosOk(AMQPMethodPayload): return 'BasicQosOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicQosOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicQosOk offset = start_offset return cls() @@ -3885,7 +4148,12 @@ class BasicReturn(AMQPMethodPayload): was published. :type routing_key: binary type (max length 255) (shortstr in AMQP) """ - __slots__ = (u'reply_code', u'reply_text', u'exchange', u'routing_key',) + __slots__ = ( + u'reply_code', + u'reply_text', + u'exchange', + u'routing_key', + ) NAME = u'basic.return' @@ -3898,7 +4166,7 @@ class BasicReturn(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'reply-code', u'reply-code', u'short', reserved=False), Field(u'reply-text', u'reply-text', u'shortstr', reserved=False), Field(u'exchange', u'exchange-name', u'shortstr', reserved=False), @@ -3912,7 +4180,10 @@ class BasicReturn(AMQPMethodPayload): :return: Python string representation """ return 'BasicReturn(%s)' % (', '.join( - map(repr, [self.reply_code, self.reply_text, self.exchange, self.routing_key]))) + map(repr, [ + self.reply_code, self.reply_text, self.exchange, + self.routing_key + ]))) def __init__(self, reply_code, reply_text, exchange, routing_key): """ @@ -3932,10 +4203,12 @@ class BasicReturn(AMQPMethodPayload): buf.write(self.routing_key) def get_size(self): # type: () -> int - return 5 + len(self.reply_text) + len(self.exchange) + len(self.routing_key) + return 5 + len(self.reply_text) + len(self.exchange) + len( + self.routing_key) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicReturn + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicReturn offset = start_offset reply_code, s_len, = struct.unpack_from('!HB', buf, offset) offset += 3 @@ -3969,7 +4242,10 @@ class BasicReject(AMQPMethodPayload): discarded or dead-lettered. :type requeue: bool (bit in AMQP) """ - __slots__ = (u'delivery_tag', u'requeue',) + __slots__ = ( + u'delivery_tag', + u'requeue', + ) NAME = u'basic.reject' @@ -3982,7 +4258,7 @@ class BasicReject(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'delivery-tag', u'delivery-tag', u'longlong', reserved=False), Field(u'requeue', u'bit', u'bit', reserved=False), ] @@ -3993,7 +4269,8 @@ class BasicReject(AMQPMethodPayload): :return: Python string representation """ - return 'BasicReject(%s)' % (', '.join(map(repr, [self.delivery_tag, self.requeue]))) + return 'BasicReject(%s)' % (', '.join( + map(repr, [self.delivery_tag, self.requeue]))) def __init__(self, delivery_tag, requeue): """ @@ -4009,7 +4286,8 @@ class BasicReject(AMQPMethodPayload): return 9 @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicReject + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicReject offset = start_offset delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) offset += 8 @@ -4048,7 +4326,7 @@ class BasicRecoverAsync(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'requeue', u'bit', u'bit', reserved=False), ] @@ -4073,7 +4351,8 @@ class BasicRecoverAsync(AMQPMethodPayload): return 1 @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicRecoverAsync + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicRecoverAsync offset = start_offset _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -4112,7 +4391,7 @@ class BasicRecover(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'requeue', u'bit', u'bit', reserved=False), ] @@ -4137,7 +4416,8 @@ class BasicRecover(AMQPMethodPayload): return 1 @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicRecover + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicRecover offset = start_offset _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -4174,7 +4454,8 @@ class BasicRecoverOk(AMQPMethodPayload): return 'BasicRecoverOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicRecoverOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> BasicRecoverOk offset = start_offset return cls() @@ -4269,7 +4550,8 @@ class TxCommitOk(AMQPMethodPayload): return 'TxCommitOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> TxCommitOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> TxCommitOk offset = start_offset return cls() @@ -4308,7 +4590,8 @@ class TxRollback(AMQPMethodPayload): return 'TxRollback(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> TxRollback + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> TxRollback offset = start_offset return cls() @@ -4343,7 +4626,8 @@ class TxRollbackOk(AMQPMethodPayload): return 'TxRollbackOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> TxRollbackOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> TxRollbackOk offset = start_offset return cls() @@ -4414,7 +4698,8 @@ class TxSelectOk(AMQPMethodPayload): return 'TxSelectOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> TxSelectOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> TxSelectOk offset = start_offset return cls() @@ -4470,7 +4755,7 @@ class ConfirmSelect(AMQPMethodPayload): IS_CONTENT_STATIC = False # this means that argument part has always the same content # See constructor pydoc for details - FIELDS = [ + FIELDS = [ Field(u'nowait', u'bit', u'bit', reserved=False), ] @@ -4495,7 +4780,8 @@ class ConfirmSelect(AMQPMethodPayload): return 1 @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConfirmSelect + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConfirmSelect offset = start_offset _bit, = struct.unpack_from('!B', buf, offset) offset += 0 @@ -4533,7 +4819,8 @@ class ConfirmSelectOk(AMQPMethodPayload): return 'ConfirmSelectOk(%s)' % (', '.join(map(repr, []))) @classmethod - def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConfirmSelectOk + def from_buffer(cls, buf, + start_offset): # type: (buffer, int) -> ConfirmSelectOk offset = start_offset return cls() @@ -4603,7 +4890,6 @@ IDENT_TO_METHOD = { (85, 11): ConfirmSelectOk, } - BINARY_HEADER_TO_METHOD = { b'\x00\x0A\x00\x3C': ConnectionBlocked, b'\x00\x0A\x00\x32': ConnectionClose, @@ -4669,7 +4955,6 @@ BINARY_HEADER_TO_METHOD = { b'\x00\x55\x00\x0B': ConfirmSelectOk, } - CLASS_ID_TO_CONTENT_PROPERTY_LIST = { 60: BasicContentPropertyList, } diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 2f4fd62..7b15d22 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -3,15 +3,18 @@ Test things """ from __future__ import print_function, absolute_import, division -import six + +import logging +import monotonic import os +import time import unittest -import time, logging, threading, monotonic -from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, \ - ReceivedMessage, Exchange -from coolamqp.clustering import Cluster, MessageReceived, NothingMuch -import time +import six + +from coolamqp.clustering import Cluster, MessageReceived, NothingMuch +from coolamqp.objects import Message, NodeDefinition, Queue, \ + ReceivedMessage, Exchange NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) @@ -56,7 +59,7 @@ class TestA(unittest.TestCase): self.assertTrue(monotonic.monotonic() - a >= 4) def test_set_qos_but_later(self): - con, fut = self.c.consume(Queue(u'hello', exclusive=True)) + con, fut = self.c.consume(Queue(u'hello2', exclusive=True)) fut.result() @@ -83,10 +86,10 @@ class TestA(unittest.TestCase): self.assertIsInstance(e, ReceivedMessage) P['q'] = True - con, fut = self.c.consume(Queue(u'hello', exclusive=True), + con, fut = self.c.consume(Queue(u'hello3', exclusive=True), on_message=ok, no_ack=True) fut.result() - self.c.publish(Message(b''), routing_key=u'hello', tx=True).result() + self.c.publish(Message(b''), routing_key=u'hello3', tx=True).result() time.sleep(1) @@ -97,22 +100,22 @@ class TestA(unittest.TestCase): def ok(e): self.assertIsInstance(e, ReceivedMessage) - self.assertEquals(e.body, b'hello') + self.assertEquals(e.body, b'hello4') # bcoz u can compare memoryviews to their providers :D self.assertEquals(e.properties.content_type, b'text/plain') self.assertEquals(e.properties.content_encoding, b'utf8') P['q'] = True - con, fut = self.c.consume(Queue(u'hello', exclusive=True), + con, fut = self.c.consume(Queue(u'hello4', exclusive=True), on_message=ok, no_ack=True) fut.result() - self.c.publish(Message(b'hello', properties={ + self.c.publish(Message(b'hello4', properties={ 'content_type': b'text/plain', 'content_encoding': b'utf8' }), routing_key=u'hello', confirm=True).result() self.assertRaises(RuntimeError, - lambda: self.c.publish(Message(b'hello', properties={ + lambda: self.c.publish(Message(b'hello4', properties={ 'content_type': b'text/plain', 'content_encoding': b'utf8' }), routing_key=u'hello', confirm=True, @@ -127,16 +130,16 @@ class TestA(unittest.TestCase): def ok(e): self.assertIsInstance(e, ReceivedMessage) - self.assertEquals(e.body, b'hello') + self.assertEquals(e.body, b'hello5') # bcoz u can compare memoryviews to their providers :D self.assertEquals(e.properties.content_type, b'text/plain') self.assertEquals(e.properties.content_encoding, b'utf8') P['q'] = True - con, fut = self.c.consume(Queue(u'hello', exclusive=True), + con, fut = self.c.consume(Queue(u'hello5', exclusive=True), on_message=ok, no_ack=True) fut.result() - self.c.publish(Message(b'hello', properties={ + self.c.publish(Message(b'hello5', properties={ 'content_type': b'text/plain', 'content_encoding': b'utf8' }), routing_key=u'hello', tx=True).result() @@ -152,13 +155,13 @@ class TestA(unittest.TestCase): def ok(e): self.assertIsInstance(e, ReceivedMessage) - self.assertEquals(e.body, b'hello') + self.assertEquals(e.body, b'hello6') P['q'] = True - con, fut = self.c.consume(Queue(u'hello', exclusive=True), + con, fut = self.c.consume(Queue(u'hello6', exclusive=True), on_message=ok, no_ack=True) fut.result() - self.c.publish(Message(b'hello'), routing_key=u'hello', + self.c.publish(Message(b'hello6'), routing_key=u'hello', tx=True).result() time.sleep(1) @@ -169,19 +172,19 @@ class TestA(unittest.TestCase): """single and multi frame""" from coolamqp.attaches import BodyReceiveMode - con, fut = self.c.consume(Queue(u'hello', exclusive=True), no_ack=True, + con, fut = self.c.consume(Queue(u'hello7', exclusive=True), no_ack=True, body_receive_mode=BodyReceiveMode.MEMORYVIEW) fut.result() - data = b'hello' - self.c.publish(Message(data), routing_key=u'hello', confirm=True) + data = b'hello7' + self.c.publish(Message(data), routing_key=u'hello7', confirm=True) m = self.c.drain(2) self.assertIsInstance(m, MessageReceived) self.assertIsInstance(m.body, memoryview) self.assertEquals(m.body, data) data = six.binary_type(os.urandom(512 * 1024)) - self.c.publish(Message(data), routing_key=u'hello', confirm=True) + self.c.publish(Message(data), routing_key=u'hello7', confirm=True) m = self.c.drain(9) self.assertIsInstance(m, MessageReceived) self.assertIsInstance(m.body, memoryview) @@ -192,19 +195,19 @@ class TestA(unittest.TestCase): """single and multi frame""" from coolamqp.attaches import BodyReceiveMode - con, fut = self.c.consume(Queue(u'hello', exclusive=True), no_ack=True, + con, fut = self.c.consume(Queue(u'hello8', exclusive=True), no_ack=True, body_receive_mode=BodyReceiveMode.LIST_OF_MEMORYVIEW) fut.result() - data = b'hello' - self.c.publish(Message(data), routing_key=u'hello', confirm=True) + data = b'hello8' + self.c.publish(Message(data), routing_key=u'hello8', confirm=True) m = self.c.drain(1) self.assertIsInstance(m, MessageReceived) self.assertIsInstance(m.body[0], memoryview) self.assertEquals(m.body[0], data) data = six.binary_type(os.urandom(512 * 1024)) - self.c.publish(Message(data), routing_key=u'hello', confirm=True) + self.c.publish(Message(data), routing_key=u'hello7', confirm=True) m = self.c.drain(5) self.assertIsInstance(m, MessageReceived) self.assertTrue(all([isinstance(x, memoryview) for x in m.body])) @@ -212,16 +215,16 @@ class TestA(unittest.TestCase): def test_consumer_cancel(self): con, fut = self.c.consume( - Queue(u'hello', exclusive=True, auto_delete=True)) + Queue(u'hello9', exclusive=True, auto_delete=True)) fut.result() con.cancel().result() def test_drain_1(self): con, fut = self.c.consume( - Queue(u'hello', exclusive=True, auto_delete=True)) + Queue(u'helloA', exclusive=True, auto_delete=True)) fut.result() - self.c.publish(Message(b'ioi'), routing_key=u'hello') + self.c.publish(Message(b'ioi'), routing_key=u'helloA') self.assertIsInstance(self.c.drain(2), MessageReceived) self.assertIsInstance(self.c.drain(1), NothingMuch) -- GitLab