diff --git a/coolamqp/framing/frames/base.py b/coolamqp/framing/frames/base.py index 4d4a870f660b92e7aac7db28e072b0de86bd093c..bfbf3de616a99eecb3cbff20834c2920bbc06561 100644 --- a/coolamqp/framing/frames/base.py +++ b/coolamqp/framing/frames/base.py @@ -7,6 +7,9 @@ import six logger = logging.getLogger(__name__) +AMQP_HELLO_HEADER = b'AMQP\x00\x00\x09\x01' + + # name => (length|None, struct ID|None, reserved-field-value : for struct if structable, bytes else, length of default) BASIC_TYPES = {'bit': (None, None, "0", None), # special case 'octet': (1, 'B', "b'\\x00'", 1), @@ -22,140 +25,39 @@ BASIC_TYPES = {'bit': (None, None, "0", None), # special case DYNAMIC_BASIC_TYPES = ('table', 'longstr', 'shortstr') -def dec_to_bytes(buf, v): - dps = 0 - for k in six.moves.xrange(20): - k = v * (10 ** dps) - if abs(k-int(k)) < 0.00001: # epsilon - return buf.write(struct.pack('!BI', dps, k)) - - logger.critical('Cannot convert %s to decimal, substituting with 0', repr(v)) - buf.write(b'\x00\x00\x00\x00') - - -FIELD_TYPES = { - # length, struct, (option)formatter_to_bytes - 't': (1, '!?'), # boolean - 'b': (1, '!b'), - 'B': (1, '!B'), - 'U': (2, '!H'), - 'u': (2, '!h'), - 'I': (4, '!I'), - 'i': (4, '!i'), - 'L': (8, '!Q'), - 'l': (8, '!q'), - 'f': (4, '!f'), - 'd': (8, '!d'), - 'D': (5, None, dec_to_bytes), # decimal-value - 's': (None, None, lambda buf, v: buf.write(struct.pack('B', len(v)) + v)), # short-string - 'S': (None, None, lambda buf, v: buf.write(struct.pack('!I', len(v)) + v)), # long-string - 'A': (None, None), # field-array - 'T': (8, '!Q'), - 'V': (0, ''), -} - - - - -""" -A table is of form: -[ - (name::bytes, value::any, type::bytes(len=1)), - ... - -] -""" - -def _enframe_table(buf, table): - """ - Write AMQP table to buffer - :param buf: - :param table: - :return: - """ - buf.write(struct.pack('!L', _frame_table_size(table))) +class AMQPFrame(object): # base class for frames + FRAME_TYPE = None # override me! - for name, value, type in table: - buf.write(struct.pack('!B', len(name))) - buf.write(name) - buf.write(type) - - opt = FIELD_TYPES[opt] - - if type == 'F': # nice one - _enframe_table(buf, value) - elif type == 'V': - continue - elif len(opt) == 2: # can autoframe - buf.write(struct.pack(opt[1], value)) - else: - opt[2](buf, value) - - -def _deframe_table(buf, start_offset): # helper - convert bytes to table - """:return: tuple (table, bytes consumed)""" - - -def _frame_table_size(table): - """:return: length of table representation, in bytes, WITHOUT length header""" - - -class AMQPClass(object): - pass - - -class AMQPPayload(object): - """Payload is something that can write itself to bytes, - or at least provide a buffer to do it.""" - - def write_arguments(self, buf): - """ - Emit itself into a buffer - - :param buf: buffer to write to (will be written using .write) - """ - - -class AMQPMethod(object): - RESPONSE_TO = None - REPLY_WITH = [] - FIELDS = [] - - def get_size(self): - """ - Calculate the size of this frame. - - :return: int, size of argument section - """ - raise NotImplementedError() + def __init__(self, channel): + self.channel = channel - def write_arguments(self, buf): + def write_to(self, buf): """ - Write the argument portion of this frame into buffer. + Write a complete frame to buffer - :param buf: buffer to write to - :return: how many bytes written + This writes type and channel ID. """ - raise NotImplementedError() + buf.write(struct.pack('!BH', self.FRAME_TYPE, self.channel)) @staticmethod - def from_buffer(buf, offset): + def unserialize(channel, payload_as_buffer): """ - Construct this frame from a buffer - - :param buf: a buffer to construct the frame from - :type buf: buffer or memoryview - :param offset: offset the argument portion begins at - :type offset: int - :return: tuple of (an instance of %s, amount of bytes consumed as int) + Unserialize from a buffer. + Buffer starts at frame's own payload - type, channel and size was already obtained. + Payload does not contain FRAME_EMD. + AMQPHeartbeatFrame does not have to implement this. """ - raise NotImplementedError('') + raise NotImplementedError('Override me') +class AMQPPayload(object): + """Payload is something that can write itself to bytes, + or at least provide a buffer to do it.""" + def write_to(self, buf): + """ + Emit itself into a buffer, from length to FRAME_END -class AMQPFrame(object): - def __init__(self, channel, payload): - self.channel = channel - self.payload = payload \ No newline at end of file + :param buf: buffer to write to (will be written using .write) + """ \ No newline at end of file diff --git a/coolamqp/framing/frames/base_definitions.py b/coolamqp/framing/frames/base_definitions.py new file mode 100644 index 0000000000000000000000000000000000000000..14c90be50de350c8bd527f3e9b6b392c63e86338 --- /dev/null +++ b/coolamqp/framing/frames/base_definitions.py @@ -0,0 +1,67 @@ +# coding=UTF-8 +""" +Used for definitions +""" +from __future__ import absolute_import, division, print_function + +import struct + +from coolamqp.framing.frames.base import AMQPPayload + + +class AMQPClass(object): + pass + + +class AMQPMethodPayload(AMQPPayload): + RESPONSE_TO = None + REPLY_WITH = [] + FIELDS = [] + + def write_to(self, buf): + """ + Write own content to target buffer - starting from LENGTH, ending on FRAME_END + :param buf: target buffer + """ + from coolamqp.framing.frames.definitions import FRAME_END + + if self.IS_CONTENT_STATIC: + buf.write(self.STATIC_CONTENT) + else: + buf.write(struct.pack('!I', self.get_size()+2)) + buf.write(self.BINARY_HEADER) + self.write_arguments(buf) + buf.write(chr(FRAME_END)) + + def get_size(self): + """ + Calculate the size of this frame. + + :return: int, size of argument section + """ + raise NotImplementedError() + + def write_arguments(self, buf): + """ + Write the argument portion of this frame into buffer. + + :param buf: buffer to write to + :return: how many bytes written + :raise ValueError: some field here is invalid! + """ + raise NotImplementedError() + + @staticmethod + def from_buffer(buf, offset): + """ + Construct this frame from a buffer + + :param buf: a buffer to construct the frame from + :type buf: buffer or memoryview + :param offset: offset the argument portion begins at + :type offset: int + :return: tuple of (an instance of %s, amount of bytes consumed as int) + :raise ValueError: invalid data + """ + raise NotImplementedError('') + diff --git a/utils/__init__.py b/coolamqp/framing/frames/compilation/__init__.py similarity index 56% rename from utils/__init__.py rename to coolamqp/framing/frames/compilation/__init__.py index 9f2b35b38d89264ee25685611d0a65a192e165f6..c932491a7c750218a3d62edb6656415c22a09be1 100644 --- a/utils/__init__.py +++ b/coolamqp/framing/frames/compilation/__init__.py @@ -1,2 +1,5 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function +""" +Function that compiles amqp0-9-1.xml to definitions.py +""" \ No newline at end of file diff --git a/utils/compdefs.py b/coolamqp/framing/frames/compilation/compile_definitions.py similarity index 90% rename from utils/compdefs.py rename to coolamqp/framing/frames/compilation/compile_definitions.py index 13c5a0a527c3757fcf8192021dbfbc39f7b6886a..79b1f8171fb6535b3ade12792d435977c0226552 100644 --- a/utils/compdefs.py +++ b/coolamqp/framing/frames/compilation/compile_definitions.py @@ -5,68 +5,13 @@ import struct import six import math -from getp import get_constants, get_classes, get_domains, byname, name_class, name_method, name_field, \ - BASIC_TYPES +from coolamqp.framing.frames.compilation.utilities import get_constants, get_classes, get_domains, \ + byname, name_class, name_method, name_field, ffmt, doxify, infertype, normname, as_nice_escaped_string, \ + frepr +from coolamqp.framing.frames.base import BASIC_TYPES -def frepr(p, sop=six.text_type): - if isinstance(p, basestring): - p = sop(p) - s = repr(p) - if isinstance(p, basestring) and not s.startswith('u'): - return ('u' if sop == six.text_type else 'b') + s - else: - return s - -def as_nice_escaped_string(p): - body = [] - for q in p: - z = (hex(ord(q))[2:].upper()) - if len(z) == 1: - z = u'0' + z - body.append(u'\\x' + z) - return u"b'"+(u''.join(body))+u"'" - - -def normname(p): - return p.strip().replace('-', '_').upper() - -def infertype(p): - try: - return int(p) - except ValueError: - return p - -def doxify(label, doc, prefix=4, blank=True): # output a full docstring section - label = [] if label is None else [label] - doc = [] if doc is None else [q.strip() for q in doc.split(u'\n') if len(q.strip()) > 0] - pre = u' '*prefix - - doc = label + doc - - if len(doc) == 0: - return u'' - - doc[0] = doc[0].capitalize() - - if len(doc) == 1: - return doc[0] - - doc = filter(lambda p: len(p.strip()) > 0, doc) - - if blank: - doc = [doc[0], u''] + doc[1:] - - f = (u'\n'.join(pre + lin for lin in doc))[prefix:] - return f - -def ffmt(data, *args, **kwargs): - for arg in args: - op = str if kwargs.get('sane', True) else frepr - data = data.replace('%s', op(arg), 1) - return data - -def compile_definitions(xml_file='../resources/amqp0-9-1.xml', out_file='../coolamqp/framing/frames/machine.py'): +def compile_definitions(xml_file='resources/amqp0-9-1.xml', out_file='coolamqp/framing/frames/definitions.py'): """parse resources/amqp-0-9-1.xml into """ xml = ElementTree.parse(xml_file) @@ -78,7 +23,7 @@ from __future__ import print_function, absolute_import A Python version of the AMQP machine-readable specification. Generated automatically by CoolAMQP from AMQP machine-readable specification. -See utils/compdefs.py for the tool +See coolamqp.framing.frames.compilation for the tool AMQP is copyright (c) 2016 OASIS CoolAMQP is copyright (c) 2016 DMS Serwis s.c. @@ -86,7 +31,8 @@ CoolAMQP is copyright (c) 2016 DMS Serwis s.c. import struct -from coolamqp.framing.frames.base import AMQPClass, AMQPMethod, _enframe_table, _deframe_table, _frame_table_size +from coolamqp.framing.frames.base_definitions import AMQPClass, AMQPMethodPayload +from coolamqp.framing.frames.field_table import enframe_table, deframe_table, frame_table_size ''') @@ -151,7 +97,7 @@ from coolamqp.framing.frames.base import AMQPClass, AMQPMethod, _enframe_table, is_content_static = len([f for f in method.fields if not f.reserved]) == 0 - line('''\nclass %s(AMQPMethod): + line('''\nclass %s(AMQPMethodPayload): """ %s """ @@ -302,7 +248,7 @@ from coolamqp.framing.frames.base import AMQPClass, AMQPMethod, _enframe_table, good_structs = [] if field.basic_type == 'table': - line(' _enframe_table(buf, %s)\n' % (val, )) + line(' enframe_table(buf, %s)\n' % (val, )) written = True else: # emit ours @@ -352,7 +298,7 @@ from coolamqp.framing.frames.base import AMQPClass, AMQPMethod, _enframe_table, parts.append('len(self.'+name_field(field.name)+')') accumulator += 4 elif bt == 'table': - parts.append('_frame_table_size(self.'+name_field(field.name)+')') + parts.append('frame_table_size(self.'+name_field(field.name)+')') accumulator += 4 else: raise Exception() @@ -487,7 +433,7 @@ from coolamqp.framing.frames.base import AMQPClass, AMQPMethod, _enframe_table, bits.append(fieldname) else: if field.basic_type == 'table': # oh my god - line(""" %s, delta = _deframe_table(buf, offset) + line(""" %s, delta = deframe_table(buf, offset) offset += delta """, name_field(field.name)) else: # longstr or shortstr @@ -540,5 +486,7 @@ from coolamqp.framing.frames.base import AMQPClass, AMQPMethod, _enframe_table, out.close() + + if __name__ == '__main__': compile_definitions() diff --git a/utils/getp.py b/coolamqp/framing/frames/compilation/utilities.py similarity index 80% rename from utils/getp.py rename to coolamqp/framing/frames/compilation/utilities.py index d1f0613a8e84de15725cade7b2dce83491b8e050..f7ac878cd36494813cc8137fad330d211b5176ee 100644 --- a/utils/getp.py +++ b/coolamqp/framing/frames/compilation/utilities.py @@ -162,3 +162,60 @@ def name_field(field): if field in ('global', ): field = field + '_' return field.replace('-', '_') + +def frepr(p, sop=six.text_type): + if isinstance(p, basestring): + p = sop(p) + s = repr(p) + + if isinstance(p, basestring) and not s.startswith('u'): + return ('u' if sop == six.text_type else 'b') + s + else: + return s + +def as_nice_escaped_string(p): + body = [] + for q in p: + z = (hex(ord(q))[2:].upper()) + if len(z) == 1: + z = u'0' + z + body.append(u'\\x' + z) + return u"b'"+(u''.join(body))+u"'" + +def normname(p): + return p.strip().replace('-', '_').upper() + +def infertype(p): + try: + return int(p) + except ValueError: + return p + +def doxify(label, doc, prefix=4, blank=True): # output a full docstring section + label = [] if label is None else [label] + doc = [] if doc is None else [q.strip() for q in doc.split(u'\n') if len(q.strip()) > 0] + pre = u' '*prefix + + doc = label + doc + + if len(doc) == 0: + return u'' + + doc[0] = doc[0].capitalize() + + if len(doc) == 1: + return doc[0] + + doc = filter(lambda p: len(p.strip()) > 0, doc) + + if blank: + doc = [doc[0], u''] + doc[1:] + + f = (u'\n'.join(pre + lin for lin in doc))[prefix:] + return f + +def ffmt(data, *args, **kwargs): + for arg in args: + op = str if kwargs.get('sane', True) else frepr + data = data.replace('%s', op(arg), 1) + return data diff --git a/coolamqp/framing/frames/machine.py b/coolamqp/framing/frames/definitions.py similarity index 97% rename from coolamqp/framing/frames/machine.py rename to coolamqp/framing/frames/definitions.py index 8d70a7aa80b023cfccf712748da24a1cab6db21d..a6f53b579c5d6a8c3881ceae6046b7ad8ffaabee 100644 --- a/coolamqp/framing/frames/machine.py +++ b/coolamqp/framing/frames/definitions.py @@ -4,7 +4,7 @@ from __future__ import print_function, absolute_import A Python version of the AMQP machine-readable specification. Generated automatically by CoolAMQP from AMQP machine-readable specification. -See utils/compdefs.py for the tool +See coolamqp.framing.frames.compilation for the tool AMQP is copyright (c) 2016 OASIS CoolAMQP is copyright (c) 2016 DMS Serwis s.c. @@ -12,7 +12,8 @@ CoolAMQP is copyright (c) 2016 DMS Serwis s.c. import struct -from coolamqp.framing.frames.base import AMQPClass, AMQPMethod, _enframe_table, _deframe_table, _frame_table_size +from coolamqp.framing.frames.base_definitions import AMQPClass, AMQPMethodPayload +from coolamqp.framing.frames.field_table import enframe_table, deframe_table, frame_table_size # Core constants FRAME_METHOD = 1 @@ -103,7 +104,7 @@ class Connection(AMQPClass): INDEX = 10 -class ConnectionClose(AMQPMethod): +class ConnectionClose(AMQPMethodPayload): """ Request a connection close @@ -180,7 +181,7 @@ class ConnectionClose(AMQPMethod): return ConnectionClose(reply_code, reply_text, class_id, method_id) -class ConnectionCloseOk(AMQPMethod): +class ConnectionCloseOk(AMQPMethodPayload): """ Confirm a connection close @@ -223,7 +224,7 @@ class ConnectionCloseOk(AMQPMethod): return ConnectionCloseOk() -class ConnectionOpen(AMQPMethod): +class ConnectionOpen(AMQPMethodPayload): """ Open connection to virtual host @@ -293,7 +294,7 @@ class ConnectionOpen(AMQPMethod): return ConnectionOpen(virtual_host) -class ConnectionOpenOk(AMQPMethod): +class ConnectionOpenOk(AMQPMethodPayload): """ Signal that connection is ready @@ -338,7 +339,7 @@ class ConnectionOpenOk(AMQPMethod): return ConnectionOpenOk() -class ConnectionStart(AMQPMethod): +class ConnectionStart(AMQPMethodPayload): """ Start connection negotiation @@ -410,20 +411,20 @@ class ConnectionStart(AMQPMethod): def write_arguments(self, buf): buf.write(struct.pack('!BB', self.version_major, self.version_minor)) - _enframe_table(buf, self.server_properties) + enframe_table(buf, self.server_properties) buf.write(struct.pack('!L', len(self.mechanisms))) buf.write(self.mechanisms) buf.write(struct.pack('!L', len(self.locales))) buf.write(self.locales) def get_size(self): - return _frame_table_size(self.server_properties) + len(self.mechanisms) + len(self.locales) + 14 + return frame_table_size(self.server_properties) + len(self.mechanisms) + len(self.locales) + 14 @staticmethod def from_buffer(buf, start_offset): assert (len(buf) - start_offset) >= ConnectionStart.MINIMUM_SIZE, 'Frame too short!' offset = start_offset # we will use it to count consumed bytes - server_properties, delta = _deframe_table(buf, offset) + server_properties, delta = deframe_table(buf, offset) offset += delta version_major, version_minor, s_len, = struct.unpack_from('!BBL', buf, offset) offset += 6 @@ -436,7 +437,7 @@ class ConnectionStart(AMQPMethod): return ConnectionStart(version_major, version_minor, server_properties, mechanisms, locales) -class ConnectionSecure(AMQPMethod): +class ConnectionSecure(AMQPMethodPayload): """ Security mechanism challenge @@ -498,7 +499,7 @@ class ConnectionSecure(AMQPMethod): return ConnectionSecure(challenge) -class ConnectionStartOk(AMQPMethod): +class ConnectionStartOk(AMQPMethodPayload): """ Select security mechanism and locale @@ -562,7 +563,7 @@ class ConnectionStartOk(AMQPMethod): self.locale = locale def write_arguments(self, buf): - _enframe_table(buf, self.client_properties) + enframe_table(buf, self.client_properties) buf.write(struct.pack('!B', len(self.mechanism))) buf.write(self.mechanism) buf.write(struct.pack('!L', len(self.response))) @@ -571,13 +572,13 @@ class ConnectionStartOk(AMQPMethod): buf.write(self.locale) def get_size(self): - return _frame_table_size(self.client_properties) + len(self.mechanism) + len(self.response) + len(self.locale) + 10 + return frame_table_size(self.client_properties) + len(self.mechanism) + len(self.response) + len(self.locale) + 10 @staticmethod def from_buffer(buf, start_offset): assert (len(buf) - start_offset) >= ConnectionStartOk.MINIMUM_SIZE, 'Frame too short!' offset = start_offset # we will use it to count consumed bytes - client_properties, delta = _deframe_table(buf, offset) + client_properties, delta = deframe_table(buf, offset) offset += delta s_len, = struct.unpack_from('!B', buf, offset) offset += 1 @@ -594,7 +595,7 @@ class ConnectionStartOk(AMQPMethod): return ConnectionStartOk(client_properties, mechanism, response, locale) -class ConnectionSecureOk(AMQPMethod): +class ConnectionSecureOk(AMQPMethodPayload): """ Security mechanism response @@ -656,7 +657,7 @@ class ConnectionSecureOk(AMQPMethod): return ConnectionSecureOk(response) -class ConnectionTune(AMQPMethod): +class ConnectionTune(AMQPMethodPayload): """ Propose connection tuning parameters @@ -727,7 +728,7 @@ class ConnectionTune(AMQPMethod): return ConnectionTune(channel_max, frame_max, heartbeat) -class ConnectionTuneOk(AMQPMethod): +class ConnectionTuneOk(AMQPMethodPayload): """ Negotiate connection tuning parameters @@ -809,7 +810,7 @@ class Channel(AMQPClass): INDEX = 20 -class ChannelClose(AMQPMethod): +class ChannelClose(AMQPMethodPayload): """ Request a channel close @@ -886,7 +887,7 @@ class ChannelClose(AMQPMethod): return ChannelClose(reply_code, reply_text, class_id, method_id) -class ChannelCloseOk(AMQPMethod): +class ChannelCloseOk(AMQPMethodPayload): """ Confirm a channel close @@ -929,7 +930,7 @@ class ChannelCloseOk(AMQPMethod): return ChannelCloseOk() -class ChannelFlow(AMQPMethod): +class ChannelFlow(AMQPMethodPayload): """ Enable/disable flow from peer @@ -990,7 +991,7 @@ class ChannelFlow(AMQPMethod): return ChannelFlow(active) -class ChannelFlowOk(AMQPMethod): +class ChannelFlowOk(AMQPMethodPayload): """ Confirm a flow method @@ -1048,7 +1049,7 @@ class ChannelFlowOk(AMQPMethod): return ChannelFlowOk(active) -class ChannelOpen(AMQPMethod): +class ChannelOpen(AMQPMethodPayload): """ Open a channel for use @@ -1092,7 +1093,7 @@ class ChannelOpen(AMQPMethod): return ChannelOpen() -class ChannelOpenOk(AMQPMethod): +class ChannelOpenOk(AMQPMethodPayload): """ Signal that the channel is ready @@ -1147,7 +1148,7 @@ class Exchange(AMQPClass): INDEX = 40 -class ExchangeDeclare(AMQPMethod): +class ExchangeDeclare(AMQPMethodPayload): """ Verify exchange exists, create if needed @@ -1234,10 +1235,10 @@ class ExchangeDeclare(AMQPMethod): buf.write(struct.pack('!B', len(self.type))) buf.write(self.type) buf.write(struct.pack('!B', (int(self.passive) << 0) | (int(self.durable) << 1) | (int(self.no_wait) << 2))) - _enframe_table(buf, self.arguments) + enframe_table(buf, self.arguments) def get_size(self): - return len(self.exchange) + len(self.type) + _frame_table_size(self.arguments) + 9 + return len(self.exchange) + len(self.type) + frame_table_size(self.arguments) + 9 @staticmethod def from_buffer(buf, start_offset): @@ -1256,12 +1257,12 @@ class ExchangeDeclare(AMQPMethod): passive = bool(_bit & 1) durable = bool(_bit & 2) no_wait = bool(_bit & 16) - arguments, delta = _deframe_table(buf, offset) + arguments, delta = deframe_table(buf, offset) offset += delta return ExchangeDeclare(exchange, type, passive, durable, no_wait, arguments) -class ExchangeDelete(AMQPMethod): +class ExchangeDelete(AMQPMethodPayload): """ Delete an exchange @@ -1337,7 +1338,7 @@ class ExchangeDelete(AMQPMethod): return ExchangeDelete(exchange, if_unused, no_wait) -class ExchangeDeclareOk(AMQPMethod): +class ExchangeDeclareOk(AMQPMethodPayload): """ Confirm exchange declaration @@ -1380,7 +1381,7 @@ class ExchangeDeclareOk(AMQPMethod): return ExchangeDeclareOk() -class ExchangeDeleteOk(AMQPMethod): +class ExchangeDeleteOk(AMQPMethodPayload): """ Confirm deletion of an exchange @@ -1433,7 +1434,7 @@ class Queue(AMQPClass): INDEX = 50 -class QueueBind(AMQPMethod): +class QueueBind(AMQPMethodPayload): """ Bind queue to an exchange @@ -1512,10 +1513,10 @@ class QueueBind(AMQPMethod): buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) buf.write(struct.pack('!B', (int(self.no_wait) << 0))) - _enframe_table(buf, self.arguments) + enframe_table(buf, self.arguments) def get_size(self): - return len(self.queue) + len(self.exchange) + len(self.routing_key) + _frame_table_size(self.arguments) + 10 + return len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size(self.arguments) + 10 @staticmethod def from_buffer(buf, start_offset): @@ -1536,12 +1537,12 @@ class QueueBind(AMQPMethod): _bit, = struct.unpack_from('!B', buf, offset) offset += 1 no_wait = bool(_bit & 1) - arguments, delta = _deframe_table(buf, offset) + arguments, delta = deframe_table(buf, offset) offset += delta return QueueBind(queue, exchange, routing_key, no_wait, arguments) -class QueueBindOk(AMQPMethod): +class QueueBindOk(AMQPMethodPayload): """ Confirm bind successful @@ -1583,7 +1584,7 @@ class QueueBindOk(AMQPMethod): return QueueBindOk() -class QueueDeclare(AMQPMethod): +class QueueDeclare(AMQPMethodPayload): """ Declare queue, create if needed @@ -1676,10 +1677,10 @@ class QueueDeclare(AMQPMethod): buf.write(struct.pack('!B', len(self.queue))) buf.write(self.queue) buf.write(struct.pack('!B', (int(self.passive) << 0) | (int(self.durable) << 1) | (int(self.exclusive) << 2) | (int(self.auto_delete) << 3) | (int(self.no_wait) << 4))) - _enframe_table(buf, self.arguments) + enframe_table(buf, self.arguments) def get_size(self): - return len(self.queue) + _frame_table_size(self.arguments) + 8 + return len(self.queue) + frame_table_size(self.arguments) + 8 @staticmethod def from_buffer(buf, start_offset): @@ -1696,12 +1697,12 @@ class QueueDeclare(AMQPMethod): exclusive = bool(_bit & 4) auto_delete = bool(_bit & 8) no_wait = bool(_bit & 16) - arguments, delta = _deframe_table(buf, offset) + arguments, delta = deframe_table(buf, offset) offset += delta return QueueDeclare(queue, passive, durable, exclusive, auto_delete, no_wait, arguments) -class QueueDelete(AMQPMethod): +class QueueDelete(AMQPMethodPayload): """ Delete a queue @@ -1784,7 +1785,7 @@ class QueueDelete(AMQPMethod): return QueueDelete(queue, if_unused, if_empty, no_wait) -class QueueDeclareOk(AMQPMethod): +class QueueDeclareOk(AMQPMethodPayload): """ Confirms a queue definition @@ -1857,7 +1858,7 @@ class QueueDeclareOk(AMQPMethod): return QueueDeclareOk(queue, message_count, consumer_count) -class QueueDeleteOk(AMQPMethod): +class QueueDeleteOk(AMQPMethodPayload): """ Confirm deletion of a queue @@ -1912,7 +1913,7 @@ class QueueDeleteOk(AMQPMethod): return QueueDeleteOk(message_count) -class QueuePurge(AMQPMethod): +class QueuePurge(AMQPMethodPayload): """ Purge a queue @@ -1980,7 +1981,7 @@ class QueuePurge(AMQPMethod): return QueuePurge(queue, no_wait) -class QueuePurgeOk(AMQPMethod): +class QueuePurgeOk(AMQPMethodPayload): """ Confirms a queue purge @@ -2035,7 +2036,7 @@ class QueuePurgeOk(AMQPMethod): return QueuePurgeOk(message_count) -class QueueUnbind(AMQPMethod): +class QueueUnbind(AMQPMethodPayload): """ Unbind a queue from an exchange @@ -2098,10 +2099,10 @@ class QueueUnbind(AMQPMethod): buf.write(self.exchange) buf.write(struct.pack('!B', len(self.routing_key))) buf.write(self.routing_key) - _enframe_table(buf, self.arguments) + enframe_table(buf, self.arguments) def get_size(self): - return len(self.queue) + len(self.exchange) + len(self.routing_key) + _frame_table_size(self.arguments) + 9 + return len(self.queue) + len(self.exchange) + len(self.routing_key) + frame_table_size(self.arguments) + 9 @staticmethod def from_buffer(buf, start_offset): @@ -2119,12 +2120,12 @@ class QueueUnbind(AMQPMethod): offset += 1 routing_key = buf[offset:offset+s_len] offset += s_len - arguments, delta = _deframe_table(buf, offset) + arguments, delta = deframe_table(buf, offset) offset += delta return QueueUnbind(queue, exchange, routing_key, arguments) -class QueueUnbindOk(AMQPMethod): +class QueueUnbindOk(AMQPMethodPayload): """ Confirm unbind successful @@ -2174,7 +2175,7 @@ class Basic(AMQPClass): INDEX = 60 -class BasicAck(AMQPMethod): +class BasicAck(AMQPMethodPayload): """ Acknowledge one or more messages @@ -2239,7 +2240,7 @@ class BasicAck(AMQPMethod): return BasicAck(delivery_tag, multiple) -class BasicConsume(AMQPMethod): +class BasicConsume(AMQPMethodPayload): """ Start a queue consumer @@ -2316,10 +2317,10 @@ class BasicConsume(AMQPMethod): buf.write(struct.pack('!B', len(self.consumer_tag))) buf.write(self.consumer_tag) buf.write(struct.pack('!B', (int(self.no_local) << 0) | (int(self.no_ack) << 1) | (int(self.exclusive) << 2) | (int(self.no_wait) << 3))) - _enframe_table(buf, self.arguments) + enframe_table(buf, self.arguments) def get_size(self): - return len(self.queue) + len(self.consumer_tag) + _frame_table_size(self.arguments) + 9 + return len(self.queue) + len(self.consumer_tag) + frame_table_size(self.arguments) + 9 @staticmethod def from_buffer(buf, start_offset): @@ -2339,12 +2340,12 @@ class BasicConsume(AMQPMethod): no_ack = bool(_bit & 2) exclusive = bool(_bit & 4) no_wait = bool(_bit & 8) - arguments, delta = _deframe_table(buf, offset) + arguments, delta = deframe_table(buf, offset) offset += delta return BasicConsume(queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments) -class BasicCancel(AMQPMethod): +class BasicCancel(AMQPMethodPayload): """ End a queue consumer @@ -2411,7 +2412,7 @@ class BasicCancel(AMQPMethod): return BasicCancel(consumer_tag, no_wait) -class BasicConsumeOk(AMQPMethod): +class BasicConsumeOk(AMQPMethodPayload): """ Confirm a new consumer @@ -2471,7 +2472,7 @@ class BasicConsumeOk(AMQPMethod): return BasicConsumeOk(consumer_tag) -class BasicCancelOk(AMQPMethod): +class BasicCancelOk(AMQPMethodPayload): """ Confirm a cancelled consumer @@ -2529,7 +2530,7 @@ class BasicCancelOk(AMQPMethod): return BasicCancelOk(consumer_tag) -class BasicDeliver(AMQPMethod): +class BasicDeliver(AMQPMethodPayload): """ Notify the client of a consumer message @@ -2621,7 +2622,7 @@ class BasicDeliver(AMQPMethod): return BasicDeliver(consumer_tag, delivery_tag, redelivered, exchange, routing_key) -class BasicGet(AMQPMethod): +class BasicGet(AMQPMethodPayload): """ Direct access to a queue @@ -2690,7 +2691,7 @@ class BasicGet(AMQPMethod): return BasicGet(queue, no_ack) -class BasicGetOk(AMQPMethod): +class BasicGetOk(AMQPMethodPayload): """ Provide client with a message @@ -2779,7 +2780,7 @@ class BasicGetOk(AMQPMethod): return BasicGetOk(delivery_tag, redelivered, exchange, routing_key, message_count) -class BasicGetEmpty(AMQPMethod): +class BasicGetEmpty(AMQPMethodPayload): """ Indicate no messages available @@ -2825,7 +2826,7 @@ class BasicGetEmpty(AMQPMethod): return BasicGetEmpty() -class BasicPublish(AMQPMethod): +class BasicPublish(AMQPMethodPayload): """ Publish a message @@ -2921,7 +2922,7 @@ class BasicPublish(AMQPMethod): return BasicPublish(exchange, routing_key, mandatory, immediate) -class BasicQos(AMQPMethod): +class BasicQos(AMQPMethodPayload): """ Specify quality of service @@ -3003,7 +3004,7 @@ class BasicQos(AMQPMethod): return BasicQos(prefetch_size, prefetch_count, global_) -class BasicQosOk(AMQPMethod): +class BasicQosOk(AMQPMethodPayload): """ Confirm the requested qos @@ -3047,7 +3048,7 @@ class BasicQosOk(AMQPMethod): return BasicQosOk() -class BasicReturn(AMQPMethod): +class BasicReturn(AMQPMethodPayload): """ Return a failed message @@ -3132,7 +3133,7 @@ class BasicReturn(AMQPMethod): return BasicReturn(reply_code, reply_text, exchange, routing_key) -class BasicReject(AMQPMethod): +class BasicReject(AMQPMethodPayload): """ Reject an incoming message @@ -3195,7 +3196,7 @@ class BasicReject(AMQPMethod): return BasicReject(delivery_tag, requeue) -class BasicRecoverAsync(AMQPMethod): +class BasicRecoverAsync(AMQPMethodPayload): """ Redeliver unacknowledged messages @@ -3255,7 +3256,7 @@ class BasicRecoverAsync(AMQPMethod): return BasicRecoverAsync(requeue) -class BasicRecover(AMQPMethod): +class BasicRecover(AMQPMethodPayload): """ Redeliver unacknowledged messages @@ -3315,7 +3316,7 @@ class BasicRecover(AMQPMethod): return BasicRecover(requeue) -class BasicRecoverOk(AMQPMethod): +class BasicRecoverOk(AMQPMethodPayload): """ Confirm recovery @@ -3373,7 +3374,7 @@ class Tx(AMQPClass): INDEX = 90 -class TxCommit(AMQPMethod): +class TxCommit(AMQPMethodPayload): """ Commit the current transaction @@ -3415,7 +3416,7 @@ class TxCommit(AMQPMethod): return TxCommit() -class TxCommitOk(AMQPMethod): +class TxCommitOk(AMQPMethodPayload): """ Confirm a successful commit @@ -3458,7 +3459,7 @@ class TxCommitOk(AMQPMethod): return TxCommitOk() -class TxRollback(AMQPMethod): +class TxRollback(AMQPMethodPayload): """ Abandon the current transaction @@ -3502,7 +3503,7 @@ class TxRollback(AMQPMethod): return TxRollback() -class TxRollbackOk(AMQPMethod): +class TxRollbackOk(AMQPMethodPayload): """ Confirm successful rollback @@ -3545,7 +3546,7 @@ class TxRollbackOk(AMQPMethod): return TxRollbackOk() -class TxSelect(AMQPMethod): +class TxSelect(AMQPMethodPayload): """ Select standard transaction mode @@ -3587,7 +3588,7 @@ class TxSelect(AMQPMethod): return TxSelect() -class TxSelectOk(AMQPMethod): +class TxSelectOk(AMQPMethodPayload): """ Confirm transaction mode diff --git a/coolamqp/framing/frames/field_table.py b/coolamqp/framing/frames/field_table.py new file mode 100644 index 0000000000000000000000000000000000000000..939aea33707c84bfc00b51514080eace38db5e31 --- /dev/null +++ b/coolamqp/framing/frames/field_table.py @@ -0,0 +1,196 @@ +# coding=UTF-8 +""" +That funny type, field-table... + +A field-value is of form (value::any, type::char) + +An array is of form [field-value1, field-value2, ...] + +A table is of form ( (name1::bytes, fv1), (name2::bytes, fv2), ...) + +""" +from __future__ import absolute_import, division, print_function +import struct +import six + + +def enframe_decimal(buf, v): # convert decimal to bytes + dps = 0 + for k in six.moves.xrange(20): + k = v * (10 ** dps) + if abs(k-int(k)) < 0.00001: # epsilon + return buf.write(struct.pack('!BI', dps, k)) + + raise ValueError('Could not convert %s to decimal', v) + + +def deframe_decimal(buf, offset): + scale, val = struct.unpack_from('!BI', buf, offset) + return val / (10 ** scale), 5 + + +def deframe_shortstr(buf, offset): # -> value, bytes_eaten + ln = ord(buf[offset]) + return buf[offset+1:offset+1+ln], 1+ln + + +def enframe_shortstr(buf, value): + buf.write(chr(len(value))) + buf.write(value) + + +def deframe_longstr(buf, offset): # -> value, bytes_eaten + ln, = struct.unpack_from('!I', buf, offset) + offset += 4 + return buf[offset:offset+ln], 4 + ln + + +def enframe_longstr(buf, value): + buf.write(struct.pack('!I', value)) + buf.write(value) + + +FIELD_TYPES = { + # length, struct, (option)to_bytes (callable(buffer, value)), + # (option)from_bytes (callable(buffer, offset) -> value, bytes_consumed), + # (option)get_len (callable(value) -> length in bytes) + 't': (1, '!?'), # boolean + 'b': (1, '!b'), + 'B': (1, '!B'), + 'U': (2, '!H'), + 'u': (2, '!h'), + 'I': (4, '!I'), + 'i': (4, '!i'), + 'L': (8, '!Q'), + 'l': (8, '!q'), + 'f': (4, '!f'), + 'd': (8, '!d'), + 'D': (5, None, enframe_decimal, deframe_decimal), # decimal-value + 's': (None, None, enframe_shortstr, deframe_shortstr, lambda val: len(val)+1), # shortstr + 'S': (None, None, enframe_longstr, deframe_longstr, lambda val: len(val)+4), # longstr + 'T': (8, '!Q'), + 'V': (0, None, lambda buf, v: None, lambda buf, ofs: None, 0), # rendered as None +} + + +def enframe_field_value(buf, fv): + value, type = fv + buf.write(tp) + + opt = FIELD_TYPES[tp] + + if opt[1] is not None: + buf.write(struct.pack(opt[1], value)) + else: + opt[2](buf, value) + + +def deframe_field_value(buf, offset): # -> (value, type), bytes_consumed + start_offset = offset + field_type = buf[offset] + offset += 1 + + if field_type not in FIELD_TYPES.keys(): + raise ValueError('Unknown field type %s!', (repr(field_type),)) + + opt = FIELD_TYPES[field_type] + + if opt[1] is not None: + field_val, = struct.unpack_from(FIELD_TYPES[field_type][1], buf, offset) + offset += opt[0] + else: + field_val, delta = opt[3](buf, offset) + offset += delta + + return (field_val, field_type), offset - start_offset + + +def deframe_array(buf, offset): + start_offset = offset + ln, = struct.unpack_from('!I', buf, offset) + offset += 4 + + values = [] + while offset < (start_offset+1+ln): + v, t, delta = deframe_field_value(buf, offset) + offset += delta + values.append((v,t)) + + if offset != start_offset+4+ln: + raise ValueError('Array longer than expected, took %s, expected %s bytes', + (offset-(start_offset+ln+4), ln+4)) + + return values, ln+4 + + +def enframe_array(buf, array): + buf.write(struct.pack('!I', frame_array_size(array)-4)) + for fv in array: + enframe_field_value(buf, fv) + + +def enframe_table(buf, table): + """ + Write AMQP table to buffer + :param buf: + :param table: + :return: + """ + buf.write(struct.pack('!I', frame_table_size(table)-4)) + + for name, fv in table: + buf.write(struct.pack('!B', len(name))) + buf.write(name) + enframe_field_value(buf, fv) + + +def deframe_table(buf, start_offset): # -> (table, bytes_consumed) + """:return: tuple (table, bytes consumed)""" + offset = start_offset + table_length, = struct.unpack_from('!I', buf, start_offset) + offset += 4 + + # we will check if it's really so. + fields = [] + + while offset < (start_offset+table_length+4): + ln, = struct.unpack_from('!B', buf, offset) + offset += 1 + field_name = buf[offset:offset+ln] + offset += ln + field_val, field_tp, delta = deframe_field_value(buf, offset) + offset += delta + fields.append((field_name, (field_val, field_tp))) + + if offset > (start_offset+table_length): + raise ValueError('Table turned out longer than expected! Found %s bytes expected %s', + (offset-start_offset, table_length)) + + return fields, table_length+4 + + +def frame_field_value_size(fv): + v,t=fv + if FIELD_TYPES[t][0] is None: + return FIELD_TYPES[t][4](v) + 1 + else: + return FIELD_TYPES[t][0] + 1 + + +def frame_array_size(array): + return 4 + sum(frame_field_value_size(fv) for fv in array) + + +def frame_table_size(table): + """:return: length of table representation, in bytes, INCLUDING length header""" + + size = 4 # length header + for k, fv in table: + v,t =fv + size += 1 + len(k) + frame_field_value_size(v, t) + + return size + + +FIELD_TYPES['A'] = (None, None, enframe_array, deframe_array, frame_array_size) +FIELD_TYPES['F'] = (None, None, enframe_table, deframe_table, frame_table_size) diff --git a/coolamqp/framing/frames/frames.py b/coolamqp/framing/frames/frames.py new file mode 100644 index 0000000000000000000000000000000000000000..ea85d9a7b3dcf8ba2afd2e0ade9d78e204ddda44 --- /dev/null +++ b/coolamqp/framing/frames/frames.py @@ -0,0 +1,100 @@ +# coding=UTF-8 +""" +Concrete frame definitions +""" +from __future__ import absolute_import, division, print_function + +import struct + +from coolamqp.framing.frames.base import AMQPFrame +from coolamqp.framing.frames.definitions import FRAME_METHOD, FRAME_HEARTBEAT, FRAME_BODY, FRAME_HEADER, FRAME_END, \ + IDENT_TO_METHOD + + +class AMQPMethodFrame(AMQPFrame): + FRAME_TYPE = FRAME_METHOD + + def __init__(self, channel, payload): + """ + :param channel: channel ID + :param payload: AMQPMethodPayload instance + """ + AMQPFrame.__init__(self, channel) + self.payload = payload + + def write_to(self, buf): + AMQPFrame.write_to(self, buf) + self.payload.write_to(buf) + + + @staticmethod + def unserialize(channel, payload_as_buffer): + clsmet = struct.unpack_from('!BB', payload_as_buffer, 0) + + try: + method_payload_class = IDENT_TO_METHOD[clsmet] + payload = method_payload_class.from_buffer(payload_as_buffer, 2) + except KeyError: + raise ValueError('Invalid class %s method %s' % clsmet) + + return AMQPMethodFrame(channel, payload) + + + +class AMQPHeaderFrame(AMQPFrame): + FRAME_TYPE = FRAME_HEADER + + def __init__(self, channel, class_id, weight, body_size, property_flags, property_list): + """ + :param channel: + :param class_id: + :param weight: + :param body_size: + :param property_flags: + :param property_list: + """ + AMQPFrame.__init__(self, channel) + self.class_id = class_id + self.weight = weight + self.body_size = body_size + self.property_flags = property_flags + self.property_list = property_list + + def write_to(self, buf): + AMQPFrame.write_to(self, buf) + buf.write(struct.pack('!HHQH')) + + @staticmethod + def unserialize(channel, payload_as_buffer): + pass + + +class AMQPBodyFrame(AMQPFrame): + FRAME_TYPE = FRAME_BODY + + def __init__(self, channel, data): + AMQPFrame.__init__(self, channel) + self.data = data + + def write_to(self, buf): + AMQPFrame.write_to(self, buf) + buf.write(buf) + buf.write(chr(FRAME_END)) + + @staticmethod + def unserialize(channel, payload_as_buffer): + return AMQPBodyFrame(channel, payload_as_buffer) + + +class AMQPHeartbeatFrame(AMQPFrame): + FRAME_TYPE = FRAME_HEARTBEAT + LENGTH = 4 + DATA = '\x00\x00\xCE' + + def __init__(self): + AMQPFrame.__init__(self, 0) + + def write_to(self, buf): + AMQPFrame.write_to(self, buf) + buf.write(chr(FRAME_END)) + diff --git a/coolamqp/framing/frames/human.py b/coolamqp/framing/frames/human.py deleted file mode 100644 index bbb560a4dd2b797e88e4414b883fb19f20993261..0000000000000000000000000000000000000000 --- a/coolamqp/framing/frames/human.py +++ /dev/null @@ -1,12 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function - - - -def decode_frame(type_, payload): - """ - - :param type_: - :param payload: - :return: - """ \ No newline at end of file diff --git a/coolamqp/framing/serialization.py b/coolamqp/framing/serialization.py deleted file mode 100644 index db550b153237a389e65f94a1cdff811e1efb2e20..0000000000000000000000000000000000000000 --- a/coolamqp/framing/serialization.py +++ /dev/null @@ -1,39 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import struct -"""bytes <-> Frame's""" - -from coolamqp.framing.frames import IDENT_TO_METHOD, FRAME_END, FRAME_METHOD - -HDR = b'AMQP\x00\x00\x00\x01' - - -class AMQPFrame(object): - def __init__(self, type_, channel, payload): - - - -def serialize_method(channel, method): - """Return an AMQP frame""" - strs = [] - method.write(lambda s: strs.append(s)) - payload_len = sum(len(x) for x in strs) - return b''.join([struct.pack('!BHI', FRAME_METHOD, channel, payload_len)] + strs + [chr(FRAME_END)]) - -def try_frame(buf, offset): - """ - Try to make sense out of a buffer and decode a frame. - :param buf: - :param offset: - :return: - """ - - -def to_packet(type_, channel, payload, size=None, frame_end=None): - """ - :type payload: six.binary_type - :type frame_end: six.binary_type - """ - size = size or len(payload) - frame_end = frame_end or FRAME_END - return struct.pack('!BHI', type_, channel, size) + payload + FRAME_END diff --git a/coolamqp/framing/streams/__init__.py b/coolamqp/framing/streams/__init__.py index c71857bcd11230186abcb534f8128eaf15053c2f..111635de9e6ce30fd3cbb34c7e3532b50d1f60d4 100644 --- a/coolamqp/framing/streams/__init__.py +++ b/coolamqp/framing/streams/__init__.py @@ -3,9 +3,7 @@ Classes that allow to receive and send frames in a rapid way """ from __future__ import absolute_import, division, print_function -import socket -import collections -from coolamqp.framing.streams.exceptions import StreamIsDead +from coolamqp.framing.streams.recv_formatter import ReceivingFormatter diff --git a/coolamqp/framing/streams/exceptions.py b/coolamqp/framing/streams/exceptions.py deleted file mode 100644 index ec40452acd255c243e9374ebeb870c4d12bae07f..0000000000000000000000000000000000000000 --- a/coolamqp/framing/streams/exceptions.py +++ /dev/null @@ -1,12 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function - - - - -class FormatterError(Exception): - """Exceptions for formatters""" - - -class InvalidDataError(Exception): - """Formatter encountered an invalid data""" \ No newline at end of file diff --git a/coolamqp/framing/streams/receiver.py b/coolamqp/framing/streams/receiver.py deleted file mode 100644 index fe005676f671871dc8316b30bc53069a049fe6dc..0000000000000000000000000000000000000000 --- a/coolamqp/framing/streams/receiver.py +++ /dev/null @@ -1,91 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import struct -import io -import six -import collections -import socket - - -from coolamqp.framing.streams.exceptions import InvalidDataError - - -class ReceivingFormatter(object): - """ - Assembles AMQP frames from received data. - - Just call with .put(data) and get frames by - iterator .frames(). - - Not thread safe. - """ - def __init__(self, sock): - self.chunks = collections.deque() # all received data - self.total_data_len = 0 - self.header = None # or type, channel, size - self.frames = collections.deque() # for - - def receive_from_socket(self, data): - self.total_data_len += len(data) - self.chunks.append(buffer(data)) - - def get_frames(self): - """ - An iterator to return frames pending for read. - - :raises ValueError: - :return: - """ - - def __got_frame(self, type_, channel, payload, frame_end): - """ - New frame! - - Try decoding - """ - - - def __statemachine(self): - if self.header is None and self.total_data_len > 7: - a = bytearray() - while len(a) < 7: - if len(self.chunks[0]) <= (len(a) - 7): # we will need a next one - a.extend(self.chunks.popleft()) - else: - a.extend(self.chunks[0:len(a-7)]) - self.chunks[0] = buffer(self.chunks[0], len(a)-7) - self.header = struct.unpack('!BHI', a) - - if (self.header is not None) and self.total_data_len >= (self.header[2]+1): - if len(self.chunks[0]) > self.header[2]+1: - # We can subslice it - it's very fast - payload = buffer(self.chunks[0], self.header[2]) - frame_end = self.chunks[self.header[2]] - self.chunks[0] = buffer(self.chunks[0], self.header[2]+1) - - else: - # Construct a separate buffer :( - payload = io.BytesIO() - while payload.tell() < self.header[2]: - remaining = self.header[2] - payload.tell() - - if remaining >= self.chunks[0]: - chunk = self.chunks.popleft() - payload.write(self.chunks.popleft()) - self.total_data_len -= len(chunk) - else: - self.total_data_len -= remaining - payload.write(buffer(self.chunks[0], 0, remaining)) - self.chunks[0] = buffer(self.chunks[0], remaining) - - # Get last byte - if len(self.chunks[0]) == 1: - frame_end = self.chunks.pop()[0] - else: - frame_end = self.chunks[0][0] - self.chunks[0] = buffer(self.chunks[0], 1) - - self.__got_frame(self.header[0], self.header[1], payload.getvalue(), frame_end) - self.header = None - - diff --git a/coolamqp/framing/streams/recv_formatter.py b/coolamqp/framing/streams/recv_formatter.py new file mode 100644 index 0000000000000000000000000000000000000000..18f84df0bfd3ed41f928879e3e8b203fc3b8f0e9 --- /dev/null +++ b/coolamqp/framing/streams/recv_formatter.py @@ -0,0 +1,139 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import struct +import io +import six +import collections +import socket + +from coolamqp.framing.frames.definitions import FRAME_HEADER, FRAME_HEARTBEAT, FRAME_END, FRAME_METHOD, FRAME_BODY +from coolamqp.framing.frames.frames import AMQPBodyFrame, AMQPHeaderFrame, AMQPHeartbeatFrame, AMQPMethodFrame +from coolamqp.framing.streams.exceptions import InvalidDataError + + +FRAME_TYPES = { + FRAME_HEADER: AMQPHeaderFrame, + FRAME_BODY: AMQPBodyFrame, + FRAME_METHOD: AMQPMethodFrame +} + + +class ReceivingFormatter(object): + """ + Assembles AMQP frames from received data. + + Just call with .put(data) and get frames by iterator .frames(). + + Not thread safe. + + State machine + (frame_type is None) and has_bytes(1) -> (frame_type <- bytes(1)) + + (frame_type is HEARTBEAT) and has_bytes(3) -> (output_frame, frame_type <- None) + (frame_type is not HEARTBEAT and not None) and has_bytes(6) -> (frame_channel <- bytes(2), + frame_size <- bytes(4)) + + (frame_size is not None) and has_bytes(frame_size+1) -> (output_frame, + frame_type <- None + frame_size < None) + """ + def __init__(self, sock): + self.chunks = collections.deque() # all received data + self.total_data_len = 0 + + self.frame_type = None + self.frame_channel = None + self.frame_size = None + + self.bytes_needed = None # bytes needed for a new frame + self.frames = collections.deque() # for + + def put(self, data): + self.total_data_len += len(data) + self.chunks.append(buffer(data)) + + def get_frames(self): # -> iterator of AMQPFrame, raises ValueError + """ + An iterator to return frames pending for read. + + :raises ValueError: invalid frame readed, kill the connection. + :return: iterator with frames + """ + while self.__statemachine(): + pass + + while len(self.frames) > 0: + yield self.frames.popleft() + + def _extract(self, up_to): # return up to up_to bytes from current chunk, switch if necessary + if up_to >= len(self.chunks[0]): + q = self.chunks.popleft() + else: + q = buffer(self.chunks[0], 0, up_to) + self.chunks[0] = buffer(self.chunks, up_to) + self.total_data_len -= len(q) + return q + + def _statemachine(self): # -> bool, was any action taken? + # state rule 1 + if self.frame_type is None and self.total_data_len > 0: + self.frame_type = ord(self._extract(1)) + + if self.frame_type not in (FRAME_HEARTBEAT, FRAME_HEADER, FRAME_METHOD, FRAME_BODY): + raise ValueError('Invalid frame') + + return True + + # state rule 2 + if (self.frame_type == FRAME_HEARTBEAT) and (self.total_data_len > 3): + data = b'' + while len(data) < 3: + data = data + self._extract(3 - len(data)) + + if data != AMQPHeartbeatFrame.DATA: + # Invalid heartbeat frame! + raise ValueError('Invalid AMQP heartbeat') + + self.frames.append(AMQPHeartbeatFrame()) + self.frame_type = None + + return True + + # state rule 3 + if (self.frame_type != FRAME_HEARTBEAT) and (self.frame_type is not None) and (self.total_data_len > 6): + hdr = b'' + while len(hdr) < 6: + hdr = hdr + self._extract(6 - len(hdr)) + + self.frame_channel, self.frame_size = struct.unpack('!BI', hdr) + + return True + + # state rule 4 + if self.total_data_len >= (self.frame_size+1): + + if len(self.chunks[0]) >= self.total_data_len: + # We can subslice it - it's very fast + payload = self._extract(self.total_data_len) + else: + # Construct a separate buffer :( + payload = io.BytesIO() + while payload.tell() < self.total_data_len: + payload.write(self._extract(self.total_data_len - payload.tell())) + + payload = buffer(payload.getvalue()) + + if ord(self._extract(1)) != FRAME_END: + raise ValueError('Invalid frame end') + + try: + frame = FRAME_TYPES[self.frame_type].unserialize(self.frame_channel, payload) + except ValueError: + raise + + self.frames.append(frame) + self.frame_type = None + self.frame_size = None + + return True + return False \ No newline at end of file