diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index f9c33bb697e90b38cbd80b075314e853d1e90607..5f7599891c14a1190ce760459d5b61e50101577b 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -37,7 +37,8 @@ class AMQPFrame(object): # base class for framing This writes type and channel ID. """ - buf.write(struct.pack('!BH', self.FRAME_TYPE, self.channel)) + # DO NOT UNCOMMENT buf.write(struct.pack('!BH', self.FRAME_TYPE, self.channel)) + raise NotImplementedError('Please write the frame type and channel in child classes, its faster that way ') @staticmethod def unserialize(channel, payload_as_buffer): @@ -54,7 +55,7 @@ class AMQPFrame(object): # base class for framing Return size of this frame, in bytes, from frame type to frame_end :return: int """ - raise NotImplementedError() + raise NotImplementedError('Override me') class AMQPPayload(object): @@ -103,7 +104,7 @@ class AMQPContentPropertyList(object): def write_to(self, buf): """Serialize itself (flags + values) to a buffer""" - raise NotImplementedError + raise Exception('This is an abstract method') @staticmethod def from_buffer(self, buf, start_offset): @@ -114,14 +115,14 @@ class AMQPContentPropertyList(object): Buffer HAS TO start at property_flags """ - raise NotImplementedError + raise Exception('This is an abstract method') def get_size(self): """ How long is property_flags + property_values :return: int """ - raise NotImplementedError + raise Exception('This is an abstract method') class AMQPMethodPayload(AMQPPayload): @@ -175,4 +176,3 @@ class AMQPMethodPayload(AMQPPayload): :raise ValueError: invalid data """ raise NotImplementedError('') - diff --git a/coolamqp/framing/compilation/compile_definitions.py b/coolamqp/framing/compilation/compile_definitions.py index 3ded538b3b618d8d2ed2bb0e4e4689f5c4c5239f..3d9282846c24d36f0530b47baf774ad502338bec 100644 --- a/coolamqp/framing/compilation/compile_definitions.py +++ b/coolamqp/framing/compilation/compile_definitions.py @@ -164,14 +164,14 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved # a bit if piece_index > 0: if field.reserved or field.basic_type == 'bit': - byte_chunk.append(u"kwargs['%s']" % (2**piece_index, )) + pass # zero anyway else: byte_chunk.append(u"(('%s' in kwargs) << %s)" % (format_field_name(field.name), piece_index)) piece_index -= 1 else: if first_byte: if field.reserved or field.basic_type == 'bit': - byte_chunk.append(u'0') + pass # zero anyway else: byte_chunk.append(u"int('%s' in kwargs)" % (format_field_name(field.name),)) else: @@ -219,9 +219,81 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved c = compile_particular_content_property_list_class(zpf, %s.FIELDS) %s.PARTICULAR_CLASSES[zpf] = c return c(**kwargs) - '''.replace('%s', name_class(cls.name) + 'ContentPropertyList').replace('%d', '%s')) + line(u''' + @staticmethod + def typize(*fields): +''') + line(u' zpf = bytearray([\n') + + first_byte = True # in 2-byte group + piece_index = 7 # from 7 downto 0 + fields_remaining = len(cls.properties) + byte_chunk = [] + + for field in cls.properties: + # a bit + if piece_index > 0: + if field.reserved or field.basic_type == 'bit': + pass # zero + else: + byte_chunk.append(u"(('%s' in fields) << %s)" % (format_field_name(field.name), piece_index)) + piece_index -= 1 + else: + if first_byte: + if field.reserved or field.basic_type == 'bit': + pass #zero + else: + byte_chunk.append(u"int('%s' in kwargs)" % (format_field_name(field.name),)) + else: + # this is the "do we need moar flags" section + byte_chunk.append(u"kwargs['%s']" % ( + int(fields_remaining > 1) + )) + + # Emit the byte + line(u' %s,\n', u' | '.join(byte_chunk)) + byte_chunk = [] + first_byte = not first_byte + piece_index = 7 + fields_remaining -= 1 + + if len(byte_chunk) > 0: + line(u' %s\n', u' | '.join(byte_chunk)) # We did not finish + + line(u''' ]) + zpf = six.binary_type(zpf) + if zpf in %s.PARTICULAR_CLASSES: + return %s.PARTICULAR_CLASSES[zpf] + else: + logger.debug('Property field (%s:%d) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, %s.FIELDS) + %s.PARTICULAR_CLASSES[zpf] = c + return c +'''.replace("%s", name_class(cls.name) + 'ContentPropertyList').replace('%d', '%s')) + + line(u''' + @staticmethod + def from_buffer(buf, offset): + """ + Return a content property list instance unserialized from + buffer, so that buf[offset] marks the start of property flags + """ + # extract property flags + pfl = 2 + while ord(buf[offset + pfl]) & 1: + pfl += 2 + zpf = %s.zero_property_flags(buf[offset:offset+pfl]) + if zpf in %s.PARTICULAR_CLASSES: + return %s.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) + else: + logger.debug('Property field (%s:%d) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, %s.FIELDS) + %s.PARTICULAR_CLASSES[zpf] = c + return c.from_buffer(buf, offset) + +'''.replace('%s', name_class(cls.name) + 'ContentPropertyList').replace("%d", "%s")) # ============================================ Do methods for this class for method in cls.methods: diff --git a/coolamqp/framing/compilation/content_property.py b/coolamqp/framing/compilation/content_property.py index 4bd50c0d2572c65d35d32543bc3f6dbb363cbd7e..df6b963a7d9f94885b2f9fdeac18cd652bee6e55 100644 --- a/coolamqp/framing/compilation/content_property.py +++ b/coolamqp/framing/compilation/content_property.py @@ -83,11 +83,12 @@ def _compile_particular_content_property_list_class(zpf, fields): # from_buffer # note that non-bit values - mod.append(u' def from_buffer(self, buf, start_offset):\n offset = start_offset + %s\n' % (zpf_length, )) + mod.append(u' @classmethod\n') + mod.append(u' def from_buffer(cls, buf, start_offset):\n offset = start_offset + %s\n' % (zpf_length, )) mod.append(get_from_buffer( present_fields , prefix='', indent_level=2)) - mod.append(u' return ParticularContentTypeList(%s)\n' % + mod.append(u' return cls(%s)\n' % u', '.join(format_field_name(field.name) for field in present_fields)) diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 895b13396b77a0dfef6142f921a16f0697cc0827..cd2b13235a9a6fdab4b01c321926bde76262fbab 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -109,6 +109,39 @@ class Connection(AMQPClass): INDEX = 10 + @staticmethod + def typize(*fields): + zpf = bytearray([ + ]) + zpf = six.binary_type(zpf) + if zpf in ConnectionContentPropertyList.PARTICULAR_CLASSES: + return ConnectionContentPropertyList.PARTICULAR_CLASSES[zpf] + else: + logger.debug('Property field (ConnectionContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, ConnectionContentPropertyList.FIELDS) + ConnectionContentPropertyList.PARTICULAR_CLASSES[zpf] = c + return c + + @staticmethod + def from_buffer(buf, offset): + """ + Return a content property list instance unserialized from + buffer, so that buf[offset] marks the start of property flags + """ + # extract property flags + pfl = 2 + while ord(buf[offset + pfl]) & 1: + pfl += 2 + zpf = ConnectionContentPropertyList.zero_property_flags(buf[offset:offset+pfl]) + if zpf in ConnectionContentPropertyList.PARTICULAR_CLASSES: + return ConnectionContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) + else: + logger.debug('Property field (ConnectionContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, ConnectionContentPropertyList.FIELDS) + ConnectionContentPropertyList.PARTICULAR_CLASSES[zpf] = c + return c.from_buffer(buf, offset) + + class ConnectionClose(AMQPMethodPayload): """ Request a connection close @@ -705,6 +738,39 @@ class Channel(AMQPClass): INDEX = 20 + @staticmethod + def typize(*fields): + zpf = bytearray([ + ]) + zpf = six.binary_type(zpf) + if zpf in ChannelContentPropertyList.PARTICULAR_CLASSES: + return ChannelContentPropertyList.PARTICULAR_CLASSES[zpf] + else: + logger.debug('Property field (ChannelContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, ChannelContentPropertyList.FIELDS) + ChannelContentPropertyList.PARTICULAR_CLASSES[zpf] = c + return c + + @staticmethod + def from_buffer(buf, offset): + """ + Return a content property list instance unserialized from + buffer, so that buf[offset] marks the start of property flags + """ + # extract property flags + pfl = 2 + while ord(buf[offset + pfl]) & 1: + pfl += 2 + zpf = ChannelContentPropertyList.zero_property_flags(buf[offset:offset+pfl]) + if zpf in ChannelContentPropertyList.PARTICULAR_CLASSES: + return ChannelContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) + else: + logger.debug('Property field (ChannelContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, ChannelContentPropertyList.FIELDS) + ChannelContentPropertyList.PARTICULAR_CLASSES[zpf] = c + return c.from_buffer(buf, offset) + + class ChannelClose(AMQPMethodPayload): """ Request a channel close @@ -983,6 +1049,39 @@ class Exchange(AMQPClass): INDEX = 40 + @staticmethod + def typize(*fields): + zpf = bytearray([ + ]) + zpf = six.binary_type(zpf) + if zpf in ExchangeContentPropertyList.PARTICULAR_CLASSES: + return ExchangeContentPropertyList.PARTICULAR_CLASSES[zpf] + else: + logger.debug('Property field (ExchangeContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, ExchangeContentPropertyList.FIELDS) + ExchangeContentPropertyList.PARTICULAR_CLASSES[zpf] = c + return c + + @staticmethod + def from_buffer(buf, offset): + """ + Return a content property list instance unserialized from + buffer, so that buf[offset] marks the start of property flags + """ + # extract property flags + pfl = 2 + while ord(buf[offset + pfl]) & 1: + pfl += 2 + zpf = ExchangeContentPropertyList.zero_property_flags(buf[offset:offset+pfl]) + if zpf in ExchangeContentPropertyList.PARTICULAR_CLASSES: + return ExchangeContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) + else: + logger.debug('Property field (ExchangeContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, ExchangeContentPropertyList.FIELDS) + ExchangeContentPropertyList.PARTICULAR_CLASSES[zpf] = c + return c.from_buffer(buf, offset) + + class ExchangeDeclare(AMQPMethodPayload): """ Verify exchange exists, create if needed @@ -1219,6 +1318,39 @@ class Queue(AMQPClass): INDEX = 50 + @staticmethod + def typize(*fields): + zpf = bytearray([ + ]) + zpf = six.binary_type(zpf) + if zpf in QueueContentPropertyList.PARTICULAR_CLASSES: + return QueueContentPropertyList.PARTICULAR_CLASSES[zpf] + else: + logger.debug('Property field (QueueContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, QueueContentPropertyList.FIELDS) + QueueContentPropertyList.PARTICULAR_CLASSES[zpf] = c + return c + + @staticmethod + def from_buffer(buf, offset): + """ + Return a content property list instance unserialized from + buffer, so that buf[offset] marks the start of property flags + """ + # extract property flags + pfl = 2 + while ord(buf[offset + pfl]) & 1: + pfl += 2 + zpf = QueueContentPropertyList.zero_property_flags(buf[offset:offset+pfl]) + if zpf in QueueContentPropertyList.PARTICULAR_CLASSES: + return QueueContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) + else: + logger.debug('Property field (QueueContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, QueueContentPropertyList.FIELDS) + QueueContentPropertyList.PARTICULAR_CLASSES[zpf] = c + return c.from_buffer(buf, offset) + + class QueueBind(AMQPMethodPayload): """ Bind queue to an exchange @@ -1930,6 +2062,40 @@ class BasicContentPropertyList(AMQPContentPropertyList): BasicContentPropertyList.PARTICULAR_CLASSES[zpf] = c return c(**kwargs) + @staticmethod + def typize(*fields): + zpf = bytearray([ + (('content_type' in fields) << 7) | (('content_encoding' in fields) << 6) | (('headers' in fields) << 5) | (('delivery_mode' in fields) << 4) | (('priority' in fields) << 3) | (('correlation_id' in fields) << 2) | (('reply_to' in fields) << 1) | int('expiration' in kwargs), + (('message_id' in fields) << 7) | (('timestamp' in fields) << 6) | (('type_' in fields) << 5) | (('user_id' in fields) << 4) | (('app_id' in fields) << 3) | (('reserved' in fields) << 2) + ]) + zpf = six.binary_type(zpf) + if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: + return BasicContentPropertyList.PARTICULAR_CLASSES[zpf] + else: + logger.debug('Property field (BasicContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, BasicContentPropertyList.FIELDS) + BasicContentPropertyList.PARTICULAR_CLASSES[zpf] = c + return c + + @staticmethod + def from_buffer(buf, offset): + """ + Return a content property list instance unserialized from + buffer, so that buf[offset] marks the start of property flags + """ + # extract property flags + pfl = 2 + while ord(buf[offset + pfl]) & 1: + pfl += 2 + zpf = BasicContentPropertyList.zero_property_flags(buf[offset:offset+pfl]) + if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: + return BasicContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) + else: + logger.debug('Property field (BasicContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, BasicContentPropertyList.FIELDS) + BasicContentPropertyList.PARTICULAR_CLASSES[zpf] = c + return c.from_buffer(buf, offset) + class BasicAck(AMQPMethodPayload): """ @@ -2943,6 +3109,39 @@ class Tx(AMQPClass): INDEX = 90 + @staticmethod + def typize(*fields): + zpf = bytearray([ + ]) + zpf = six.binary_type(zpf) + if zpf in TxContentPropertyList.PARTICULAR_CLASSES: + return TxContentPropertyList.PARTICULAR_CLASSES[zpf] + else: + logger.debug('Property field (TxContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, TxContentPropertyList.FIELDS) + TxContentPropertyList.PARTICULAR_CLASSES[zpf] = c + return c + + @staticmethod + def from_buffer(buf, offset): + """ + Return a content property list instance unserialized from + buffer, so that buf[offset] marks the start of property flags + """ + # extract property flags + pfl = 2 + while ord(buf[offset + pfl]) & 1: + pfl += 2 + zpf = TxContentPropertyList.zero_property_flags(buf[offset:offset+pfl]) + if zpf in TxContentPropertyList.PARTICULAR_CLASSES: + return TxContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) + else: + logger.debug('Property field (TxContentPropertyList:%s) not seen yet, compiling', repr(zpf)) + c = compile_particular_content_property_list_class(zpf, TxContentPropertyList.FIELDS) + TxContentPropertyList.PARTICULAR_CLASSES[zpf] = c + return c.from_buffer(buf, offset) + + class TxCommit(AMQPMethodPayload): """ Commit the current transaction diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index 3f8e9bbca52d232ca88f7656ea4413477dc59233..0d54deefb2d96a2100bbf1cb0fd16623699824f3 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -26,66 +26,72 @@ class AMQPMethodFrame(AMQPFrame): AMQPFrame.write_to(self, buf) self.payload.write_to(buf) - @staticmethod def unserialize(channel, payload_as_buffer): - clsmet = struct.unpack_from('!BB', payload_as_buffer, 0) + clsmet = struct.unpack_from('!HH', payload_as_buffer, 0) try: method_payload_class = IDENT_TO_METHOD[clsmet] - payload = method_payload_class.from_buffer(payload_as_buffer, 2) + payload = method_payload_class.from_buffer(payload_as_buffer, 4) except KeyError: raise ValueError('Invalid class %s method %s' % clsmet) return AMQPMethodFrame(channel, payload) def get_size(self): - return 10 + self.payload.get_size() + # frame header is always 7, frame end is 1, class + method is 4 + return 12 + self.payload.get_size() class AMQPHeaderFrame(AMQPFrame): FRAME_TYPE = FRAME_HEADER - def __init__(self, channel, class_id, weight, body_size, property_flags, property_list): + def __init__(self, channel, class_id, weight, body_size, properties): """ :param channel: channel ID :param class_id: class ID :param weight: weight (lol wut?) :param body_size: size of the body to follow - :param property_flags: binary - :param property_list: a list of properties + :param properties: a suitable AMQPContentPropertyList instance """ AMQPFrame.__init__(self, channel) self.class_id = class_id self.weight = weight self.body_size = body_size - self.property_flags = property_flags - self.property_list = property_list + self.properties = properties def write_to(self, buf): - AMQPFrame.write_to(self, buf) - buf.write(struct.pack('!HHQ', self.class_id, 0, self.body_size)) - buf.write(self.property_flags) - + buf.write(struct.pack('!BHLHHQ', FRAME_HEADER, self.channel, + 12+self.properties.get_size(), self.class_id, 0, self.body_size)) + self.properties.write_to(buf) + buf.write(chr(FRAME_END)) @staticmethod def unserialize(channel, payload_as_buffer): - pass + # payload starts with class ID + print(repr(str(payload_as_buffer[12:]))) + class_id, weight, body_size = struct.unpack_from('!HHQ', payload_as_buffer, 0) + properties = CLASS_ID_TO_CONTENT_PROPERTY_LIST[class_id].from_buffer(payload_as_buffer, 12) + return AMQPHeaderFrame(channel, class_id, weight, body_size, properties) def get_size(self): - raise NotImplementedError + # frame header is always 7, frame end is 1, content header is 12 + props + return 20 + self.properties.get_size() class AMQPBodyFrame(AMQPFrame): FRAME_TYPE = FRAME_BODY def __init__(self, channel, data): + """ + :type data: binary + """ AMQPFrame.__init__(self, channel) self.data = data def write_to(self, buf): - AMQPFrame.write_to(self, buf) - buf.write(buf) + buf.write(struct.pack('!BHL', FRAME_BODY, self.channel, len(self.data))) + buf.write(self.data) buf.write(chr(FRAME_END)) @staticmethod @@ -98,15 +104,14 @@ class AMQPBodyFrame(AMQPFrame): class AMQPHeartbeatFrame(AMQPFrame): FRAME_TYPE = FRAME_HEARTBEAT - LENGTH = 4 - DATA = '\x00\x00\xCE' + LENGTH = 6 + DATA = chr(FRAME_HEARTBEAT)+'\x00\x00\x00\x00\xCE' def __init__(self): AMQPFrame.__init__(self, 0) def write_to(self, buf): - AMQPFrame.write_to(self, buf) - buf.write(chr(FRAME_END)) + buf.write(AMQPHeartbeatFrame.DATA) def get_size(self): return AMQPHeartbeatFrame.LENGTH diff --git a/tests/test_framing/test_definitions/test_frames.py b/tests/test_framing/test_definitions/test_frames.py new file mode 100644 index 0000000000000000000000000000000000000000..80caa168222ca5c7bec12aa008cc2ea83e578159 --- /dev/null +++ b/tests/test_framing/test_definitions/test_frames.py @@ -0,0 +1,44 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import unittest +import io +import struct +from coolamqp.framing.frames import AMQPHeaderFrame +from coolamqp.framing.definitions import BasicContentPropertyList, FRAME_HEADER, FRAME_END + + +class TestShitSerializesRight(unittest.TestCase): + + def test_unser_header_frame(self): + s = b'\x00\x3C\x00\x00' + \ + b'\x00\x00\x00\x00\x00\x00\x00\x0A' + \ + b'\xC0\x00\x0Atext/plain\x04utf8' + + hf = AMQPHeaderFrame.unserialize(0, buffer(s)) + + self.assertEquals(hf.class_id, 60) + self.assertEquals(hf.weight, 0) + self.assertEquals(hf.body_size, 10) + self.assertEquals(hf.properties.content_type, b'text/plain') + self.assertEquals(hf.properties.content_encoding, b'utf8') + + def test_ser_header_frame(self): + + a_cpl = BasicContentPropertyList(content_type='text/plain') + + # content_type has len 10 + + buf = io.BytesIO() + + hdr = AMQPHeaderFrame(0, 60, 0, 0, a_cpl) + hdr.write_to(buf) + + s = b'\x00\x00\x00\x00' + \ + b'\x00\x00\x00\x00\x00\x00\x00\x00' + \ + b'\x80\x00\x0Atext/plain' + s = chr(FRAME_HEADER) + b'\x00\x00' + \ + struct.pack('!L', len(s)) + s + chr(FRAME_END) + + self.assertEquals(buf.getvalue(), + + )