diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index ce5319425179f2b8aa4358ca2b1642767c39ab52..bebc6deb77e8e27bee730334b054ca75329fc384 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -32,12 +32,6 @@ def translate_exceptions(fun): amqp.exceptions.NotFound, amqp.exceptions.AccessRefused) as e: - try: - e.reply_code - except AttributeError: - - - if e. raise RemoteAMQPError(e.reply_code, e.reply_text) except (IOError, diff --git a/coolamqp/framing/frames/__init__.py b/coolamqp/framing/frames/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..56b1f03091d659efcc8cb8055b7fd3d63d0b01fe --- /dev/null +++ b/coolamqp/framing/frames/__init__.py @@ -0,0 +1,8 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + +""" +Definitions of frames. + +machine.py is machine-generated from AMQP specification +""" \ No newline at end of file diff --git a/coolamqp/framing/frames/base.py b/coolamqp/framing/frames/base.py new file mode 100644 index 0000000000000000000000000000000000000000..4d4a870f660b92e7aac7db28e072b0de86bd093c --- /dev/null +++ b/coolamqp/framing/frames/base.py @@ -0,0 +1,161 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import struct +import logging +import six + +logger = logging.getLogger(__name__) + + +# name => (length|None, struct ID|None, reserved-field-value : for struct if structable, bytes else, length of default) +BASIC_TYPES = {'bit': (None, None, "0", None), # special case + 'octet': (1, 'B', "b'\\x00'", 1), + 'short': (2, 'H', "b'\\x00\\x00'", 2), + 'long': (4, 'I', "b'\\x00\\x00\\x00\\x00'", 4), + 'longlong': (8, 'Q', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), + 'timestamp': (8, 'L', "b'\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00'", 8), + 'table': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case + 'longstr': (None, None, "b'\\x00\\x00\\x00\\x00'", 4), # special case + 'shortstr': (None, None, "b'\\x00'", 1), # special case + } + +DYNAMIC_BASIC_TYPES = ('table', 'longstr', 'shortstr') + + +def dec_to_bytes(buf, v): + dps = 0 + for k in six.moves.xrange(20): + k = v * (10 ** dps) + if abs(k-int(k)) < 0.00001: # epsilon + return buf.write(struct.pack('!BI', dps, k)) + + logger.critical('Cannot convert %s to decimal, substituting with 0', repr(v)) + buf.write(b'\x00\x00\x00\x00') + + +FIELD_TYPES = { + # length, struct, (option)formatter_to_bytes + 't': (1, '!?'), # boolean + 'b': (1, '!b'), + 'B': (1, '!B'), + 'U': (2, '!H'), + 'u': (2, '!h'), + 'I': (4, '!I'), + 'i': (4, '!i'), + 'L': (8, '!Q'), + 'l': (8, '!q'), + 'f': (4, '!f'), + 'd': (8, '!d'), + 'D': (5, None, dec_to_bytes), # decimal-value + 's': (None, None, lambda buf, v: buf.write(struct.pack('B', len(v)) + v)), # short-string + 'S': (None, None, lambda buf, v: buf.write(struct.pack('!I', len(v)) + v)), # long-string + 'A': (None, None), # field-array + 'T': (8, '!Q'), + 'V': (0, ''), +} + + + + +""" +A table is of form: +[ + (name::bytes, value::any, type::bytes(len=1)), + ... + +] +""" + + +def _enframe_table(buf, table): + """ + Write AMQP table to buffer + :param buf: + :param table: + :return: + """ + buf.write(struct.pack('!L', _frame_table_size(table))) + + for name, value, type in table: + buf.write(struct.pack('!B', len(name))) + buf.write(name) + buf.write(type) + + opt = FIELD_TYPES[opt] + + if type == 'F': # nice one + _enframe_table(buf, value) + elif type == 'V': + continue + elif len(opt) == 2: # can autoframe + buf.write(struct.pack(opt[1], value)) + else: + opt[2](buf, value) + + +def _deframe_table(buf, start_offset): # helper - convert bytes to table + """:return: tuple (table, bytes consumed)""" + + +def _frame_table_size(table): + """:return: length of table representation, in bytes, WITHOUT length header""" + + +class AMQPClass(object): + pass + + +class AMQPPayload(object): + """Payload is something that can write itself to bytes, + or at least provide a buffer to do it.""" + + def write_arguments(self, buf): + """ + Emit itself into a buffer + + :param buf: buffer to write to (will be written using .write) + """ + + +class AMQPMethod(object): + RESPONSE_TO = None + REPLY_WITH = [] + FIELDS = [] + + def get_size(self): + """ + Calculate the size of this frame. + + :return: int, size of argument section + """ + raise NotImplementedError() + + def write_arguments(self, buf): + """ + Write the argument portion of this frame into buffer. + + :param buf: buffer to write to + :return: how many bytes written + """ + raise NotImplementedError() + + @staticmethod + def from_buffer(buf, offset): + """ + Construct this frame from a buffer + + :param buf: a buffer to construct the frame from + :type buf: buffer or memoryview + :param offset: offset the argument portion begins at + :type offset: int + :return: tuple of (an instance of %s, amount of bytes consumed as int) + """ + raise NotImplementedError('') + + + + +class AMQPFrame(object): + def __init__(self, channel, payload): + self.channel = channel + self.payload = payload \ No newline at end of file diff --git a/coolamqp/framing/frames/human.py b/coolamqp/framing/frames/human.py new file mode 100644 index 0000000000000000000000000000000000000000..bbb560a4dd2b797e88e4414b883fb19f20993261 --- /dev/null +++ b/coolamqp/framing/frames/human.py @@ -0,0 +1,12 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + + + +def decode_frame(type_, payload): + """ + + :param type_: + :param payload: + :return: + """ \ No newline at end of file diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/frames/machine.py similarity index 54% rename from coolamqp/framing/definitions.py rename to coolamqp/framing/frames/machine.py index b9c5c0390cfa9479f2ed56bf8148cf1e90f95c20..8d70a7aa80b023cfccf712748da24a1cab6db21d 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/frames/machine.py @@ -12,7 +12,9 @@ CoolAMQP is copyright (c) 2016 DMS Serwis s.c. import struct -# Core frame types +from coolamqp.framing.frames.base import AMQPClass, AMQPMethod, _enframe_table, _deframe_table, _frame_table_size + +# Core constants FRAME_METHOD = 1 FRAME_HEADER = 2 FRAME_BODY = 3 @@ -59,6 +61,11 @@ NOT_IMPLEMENTED = 540 # The client tried to use functionality that is not implem INTERNAL_ERROR = 541 # The server could not complete the method because of an internal error. # The server may require intervention by an operator in order to resume # normal operations. + +HARD_ERROR = [CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, SYNTAX_ERROR, COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, INTERNAL_ERROR] +SOFT_ERROR = [CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, RESOURCE_LOCKED, PRECONDITION_FAILED] + + DOMAIN_TO_BASIC_TYPE = { u'class-id': u'short', u'consumer-tag': u'shortstr', @@ -86,26 +93,6 @@ DOMAIN_TO_BASIC_TYPE = { u'table': None, } - -class AMQPClass(object): - pass - - -class AMQPMethod(object): - RESPONSE_TO = None - REPLY_WITH = [] - FIELDS = [] - - def write_arguments(self, out): - """ - Write the argument portion of this frame into out. - - :param out: a callable that will be invoked (possibly many times) with - parts of the arguments section. - :type out: callable(part_of_frame: binary type) -> nevermind - """ - - class Connection(AMQPClass): """ The connection class provides methods for a client to establish a network connection to @@ -128,13 +115,24 @@ class ConnectionClose(AMQPMethod): CLASS = Connection NAME = u'close' CLASSNAME = u'connection' + FULLNAME = u'connection.close' + CLASS_INDEX = 10 + CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 50 - FULLNAME = u'connection.close' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x32' + BINARY_HEADER = b'\x0A\x32' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [ConnectionCloseOk] - BINARY_HEADER = b'\x0A\x32' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = True + + MINIMUM_SIZE = 13 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reply-code', u'reply-code', u'short', False), (u'reply-text', u'reply-text', u'shortstr', False), @@ -161,8 +159,25 @@ class ConnectionClose(AMQPMethod): self.class_id = class_id self.method_id = method_id - def write_arguments(self, out): - out(struct.pack('!HpHH', self.reply_code, self.reply_text, self.class_id, self.method_id)) + def write_arguments(self, buf): + buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) + buf.write(self.reply_text) + buf.write(struct.pack('!HH', self.class_id, self.method_id)) + + def get_size(self): + return len(self.reply_text) + 7 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ConnectionClose.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + reply_code, s_len, = struct.unpack_from('!HB', buf, offset) + offset += 3 + reply_text = buf[offset:offset+s_len] + offset += s_len + class_id, method_id, = struct.unpack_from('!HH', buf, offset) + offset += 4 + return ConnectionClose(reply_code, reply_text, class_id, method_id) class ConnectionCloseOk(AMQPMethod): @@ -175,14 +190,25 @@ class ConnectionCloseOk(AMQPMethod): CLASS = Connection NAME = u'close-ok' CLASSNAME = u'connection' + FULLNAME = u'connection.close-ok' + CLASS_INDEX = 10 + CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 51 - FULLNAME = u'connection.close-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x33' + BINARY_HEADER = b'\x0A\x33' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x0A\x33' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = True + SENT_BY_SERVER = True + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x0A\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = ConnectionClose # this is sent in response to connection.close def __init__(self): @@ -190,6 +216,13 @@ class ConnectionCloseOk(AMQPMethod): Create frame connection.close-ok """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return ConnectionCloseOk() + + class ConnectionOpen(AMQPMethod): """ Open connection to virtual host @@ -202,13 +235,24 @@ class ConnectionOpen(AMQPMethod): CLASS = Connection NAME = u'open' CLASSNAME = u'connection' + FULLNAME = u'connection.open' + CLASS_INDEX = 10 + CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 40 - FULLNAME = u'connection.open' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x28' + BINARY_HEADER = b'\x0A\x28' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [ConnectionOpenOk] - BINARY_HEADER = b'\x0A\x28' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 15 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'virtual-host', u'path', u'shortstr', False), # virtual host name (u'reserved-1', u'shortstr', u'shortstr', True), @@ -225,8 +269,28 @@ class ConnectionOpen(AMQPMethod): """ self.virtual_host = virtual_host - def write_arguments(self, out): - out(struct.pack('!p?', self.virtual_host, 0)) + def write_arguments(self, buf): + buf.write(struct.pack('!B', len(self.virtual_host))) + buf.write(self.virtual_host) + buf.write(b'\x00') + buf.write(struct.pack('!B', )) + + def get_size(self): + return len(self.virtual_host) + 3 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ConnectionOpen.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + virtual_host = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + offset += s_len + offset += 1 + return ConnectionOpen(virtual_host) class ConnectionOpenOk(AMQPMethod): @@ -238,13 +302,25 @@ class ConnectionOpenOk(AMQPMethod): CLASS = Connection NAME = u'open-ok' CLASSNAME = u'connection' + FULLNAME = u'connection.open-ok' + CLASS_INDEX = 10 + CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 41 - FULLNAME = u'connection.open-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x29' + BINARY_HEADER = b'\x0A\x29' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x0A\x29' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 7 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x0A\x29\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = ConnectionOpen # this is sent in response to connection.open FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'shortstr', u'shortstr', True), @@ -255,8 +331,11 @@ class ConnectionOpenOk(AMQPMethod): Create frame connection.open-ok """ - def write_arguments(self, out): - pass # this has a frame, but its only default shortstrs + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return ConnectionOpenOk() class ConnectionStart(AMQPMethod): @@ -270,13 +349,24 @@ class ConnectionStart(AMQPMethod): CLASS = Connection NAME = u'start' CLASSNAME = u'connection' + FULLNAME = u'connection.start' + CLASS_INDEX = 10 + CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 10 - FULLNAME = u'connection.start' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x0A' + BINARY_HEADER = b'\x0A\x0A' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [ConnectionStartOk] - BINARY_HEADER = b'\x0A\x0A' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 59 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'version-major', u'octet', u'octet', False), # protocol major version (u'version-minor', u'octet', u'octet', False), # protocol minor version @@ -318,13 +408,32 @@ class ConnectionStart(AMQPMethod): self.mechanisms = mechanisms self.locales = locales - def write_arguments(self, out): - out(struct.pack('!BB', self.version_major, self.version_minor)) - out(struct.pack('!L', len(self.mechanisms))) - out(self.mechanisms) - out(struct.pack('!L', len(self.locales))) - out(self.locales) - pass # this has a frame, but its only default shortstrs + def write_arguments(self, buf): + buf.write(struct.pack('!BB', self.version_major, self.version_minor)) + _enframe_table(buf, self.server_properties) + buf.write(struct.pack('!L', len(self.mechanisms))) + buf.write(self.mechanisms) + buf.write(struct.pack('!L', len(self.locales))) + buf.write(self.locales) + + def get_size(self): + return _frame_table_size(self.server_properties) + len(self.mechanisms) + len(self.locales) + 14 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ConnectionStart.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + server_properties, delta = _deframe_table(buf, offset) + offset += delta + version_major, version_minor, s_len, = struct.unpack_from('!BBL', buf, offset) + offset += 6 + mechanisms = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!L', buf, offset) + offset += 4 + locales = buf[offset:offset+s_len] + offset += s_len + return ConnectionStart(version_major, version_minor, server_properties, mechanisms, locales) class ConnectionSecure(AMQPMethod): @@ -338,13 +447,24 @@ class ConnectionSecure(AMQPMethod): CLASS = Connection NAME = u'secure' CLASSNAME = u'connection' + FULLNAME = u'connection.secure' + CLASS_INDEX = 10 + CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 20 - FULLNAME = u'connection.secure' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x14' + BINARY_HEADER = b'\x0A\x14' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [ConnectionSecureOk] - BINARY_HEADER = b'\x0A\x14' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 19 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'challenge', u'longstr', u'longstr', False), # security challenge data ] @@ -360,10 +480,22 @@ class ConnectionSecure(AMQPMethod): """ self.challenge = challenge - def write_arguments(self, out): - out(struct.pack('!L', len(self.challenge))) - out(self.challenge) - pass # this has a frame, but its only default shortstrs + def write_arguments(self, buf): + buf.write(struct.pack('!L', len(self.challenge))) + buf.write(self.challenge) + + def get_size(self): + return len(self.challenge) + 4 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ConnectionSecure.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!L', buf, offset) + offset += 4 + challenge = buf[offset:offset+s_len] + offset += s_len + return ConnectionSecure(challenge) class ConnectionStartOk(AMQPMethod): @@ -375,13 +507,24 @@ class ConnectionStartOk(AMQPMethod): CLASS = Connection NAME = u'start-ok' CLASSNAME = u'connection' + FULLNAME = u'connection.start-ok' + CLASS_INDEX = 10 + CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 11 - FULLNAME = u'connection.start-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x0B' + BINARY_HEADER = b'\x0A\x0B' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x0A\x0B' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 52 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content RESPONSE_TO = ConnectionStart # this is sent in response to connection.start FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'client-properties', u'peer-properties', u'table', False), # client properties @@ -418,10 +561,37 @@ class ConnectionStartOk(AMQPMethod): self.response = response self.locale = locale - def write_arguments(self, out): - out(struct.pack('!pL', self.mechanism, len(self.response))) - out(self.response) - out(struct.pack('!p', self.locale)) + def write_arguments(self, buf): + _enframe_table(buf, self.client_properties) + buf.write(struct.pack('!B', len(self.mechanism))) + buf.write(self.mechanism) + buf.write(struct.pack('!L', len(self.response))) + buf.write(self.response) + buf.write(struct.pack('!B', len(self.locale))) + buf.write(self.locale) + + def get_size(self): + return _frame_table_size(self.client_properties) + len(self.mechanism) + len(self.response) + len(self.locale) + 10 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ConnectionStartOk.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + client_properties, delta = _deframe_table(buf, offset) + offset += delta + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + mechanism = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!L', buf, offset) + offset += 4 + response = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + locale = buf[offset:offset+s_len] + offset += s_len + return ConnectionStartOk(client_properties, mechanism, response, locale) class ConnectionSecureOk(AMQPMethod): @@ -434,13 +604,24 @@ class ConnectionSecureOk(AMQPMethod): CLASS = Connection NAME = u'secure-ok' CLASSNAME = u'connection' + FULLNAME = u'connection.secure-ok' + CLASS_INDEX = 10 + CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 21 - FULLNAME = u'connection.secure-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x15' + BINARY_HEADER = b'\x0A\x15' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x0A\x15' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 19 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content RESPONSE_TO = ConnectionSecure # this is sent in response to connection.secure FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'response', u'longstr', u'longstr', False), # security response data @@ -457,10 +638,22 @@ class ConnectionSecureOk(AMQPMethod): """ self.response = response - def write_arguments(self, out): - out(struct.pack('!L', len(self.response))) - out(self.response) - pass # this has a frame, but its only default shortstrs + def write_arguments(self, buf): + buf.write(struct.pack('!L', len(self.response))) + buf.write(self.response) + + def get_size(self): + return len(self.response) + 4 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ConnectionSecureOk.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!L', buf, offset) + offset += 4 + response = buf[offset:offset+s_len] + offset += s_len + return ConnectionSecureOk(response) class ConnectionTune(AMQPMethod): @@ -473,13 +666,24 @@ class ConnectionTune(AMQPMethod): CLASS = Connection NAME = u'tune' CLASSNAME = u'connection' + FULLNAME = u'connection.tune' + CLASS_INDEX = 10 + CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 30 - FULLNAME = u'connection.tune' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x1E' + BINARY_HEADER = b'\x0A\x1E' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [ConnectionTuneOk] - BINARY_HEADER = b'\x0A\x1E' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 8 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'channel-max', u'short', u'short', False), # proposed maximum channels (u'frame-max', u'long', u'long', False), # proposed maximum frame size @@ -509,8 +713,18 @@ class ConnectionTune(AMQPMethod): self.frame_max = frame_max self.heartbeat = heartbeat - def write_arguments(self, out): - out(struct.pack('!HIH', self.channel_max, self.frame_max, self.heartbeat)) + def write_arguments(self, buf): + buf.write(struct.pack('!HIH', self.channel_max, self.frame_max, self.heartbeat)) + + def get_size(self): + return 8 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ConnectionTune.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + channel_max, frame_max, heartbeat, = struct.unpack_from('!HIH', buf, offset) + return ConnectionTune(channel_max, frame_max, heartbeat) class ConnectionTuneOk(AMQPMethod): @@ -523,13 +737,24 @@ class ConnectionTuneOk(AMQPMethod): CLASS = Connection NAME = u'tune-ok' CLASSNAME = u'connection' + FULLNAME = u'connection.tune-ok' + CLASS_INDEX = 10 + CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 31 - FULLNAME = u'connection.tune-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x1F' + BINARY_HEADER = b'\x0A\x1F' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x0A\x1F' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 8 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content RESPONSE_TO = ConnectionTune # this is sent in response to connection.tune FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'channel-max', u'short', u'short', False), # negotiated maximum channels @@ -560,8 +785,18 @@ class ConnectionTuneOk(AMQPMethod): self.frame_max = frame_max self.heartbeat = heartbeat - def write_arguments(self, out): - out(struct.pack('!HIH', self.channel_max, self.frame_max, self.heartbeat)) + def write_arguments(self, buf): + buf.write(struct.pack('!HIH', self.channel_max, self.frame_max, self.heartbeat)) + + def get_size(self): + return 8 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ConnectionTuneOk.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + channel_max, frame_max, heartbeat, = struct.unpack_from('!HIH', buf, offset) + return ConnectionTuneOk(channel_max, frame_max, heartbeat) class Channel(AMQPClass): @@ -586,13 +821,24 @@ class ChannelClose(AMQPMethod): CLASS = Channel NAME = u'close' CLASSNAME = u'channel' + FULLNAME = u'channel.close' + CLASS_INDEX = 20 + CLASS_INDEX_BINARY = b'\x14' METHOD_INDEX = 40 - FULLNAME = u'channel.close' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x28' + BINARY_HEADER = b'\x14\x28' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [ChannelCloseOk] - BINARY_HEADER = b'\x14\x28' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = True + + MINIMUM_SIZE = 13 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reply-code', u'reply-code', u'short', False), (u'reply-text', u'reply-text', u'shortstr', False), @@ -619,8 +865,25 @@ class ChannelClose(AMQPMethod): self.class_id = class_id self.method_id = method_id - def write_arguments(self, out): - out(struct.pack('!HpHH', self.reply_code, self.reply_text, self.class_id, self.method_id)) + def write_arguments(self, buf): + buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) + buf.write(self.reply_text) + buf.write(struct.pack('!HH', self.class_id, self.method_id)) + + def get_size(self): + return len(self.reply_text) + 7 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ChannelClose.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + reply_code, s_len, = struct.unpack_from('!HB', buf, offset) + offset += 3 + reply_text = buf[offset:offset+s_len] + offset += s_len + class_id, method_id, = struct.unpack_from('!HH', buf, offset) + offset += 4 + return ChannelClose(reply_code, reply_text, class_id, method_id) class ChannelCloseOk(AMQPMethod): @@ -633,14 +896,25 @@ class ChannelCloseOk(AMQPMethod): CLASS = Channel NAME = u'close-ok' CLASSNAME = u'channel' + FULLNAME = u'channel.close-ok' + CLASS_INDEX = 20 + CLASS_INDEX_BINARY = b'\x14' METHOD_INDEX = 41 - FULLNAME = u'channel.close-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x29' + BINARY_HEADER = b'\x14\x29' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x14\x29' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = True + SENT_BY_SERVER = True + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x14\x29\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = ChannelClose # this is sent in response to channel.close def __init__(self): @@ -648,6 +922,13 @@ class ChannelCloseOk(AMQPMethod): Create frame channel.close-ok """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return ChannelCloseOk() + + class ChannelFlow(AMQPMethod): """ Enable/disable flow from peer @@ -661,13 +942,24 @@ class ChannelFlow(AMQPMethod): CLASS = Channel NAME = u'flow' CLASSNAME = u'channel' + FULLNAME = u'channel.flow' + CLASS_INDEX = 20 + CLASS_INDEX_BINARY = b'\x14' METHOD_INDEX = 20 - FULLNAME = u'channel.flow' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x14' + BINARY_HEADER = b'\x14\x14' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [ChannelFlowOk] - BINARY_HEADER = b'\x14\x14' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = True + + MINIMUM_SIZE = 1 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'active', u'bit', u'bit', False), # start/stop content frames ] @@ -683,8 +975,19 @@ class ChannelFlow(AMQPMethod): """ self.active = active - def write_arguments(self, out): - out(struct.pack('!?', self.active)) + def write_arguments(self, buf): + buf.write(struct.pack('!B', (int(self.active) << 0))) + + def get_size(self): + return 1 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ChannelFlow.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + _bit_0, = struct.unpack_from('!B', buf, offset) + active = bool(_bit_0 & 1) + return ChannelFlow(active) class ChannelFlowOk(AMQPMethod): @@ -696,13 +999,24 @@ class ChannelFlowOk(AMQPMethod): CLASS = Channel NAME = u'flow-ok' CLASSNAME = u'channel' + FULLNAME = u'channel.flow-ok' + CLASS_INDEX = 20 + CLASS_INDEX_BINARY = b'\x14' METHOD_INDEX = 21 - FULLNAME = u'channel.flow-ok' - SYNCHRONOUS = False + METHOD_INDEX_BINARY = b'\x15' + BINARY_HEADER = b'\x14\x15' # CLASS ID + METHOD ID + + SYNCHRONOUS = False # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x14\x15' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = True + + MINIMUM_SIZE = 1 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content RESPONSE_TO = ChannelFlow # this is sent in response to channel.flow FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'active', u'bit', u'bit', False), # current flow setting @@ -719,8 +1033,19 @@ class ChannelFlowOk(AMQPMethod): """ self.active = active - def write_arguments(self, out): - out(struct.pack('!?', self.active)) + def write_arguments(self, buf): + buf.write(struct.pack('!B', (int(self.active) << 0))) + + def get_size(self): + return 1 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ChannelFlowOk.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + _bit_0, = struct.unpack_from('!B', buf, offset) + active = bool(_bit_0 & 1) + return ChannelFlowOk(active) class ChannelOpen(AMQPMethod): @@ -732,13 +1057,25 @@ class ChannelOpen(AMQPMethod): CLASS = Channel NAME = u'open' CLASSNAME = u'channel' + FULLNAME = u'channel.open' + CLASS_INDEX = 20 + CLASS_INDEX_BINARY = b'\x14' METHOD_INDEX = 10 - FULLNAME = u'channel.open' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x0A' + BINARY_HEADER = b'\x14\x0A' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [ChannelOpenOk] - BINARY_HEADER = b'\x14\x0A' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 7 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x05\x14\x0A\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'shortstr', u'shortstr', True), ] @@ -748,8 +1085,11 @@ class ChannelOpen(AMQPMethod): Create frame channel.open """ - def write_arguments(self, out): - pass # this has a frame, but its only default shortstrs + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return ChannelOpen() class ChannelOpenOk(AMQPMethod): @@ -761,13 +1101,25 @@ class ChannelOpenOk(AMQPMethod): CLASS = Channel NAME = u'open-ok' CLASSNAME = u'channel' + FULLNAME = u'channel.open-ok' + CLASS_INDEX = 20 + CLASS_INDEX_BINARY = b'\x14' METHOD_INDEX = 11 - FULLNAME = u'channel.open-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x0B' + BINARY_HEADER = b'\x14\x0B' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x14\x0B' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 19 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x05\x14\x0B\x00\x00\x00\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = ChannelOpen # this is sent in response to channel.open FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'longstr', u'longstr', True), @@ -778,10 +1130,11 @@ class ChannelOpenOk(AMQPMethod): Create frame channel.open-ok """ - def write_arguments(self, out): - out(struct.pack('!L', len(self.reserved_1))) - out(self.reserved_1) - pass # this has a frame, but its only default shortstrs + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return ChannelOpenOk() class Exchange(AMQPClass): @@ -804,13 +1157,24 @@ class ExchangeDeclare(AMQPMethod): CLASS = Exchange NAME = u'declare' CLASSNAME = u'exchange' + FULLNAME = u'exchange.declare' + CLASS_INDEX = 40 + CLASS_INDEX_BINARY = b'\x28' METHOD_INDEX = 10 - FULLNAME = u'exchange.declare' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x0A' + BINARY_HEADER = b'\x28\x0A' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [ExchangeDeclareOk] - BINARY_HEADER = b'\x28\x0A' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 36 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'short', u'short', True), (u'exchange', u'exchange-name', u'shortstr', False), @@ -863,9 +1227,38 @@ class ExchangeDeclare(AMQPMethod): self.no_wait = no_wait self.arguments = arguments - def write_arguments(self, out): - out(struct.pack('!Hpp?????', 0, self.exchange, self.type, self.passive, self.durable, 0, 0, self.no_wait)) - pass # this has a frame, but its only default shortstrs + def write_arguments(self, buf): + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.exchange))) + buf.write(self.exchange) + buf.write(struct.pack('!B', len(self.type))) + buf.write(self.type) + buf.write(struct.pack('!B', (int(self.passive) << 0) | (int(self.durable) << 1) | (int(self.no_wait) << 2))) + _enframe_table(buf, self.arguments) + + def get_size(self): + return len(self.exchange) + len(self.type) + _frame_table_size(self.arguments) + 9 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ExchangeDeclare.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!2xB', buf, offset) + offset += 3 + exchange = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + type = buf[offset:offset+s_len] + offset += s_len + _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 + passive = bool(_bit & 1) + durable = bool(_bit & 2) + no_wait = bool(_bit & 16) + arguments, delta = _deframe_table(buf, offset) + offset += delta + return ExchangeDeclare(exchange, type, passive, durable, no_wait, arguments) class ExchangeDelete(AMQPMethod): @@ -878,13 +1271,24 @@ class ExchangeDelete(AMQPMethod): CLASS = Exchange NAME = u'delete' CLASSNAME = u'exchange' + FULLNAME = u'exchange.delete' + CLASS_INDEX = 40 + CLASS_INDEX_BINARY = b'\x28' METHOD_INDEX = 20 - FULLNAME = u'exchange.delete' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x14' + BINARY_HEADER = b'\x28\x14' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [ExchangeDeleteOk] - BINARY_HEADER = b'\x28\x14' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 10 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'short', u'short', True), (u'exchange', u'exchange-name', u'shortstr', False), @@ -909,8 +1313,28 @@ class ExchangeDelete(AMQPMethod): self.if_unused = if_unused self.no_wait = no_wait - def write_arguments(self, out): - out(struct.pack('!Hp??', 0, self.exchange, self.if_unused, self.no_wait)) + def write_arguments(self, buf): + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.exchange))) + buf.write(self.exchange) + buf.write(struct.pack('!B', (int(self.if_unused) << 0) | (int(self.no_wait) << 1))) + + def get_size(self): + return len(self.exchange) + 4 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= ExchangeDelete.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!2xB', buf, offset) + offset += 3 + exchange = buf[offset:offset+s_len] + offset += s_len + _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 + if_unused = bool(_bit & 1) + no_wait = bool(_bit & 2) + return ExchangeDelete(exchange, if_unused, no_wait) class ExchangeDeclareOk(AMQPMethod): @@ -923,14 +1347,25 @@ class ExchangeDeclareOk(AMQPMethod): CLASS = Exchange NAME = u'declare-ok' CLASSNAME = u'exchange' + FULLNAME = u'exchange.declare-ok' + CLASS_INDEX = 40 + CLASS_INDEX_BINARY = b'\x28' METHOD_INDEX = 11 - FULLNAME = u'exchange.declare-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x0B' + BINARY_HEADER = b'\x28\x0B' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x28\x0B' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x28\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = ExchangeDeclare # this is sent in response to exchange.declare def __init__(self): @@ -938,6 +1373,13 @@ class ExchangeDeclareOk(AMQPMethod): Create frame exchange.declare-ok """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return ExchangeDeclareOk() + + class ExchangeDeleteOk(AMQPMethod): """ Confirm deletion of an exchange @@ -947,14 +1389,25 @@ class ExchangeDeleteOk(AMQPMethod): CLASS = Exchange NAME = u'delete-ok' CLASSNAME = u'exchange' + FULLNAME = u'exchange.delete-ok' + CLASS_INDEX = 40 + CLASS_INDEX_BINARY = b'\x28' METHOD_INDEX = 21 - FULLNAME = u'exchange.delete-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x15' + BINARY_HEADER = b'\x28\x15' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x28\x15' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x28\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = ExchangeDelete # this is sent in response to exchange.delete def __init__(self): @@ -962,6 +1415,13 @@ class ExchangeDeleteOk(AMQPMethod): Create frame exchange.delete-ok """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return ExchangeDeleteOk() + + class Queue(AMQPClass): """ Queues store and forward messages. queues can be configured in the server or created at @@ -985,13 +1445,24 @@ class QueueBind(AMQPMethod): CLASS = Queue NAME = u'bind' CLASSNAME = u'queue' + FULLNAME = u'queue.bind' + CLASS_INDEX = 50 + CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 20 - FULLNAME = u'queue.bind' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x14' + BINARY_HEADER = b'\x32\x14' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [QueueBindOk] - BINARY_HEADER = b'\x32\x14' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 43 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'short', u'short', True), (u'queue', u'queue-name', u'shortstr', False), @@ -1032,9 +1503,42 @@ class QueueBind(AMQPMethod): self.no_wait = no_wait self.arguments = arguments - def write_arguments(self, out): - out(struct.pack('!Hppp?', 0, self.queue, self.exchange, self.routing_key, self.no_wait)) - pass # this has a frame, but its only default shortstrs + def write_arguments(self, buf): + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) + buf.write(self.queue) + buf.write(struct.pack('!B', len(self.exchange))) + buf.write(self.exchange) + buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(self.routing_key) + buf.write(struct.pack('!B', (int(self.no_wait) << 0))) + _enframe_table(buf, self.arguments) + + def get_size(self): + return len(self.queue) + len(self.exchange) + len(self.routing_key) + _frame_table_size(self.arguments) + 10 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= QueueBind.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!2xB', buf, offset) + offset += 3 + queue = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + exchange = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + routing_key = buf[offset:offset+s_len] + offset += s_len + _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 + no_wait = bool(_bit & 1) + arguments, delta = _deframe_table(buf, offset) + offset += delta + return QueueBind(queue, exchange, routing_key, no_wait, arguments) class QueueBindOk(AMQPMethod): @@ -1046,14 +1550,25 @@ class QueueBindOk(AMQPMethod): CLASS = Queue NAME = u'bind-ok' CLASSNAME = u'queue' + FULLNAME = u'queue.bind-ok' + CLASS_INDEX = 50 + CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 21 - FULLNAME = u'queue.bind-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x15' + BINARY_HEADER = b'\x32\x15' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x32\x15' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x32\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = QueueBind # this is sent in response to queue.bind def __init__(self): @@ -1061,6 +1576,13 @@ class QueueBindOk(AMQPMethod): Create frame queue.bind-ok """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return QueueBindOk() + + class QueueDeclare(AMQPMethod): """ Declare queue, create if needed @@ -1072,13 +1594,24 @@ class QueueDeclare(AMQPMethod): CLASS = Queue NAME = u'declare' CLASSNAME = u'queue' + FULLNAME = u'queue.declare' + CLASS_INDEX = 50 + CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 10 - FULLNAME = u'queue.declare' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x0A' + BINARY_HEADER = b'\x32\x0A' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [QueueDeclareOk] - BINARY_HEADER = b'\x32\x0A' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 29 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'short', u'short', True), (u'queue', u'queue-name', u'shortstr', False), @@ -1138,9 +1671,34 @@ class QueueDeclare(AMQPMethod): self.no_wait = no_wait self.arguments = arguments - def write_arguments(self, out): - out(struct.pack('!Hp?????', 0, self.queue, self.passive, self.durable, self.exclusive, self.auto_delete, self.no_wait)) - pass # this has a frame, but its only default shortstrs + def write_arguments(self, buf): + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) + buf.write(self.queue) + buf.write(struct.pack('!B', (int(self.passive) << 0) | (int(self.durable) << 1) | (int(self.exclusive) << 2) | (int(self.auto_delete) << 3) | (int(self.no_wait) << 4))) + _enframe_table(buf, self.arguments) + + def get_size(self): + return len(self.queue) + _frame_table_size(self.arguments) + 8 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= QueueDeclare.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!2xB', buf, offset) + offset += 3 + queue = buf[offset:offset+s_len] + offset += s_len + _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 + passive = bool(_bit & 1) + durable = bool(_bit & 2) + exclusive = bool(_bit & 4) + auto_delete = bool(_bit & 8) + no_wait = bool(_bit & 16) + arguments, delta = _deframe_table(buf, offset) + offset += delta + return QueueDeclare(queue, passive, durable, exclusive, auto_delete, no_wait, arguments) class QueueDelete(AMQPMethod): @@ -1154,13 +1712,24 @@ class QueueDelete(AMQPMethod): CLASS = Queue NAME = u'delete' CLASSNAME = u'queue' + FULLNAME = u'queue.delete' + CLASS_INDEX = 50 + CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 40 - FULLNAME = u'queue.delete' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x28' + BINARY_HEADER = b'\x32\x28' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [QueueDeleteOk] - BINARY_HEADER = b'\x32\x28' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 10 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'short', u'short', True), (u'queue', u'queue-name', u'shortstr', False), @@ -1190,8 +1759,29 @@ class QueueDelete(AMQPMethod): self.if_empty = if_empty self.no_wait = no_wait - def write_arguments(self, out): - out(struct.pack('!Hp???', 0, self.queue, self.if_unused, self.if_empty, self.no_wait)) + def write_arguments(self, buf): + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) + buf.write(self.queue) + buf.write(struct.pack('!B', (int(self.if_unused) << 0) | (int(self.if_empty) << 1) | (int(self.no_wait) << 2))) + + def get_size(self): + return len(self.queue) + 4 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= QueueDelete.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!2xB', buf, offset) + offset += 3 + queue = buf[offset:offset+s_len] + offset += s_len + _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 + if_unused = bool(_bit & 1) + if_empty = bool(_bit & 2) + no_wait = bool(_bit & 4) + return QueueDelete(queue, if_unused, if_empty, no_wait) class QueueDeclareOk(AMQPMethod): @@ -1204,13 +1794,24 @@ class QueueDeclareOk(AMQPMethod): CLASS = Queue NAME = u'declare-ok' CLASSNAME = u'queue' + FULLNAME = u'queue.declare-ok' + CLASS_INDEX = 50 + CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 11 - FULLNAME = u'queue.declare-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x0B' + BINARY_HEADER = b'\x32\x0B' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x32\x0B' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 15 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content RESPONSE_TO = QueueDeclare # this is sent in response to queue.declare FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'queue', u'queue-name', u'shortstr', False), @@ -1235,8 +1836,25 @@ class QueueDeclareOk(AMQPMethod): self.message_count = message_count self.consumer_count = consumer_count - def write_arguments(self, out): - out(struct.pack('!pII', self.queue, self.message_count, self.consumer_count)) + def write_arguments(self, buf): + buf.write(struct.pack('!B', len(self.queue))) + buf.write(self.queue) + buf.write(struct.pack('!II', self.message_count, self.consumer_count)) + + def get_size(self): + return len(self.queue) + 9 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= QueueDeclareOk.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + queue = buf[offset:offset+s_len] + offset += s_len + message_count, consumer_count, = struct.unpack_from('!II', buf, offset) + offset += 8 + return QueueDeclareOk(queue, message_count, consumer_count) class QueueDeleteOk(AMQPMethod): @@ -1248,13 +1866,24 @@ class QueueDeleteOk(AMQPMethod): CLASS = Queue NAME = u'delete-ok' CLASSNAME = u'queue' + FULLNAME = u'queue.delete-ok' + CLASS_INDEX = 50 + CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 41 - FULLNAME = u'queue.delete-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x29' + BINARY_HEADER = b'\x32\x29' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x32\x29' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 4 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content RESPONSE_TO = QueueDelete # this is sent in response to queue.delete FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'message-count', u'message-count', u'long', False), @@ -1269,8 +1898,18 @@ class QueueDeleteOk(AMQPMethod): """ self.message_count = message_count - def write_arguments(self, out): - out(struct.pack('!I', self.message_count)) + def write_arguments(self, buf): + buf.write(struct.pack('!I', self.message_count)) + + def get_size(self): + return 4 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= QueueDeleteOk.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + message_count, = struct.unpack_from('!I', buf, offset) + return QueueDeleteOk(message_count) class QueuePurge(AMQPMethod): @@ -1283,13 +1922,24 @@ class QueuePurge(AMQPMethod): CLASS = Queue NAME = u'purge' CLASSNAME = u'queue' + FULLNAME = u'queue.purge' + CLASS_INDEX = 50 + CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 30 - FULLNAME = u'queue.purge' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x1E' + BINARY_HEADER = b'\x32\x1E' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [QueuePurgeOk] - BINARY_HEADER = b'\x32\x1E' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 10 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'short', u'short', True), (u'queue', u'queue-name', u'shortstr', False), @@ -1307,8 +1957,27 @@ class QueuePurge(AMQPMethod): self.queue = queue self.no_wait = no_wait - def write_arguments(self, out): - out(struct.pack('!Hp?', 0, self.queue, self.no_wait)) + def write_arguments(self, buf): + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) + buf.write(self.queue) + buf.write(struct.pack('!B', (int(self.no_wait) << 0))) + + def get_size(self): + return len(self.queue) + 4 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= QueuePurge.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!2xB', buf, offset) + offset += 3 + queue = buf[offset:offset+s_len] + offset += s_len + _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 + no_wait = bool(_bit & 1) + return QueuePurge(queue, no_wait) class QueuePurgeOk(AMQPMethod): @@ -1320,13 +1989,24 @@ class QueuePurgeOk(AMQPMethod): CLASS = Queue NAME = u'purge-ok' CLASSNAME = u'queue' + FULLNAME = u'queue.purge-ok' + CLASS_INDEX = 50 + CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 31 - FULLNAME = u'queue.purge-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x1F' + BINARY_HEADER = b'\x32\x1F' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x32\x1F' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 4 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content RESPONSE_TO = QueuePurge # this is sent in response to queue.purge FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'message-count', u'message-count', u'long', False), @@ -1341,8 +2021,18 @@ class QueuePurgeOk(AMQPMethod): """ self.message_count = message_count - def write_arguments(self, out): - out(struct.pack('!I', self.message_count)) + def write_arguments(self, buf): + buf.write(struct.pack('!I', self.message_count)) + + def get_size(self): + return 4 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= QueuePurgeOk.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + message_count, = struct.unpack_from('!I', buf, offset) + return QueuePurgeOk(message_count) class QueueUnbind(AMQPMethod): @@ -1354,13 +2044,24 @@ class QueueUnbind(AMQPMethod): CLASS = Queue NAME = u'unbind' CLASSNAME = u'queue' + FULLNAME = u'queue.unbind' + CLASS_INDEX = 50 + CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 50 - FULLNAME = u'queue.unbind' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x32' + BINARY_HEADER = b'\x32\x32' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [QueueUnbindOk] - BINARY_HEADER = b'\x32\x32' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 42 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'short', u'short', True), (u'queue', u'queue-name', u'shortstr', False), @@ -1389,9 +2090,38 @@ class QueueUnbind(AMQPMethod): self.routing_key = routing_key self.arguments = arguments - def write_arguments(self, out): - out(struct.pack('!Hppp', 0, self.queue, self.exchange, self.routing_key)) - pass # this has a frame, but its only default shortstrs + def write_arguments(self, buf): + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) + buf.write(self.queue) + buf.write(struct.pack('!B', len(self.exchange))) + buf.write(self.exchange) + buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(self.routing_key) + _enframe_table(buf, self.arguments) + + def get_size(self): + return len(self.queue) + len(self.exchange) + len(self.routing_key) + _frame_table_size(self.arguments) + 9 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= QueueUnbind.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!2xB', buf, offset) + offset += 3 + queue = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + exchange = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + routing_key = buf[offset:offset+s_len] + offset += s_len + arguments, delta = _deframe_table(buf, offset) + offset += delta + return QueueUnbind(queue, exchange, routing_key, arguments) class QueueUnbindOk(AMQPMethod): @@ -1403,14 +2133,25 @@ class QueueUnbindOk(AMQPMethod): CLASS = Queue NAME = u'unbind-ok' CLASSNAME = u'queue' + FULLNAME = u'queue.unbind-ok' + CLASS_INDEX = 50 + CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 51 - FULLNAME = u'queue.unbind-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x33' + BINARY_HEADER = b'\x32\x33' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x32\x33' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x32\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = QueueUnbind # this is sent in response to queue.unbind def __init__(self): @@ -1418,6 +2159,13 @@ class QueueUnbindOk(AMQPMethod): Create frame queue.unbind-ok """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return QueueUnbindOk() + + class Basic(AMQPClass): """ The basic class provides methods that support an industry-standard messaging model. @@ -1437,13 +2185,24 @@ class BasicAck(AMQPMethod): CLASS = Basic NAME = u'ack' CLASSNAME = u'basic' + FULLNAME = u'basic.ack' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 80 - FULLNAME = u'basic.ack' - SYNCHRONOUS = False + METHOD_INDEX_BINARY = b'\x50' + BINARY_HEADER = b'\x3C\x50' # CLASS ID + METHOD ID + + SYNCHRONOUS = False # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x50' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 9 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'delivery-tag', u'delivery-tag', u'longlong', False), (u'multiple', u'bit', u'bit', False), # acknowledge multiple messages @@ -1464,8 +2223,20 @@ class BasicAck(AMQPMethod): self.delivery_tag = delivery_tag self.multiple = multiple - def write_arguments(self, out): - out(struct.pack('!L?', self.delivery_tag, self.multiple)) + def write_arguments(self, buf): + buf.write(struct.pack('!Q', self.delivery_tag)) + buf.write(struct.pack('!B', (int(self.multiple) << 0))) + + def get_size(self): + return 9 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicAck.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + delivery_tag, _bit_0, = struct.unpack_from('!QB', buf, offset) + multiple = bool(_bit_0 & 1) + return BasicAck(delivery_tag, multiple) class BasicConsume(AMQPMethod): @@ -1479,13 +2250,24 @@ class BasicConsume(AMQPMethod): CLASS = Basic NAME = u'consume' CLASSNAME = u'basic' + FULLNAME = u'basic.consume' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 20 - FULLNAME = u'basic.consume' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x14' + BINARY_HEADER = b'\x3C\x14' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [BasicConsumeOk] - BINARY_HEADER = b'\x3C\x14' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 36 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'short', u'short', True), (u'queue', u'queue-name', u'shortstr', False), @@ -1527,9 +2309,39 @@ class BasicConsume(AMQPMethod): self.no_wait = no_wait self.arguments = arguments - def write_arguments(self, out): - out(struct.pack('!Hpp????', 0, self.queue, self.consumer_tag, self.no_local, self.no_ack, self.exclusive, self.no_wait)) - pass # this has a frame, but its only default shortstrs + def write_arguments(self, buf): + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) + buf.write(self.queue) + buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(self.consumer_tag) + buf.write(struct.pack('!B', (int(self.no_local) << 0) | (int(self.no_ack) << 1) | (int(self.exclusive) << 2) | (int(self.no_wait) << 3))) + _enframe_table(buf, self.arguments) + + def get_size(self): + return len(self.queue) + len(self.consumer_tag) + _frame_table_size(self.arguments) + 9 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicConsume.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!2xB', buf, offset) + offset += 3 + queue = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + consumer_tag = buf[offset:offset+s_len] + offset += s_len + _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 + no_local = bool(_bit & 1) + no_ack = bool(_bit & 2) + exclusive = bool(_bit & 4) + no_wait = bool(_bit & 8) + arguments, delta = _deframe_table(buf, offset) + offset += delta + return BasicConsume(queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments) class BasicCancel(AMQPMethod): @@ -1544,13 +2356,24 @@ class BasicCancel(AMQPMethod): CLASS = Basic NAME = u'cancel' CLASSNAME = u'basic' + FULLNAME = u'basic.cancel' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 30 - FULLNAME = u'basic.cancel' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x1E' + BINARY_HEADER = b'\x3C\x1E' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [BasicCancelOk] - BINARY_HEADER = b'\x3C\x1E' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 8 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'consumer-tag', u'consumer-tag', u'shortstr', False), (u'no-wait', u'no-wait', u'bit', False), @@ -1566,8 +2389,26 @@ class BasicCancel(AMQPMethod): self.consumer_tag = consumer_tag self.no_wait = no_wait - def write_arguments(self, out): - out(struct.pack('!p?', self.consumer_tag, self.no_wait)) + def write_arguments(self, buf): + buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(self.consumer_tag) + buf.write(struct.pack('!B', (int(self.no_wait) << 0))) + + def get_size(self): + return len(self.consumer_tag) + 2 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicCancel.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + consumer_tag = buf[offset:offset+s_len] + offset += s_len + _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 + no_wait = bool(_bit & 1) + return BasicCancel(consumer_tag, no_wait) class BasicConsumeOk(AMQPMethod): @@ -1580,13 +2421,24 @@ class BasicConsumeOk(AMQPMethod): CLASS = Basic NAME = u'consume-ok' CLASSNAME = u'basic' + FULLNAME = u'basic.consume-ok' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 21 - FULLNAME = u'basic.consume-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x15' + BINARY_HEADER = b'\x3C\x15' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x15' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 7 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content RESPONSE_TO = BasicConsume # this is sent in response to basic.consume FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'consumer-tag', u'consumer-tag', u'shortstr', False), @@ -1601,8 +2453,22 @@ class BasicConsumeOk(AMQPMethod): """ self.consumer_tag = consumer_tag - def write_arguments(self, out): - out(struct.pack('!p', self.consumer_tag)) + def write_arguments(self, buf): + buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(self.consumer_tag) + + def get_size(self): + return len(self.consumer_tag) + 1 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicConsumeOk.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + consumer_tag = buf[offset:offset+s_len] + offset += s_len + return BasicConsumeOk(consumer_tag) class BasicCancelOk(AMQPMethod): @@ -1614,13 +2480,24 @@ class BasicCancelOk(AMQPMethod): CLASS = Basic NAME = u'cancel-ok' CLASSNAME = u'basic' + FULLNAME = u'basic.cancel-ok' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 31 - FULLNAME = u'basic.cancel-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x1F' + BINARY_HEADER = b'\x3C\x1F' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x1F' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 7 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content RESPONSE_TO = BasicCancel # this is sent in response to basic.cancel FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'consumer-tag', u'consumer-tag', u'shortstr', False), @@ -1634,8 +2511,22 @@ class BasicCancelOk(AMQPMethod): """ self.consumer_tag = consumer_tag - def write_arguments(self, out): - out(struct.pack('!p', self.consumer_tag)) + def write_arguments(self, buf): + buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(self.consumer_tag) + + def get_size(self): + return len(self.consumer_tag) + 1 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicCancelOk.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + consumer_tag = buf[offset:offset+s_len] + offset += s_len + return BasicCancelOk(consumer_tag) class BasicDeliver(AMQPMethod): @@ -1650,13 +2541,24 @@ class BasicDeliver(AMQPMethod): CLASS = Basic NAME = u'deliver' CLASSNAME = u'basic' + FULLNAME = u'basic.deliver' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 60 - FULLNAME = u'basic.deliver' - SYNCHRONOUS = False + METHOD_INDEX_BINARY = b'\x3C' + BINARY_HEADER = b'\x3C\x3C' # CLASS ID + METHOD ID + + SYNCHRONOUS = False # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x3C' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 30 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'consumer-tag', u'consumer-tag', u'shortstr', False), (u'delivery-tag', u'delivery-tag', u'longlong', False), @@ -1685,8 +2587,38 @@ class BasicDeliver(AMQPMethod): self.exchange = exchange self.routing_key = routing_key - def write_arguments(self, out): - out(struct.pack('!pL?pp', self.consumer_tag, self.delivery_tag, self.redelivered, self.exchange, self.routing_key)) + def write_arguments(self, buf): + buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(self.consumer_tag) + buf.write(struct.pack('!B', (int(self.redelivered) << 0))) + buf.write(struct.pack('!QB', self.delivery_tag, len(self.exchange))) + buf.write(self.exchange) + buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(self.routing_key) + + def get_size(self): + return len(self.consumer_tag) + len(self.exchange) + len(self.routing_key) + 12 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicDeliver.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + consumer_tag = buf[offset:offset+s_len] + offset += s_len + _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 + redelivered = bool(_bit & 1) + delivery_tag, s_len, = struct.unpack_from('!QB', buf, offset) + offset += 9 + exchange = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + routing_key = buf[offset:offset+s_len] + offset += s_len + return BasicDeliver(consumer_tag, delivery_tag, redelivered, exchange, routing_key) class BasicGet(AMQPMethod): @@ -1700,13 +2632,24 @@ class BasicGet(AMQPMethod): CLASS = Basic NAME = u'get' CLASSNAME = u'basic' + FULLNAME = u'basic.get' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 70 - FULLNAME = u'basic.get' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x46' + BINARY_HEADER = b'\x3C\x46' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [BasicGetOk, BasicGetEmpty] - BINARY_HEADER = b'\x3C\x46' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 10 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'short', u'short', True), (u'queue', u'queue-name', u'shortstr', False), @@ -1724,8 +2667,27 @@ class BasicGet(AMQPMethod): self.queue = queue self.no_ack = no_ack - def write_arguments(self, out): - out(struct.pack('!Hp?', 0, self.queue, self.no_ack)) + def write_arguments(self, buf): + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.queue))) + buf.write(self.queue) + buf.write(struct.pack('!B', (int(self.no_ack) << 0))) + + def get_size(self): + return len(self.queue) + 4 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicGet.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!2xB', buf, offset) + offset += 3 + queue = buf[offset:offset+s_len] + offset += s_len + _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 + no_ack = bool(_bit & 1) + return BasicGet(queue, no_ack) class BasicGetOk(AMQPMethod): @@ -1739,13 +2701,24 @@ class BasicGetOk(AMQPMethod): CLASS = Basic NAME = u'get-ok' CLASSNAME = u'basic' + FULLNAME = u'basic.get-ok' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 71 - FULLNAME = u'basic.get-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x47' + BINARY_HEADER = b'\x3C\x47' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x47' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 27 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content RESPONSE_TO = BasicGet # this is sent in response to basic.get FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'delivery-tag', u'delivery-tag', u'longlong', False), @@ -1775,8 +2748,35 @@ class BasicGetOk(AMQPMethod): self.routing_key = routing_key self.message_count = message_count - def write_arguments(self, out): - out(struct.pack('!L?ppI', self.delivery_tag, self.redelivered, self.exchange, self.routing_key, self.message_count)) + def write_arguments(self, buf): + buf.write(struct.pack('!B', (int(self.redelivered) << 0))) + buf.write(struct.pack('!QB', self.delivery_tag, len(self.exchange))) + buf.write(self.exchange) + buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(self.routing_key) + buf.write(struct.pack('!I', self.message_count)) + + def get_size(self): + return len(self.exchange) + len(self.routing_key) + 15 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicGetOk.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 + redelivered = bool(_bit & 1) + delivery_tag, s_len, = struct.unpack_from('!QB', buf, offset) + offset += 9 + exchange = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + routing_key = buf[offset:offset+s_len] + offset += s_len + message_count, = struct.unpack_from('!I', buf, offset) + offset += 4 + return BasicGetOk(delivery_tag, redelivered, exchange, routing_key, message_count) class BasicGetEmpty(AMQPMethod): @@ -1789,13 +2789,25 @@ class BasicGetEmpty(AMQPMethod): CLASS = Basic NAME = u'get-empty' CLASSNAME = u'basic' + FULLNAME = u'basic.get-empty' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 72 - FULLNAME = u'basic.get-empty' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x48' + BINARY_HEADER = b'\x3C\x48' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x48' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 7 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x0D\x3C\x48\x00\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = BasicGet # this is sent in response to basic.get FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'shortstr', u'shortstr', True), @@ -1806,8 +2818,11 @@ class BasicGetEmpty(AMQPMethod): Create frame basic.get-empty """ - def write_arguments(self, out): - pass # this has a frame, but its only default shortstrs + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return BasicGetEmpty() class BasicPublish(AMQPMethod): @@ -1821,13 +2836,24 @@ class BasicPublish(AMQPMethod): CLASS = Basic NAME = u'publish' CLASSNAME = u'basic' + FULLNAME = u'basic.publish' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 40 - FULLNAME = u'basic.publish' - SYNCHRONOUS = False + METHOD_INDEX_BINARY = b'\x28' + BINARY_HEADER = b'\x3C\x28' # CLASS ID + METHOD ID + + SYNCHRONOUS = False # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x28' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 17 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reserved-1', u'short', u'short', True), (u'exchange', u'exchange-name', u'shortstr', False), @@ -1865,8 +2891,34 @@ class BasicPublish(AMQPMethod): self.mandatory = mandatory self.immediate = immediate - def write_arguments(self, out): - out(struct.pack('!Hpp??', 0, self.exchange, self.routing_key, self.mandatory, self.immediate)) + def write_arguments(self, buf): + buf.write(b'\x00\x00') + buf.write(struct.pack('!B', len(self.exchange))) + buf.write(self.exchange) + buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(self.routing_key) + buf.write(struct.pack('!B', (int(self.mandatory) << 0) | (int(self.immediate) << 1))) + + def get_size(self): + return len(self.exchange) + len(self.routing_key) + 5 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicPublish.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + s_len, = struct.unpack_from('!2xB', buf, offset) + offset += 3 + exchange = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + routing_key = buf[offset:offset+s_len] + offset += s_len + _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 + mandatory = bool(_bit & 1) + immediate = bool(_bit & 2) + return BasicPublish(exchange, routing_key, mandatory, immediate) class BasicQos(AMQPMethod): @@ -1882,13 +2934,24 @@ class BasicQos(AMQPMethod): CLASS = Basic NAME = u'qos' CLASSNAME = u'basic' + FULLNAME = u'basic.qos' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 10 - FULLNAME = u'basic.qos' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x0A' + BINARY_HEADER = b'\x3C\x0A' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [BasicQosOk] - BINARY_HEADER = b'\x3C\x0A' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 7 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'prefetch-size', u'long', u'long', False), # prefetch window in octets (u'prefetch-count', u'short', u'short', False), # prefetch window in messages @@ -1924,8 +2987,20 @@ class BasicQos(AMQPMethod): self.prefetch_count = prefetch_count self.global_ = global_ - def write_arguments(self, out): - out(struct.pack('!IH?', self.prefetch_size, self.prefetch_count, self.global_)) + def write_arguments(self, buf): + buf.write(struct.pack('!IH', self.prefetch_size, self.prefetch_count)) + buf.write(struct.pack('!B', (int(self.global_) << 0))) + + def get_size(self): + return 7 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicQos.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + prefetch_size, prefetch_count, _bit_0, = struct.unpack_from('!IHB', buf, offset) + global_ = bool(_bit_0 & 1) + return BasicQos(prefetch_size, prefetch_count, global_) class BasicQosOk(AMQPMethod): @@ -1939,14 +3014,25 @@ class BasicQosOk(AMQPMethod): CLASS = Basic NAME = u'qos-ok' CLASSNAME = u'basic' + FULLNAME = u'basic.qos-ok' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 11 - FULLNAME = u'basic.qos-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x0B' + BINARY_HEADER = b'\x3C\x0B' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x0B' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x3C\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = BasicQos # this is sent in response to basic.qos def __init__(self): @@ -1954,6 +3040,13 @@ class BasicQosOk(AMQPMethod): Create frame basic.qos-ok """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return BasicQosOk() + + class BasicReturn(AMQPMethod): """ Return a failed message @@ -1966,13 +3059,24 @@ class BasicReturn(AMQPMethod): CLASS = Basic NAME = u'return' CLASSNAME = u'basic' + FULLNAME = u'basic.return' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 50 - FULLNAME = u'basic.return' - SYNCHRONOUS = False + METHOD_INDEX_BINARY = b'\x32' + BINARY_HEADER = b'\x3C\x32' # CLASS ID + METHOD ID + + SYNCHRONOUS = False # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x32' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 23 # arguments part can never be shorter than this + + IS_SIZE_STATIC = False # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'reply-code', u'reply-code', u'short', False), (u'reply-text', u'reply-text', u'shortstr', False), @@ -1998,8 +3102,34 @@ class BasicReturn(AMQPMethod): self.exchange = exchange self.routing_key = routing_key - def write_arguments(self, out): - out(struct.pack('!Hppp', self.reply_code, self.reply_text, self.exchange, self.routing_key)) + def write_arguments(self, buf): + buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) + buf.write(self.reply_text) + buf.write(struct.pack('!B', len(self.exchange))) + buf.write(self.exchange) + buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(self.routing_key) + + def get_size(self): + return len(self.reply_text) + len(self.exchange) + len(self.routing_key) + 5 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicReturn.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + reply_code, s_len, = struct.unpack_from('!HB', buf, offset) + offset += 3 + reply_text = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + exchange = buf[offset:offset+s_len] + offset += s_len + s_len, = struct.unpack_from('!B', buf, offset) + offset += 1 + routing_key = buf[offset:offset+s_len] + offset += s_len + return BasicReturn(reply_code, reply_text, exchange, routing_key) class BasicReject(AMQPMethod): @@ -2013,13 +3143,24 @@ class BasicReject(AMQPMethod): CLASS = Basic NAME = u'reject' CLASSNAME = u'basic' + FULLNAME = u'basic.reject' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 90 - FULLNAME = u'basic.reject' - SYNCHRONOUS = False + METHOD_INDEX_BINARY = b'\x5A' + BINARY_HEADER = b'\x3C\x5A' # CLASS ID + METHOD ID + + SYNCHRONOUS = False # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x5A' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 9 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'delivery-tag', u'delivery-tag', u'longlong', False), (u'requeue', u'bit', u'bit', False), # requeue the message @@ -2038,8 +3179,20 @@ class BasicReject(AMQPMethod): self.delivery_tag = delivery_tag self.requeue = requeue - def write_arguments(self, out): - out(struct.pack('!L?', self.delivery_tag, self.requeue)) + def write_arguments(self, buf): + buf.write(struct.pack('!Q', self.delivery_tag)) + buf.write(struct.pack('!B', (int(self.requeue) << 0))) + + def get_size(self): + return 9 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicReject.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + delivery_tag, _bit_0, = struct.unpack_from('!QB', buf, offset) + requeue = bool(_bit_0 & 1) + return BasicReject(delivery_tag, requeue) class BasicRecoverAsync(AMQPMethod): @@ -2053,13 +3206,24 @@ class BasicRecoverAsync(AMQPMethod): CLASS = Basic NAME = u'recover-async' CLASSNAME = u'basic' + FULLNAME = u'basic.recover-async' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 100 - FULLNAME = u'basic.recover-async' - SYNCHRONOUS = False + METHOD_INDEX_BINARY = b'\x64' + BINARY_HEADER = b'\x3C\x64' # CLASS ID + METHOD ID + + SYNCHRONOUS = False # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x64' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 1 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'requeue', u'bit', u'bit', False), # requeue the message ] @@ -2076,8 +3240,19 @@ class BasicRecoverAsync(AMQPMethod): """ self.requeue = requeue - def write_arguments(self, out): - out(struct.pack('!?', self.requeue)) + def write_arguments(self, buf): + buf.write(struct.pack('!B', (int(self.requeue) << 0))) + + def get_size(self): + return 1 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicRecoverAsync.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + _bit_0, = struct.unpack_from('!B', buf, offset) + requeue = bool(_bit_0 & 1) + return BasicRecoverAsync(requeue) class BasicRecover(AMQPMethod): @@ -2091,13 +3266,24 @@ class BasicRecover(AMQPMethod): CLASS = Basic NAME = u'recover' CLASSNAME = u'basic' + FULLNAME = u'basic.recover' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 110 - FULLNAME = u'basic.recover' - SYNCHRONOUS = False + METHOD_INDEX_BINARY = b'\x6E' + BINARY_HEADER = b'\x3C\x6E' # CLASS ID + METHOD ID + + SYNCHRONOUS = False # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x6E' - IS_SIZE_STATIC = False + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 1 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = False # this means that argument part has always the same content FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved) (u'requeue', u'bit', u'bit', False), # requeue the message ] @@ -2114,8 +3300,19 @@ class BasicRecover(AMQPMethod): """ self.requeue = requeue - def write_arguments(self, out): - out(struct.pack('!?', self.requeue)) + def write_arguments(self, buf): + buf.write(struct.pack('!B', (int(self.requeue) << 0))) + + def get_size(self): + return 1 + + @staticmethod + def from_buffer(buf, start_offset): + assert (len(buf) - start_offset) >= BasicRecover.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes + _bit_0, = struct.unpack_from('!B', buf, offset) + requeue = bool(_bit_0 & 1) + return BasicRecover(requeue) class BasicRecoverOk(AMQPMethod): @@ -2127,20 +3324,38 @@ class BasicRecoverOk(AMQPMethod): CLASS = Basic NAME = u'recover-ok' CLASSNAME = u'basic' + FULLNAME = u'basic.recover-ok' + CLASS_INDEX = 60 + CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 111 - FULLNAME = u'basic.recover-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x6F' + BINARY_HEADER = b'\x3C\x6F' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x3C\x6F' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x3C\x6F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ Create frame basic.recover-ok """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return BasicRecoverOk() + + class Tx(AMQPClass): """ The tx class allows publish and ack operations to be batched into atomic @@ -2168,20 +3383,38 @@ class TxCommit(AMQPMethod): CLASS = Tx NAME = u'commit' CLASSNAME = u'tx' + FULLNAME = u'tx.commit' + CLASS_INDEX = 90 + CLASS_INDEX_BINARY = b'\x5A' METHOD_INDEX = 20 - FULLNAME = u'tx.commit' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x14' + BINARY_HEADER = b'\x5A\x14' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [TxCommitOk] - BINARY_HEADER = b'\x5A\x14' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x14\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ Create frame tx.commit """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return TxCommit() + + class TxCommitOk(AMQPMethod): """ Confirm a successful commit @@ -2192,14 +3425,25 @@ class TxCommitOk(AMQPMethod): CLASS = Tx NAME = u'commit-ok' CLASSNAME = u'tx' + FULLNAME = u'tx.commit-ok' + CLASS_INDEX = 90 + CLASS_INDEX_BINARY = b'\x5A' METHOD_INDEX = 21 - FULLNAME = u'tx.commit-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x15' + BINARY_HEADER = b'\x5A\x15' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x5A\x15' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = TxCommit # this is sent in response to tx.commit def __init__(self): @@ -2207,6 +3451,13 @@ class TxCommitOk(AMQPMethod): Create frame tx.commit-ok """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return TxCommitOk() + + class TxRollback(AMQPMethod): """ Abandon the current transaction @@ -2219,20 +3470,38 @@ class TxRollback(AMQPMethod): CLASS = Tx NAME = u'rollback' CLASSNAME = u'tx' + FULLNAME = u'tx.rollback' + CLASS_INDEX = 90 + CLASS_INDEX_BINARY = b'\x5A' METHOD_INDEX = 30 - FULLNAME = u'tx.rollback' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x1E' + BINARY_HEADER = b'\x5A\x1E' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [TxRollbackOk] - BINARY_HEADER = b'\x5A\x1E' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x1E\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ Create frame tx.rollback """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return TxRollback() + + class TxRollbackOk(AMQPMethod): """ Confirm successful rollback @@ -2243,14 +3512,25 @@ class TxRollbackOk(AMQPMethod): CLASS = Tx NAME = u'rollback-ok' CLASSNAME = u'tx' + FULLNAME = u'tx.rollback-ok' + CLASS_INDEX = 90 + CLASS_INDEX_BINARY = b'\x5A' METHOD_INDEX = 31 - FULLNAME = u'tx.rollback-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x1F' + BINARY_HEADER = b'\x5A\x1F' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x5A\x1F' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x1F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = TxRollback # this is sent in response to tx.rollback def __init__(self): @@ -2258,6 +3538,13 @@ class TxRollbackOk(AMQPMethod): Create frame tx.rollback-ok """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return TxRollbackOk() + + class TxSelect(AMQPMethod): """ Select standard transaction mode @@ -2268,20 +3555,38 @@ class TxSelect(AMQPMethod): CLASS = Tx NAME = u'select' CLASSNAME = u'tx' + FULLNAME = u'tx.select' + CLASS_INDEX = 90 + CLASS_INDEX_BINARY = b'\x5A' METHOD_INDEX = 10 - FULLNAME = u'tx.select' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x0A' + BINARY_HEADER = b'\x5A\x0A' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [TxSelectOk] - BINARY_HEADER = b'\x5A\x0A' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = True + SENT_BY_SERVER = False + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x0A\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END def __init__(self): """ Create frame tx.select """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return TxSelect() + + class TxSelectOk(AMQPMethod): """ Confirm transaction mode @@ -2292,14 +3597,25 @@ class TxSelectOk(AMQPMethod): CLASS = Tx NAME = u'select-ok' CLASSNAME = u'tx' + FULLNAME = u'tx.select-ok' + CLASS_INDEX = 90 + CLASS_INDEX_BINARY = b'\x5A' METHOD_INDEX = 11 - FULLNAME = u'tx.select-ok' - SYNCHRONOUS = True + METHOD_INDEX_BINARY = b'\x0B' + BINARY_HEADER = b'\x5A\x0B' # CLASS ID + METHOD ID + + SYNCHRONOUS = True # does this message imply other one? REPLY_WITH = [] - BINARY_HEADER = b'\x5A\x0B' - IS_SIZE_STATIC = True - STATIC_ARGUMENT_SIZE = 0 # length of arguments (as binary) is constant here + + SENT_BY_CLIENT = False + SENT_BY_SERVER = True + + MINIMUM_SIZE = 0 # arguments part can never be shorter than this + + IS_SIZE_STATIC = True # this means that argument part has always the same length + IS_CONTENT_STATIC = True # this means that argument part has always the same content + STATIC_CONTENT = b'\x00\x00\x00\x04\x5A\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END RESPONSE_TO = TxSelect # this is sent in response to tx.select def __init__(self): @@ -2307,6 +3623,13 @@ class TxSelectOk(AMQPMethod): Create frame tx.select-ok """ + # not generating write_arguments - this method has static content! + + @staticmethod + def from_buffer(buf, start_offset): + return TxSelectOk() + + IDENT_TO_METHOD = { (90, 21): TxCommitOk, (60, 100): BasicRecoverAsync, @@ -2363,3 +3686,60 @@ IDENT_TO_METHOD = { (50, 10): QueueDeclare, } + +BINARY_HEADER_TO_METHOD = { + b'\x5A\x15': TxCommitOk, + b'\x3C\x64': BasicRecoverAsync, + b'\x0A\x0B': ConnectionStartOk, + b'\x3C\x28': BasicPublish, + b'\x3C\x32': BasicReturn, + b'\x0A\x33': ConnectionCloseOk, + b'\x14\x14': ChannelFlow, + b'\x3C\x15': BasicConsumeOk, + b'\x0A\x15': ConnectionSecureOk, + b'\x5A\x1E': TxRollback, + b'\x5A\x0A': TxSelect, + b'\x32\x0B': QueueDeclareOk, + b'\x3C\x46': BasicGet, + b'\x5A\x0B': TxSelectOk, + b'\x0A\x1E': ConnectionTune, + b'\x3C\x0B': BasicQosOk, + b'\x3C\x50': BasicAck, + b'\x14\x15': ChannelFlowOk, + b'\x3C\x3C': BasicDeliver, + b'\x5A\x1F': TxRollbackOk, + b'\x14\x28': ChannelClose, + b'\x3C\x47': BasicGetOk, + b'\x32\x1E': QueuePurge, + b'\x0A\x1F': ConnectionTuneOk, + b'\x0A\x28': ConnectionOpen, + b'\x3C\x1E': BasicCancel, + b'\x32\x32': QueueUnbind, + b'\x28\x0A': ExchangeDeclare, + b'\x0A\x32': ConnectionClose, + b'\x14\x0A': ChannelOpen, + b'\x14\x29': ChannelCloseOk, + b'\x3C\x6E': BasicRecover, + b'\x3C\x5A': BasicReject, + b'\x32\x1F': QueuePurgeOk, + b'\x32\x28': QueueDelete, + b'\x28\x14': ExchangeDelete, + b'\x32\x14': QueueBind, + b'\x0A\x29': ConnectionOpenOk, + b'\x3C\x1F': BasicCancelOk, + b'\x5A\x14': TxCommit, + b'\x0A\x0A': ConnectionStart, + b'\x3C\x0A': BasicQos, + b'\x28\x0B': ExchangeDeclareOk, + b'\x28\x15': ExchangeDeleteOk, + b'\x14\x0B': ChannelOpenOk, + b'\x3C\x48': BasicGetEmpty, + b'\x3C\x6F': BasicRecoverOk, + b'\x3C\x14': BasicConsume, + b'\x0A\x14': ConnectionSecure, + b'\x32\x29': QueueDeleteOk, + b'\x32\x33': QueueUnbindOk, + b'\x32\x15': QueueBindOk, + b'\x32\x0A': QueueDeclare, +} + diff --git a/coolamqp/framing/serialization.py b/coolamqp/framing/serialization.py index 1aacd3177cfc98a5e36aa5f6ec7332d9b6550c71..db550b153237a389e65f94a1cdff811e1efb2e20 100644 --- a/coolamqp/framing/serialization.py +++ b/coolamqp/framing/serialization.py @@ -3,11 +3,31 @@ from __future__ import absolute_import, division, print_function import struct """bytes <-> Frame's""" -from coolamqp.framing.definitions import FRAME_END +from coolamqp.framing.frames import IDENT_TO_METHOD, FRAME_END, FRAME_METHOD -HDR = b'AMQP\x01\x01\x00\x09' +HDR = b'AMQP\x00\x00\x00\x01' +class AMQPFrame(object): + def __init__(self, type_, channel, payload): + + + +def serialize_method(channel, method): + """Return an AMQP frame""" + strs = [] + method.write(lambda s: strs.append(s)) + payload_len = sum(len(x) for x in strs) + return b''.join([struct.pack('!BHI', FRAME_METHOD, channel, payload_len)] + strs + [chr(FRAME_END)]) + +def try_frame(buf, offset): + """ + Try to make sense out of a buffer and decode a frame. + :param buf: + :param offset: + :return: + """ + def to_packet(type_, channel, payload, size=None, frame_end=None): """ diff --git a/coolamqp/framing/streams/__init__.py b/coolamqp/framing/streams/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..c71857bcd11230186abcb534f8128eaf15053c2f --- /dev/null +++ b/coolamqp/framing/streams/__init__.py @@ -0,0 +1,11 @@ +# coding=UTF-8 +""" +Classes that allow to receive and send frames in a rapid way +""" +from __future__ import absolute_import, division, print_function +import socket +import collections + +from coolamqp.framing.streams.exceptions import StreamIsDead + + diff --git a/coolamqp/framing/streams/exceptions.py b/coolamqp/framing/streams/exceptions.py new file mode 100644 index 0000000000000000000000000000000000000000..ec40452acd255c243e9374ebeb870c4d12bae07f --- /dev/null +++ b/coolamqp/framing/streams/exceptions.py @@ -0,0 +1,12 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + + + + +class FormatterError(Exception): + """Exceptions for formatters""" + + +class InvalidDataError(Exception): + """Formatter encountered an invalid data""" \ No newline at end of file diff --git a/coolamqp/framing/streams/receiver.py b/coolamqp/framing/streams/receiver.py new file mode 100644 index 0000000000000000000000000000000000000000..fe005676f671871dc8316b30bc53069a049fe6dc --- /dev/null +++ b/coolamqp/framing/streams/receiver.py @@ -0,0 +1,91 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import struct +import io +import six +import collections +import socket + + +from coolamqp.framing.streams.exceptions import InvalidDataError + + +class ReceivingFormatter(object): + """ + Assembles AMQP frames from received data. + + Just call with .put(data) and get frames by + iterator .frames(). + + Not thread safe. + """ + def __init__(self, sock): + self.chunks = collections.deque() # all received data + self.total_data_len = 0 + self.header = None # or type, channel, size + self.frames = collections.deque() # for + + def receive_from_socket(self, data): + self.total_data_len += len(data) + self.chunks.append(buffer(data)) + + def get_frames(self): + """ + An iterator to return frames pending for read. + + :raises ValueError: + :return: + """ + + def __got_frame(self, type_, channel, payload, frame_end): + """ + New frame! + + Try decoding + """ + + + def __statemachine(self): + if self.header is None and self.total_data_len > 7: + a = bytearray() + while len(a) < 7: + if len(self.chunks[0]) <= (len(a) - 7): # we will need a next one + a.extend(self.chunks.popleft()) + else: + a.extend(self.chunks[0:len(a-7)]) + self.chunks[0] = buffer(self.chunks[0], len(a)-7) + self.header = struct.unpack('!BHI', a) + + if (self.header is not None) and self.total_data_len >= (self.header[2]+1): + if len(self.chunks[0]) > self.header[2]+1: + # We can subslice it - it's very fast + payload = buffer(self.chunks[0], self.header[2]) + frame_end = self.chunks[self.header[2]] + self.chunks[0] = buffer(self.chunks[0], self.header[2]+1) + + else: + # Construct a separate buffer :( + payload = io.BytesIO() + while payload.tell() < self.header[2]: + remaining = self.header[2] - payload.tell() + + if remaining >= self.chunks[0]: + chunk = self.chunks.popleft() + payload.write(self.chunks.popleft()) + self.total_data_len -= len(chunk) + else: + self.total_data_len -= remaining + payload.write(buffer(self.chunks[0], 0, remaining)) + self.chunks[0] = buffer(self.chunks[0], remaining) + + # Get last byte + if len(self.chunks[0]) == 1: + frame_end = self.chunks.pop()[0] + else: + frame_end = self.chunks[0][0] + self.chunks[0] = buffer(self.chunks[0], 1) + + self.__got_frame(self.header[0], self.header[1], payload.getvalue(), frame_end) + self.header = None + + diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 2c97858213765d5d217c37277d50ec8c467b7e61..7375569ed16127918e573cf1204aa1befb062305 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -87,7 +87,7 @@ class Order(object): """Return whether the operation failed, ie. completed but with an error code. Cancelled and discarded ops are considered failed. This assumes that this order has been .wait()ed upon""" - return self._result is True + return self._result is not True def result(self): """Wait until this is completed and return a response""" diff --git a/utils/compdefs.py b/utils/compdefs.py index 357fc58fea41913e389dd06716667bf05d84b6cf..13c5a0a527c3757fcf8192021dbfbc39f7b6886a 100644 --- a/utils/compdefs.py +++ b/utils/compdefs.py @@ -1,7 +1,9 @@ +from __future__ import division from xml.etree import ElementTree import collections import struct import six +import math from getp import get_constants, get_classes, get_domains, byname, name_class, name_method, name_field, \ BASIC_TYPES @@ -16,6 +18,16 @@ def frepr(p, sop=six.text_type): else: return s +def as_nice_escaped_string(p): + body = [] + for q in p: + z = (hex(ord(q))[2:].upper()) + if len(z) == 1: + z = u'0' + z + body.append(u'\\x' + z) + return u"b'"+(u''.join(body))+u"'" + + def normname(p): return p.strip().replace('-', '_').upper() @@ -54,14 +66,13 @@ def ffmt(data, *args, **kwargs): data = data.replace('%s', op(arg), 1) return data -def compile_definitions(xml_file='resources/amqp0-9-1.xml', out_file='coolamqp/framing/definitions.py'): +def compile_definitions(xml_file='../resources/amqp0-9-1.xml', out_file='../coolamqp/framing/frames/machine.py'): """parse resources/amqp-0-9-1.xml into """ xml = ElementTree.parse(xml_file) + out = open(out_file, 'wb') - with open(out_file, 'wb') as out: - - out.write('''# coding=UTF-8 + out.write('''# coding=UTF-8 from __future__ import print_function, absolute_import """ A Python version of the AMQP machine-readable specification. @@ -75,61 +86,50 @@ CoolAMQP is copyright (c) 2016 DMS Serwis s.c. import struct -''') - - def line(data, *args, **kwargs): - out.write(ffmt(data, *args, sane=True)) - - # Output core ones - line('# Core frame types\n') - for constant in get_constants(xml): - g = ffmt('%s = %s', constant.name.strip().replace('-', '_').upper(), constant.value) - line(g) - if constant.docs: - lines = constant.docs.split('\n') - line(' # %s\n', lines[0]) - if len(lines) > 1: - for ln in lines[1:]: - line(u' '*len(g)) - line(u' # %s\n', ln) - else: - line('\n') - - # get domains - domain_to_basic_type = {} - line('DOMAIN_TO_BASIC_TYPE = {\n') - for domain in get_domains(xml): - line(u' %s: %s,\n', frepr(domain.name), frepr(None if domain.elementary else domain.type)) - if not domain.elementary: - domain_to_basic_type[domain.name] = domain.type - - line('}\n') - - line(''' - -class AMQPClass(object): - pass - - -class AMQPMethod(object): - RESPONSE_TO = None - REPLY_WITH = [] - FIELDS = [] - - def write_arguments(self, out): - """ - Write the argument portion of this frame into out. - - :param out: a callable that will be invoked (possibly many times) with - parts of the arguments section. - :type out: callable(part_of_frame: binary type) -> nevermind - """ +from coolamqp.framing.frames.base import AMQPClass, AMQPMethod, _enframe_table, _deframe_table, _frame_table_size ''') - # Output classes - for cls in get_classes(xml): - line('''\nclass %s(AMQPClass): + def line(data, *args, **kwargs): + out.write(ffmt(data, *args, sane=True)) + + # Output core ones + FRAME_END = None + con_classes = collections.defaultdict(list) + line('# Core constants\n') + for constant in get_constants(xml): + if normname(constant.name) == 'FRAME_END': + FRAME_END = constant.value + g = ffmt('%s = %s', normname(constant.name), constant.value) + line(g) + if constant.docs: + lines = constant.docs.split('\n') + line(' # %s\n', lines[0]) + if len(lines) > 1: + for ln in lines[1:]: + line(u' '*len(g)) + line(u' # %s\n', ln) + else: + line('\n') + + if constant.kind: + con_classes[constant.kind].append(normname(constant.name)) + + for constant_kind, constants in con_classes.items(): + line('\n%s = [%s]', normname(constant_kind), u', '.join(constants)) + + # get domains + domain_to_basic_type = {} + line('\n\n\nDOMAIN_TO_BASIC_TYPE = {\n') + for domain in get_domains(xml): + line(u' %s: %s,\n', frepr(domain.name), frepr(None if domain.elementary else domain.type)) + domain_to_basic_type[domain.name] = domain.type + + line('}\n') + + # Output classes + for cls in get_classes(xml): + line('''\nclass %s(AMQPClass): """ %s """ @@ -138,161 +138,407 @@ class AMQPMethod(object): ''', name_class(cls.name), doxify(None, cls.docs), frepr(cls.name), cls.index) - for method in cls.methods: - is_static = method.is_static(domain_to_basic_type) - if is_static: - static_size = method.get_size(domain_to_basic_type) + for method in cls.methods: + full_class_name = '%s%s' % (name_class(cls.name), name_method(method.name)) + + # annotate types + method.fields = [field._replace(basic_type=domain_to_basic_type[field.type]) for field in method.fields] + + is_static = method.is_static() + if is_static: + static_size = method.get_size() + + is_content_static = len([f for f in method.fields if not f.reserved]) == 0 - line('''\nclass %s%s(AMQPMethod): + line('''\nclass %s(AMQPMethod): """ %s """ CLASS = %s NAME = %s CLASSNAME = %s + FULLNAME = %s + CLASS_INDEX = %s + CLASS_INDEX_BINARY = %s METHOD_INDEX = %s - FULLNAME = %s - SYNCHRONOUS = %s + METHOD_INDEX_BINARY = %s + BINARY_HEADER = %s # CLASS ID + METHOD ID + + SYNCHRONOUS = %s # does this message imply other one? REPLY_WITH = [%s] - BINARY_HEADER = b'%s' - IS_SIZE_STATIC = %s + + SENT_BY_CLIENT = %s + SENT_BY_SERVER = %s + + MINIMUM_SIZE = %s # arguments part can never be shorter than this + + IS_SIZE_STATIC = %s # this means that argument part has always the same length + IS_CONTENT_STATIC = %s # this means that argument part has always the same content ''', - name_class(cls.name), name_method(method.name), - doxify(method.label, method.docs), - name_class(cls.name), - frepr(method.name), - frepr(cls.name), - frepr(cls.index), - frepr(method.index), - frepr(cls.name + '.' + method.name), - repr(method.synchronous), - u', '.join([name_class(cls.name)+name_method(kidname) for kidname in method.response]), - u''.join(map(lambda x: u'\\x'+(('0'+hex(x)[2:] if x < 16 else hex(x)[2:]).upper()), - [cls.index, method.index])), - repr(is_static) - ) - - # Static size - if is_static: - line(' STATIC_ARGUMENT_SIZE = %s # length of arguments (as binary) is constant here\n', static_size) - - # Am I a response somewhere? - for paren in cls.methods: - if method.name in paren.response: - line(' RESPONSE_TO = %s%s # this is sent in response to %s\n', name_class(cls.name), name_method(paren.name), - cls.name+'.'+paren.name - ) + full_class_name, + doxify(method.label, method.docs), + name_class(cls.name), + frepr(method.name), + frepr(cls.name), + frepr(cls.name + '.' + method.name), + frepr(cls.index), + as_nice_escaped_string(chr(cls.index)), + frepr(method.index), + as_nice_escaped_string(chr(method.index)), + as_nice_escaped_string(chr(cls.index)+chr(method.index)), + repr(method.synchronous), + u', '.join([name_class(cls.name)+name_method(kidname) for kidname in method.response]), + repr(method.sent_by_client), + repr(method.sent_by_server), + repr(method.get_minimum_size(domain_to_basic_type)), + repr(is_static), + repr(is_content_static) + ) + + + if is_content_static: + + line(''' STATIC_CONTENT = %s # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END +''', + as_nice_escaped_string(struct.pack('!LBB', static_size+4, cls.index, method.index)+\ + method.get_static_body()+\ + struct.pack('!B', FRAME_END))) - # fields - if len(method.fields) > 0: - line(' FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved)') + # Am I a response somewhere? + for paren in cls.methods: + if method.name in paren.response: + line(' RESPONSE_TO = %s%s # this is sent in response to %s\n', name_class(cls.name), name_method(paren.name), + cls.name+'.'+paren.name + ) - for field in method.fields: - tp = field.type - while tp in domain_to_basic_type: - tp = domain_to_basic_type[tp] + # fields + if len(method.fields) > 0: + line(' FIELDS = [ # tuples of (field name, field domain, basic type used, is_reserved)') - line('\n (%s, %s, %s, %s), ', frepr(field.name), frepr(field.type), frepr(tp), repr(field.reserved)) - if field.label: - line(' # '+field.label) + for field in method.fields: + line('\n (%s, %s, %s, %s), ', frepr(field.name), frepr(field.type), + frepr(field.basic_type), repr(field.reserved)) + if field.label: + line(' # '+field.label) - line('\n ]\n') + line('\n ]\n') - non_reserved_fields = [field for field in method.fields if not field.reserved] + non_reserved_fields = [field for field in method.fields if not field.reserved] - # constructor - line('''\n def __init__(%s): + # constructor + line('''\n def __init__(%s): """ Create frame %s ''', - u', '.join(['self'] + [name_field(field.name) for field in non_reserved_fields]), - cls.name + '.' + method.name, - ) + u', '.join(['self'] + [name_field(field.name) for field in non_reserved_fields]), + cls.name + '.' + method.name, + ) - if len(non_reserved_fields) > 0: - line('\n') - for field in non_reserved_fields: - tp = field.type - while tp in domain_to_basic_type: - tp = domain_to_basic_type[tp] - - if (field.label is not None) or (field.docs is not None): - line(' :param %s: %s\n', name_field(field.name), - doxify(field.label, field.docs, prefix=12, blank=False)) - line(' :type %s: %s (as %s)\n', name_field(field.name), field.type, tp) + if len(non_reserved_fields) > 0: + line('\n') + for field in non_reserved_fields: + if (field.label is not None) or (field.docs is not None): + line(' :param %s: %s\n', name_field(field.name), + doxify(field.label, field.docs, prefix=12, blank=False)) + line(' :type %s: %s (as %s)\n', name_field(field.name), field.type, field.basic_type) - line(' """\n') + line(' """\n') - for field in non_reserved_fields: - line(' self.%s = %s\n', name_field(field.name), name_field(field.name)) + for field in non_reserved_fields: + line(' self.%s = %s\n', name_field(field.name), name_field(field.name)) + if len(non_reserved_fields) == 0: + line('\n') - # end - if len(method.fields) > 0: - line('''\n def write_arguments(self, out): + # end + if not is_content_static: + line('''\n def write_arguments(self, buf): ''') - def emit_structs(su): - if len(su) == 0: - return - line(" out(struct.pack('!") - line(''.join(a for a, b in su)) - line("', ") - line(', '.join(b for a, b in su)) - line('))\n') - - good_structs = [] - written = False + def emit_structs(su): + if len(su) == 0: + return + line(" buf.write(struct.pack('!") + line(''.join(a for a, b in su)) + line("', ") + line(', '.join(b for a, b in su)) + line('))\n') + + def emit_bits(bits): + bits = [b for b in bits if b != '0'] # reserved values are out :> + + line(" buf.write(struct.pack('!B', %s))\n", + u' | '.join((u'(int(%s) << %s)' % (bit, position)) for position, bit in enumerate(bits)) + ) + + good_structs = [] + written = False + bits = [] + for field in method.fields: + val = 'self.' + name_field(field.name) if not field.reserved else BASIC_TYPES[field.basic_type][2] + + if (len(bits) == 8) or ((field.basic_type != 'bit') and len(bits) > 0): + emit_bits(bits) + bits = [] + written = True + + if field.basic_type == 'bit': + bits.append(val) + elif field.reserved: + line(" buf.write("+BASIC_TYPES[field.basic_type][2]+")\n") + written = True + continue + elif BASIC_TYPES[field.basic_type][1] is None: + # struct can't do it + + if field.basic_type == 'longstr': + good_structs.append(('L', 'len(%s)' % (val, ))) + + elif field.basic_type == 'shortstr': + good_structs.append(('B', 'len(%s)' % (val, ))) + + emit_structs(good_structs) + good_structs = [] + + if field.basic_type == 'table': + line(' _enframe_table(buf, %s)\n' % (val, )) + written = True + else: + # emit ours + line(' buf.write('+val+')\n') + written = True + else: + # special case - empty string + if field.basic_type == 'shortstr' and field.reserved: + continue # just skip :) + + val = ('self.'+name_field(field.name)) if not field.reserved else frepr(BASIC_TYPES[field.basic_type][2], sop=six.binary_type) + + good_structs.append((BASIC_TYPES[field.basic_type][1], val)) + written = True + written = written or len(good_structs) > 0 + emit_structs(good_structs) + if len(bits) > 0: + emit_bits(bits) + written = True + bits = [] + + if not written: + line(' pass # this has a frame, but it''s only default shortstrs\n') + line('\n') + + line(' def get_size(self):\n return ') + parts = [] + accumulator = 0 + bits = 0 + for field in method.fields: + bt = field.basic_type + + if (bits > 0) and (bt != 'bit'): # sync bits if not + accumulator += int(math.ceil(bits / 8)) + bits = 0 + + if field.basic_type == 'bit': + bits += 1 + elif field.reserved: + accumulator += BASIC_TYPES[field.basic_type][3] + elif BASIC_TYPES[bt][0] is not None: + accumulator += BASIC_TYPES[field.basic_type][0] + elif bt == 'shortstr': + parts.append('len(self.'+name_field(field.name)+')') + accumulator += 1 + elif bt == 'longstr': + parts.append('len(self.'+name_field(field.name)+')') + accumulator += 4 + elif bt == 'table': + parts.append('_frame_table_size(self.'+name_field(field.name)+')') + accumulator += 4 + else: + raise Exception() + + if bits > 0: # sync bits + accumulator += int(math.ceil(bits / 8)) + bits = 0 + + parts.append(repr(accumulator)) + line(u' + '.join(parts)) + line('\n') + else: + line(' # not generating write_arguments - this method has static content!\n') + line('\n') + + line(''' @staticmethod + def from_buffer(buf, start_offset): +''', + full_class_name) + + if is_content_static: + line(" return %s()\n\n", full_class_name) + else: + line(""" assert (len(buf) - start_offset) >= %s.MINIMUM_SIZE, 'Frame too short!' + offset = start_offset # we will use it to count consumed bytes +""", + full_class_name) + # The simple, or the painful way? + has_nonstruct_fields = False + for field in method.fields: + if BASIC_TYPES[field.basic_type][1] is None: + has_nonstruct_fields = True + + if len(method.fields) == 0: + line(' return %s(), 0\n', full_class_name) + elif is_static: + fieldnames = [] + formats = [] + + bits = [] + bit_id = 0 + bits_to_sync_later = {} # bit_0 => [fLSB, fMSB] + for field in method.fields: - if field.type not in BASIC_TYPES: - tp = domain_to_basic_type[field.type] + if field.basic_type == 'bit': + bits.append(None if field.reserved else name_field(field.name)) + + if len(bits) == 8: + fieldnames.append('_bit_%s' % (bit_id, )) + formats.append('B') + bits_to_sync_later['_bit_%s' % (bit_id, )] = bits + bits = [] + bit_id += 1 + + elif field.reserved: + formats.append('%sx' % (BASIC_TYPES[field.basic_type][0],)) else: - tp = field.type + fieldnames.append(name_field(field.name)) + formats.append(BASIC_TYPES[field.basic_type][1]) - if BASIC_TYPES[tp][1] is None: - # struct can't do it + # sync bits + if len(bits) > 0: + fieldnames.append('_bit_%s' % (bit_id,)) + formats.append('B') + bits_to_sync_later['_bit_%s' % (bit_id,)] = bits - if tp == 'longstr': - good_structs.append(('L', 'len(self.'+name_field(field.name)+')')) - written = True + line(" %s, = struct.unpack_from('!%s', buf, offset)\n", + u', '.join(fieldnames), + u''.join(formats) + ) - emit_structs(good_structs) - good_structs = [] + # If there were any bits, unpack them now + for var_name, bits in bits_to_sync_later.items(): + for bitname, multiplier in zip(bits, (1, 2, 4, 8, 16, 32, 64, 128)): + line(" %s = bool(%s & %s)\n", bitname, var_name, multiplier) - # emit ours - if tp == 'longstr': - line(' out(self.'+name_field(field.name)+')\n') - written = True + + line(" return %s(%s)", full_class_name, u', '.join([ + name_field(field.name) for field in method.fields if not field.reserved + ])) + + else: + def emit_bits(bits): + + if all(n == '_' for n in bits): + # everything is reserved, lol + line(""" offset += 1 +""") + return + + line(""" _bit, = struct.unpack_from('!B', buf, offset) + offset += 1 +""") + for bit, multiplier in zip(bits, (1,2,4,8,16,32,64,128)): + if bit != '_': + line(""" %s = bool(_bit & %s) +""", + bit, multiplier) + + def emit_structures(ss, ln): + line(""" %s, = struct.unpack_from('!%s', buf, offset) + offset += %s +""", + u', '.join([a[0] for a in ss if not (a[0] == '_' and a[1][-1] == 'x')]), + ''.join([a[1] for a in ss]), + ln + ) + + # we'll be counting bytes + to_struct = [] # accumulate static field, (var name, struct_code) + cur_struct_len = 0 # length of current struct + + bits = [] + bit_id = 0 + + for field in method.fields: + fieldname = '_' if field.reserved else name_field(field.name) + + if (len(bits) > 0) and (field.basic_type != 'bit'): + emit_bits(bits) + bits = [] + + # offset is current start + # length is length to read + if BASIC_TYPES[field.basic_type][0] is not None: + if field.reserved: + to_struct.append(('_', '%sx' % (BASIC_TYPES[field.basic_type][0],))) + else: + to_struct.append((fieldname, BASIC_TYPES[field.basic_type][1])) + cur_struct_len += BASIC_TYPES[field.basic_type][0] + elif field.basic_type == 'bit': + bits.append(fieldname) else: - # special case - empty string - if tp == 'shortstr' and field.reserved: - continue # just skip :) + if field.basic_type == 'table': # oh my god + line(""" %s, delta = _deframe_table(buf, offset) + offset += delta +""", name_field(field.name)) + else: # longstr or shortstr + f_q, f_l = ('L', 4) if field.basic_type == 'longstr' else ('B', 1) + to_struct.append(('s_len', f_q)) + cur_struct_len += f_l + emit_structures(to_struct, cur_struct_len) + to_struct, cur_struct_len = [], 0 + if field.reserved: + line(" offset += s_len\n") + else: + line(" %s = buf[offset:offset+s_len]\n offset += s_len\n", + fieldname) + + # check bits for overflow + if len(bits) == 8: + emit_bits(bits) + bits = [] + + if len(bits) > 0: + emit_bits(bits) + elif len(to_struct) > 0: + emit_structures(to_struct, cur_struct_len) - val = 'self.'+name_field(field.name) if not field.reserved else frepr(BASIC_TYPES[tp][2], sop=six.binary_type) - good_structs.append((BASIC_TYPES[tp][1], val)) - written = True - written = written and len(good_structs) > 0 - emit_structs(good_structs) - if not written: - line(' pass # this has a frame, but it''s only default shortstrs\n') - line('\n') + line(" return %s(%s)", + full_class_name, + u', '.join(name_field(field.name) for field in method.fields if not field.reserved)) - # Get me a dict - (classid, methodid) => class of method - dct = {} - for cls in get_classes(xml): - for method in cls.methods: - dct[((cls.index, method.index))] = '%s%s' % (name_class(cls.name), name_method(method.name)) + line('\n\n') + + + # Get me a dict - (classid, methodid) => class of method + dct = {} + for cls in get_classes(xml): + for method in cls.methods: + dct[((cls.index, method.index))] = '%s%s' % (name_class(cls.name), name_method(method.name)) - line('\nIDENT_TO_METHOD = {\n') - for k, v in dct.items(): - line(' %s: %s,\n', repr(k), v) - line('}\n\n') + line('\nIDENT_TO_METHOD = {\n') + for k, v in dct.items(): + line(' %s: %s,\n', repr(k), v) + line('}\n\n') + line('\nBINARY_HEADER_TO_METHOD = {\n') + for k, v in dct.items(): + line(' %s: %s,\n', as_nice_escaped_string(struct.pack('!BB', *k)), v) + line('}\n\n') + out.close() if __name__ == '__main__': compile_definitions() diff --git a/utils/getp.py b/utils/getp.py index cafc64868e3290732c94d459f7730e98bd206f11..d1f0613a8e84de15725cade7b2dce83491b8e050 100644 --- a/utils/getp.py +++ b/utils/getp.py @@ -2,32 +2,25 @@ from __future__ import absolute_import, division, print_function from collections import namedtuple import six +import math + +from coolamqp.framing.frames.base import BASIC_TYPES, DYNAMIC_BASIC_TYPES # docs may be None Constant = namedtuple('Constant', ('name', 'value', 'kind', 'docs')) # kind is AMQP constant class # value is int -Field = namedtuple('Field', ('name', 'type', 'label', 'docs', 'reserved')) # reserved is bool -Method = namedtuple('Method', ('name', 'synchronous', 'index', 'label', 'docs', 'fields', 'response')) - # synchronous is bool +Field = namedtuple('Field', ('name', 'type', 'label', 'docs', 'reserved', 'basic_type')) # reserved is bool +Method = namedtuple('Method', ('name', 'synchronous', 'index', 'label', 'docs', 'fields', 'response', + 'sent_by_client', 'sent_by_server', 'constant')) + # synchronous is bool, constant is bool # repponse is a list of method.name Class_ = namedtuple('Class_', ('name', 'index', 'docs', 'methods')) # label is int Domain = namedtuple('Domain', ('name', 'type', 'elementary')) # elementary is bool - # name => (length|None, struct ID|None, reserved-field-value : for struct if structable, bytes else) -BASIC_TYPES = {'bit': (1, '?', 0), - 'octet': (1, 'B', 0), - 'short': (2, 'H', 0), - 'long': (4, 'I', 0), - 'longlong': (8, 'L', 0), - 'timestamp': (8, 'L', 0), - 'table': (None, None, b'\x00\x00\x00\x00'), - 'longstr': (None, None, b'\x00\x00\x00\x00'), - 'shortstr': (None, 'p', '') - } class Method(object): - def __init__(self, name, synchronous, index, label, docs, fields, response): + def __init__(self, name, synchronous, index, label, docs, fields, response, sent_by_client, sent_by_server): self.name = name self.synchronous = synchronous self.index = index @@ -35,25 +28,56 @@ class Method(object): self.response = response self.label = label self.docs = docs + self.sent_by_client = sent_by_client + self.sent_by_server = sent_by_server + + self.constant = len([f for f in self.fields if not f.reserved]) == 0 + + def get_static_body(self): # only arguments part + body = [] + bits = 0 + for field in self.fields: + + if bits > 0 and field.basic_type != 'bit': + body.append(b'\x00' * math.ceil(bits / 8)) + bits = 0 + + if field.basic_type == 'bit': + bits += 1 + else: + body.append(eval(BASIC_TYPES[field.basic_type][2])) + return b''.join(body) - def get_size(self, domain_to_type): # for static methods + def get_size(self, domain_to_type=None): # for static methods size = 0 + bits = 0 for field in self.fields: - tp = field.type - while tp in domain_to_type: - tp = domain_to_type[tp] - if BASIC_TYPES[tp] is None: - raise TypeError() - size += BASIC_TYPES[tp] + + if (bits > 0) and (field.basic_type != 'bit'): # sync bits + size += int(math.ceil(bits / 8)) + bits = 0 + + if BASIC_TYPES[field.basic_type][0] is None: + if field.basic_type == 'bit': + bits += 1 + else: + size += len(BASIC_TYPES[field.basic_type][2]) # default minimum entry + else: + size += BASIC_TYPES[field.basic_type][0] + + if bits > 0: # sync bits + size += int(math.ceil(bits / 8)) + return size - def is_static(self, domain_to_type): # is size constant? - try: - self.get_size(domain_to_type) - except TypeError: - return False + def is_static(self, domain_to_type=None): # is size constant? + for field in self.fields: + if field.basic_type in DYNAMIC_BASIC_TYPES: + return False return True + def get_minimum_size(self, domain_to_type=None): + return self.get_size() def get_docs(elem): @@ -78,14 +102,18 @@ def for_method_field(elem): # for <field> in <method> return Field(six.text_type(a['name']), a['domain'] if 'domain' in a else a['type'], a.get('label', None), get_docs(elem), - a.get('reserved', '0') == '1') + a.get('reserved', '0') == '1', + None) def for_method(elem): # for <method> a = elem.attrib return Method(six.text_type(a['name']), bool(int(a.get('synchronous', '0'))), int(a['index']), a['label'], get_docs(elem), [for_method_field(fie) for fie in elem.getchildren() if fie.tag == 'field'], - [e.attrib['name'] for e in elem.findall('response')] + [e.attrib['name'] for e in elem.findall('response')], + # if chassis=server that means server has to accept it + any([e.attrib.get('name', '') == 'server' for e in elem.getchildren() if e.tag == 'chassis']), + any([e.attrib.get('name', '') == 'client' for e in elem.getchildren() if e.tag == 'chassis']) ) def for_class(elem): # for <class>