From 676dde9f2891fd330f5195e4aca3149e51428ff7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sat, 31 Dec 2016 00:33:41 +0100 Subject: [PATCH] can receive and unframe connectionstart --- coolamqp/__init__.py | 1 + coolamqp/connection/definition.py | 27 ++ .../compilation/compile_definitions.py | 80 ++-- .../framing/compilation/textcode_fields.py | 2 + coolamqp/framing/definitions.py | 389 +++++------------- coolamqp/framing/field_table.py | 8 +- coolamqp/framing/frames.py | 11 +- coolamqp/uplink/__init__.py | 4 + coolamqp/uplink/connection/__init__.py | 11 + coolamqp/uplink/connection/connection.py | 58 +++ .../uplink/{ => connection}/recv_framer.py | 64 +-- coolamqp/uplink/connection/send_framer.py | 47 +++ coolamqp/uplink/listener/__init__.py | 17 + coolamqp/uplink/listener/epoll_listener.py | 91 ++++ coolamqp/uplink/listener/socket.py | 97 +++++ coolamqp/uplink/listener/thread.py | 40 ++ coolamqp/uplink/order.py | 30 -- coolamqp/uplink/reactor.py | 35 ++ coolamqp/uplink/reader_thread.py | 34 -- coolamqp/uplink/send_framer.py | 95 ----- coolamqp/uplink/watchman.py | 144 ------- .../iface.py => tests/test_uplink/__init__.py | 0 tests/test_uplink/test_basic.py | 39 ++ 23 files changed, 667 insertions(+), 657 deletions(-) create mode 100644 coolamqp/connection/definition.py create mode 100644 coolamqp/uplink/connection/__init__.py create mode 100644 coolamqp/uplink/connection/connection.py rename coolamqp/uplink/{ => connection}/recv_framer.py (65%) create mode 100644 coolamqp/uplink/connection/send_framer.py create mode 100644 coolamqp/uplink/listener/__init__.py create mode 100644 coolamqp/uplink/listener/epoll_listener.py create mode 100644 coolamqp/uplink/listener/socket.py create mode 100644 coolamqp/uplink/listener/thread.py delete mode 100644 coolamqp/uplink/order.py create mode 100644 coolamqp/uplink/reactor.py delete mode 100644 coolamqp/uplink/reader_thread.py delete mode 100644 coolamqp/uplink/send_framer.py delete mode 100644 coolamqp/uplink/watchman.py rename coolamqp/uplink/iface.py => tests/test_uplink/__init__.py (100%) create mode 100644 tests/test_uplink/test_basic.py diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 9599562..a8863d8 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1,2 @@ # coding=UTF-8 + diff --git a/coolamqp/connection/definition.py b/coolamqp/connection/definition.py new file mode 100644 index 0000000..82778d3 --- /dev/null +++ b/coolamqp/connection/definition.py @@ -0,0 +1,27 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + + + +class NodeDefinition(object): + """ + Definition of a node + """ + + def __init__(self, host, port, user, password, virtual_host='/', amqp_version='0.9.1', heartbeat=None): + """ + + :param host: TCP host, str + :param port: TCP port, int + :param user: AMQP user + :param password: AMQP password + :param virtual_host: AMQP virtual host + :param amqp_version: AMQP protocol version + """ + self.user = user + self.password = password + self.host = host + self.port = port + self.virtual_host = virtual_host + self.amqp_version = amqp_version + self.heartbeat = heartbeat diff --git a/coolamqp/framing/compilation/compile_definitions.py b/coolamqp/framing/compilation/compile_definitions.py index 3d92828..41025f9 100644 --- a/coolamqp/framing/compilation/compile_definitions.py +++ b/coolamqp/framing/compilation/compile_definitions.py @@ -221,48 +221,48 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved return c(**kwargs) '''.replace('%s', name_class(cls.name) + 'ContentPropertyList').replace('%d', '%s')) - line(u''' + line(u''' @staticmethod def typize(*fields): -''') - line(u' zpf = bytearray([\n') - - first_byte = True # in 2-byte group - piece_index = 7 # from 7 downto 0 - fields_remaining = len(cls.properties) - byte_chunk = [] - - for field in cls.properties: - # a bit - if piece_index > 0: - if field.reserved or field.basic_type == 'bit': - pass # zero - else: - byte_chunk.append(u"(('%s' in fields) << %s)" % (format_field_name(field.name), piece_index)) - piece_index -= 1 - else: - if first_byte: + ''') + line(u' zpf = bytearray([\n') + + first_byte = True # in 2-byte group + piece_index = 7 # from 7 downto 0 + fields_remaining = len(cls.properties) + byte_chunk = [] + + for field in cls.properties: + # a bit + if piece_index > 0: if field.reserved or field.basic_type == 'bit': - pass #zero + pass # zero else: - byte_chunk.append(u"int('%s' in kwargs)" % (format_field_name(field.name),)) + byte_chunk.append(u"(('%s' in fields) << %s)" % (format_field_name(field.name), piece_index)) + piece_index -= 1 else: - # this is the "do we need moar flags" section - byte_chunk.append(u"kwargs['%s']" % ( - int(fields_remaining > 1) - )) - - # Emit the byte - line(u' %s,\n', u' | '.join(byte_chunk)) - byte_chunk = [] - first_byte = not first_byte - piece_index = 7 - fields_remaining -= 1 - - if len(byte_chunk) > 0: - line(u' %s\n', u' | '.join(byte_chunk)) # We did not finish - - line(u''' ]) + if first_byte: + if field.reserved or field.basic_type == 'bit': + pass #zero + else: + byte_chunk.append(u"int('%s' in kwargs)" % (format_field_name(field.name),)) + else: + # this is the "do we need moar flags" section + byte_chunk.append(u"kwargs['%s']" % ( + int(fields_remaining > 1) + )) + + # Emit the byte + line(u' %s,\n', u' | '.join(byte_chunk)) + byte_chunk = [] + first_byte = not first_byte + piece_index = 7 + fields_remaining -= 1 + + if len(byte_chunk) > 0: + line(u' %s\n', u' | '.join(byte_chunk)) # We did not finish + + line(u''' ]) zpf = six.binary_type(zpf) if zpf in %s.PARTICULAR_CLASSES: return %s.PARTICULAR_CLASSES[zpf] @@ -273,7 +273,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved return c '''.replace("%s", name_class(cls.name) + 'ContentPropertyList').replace('%d', '%s')) - line(u''' + line(u''' @staticmethod def from_buffer(buf, offset): """ @@ -326,7 +326,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved to_docstring(method.label, method.docs), frepr(cls.name + '.' + method.name), frepr(cls.index), frepr(method.index), - to_code_binary(chr(cls.index)+chr(method.index)), + to_code_binary(struct.pack("!HH", cls.index, method.index)), repr(method.sent_by_client), repr(method.sent_by_server), repr(is_static), @@ -423,7 +423,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved line('\nBINARY_HEADER_TO_METHOD = {\n') for k, v in dct.items(): - line(' %s: %s,\n', to_code_binary(struct.pack('!BB', *k)), v) + line(' %s: %s,\n', to_code_binary(struct.pack('!HH', *k)), v) line('}\n\n') line('\nCLASS_ID_TO_CONTENT_PROPERTY_LIST = {\n') diff --git a/coolamqp/framing/compilation/textcode_fields.py b/coolamqp/framing/compilation/textcode_fields.py index 551183f..3b52d7f 100644 --- a/coolamqp/framing/compilation/textcode_fields.py +++ b/coolamqp/framing/compilation/textcode_fields.py @@ -127,6 +127,8 @@ def get_from_buffer(fields, prefix='', indent_level=2): elif field.basic_type == u'bit': bits.append('_' if field.reserved else fieldname) elif field.basic_type == u'table': # oh my god + if len(to_struct) > 0: + emit_structures() emit("%s, delta = deframe_table(buf, offset)", fieldname) emit("offset += delta") else: # longstr or shortstr diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index cd2b132..509bc74 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -109,39 +109,6 @@ class Connection(AMQPClass): INDEX = 10 - @staticmethod - def typize(*fields): - zpf = bytearray([ - ]) - zpf = six.binary_type(zpf) - if zpf in ConnectionContentPropertyList.PARTICULAR_CLASSES: - return ConnectionContentPropertyList.PARTICULAR_CLASSES[zpf] - else: - logger.debug('Property field (ConnectionContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, ConnectionContentPropertyList.FIELDS) - ConnectionContentPropertyList.PARTICULAR_CLASSES[zpf] = c - return c - - @staticmethod - def from_buffer(buf, offset): - """ - Return a content property list instance unserialized from - buffer, so that buf[offset] marks the start of property flags - """ - # extract property flags - pfl = 2 - while ord(buf[offset + pfl]) & 1: - pfl += 2 - zpf = ConnectionContentPropertyList.zero_property_flags(buf[offset:offset+pfl]) - if zpf in ConnectionContentPropertyList.PARTICULAR_CLASSES: - return ConnectionContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) - else: - logger.debug('Property field (ConnectionContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, ConnectionContentPropertyList.FIELDS) - ConnectionContentPropertyList.PARTICULAR_CLASSES[zpf] = c - return c.from_buffer(buf, offset) - - class ConnectionClose(AMQPMethodPayload): """ Request a connection close @@ -154,7 +121,7 @@ class ConnectionClose(AMQPMethodPayload): NAME = u'connection.close' INDEX = (10, 50) # (Class ID, Method ID) - BINARY_HEADER = b'\x0A\x32' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x0A\x00\x32' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True @@ -218,7 +185,7 @@ class ConnectionCloseOk(AMQPMethodPayload): NAME = u'connection.close-ok' INDEX = (10, 51) # (Class ID, Method ID) - BINARY_HEADER = b'\x0A\x33' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x0A\x00\x33' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True @@ -250,7 +217,7 @@ class ConnectionOpen(AMQPMethodPayload): NAME = u'connection.open' INDEX = (10, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x0A\x28' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x0A\x00\x28' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -307,7 +274,7 @@ class ConnectionOpenOk(AMQPMethodPayload): NAME = u'connection.open-ok' INDEX = (10, 41) # (Class ID, Method ID) - BINARY_HEADER = b'\x0A\x29' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x0A\x00\x29' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -346,7 +313,7 @@ class ConnectionStart(AMQPMethodPayload): NAME = u'connection.start' INDEX = (10, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x0A\x0A' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x0A\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -408,10 +375,12 @@ class ConnectionStart(AMQPMethodPayload): @staticmethod def from_buffer(buf, start_offset): offset = start_offset + version_major, version_minor, = struct.unpack_from('!BB', buf, offset) + offset += 2 server_properties, delta = deframe_table(buf, offset) offset += delta - version_major, version_minor, s_len, = struct.unpack_from('!BBL', buf, offset) - offset += 6 + s_len, = struct.unpack_from('!L', buf, offset) + offset += 4 mechanisms = buf[offset:offset+s_len] offset += s_len s_len, = struct.unpack_from('!L', buf, offset) @@ -432,7 +401,7 @@ class ConnectionSecure(AMQPMethodPayload): NAME = u'connection.secure' INDEX = (10, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x0A\x14' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x0A\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -481,7 +450,7 @@ class ConnectionStartOk(AMQPMethodPayload): NAME = u'connection.start-ok' INDEX = (10, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x0A\x0B' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x0A\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -566,7 +535,7 @@ class ConnectionSecureOk(AMQPMethodPayload): NAME = u'connection.secure-ok' INDEX = (10, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x0A\x15' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x0A\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -616,7 +585,7 @@ class ConnectionTune(AMQPMethodPayload): NAME = u'connection.tune' INDEX = (10, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x0A\x1E' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x0A\x00\x1E' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -677,7 +646,7 @@ class ConnectionTuneOk(AMQPMethodPayload): NAME = u'connection.tune-ok' INDEX = (10, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x0A\x1F' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x0A\x00\x1F' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -738,39 +707,6 @@ class Channel(AMQPClass): INDEX = 20 - @staticmethod - def typize(*fields): - zpf = bytearray([ - ]) - zpf = six.binary_type(zpf) - if zpf in ChannelContentPropertyList.PARTICULAR_CLASSES: - return ChannelContentPropertyList.PARTICULAR_CLASSES[zpf] - else: - logger.debug('Property field (ChannelContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, ChannelContentPropertyList.FIELDS) - ChannelContentPropertyList.PARTICULAR_CLASSES[zpf] = c - return c - - @staticmethod - def from_buffer(buf, offset): - """ - Return a content property list instance unserialized from - buffer, so that buf[offset] marks the start of property flags - """ - # extract property flags - pfl = 2 - while ord(buf[offset + pfl]) & 1: - pfl += 2 - zpf = ChannelContentPropertyList.zero_property_flags(buf[offset:offset+pfl]) - if zpf in ChannelContentPropertyList.PARTICULAR_CLASSES: - return ChannelContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) - else: - logger.debug('Property field (ChannelContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, ChannelContentPropertyList.FIELDS) - ChannelContentPropertyList.PARTICULAR_CLASSES[zpf] = c - return c.from_buffer(buf, offset) - - class ChannelClose(AMQPMethodPayload): """ Request a channel close @@ -783,7 +719,7 @@ class ChannelClose(AMQPMethodPayload): NAME = u'channel.close' INDEX = (20, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x14\x28' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x14\x00\x28' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True @@ -847,7 +783,7 @@ class ChannelCloseOk(AMQPMethodPayload): NAME = u'channel.close-ok' INDEX = (20, 41) # (Class ID, Method ID) - BINARY_HEADER = b'\x14\x29' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x14\x00\x29' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True @@ -880,7 +816,7 @@ class ChannelFlow(AMQPMethodPayload): NAME = u'channel.flow' INDEX = (20, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x14\x14' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x14\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True @@ -927,7 +863,7 @@ class ChannelFlowOk(AMQPMethodPayload): NAME = u'channel.flow-ok' INDEX = (20, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x14\x15' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x14\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, True @@ -974,7 +910,7 @@ class ChannelOpen(AMQPMethodPayload): NAME = u'channel.open' INDEX = (20, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x14\x0A' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x14\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -1011,7 +947,7 @@ class ChannelOpenOk(AMQPMethodPayload): NAME = u'channel.open-ok' INDEX = (20, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x14\x0B' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x14\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -1049,39 +985,6 @@ class Exchange(AMQPClass): INDEX = 40 - @staticmethod - def typize(*fields): - zpf = bytearray([ - ]) - zpf = six.binary_type(zpf) - if zpf in ExchangeContentPropertyList.PARTICULAR_CLASSES: - return ExchangeContentPropertyList.PARTICULAR_CLASSES[zpf] - else: - logger.debug('Property field (ExchangeContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, ExchangeContentPropertyList.FIELDS) - ExchangeContentPropertyList.PARTICULAR_CLASSES[zpf] = c - return c - - @staticmethod - def from_buffer(buf, offset): - """ - Return a content property list instance unserialized from - buffer, so that buf[offset] marks the start of property flags - """ - # extract property flags - pfl = 2 - while ord(buf[offset + pfl]) & 1: - pfl += 2 - zpf = ExchangeContentPropertyList.zero_property_flags(buf[offset:offset+pfl]) - if zpf in ExchangeContentPropertyList.PARTICULAR_CLASSES: - return ExchangeContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) - else: - logger.debug('Property field (ExchangeContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, ExchangeContentPropertyList.FIELDS) - ExchangeContentPropertyList.PARTICULAR_CLASSES[zpf] = c - return c.from_buffer(buf, offset) - - class ExchangeDeclare(AMQPMethodPayload): """ Verify exchange exists, create if needed @@ -1092,7 +995,7 @@ class ExchangeDeclare(AMQPMethodPayload): NAME = u'exchange.declare' INDEX = (40, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x28\x0A' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x28\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -1194,7 +1097,7 @@ class ExchangeDelete(AMQPMethodPayload): NAME = u'exchange.delete' INDEX = (40, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x28\x14' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x28\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -1258,7 +1161,7 @@ class ExchangeDeclareOk(AMQPMethodPayload): NAME = u'exchange.declare-ok' INDEX = (40, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x28\x0B' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x28\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -1287,7 +1190,7 @@ class ExchangeDeleteOk(AMQPMethodPayload): NAME = u'exchange.delete-ok' INDEX = (40, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x28\x15' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x28\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -1318,39 +1221,6 @@ class Queue(AMQPClass): INDEX = 50 - @staticmethod - def typize(*fields): - zpf = bytearray([ - ]) - zpf = six.binary_type(zpf) - if zpf in QueueContentPropertyList.PARTICULAR_CLASSES: - return QueueContentPropertyList.PARTICULAR_CLASSES[zpf] - else: - logger.debug('Property field (QueueContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, QueueContentPropertyList.FIELDS) - QueueContentPropertyList.PARTICULAR_CLASSES[zpf] = c - return c - - @staticmethod - def from_buffer(buf, offset): - """ - Return a content property list instance unserialized from - buffer, so that buf[offset] marks the start of property flags - """ - # extract property flags - pfl = 2 - while ord(buf[offset + pfl]) & 1: - pfl += 2 - zpf = QueueContentPropertyList.zero_property_flags(buf[offset:offset+pfl]) - if zpf in QueueContentPropertyList.PARTICULAR_CLASSES: - return QueueContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) - else: - logger.debug('Property field (QueueContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, QueueContentPropertyList.FIELDS) - QueueContentPropertyList.PARTICULAR_CLASSES[zpf] = c - return c.from_buffer(buf, offset) - - class QueueBind(AMQPMethodPayload): """ Bind queue to an exchange @@ -1363,7 +1233,7 @@ class QueueBind(AMQPMethodPayload): NAME = u'queue.bind' INDEX = (50, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x32\x14' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x32\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -1456,7 +1326,7 @@ class QueueBindOk(AMQPMethodPayload): NAME = u'queue.bind-ok' INDEX = (50, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x32\x15' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x32\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -1487,7 +1357,7 @@ class QueueDeclare(AMQPMethodPayload): NAME = u'queue.declare' INDEX = (50, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x32\x0A' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x32\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -1593,7 +1463,7 @@ class QueueDelete(AMQPMethodPayload): NAME = u'queue.delete' INDEX = (50, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x32\x28' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x32\x00\x28' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -1663,7 +1533,7 @@ class QueueDeclareOk(AMQPMethodPayload): NAME = u'queue.declare-ok' INDEX = (50, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x32\x0B' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x32\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -1723,7 +1593,7 @@ class QueueDeleteOk(AMQPMethodPayload): NAME = u'queue.delete-ok' INDEX = (50, 41) # (Class ID, Method ID) - BINARY_HEADER = b'\x32\x29' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x32\x00\x29' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -1768,7 +1638,7 @@ class QueuePurge(AMQPMethodPayload): NAME = u'queue.purge' INDEX = (50, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x32\x1E' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x32\x00\x1E' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -1823,7 +1693,7 @@ class QueuePurgeOk(AMQPMethodPayload): NAME = u'queue.purge-ok' INDEX = (50, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x32\x1F' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x32\x00\x1F' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -1867,7 +1737,7 @@ class QueueUnbind(AMQPMethodPayload): NAME = u'queue.unbind' INDEX = (50, 50) # (Class ID, Method ID) - BINARY_HEADER = b'\x32\x32' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x32\x00\x32' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -1944,7 +1814,7 @@ class QueueUnbindOk(AMQPMethodPayload): NAME = u'queue.unbind-ok' INDEX = (50, 51) # (Class ID, Method ID) - BINARY_HEADER = b'\x32\x33' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x32\x00\x33' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -2065,9 +1935,9 @@ class BasicContentPropertyList(AMQPContentPropertyList): @staticmethod def typize(*fields): zpf = bytearray([ - (('content_type' in fields) << 7) | (('content_encoding' in fields) << 6) | (('headers' in fields) << 5) | (('delivery_mode' in fields) << 4) | (('priority' in fields) << 3) | (('correlation_id' in fields) << 2) | (('reply_to' in fields) << 1) | int('expiration' in kwargs), - (('message_id' in fields) << 7) | (('timestamp' in fields) << 6) | (('type_' in fields) << 5) | (('user_id' in fields) << 4) | (('app_id' in fields) << 3) | (('reserved' in fields) << 2) - ]) + (('content_type' in fields) << 7) | (('content_encoding' in fields) << 6) | (('headers' in fields) << 5) | (('delivery_mode' in fields) << 4) | (('priority' in fields) << 3) | (('correlation_id' in fields) << 2) | (('reply_to' in fields) << 1) | int('expiration' in kwargs), + (('message_id' in fields) << 7) | (('timestamp' in fields) << 6) | (('type_' in fields) << 5) | (('user_id' in fields) << 4) | (('app_id' in fields) << 3) | (('reserved' in fields) << 2) + ]) zpf = six.binary_type(zpf) if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: return BasicContentPropertyList.PARTICULAR_CLASSES[zpf] @@ -2108,7 +1978,7 @@ class BasicAck(AMQPMethodPayload): NAME = u'basic.ack' INDEX = (60, 80) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x50' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x50' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -2164,7 +2034,7 @@ class BasicConsume(AMQPMethodPayload): NAME = u'basic.consume' INDEX = (60, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x14' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -2258,7 +2128,7 @@ class BasicCancel(AMQPMethodPayload): NAME = u'basic.cancel' INDEX = (60, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x1E' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x1E' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -2312,7 +2182,7 @@ class BasicConsumeOk(AMQPMethodPayload): NAME = u'basic.consume-ok' INDEX = (60, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x15' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -2359,7 +2229,7 @@ class BasicCancelOk(AMQPMethodPayload): NAME = u'basic.cancel-ok' INDEX = (60, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x1F' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x1F' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -2408,7 +2278,7 @@ class BasicDeliver(AMQPMethodPayload): NAME = u'basic.deliver' INDEX = (60, 60) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x3C' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x3C' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -2487,7 +2357,7 @@ class BasicGet(AMQPMethodPayload): NAME = u'basic.get' INDEX = (60, 70) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x46' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x46' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -2544,7 +2414,7 @@ class BasicGetOk(AMQPMethodPayload): NAME = u'basic.get-ok' INDEX = (60, 71) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x47' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x47' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -2619,7 +2489,7 @@ class BasicGetEmpty(AMQPMethodPayload): NAME = u'basic.get-empty' INDEX = (60, 72) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x48' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x48' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -2658,7 +2528,7 @@ class BasicPublish(AMQPMethodPayload): NAME = u'basic.publish' INDEX = (60, 40) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x28' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x28' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -2744,7 +2614,7 @@ class BasicQos(AMQPMethodPayload): NAME = u'basic.qos' INDEX = (60, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x0A' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -2815,7 +2685,7 @@ class BasicQosOk(AMQPMethodPayload): NAME = u'basic.qos-ok' INDEX = (60, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x0B' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -2847,7 +2717,7 @@ class BasicReturn(AMQPMethodPayload): NAME = u'basic.return' INDEX = (60, 50) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x32' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x32' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -2920,7 +2790,7 @@ class BasicReject(AMQPMethodPayload): NAME = u'basic.reject' INDEX = (60, 90) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x5A' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x5A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -2974,7 +2844,7 @@ class BasicRecoverAsync(AMQPMethodPayload): NAME = u'basic.recover-async' INDEX = (60, 100) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x64' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x64' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -3024,7 +2894,7 @@ class BasicRecover(AMQPMethodPayload): NAME = u'basic.recover' INDEX = (60, 110) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x6E' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x6E' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -3072,7 +2942,7 @@ class BasicRecoverOk(AMQPMethodPayload): NAME = u'basic.recover-ok' INDEX = (60, 111) # (Class ID, Method ID) - BINARY_HEADER = b'\x3C\x6F' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x3C\x00\x6F' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -3109,39 +2979,6 @@ class Tx(AMQPClass): INDEX = 90 - @staticmethod - def typize(*fields): - zpf = bytearray([ - ]) - zpf = six.binary_type(zpf) - if zpf in TxContentPropertyList.PARTICULAR_CLASSES: - return TxContentPropertyList.PARTICULAR_CLASSES[zpf] - else: - logger.debug('Property field (TxContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, TxContentPropertyList.FIELDS) - TxContentPropertyList.PARTICULAR_CLASSES[zpf] = c - return c - - @staticmethod - def from_buffer(buf, offset): - """ - Return a content property list instance unserialized from - buffer, so that buf[offset] marks the start of property flags - """ - # extract property flags - pfl = 2 - while ord(buf[offset + pfl]) & 1: - pfl += 2 - zpf = TxContentPropertyList.zero_property_flags(buf[offset:offset+pfl]) - if zpf in TxContentPropertyList.PARTICULAR_CLASSES: - return TxContentPropertyList.PARTICULAR_CLASSES[zpf].from_buffer(buf, offset) - else: - logger.debug('Property field (TxContentPropertyList:%s) not seen yet, compiling', repr(zpf)) - c = compile_particular_content_property_list_class(zpf, TxContentPropertyList.FIELDS) - TxContentPropertyList.PARTICULAR_CLASSES[zpf] = c - return c.from_buffer(buf, offset) - - class TxCommit(AMQPMethodPayload): """ Commit the current transaction @@ -3152,7 +2989,7 @@ class TxCommit(AMQPMethodPayload): NAME = u'tx.commit' INDEX = (90, 20) # (Class ID, Method ID) - BINARY_HEADER = b'\x5A\x14' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x5A\x00\x14' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -3182,7 +3019,7 @@ class TxCommitOk(AMQPMethodPayload): NAME = u'tx.commit-ok' INDEX = (90, 21) # (Class ID, Method ID) - BINARY_HEADER = b'\x5A\x15' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x5A\x00\x15' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -3214,7 +3051,7 @@ class TxRollback(AMQPMethodPayload): NAME = u'tx.rollback' INDEX = (90, 30) # (Class ID, Method ID) - BINARY_HEADER = b'\x5A\x1E' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x5A\x00\x1E' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -3244,7 +3081,7 @@ class TxRollbackOk(AMQPMethodPayload): NAME = u'tx.rollback-ok' INDEX = (90, 31) # (Class ID, Method ID) - BINARY_HEADER = b'\x5A\x1F' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x5A\x00\x1F' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -3274,7 +3111,7 @@ class TxSelect(AMQPMethodPayload): NAME = u'tx.select' INDEX = (90, 10) # (Class ID, Method ID) - BINARY_HEADER = b'\x5A\x0A' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x5A\x00\x0A' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = True, False @@ -3304,7 +3141,7 @@ class TxSelectOk(AMQPMethodPayload): NAME = u'tx.select-ok' INDEX = (90, 11) # (Class ID, Method ID) - BINARY_HEADER = b'\x5A\x0B' # CLASS ID + METHOD ID + BINARY_HEADER = b'\x00\x5A\x00\x0B' # CLASS ID + METHOD ID SENT_BY_CLIENT, SENT_BY_SERVER = False, True @@ -3382,59 +3219,59 @@ IDENT_TO_METHOD = { BINARY_HEADER_TO_METHOD = { - b'\x5A\x15': TxCommitOk, - b'\x3C\x64': BasicRecoverAsync, - b'\x0A\x0B': ConnectionStartOk, - b'\x3C\x28': BasicPublish, - b'\x3C\x32': BasicReturn, - b'\x0A\x33': ConnectionCloseOk, - b'\x14\x14': ChannelFlow, - b'\x3C\x15': BasicConsumeOk, - b'\x0A\x15': ConnectionSecureOk, - b'\x5A\x1E': TxRollback, - b'\x5A\x0A': TxSelect, - b'\x32\x0B': QueueDeclareOk, - b'\x3C\x46': BasicGet, - b'\x5A\x0B': TxSelectOk, - b'\x0A\x1E': ConnectionTune, - b'\x3C\x0B': BasicQosOk, - b'\x3C\x50': BasicAck, - b'\x14\x15': ChannelFlowOk, - b'\x3C\x3C': BasicDeliver, - b'\x5A\x1F': TxRollbackOk, - b'\x14\x28': ChannelClose, - b'\x3C\x47': BasicGetOk, - b'\x32\x1E': QueuePurge, - b'\x0A\x1F': ConnectionTuneOk, - b'\x0A\x28': ConnectionOpen, - b'\x3C\x1E': BasicCancel, - b'\x32\x32': QueueUnbind, - b'\x28\x0A': ExchangeDeclare, - b'\x0A\x32': ConnectionClose, - b'\x14\x0A': ChannelOpen, - b'\x14\x29': ChannelCloseOk, - b'\x3C\x6E': BasicRecover, - b'\x3C\x5A': BasicReject, - b'\x32\x1F': QueuePurgeOk, - b'\x32\x28': QueueDelete, - b'\x28\x14': ExchangeDelete, - b'\x32\x14': QueueBind, - b'\x0A\x29': ConnectionOpenOk, - b'\x3C\x1F': BasicCancelOk, - b'\x5A\x14': TxCommit, - b'\x0A\x0A': ConnectionStart, - b'\x3C\x0A': BasicQos, - b'\x28\x0B': ExchangeDeclareOk, - b'\x28\x15': ExchangeDeleteOk, - b'\x14\x0B': ChannelOpenOk, - b'\x3C\x48': BasicGetEmpty, - b'\x3C\x6F': BasicRecoverOk, - b'\x3C\x14': BasicConsume, - b'\x0A\x14': ConnectionSecure, - b'\x32\x29': QueueDeleteOk, - b'\x32\x33': QueueUnbindOk, - b'\x32\x15': QueueBindOk, - b'\x32\x0A': QueueDeclare, + b'\x00\x5A\x00\x15': TxCommitOk, + b'\x00\x3C\x00\x64': BasicRecoverAsync, + b'\x00\x0A\x00\x0B': ConnectionStartOk, + b'\x00\x3C\x00\x28': BasicPublish, + b'\x00\x3C\x00\x32': BasicReturn, + b'\x00\x0A\x00\x33': ConnectionCloseOk, + b'\x00\x14\x00\x14': ChannelFlow, + b'\x00\x3C\x00\x15': BasicConsumeOk, + b'\x00\x0A\x00\x15': ConnectionSecureOk, + b'\x00\x5A\x00\x1E': TxRollback, + b'\x00\x5A\x00\x0A': TxSelect, + b'\x00\x32\x00\x0B': QueueDeclareOk, + b'\x00\x3C\x00\x46': BasicGet, + b'\x00\x5A\x00\x0B': TxSelectOk, + b'\x00\x0A\x00\x1E': ConnectionTune, + b'\x00\x3C\x00\x0B': BasicQosOk, + b'\x00\x3C\x00\x50': BasicAck, + b'\x00\x14\x00\x15': ChannelFlowOk, + b'\x00\x3C\x00\x3C': BasicDeliver, + b'\x00\x5A\x00\x1F': TxRollbackOk, + b'\x00\x14\x00\x28': ChannelClose, + b'\x00\x3C\x00\x47': BasicGetOk, + b'\x00\x32\x00\x1E': QueuePurge, + b'\x00\x0A\x00\x1F': ConnectionTuneOk, + b'\x00\x0A\x00\x28': ConnectionOpen, + b'\x00\x3C\x00\x1E': BasicCancel, + b'\x00\x32\x00\x32': QueueUnbind, + b'\x00\x28\x00\x0A': ExchangeDeclare, + b'\x00\x0A\x00\x32': ConnectionClose, + b'\x00\x14\x00\x0A': ChannelOpen, + b'\x00\x14\x00\x29': ChannelCloseOk, + b'\x00\x3C\x00\x6E': BasicRecover, + b'\x00\x3C\x00\x5A': BasicReject, + b'\x00\x32\x00\x1F': QueuePurgeOk, + b'\x00\x32\x00\x28': QueueDelete, + b'\x00\x28\x00\x14': ExchangeDelete, + b'\x00\x32\x00\x14': QueueBind, + b'\x00\x0A\x00\x29': ConnectionOpenOk, + b'\x00\x3C\x00\x1F': BasicCancelOk, + b'\x00\x5A\x00\x14': TxCommit, + b'\x00\x0A\x00\x0A': ConnectionStart, + b'\x00\x3C\x00\x0A': BasicQos, + b'\x00\x28\x00\x0B': ExchangeDeclareOk, + b'\x00\x28\x00\x15': ExchangeDeleteOk, + b'\x00\x14\x00\x0B': ChannelOpenOk, + b'\x00\x3C\x00\x48': BasicGetEmpty, + b'\x00\x3C\x00\x6F': BasicRecoverOk, + b'\x00\x3C\x00\x14': BasicConsume, + b'\x00\x0A\x00\x14': ConnectionSecure, + b'\x00\x32\x00\x29': QueueDeleteOk, + b'\x00\x32\x00\x33': QueueUnbindOk, + b'\x00\x32\x00\x15': QueueBindOk, + b'\x00\x32\x00\x0A': QueueDeclare, } diff --git a/coolamqp/framing/field_table.py b/coolamqp/framing/field_table.py index 939aea3..c5210cc 100644 --- a/coolamqp/framing/field_table.py +++ b/coolamqp/framing/field_table.py @@ -147,7 +147,7 @@ def enframe_table(buf, table): def deframe_table(buf, start_offset): # -> (table, bytes_consumed) """:return: tuple (table, bytes consumed)""" offset = start_offset - table_length, = struct.unpack_from('!I', buf, start_offset) + table_length, = struct.unpack_from('!L', buf, start_offset) offset += 4 # we will check if it's really so. @@ -158,11 +158,11 @@ def deframe_table(buf, start_offset): # -> (table, bytes_consumed) offset += 1 field_name = buf[offset:offset+ln] offset += ln - field_val, field_tp, delta = deframe_field_value(buf, offset) + fv, delta = deframe_field_value(buf, offset) offset += delta - fields.append((field_name, (field_val, field_tp))) + fields.append((field_name, fv)) - if offset > (start_offset+table_length): + if offset > (start_offset+table_length+4): raise ValueError('Table turned out longer than expected! Found %s bytes expected %s', (offset-start_offset, table_length)) diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index 0d54dee..2577d4e 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -28,15 +28,21 @@ class AMQPMethodFrame(AMQPFrame): @staticmethod def unserialize(channel, payload_as_buffer): + print('Going to unser a methodframe') clsmet = struct.unpack_from('!HH', payload_as_buffer, 0) + print('Cls:Met=', clsmet) + try: method_payload_class = IDENT_TO_METHOD[clsmet] payload = method_payload_class.from_buffer(payload_as_buffer, 4) + except Exception as e: + print(repr(e)) + raise except KeyError: raise ValueError('Invalid class %s method %s' % clsmet) - - return AMQPMethodFrame(channel, payload) + else: + return AMQPMethodFrame(channel, payload) def get_size(self): # frame header is always 7, frame end is 1, class + method is 4 @@ -69,7 +75,6 @@ class AMQPHeaderFrame(AMQPFrame): @staticmethod def unserialize(channel, payload_as_buffer): # payload starts with class ID - print(repr(str(payload_as_buffer[12:]))) class_id, weight, body_size = struct.unpack_from('!HHQ', payload_as_buffer, 0) properties = CLASS_ID_TO_CONTENT_PROPERTY_LIST[class_id].from_buffer(payload_as_buffer, 12) return AMQPHeaderFrame(channel, class_id, weight, body_size, properties) diff --git a/coolamqp/uplink/__init__.py b/coolamqp/uplink/__init__.py index 3fb71d5..a1e76ab 100644 --- a/coolamqp/uplink/__init__.py +++ b/coolamqp/uplink/__init__.py @@ -9,3 +9,7 @@ The layer that: This layer bears no notion of fault tolerance """ from __future__ import absolute_import, division, print_function + +from coolamqp.uplink.connection import Connection +from coolamqp.uplink.listener import ListenerThread +from coolamqp.uplink.reactor import Reactor diff --git a/coolamqp/uplink/connection/__init__.py b/coolamqp/uplink/connection/__init__.py new file mode 100644 index 0000000..beed0f9 --- /dev/null +++ b/coolamqp/uplink/connection/__init__.py @@ -0,0 +1,11 @@ +# coding=UTF-8 +""" +Comprehensive management of a framing connection. + +Connection is something that can: + - call something when an AMQPFrame is received + - send AMQPFrame's +""" +from __future__ import absolute_import, division, print_function + +from coolamqp.uplink.connection.connection import Connection diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py new file mode 100644 index 0000000..54f2002 --- /dev/null +++ b/coolamqp/uplink/connection/connection.py @@ -0,0 +1,58 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import logging +from coolamqp.uplink.listener import ListenerThread + +from coolamqp.uplink.connection.recv_framer import ReceivingFramer +from coolamqp.uplink.connection.send_framer import SendingFramer + + +logger = logging.getLogger(__name__) + + +class Connection(object): + """ + An object that manages a connection in a comprehensive way + """ + + def __init__(self, socketobject, listener_thread, reactor=None): + self.reactor = reactor + if reactor is None: + logger.warn('Creating connection without a reactor; hope you know what you''re doing') + else: + reactor.set_send_frame(self.send) + + self.recvf = ReceivingFramer(self.on_frame) + self.failed = False + + self.listener_socket = listener_thread.register(socketobject, + on_read=self.recvf.put, + on_fail=self.on_fail) + + self.sendf = SendingFramer(self.listener_socket.send) + + def set_reactor(self, reactor): + self.reactor = reactor + reactor.set_send_frame(self.send) + + def on_fail(self): + self.failed = True + self.reactor.on_close() + + def send(self, frames): + """ + :param frames: list of frames or None to close the link + """ + if not self.failed: + if frames is not None: + self.sendf.send(frames) + else: + self.listener_socket.send(None) + self.failed = True + + def on_frame(self, frame): + if self.reactor is None: + logger.warn('Received %s but no reactor present. Dropping.', frame) + else: + self.reactor.on_frame(frame) + diff --git a/coolamqp/uplink/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py similarity index 65% rename from coolamqp/uplink/recv_framer.py rename to coolamqp/uplink/connection/recv_framer.py index 69e0f07..f1f9efd 100644 --- a/coolamqp/uplink/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -3,10 +3,11 @@ from __future__ import absolute_import, division, print_function import collections import io +import six import struct -from coolamqp.framing import AMQPBodyFrame, AMQPHeaderFrame, AMQPHeartbeatFrame, AMQPMethodFrame -from coolamqp.framing import FRAME_HEADER, FRAME_HEARTBEAT, FRAME_END, FRAME_METHOD, FRAME_BODY +from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame, AMQPHeartbeatFrame, AMQPMethodFrame +from coolamqp.framing.definitions import FRAME_HEADER, FRAME_HEARTBEAT, FRAME_END, FRAME_METHOD, FRAME_BODY FRAME_TYPES = { FRAME_HEADER: AMQPHeaderFrame, @@ -15,11 +16,12 @@ FRAME_TYPES = { } -class ReceivingFormatter(object): +class ReceivingFramer(object): """ Assembles AMQP framing from received data. - Just call with .put(data) and get framing by iterator .framing(). + Just call with .put(data) upon receiving, + and on_frame will be called with fresh frames. Not thread safe. @@ -34,7 +36,7 @@ class ReceivingFormatter(object): frame_type <- None frame_size < None) """ - def __init__(self, sock): + def __init__(self, on_frame=lambda frame: None): self.chunks = collections.deque() # all received data self.total_data_len = 0 @@ -43,38 +45,36 @@ class ReceivingFormatter(object): self.frame_size = None self.bytes_needed = None # bytes needed for a new frame - self.frames = collections.deque() # for + self.on_frame = on_frame def put(self, data): - self.total_data_len += len(data) - self.chunks.append(buffer(data)) - - def get_frames(self): # -> iterator of AMQPFrame, raises ValueError """ - An iterator to return framing pending for read. + Called upon receiving data. - :raises ValueError: invalid frame readed, kill the connection. - :return: iterator with framing + May result in any number of .on_frame() calls + :param data: received data """ + self.total_data_len += len(data) + self.chunks.append(buffer(data)) + while self._statemachine(): pass - while len(self.frames) > 0: - yield self.frames.popleft() - def _extract(self, up_to): # return up to up_to bytes from current chunk, switch if necessary + assert self.total_data_len >= up_to, 'Tried to extract %s but %s remaining' % (up_to, self.total_data_len) if up_to >= len(self.chunks[0]): q = self.chunks.popleft() else: q = buffer(self.chunks[0], 0, up_to) - self.chunks[0] = buffer(self.chunks, up_to) + self.chunks[0] = buffer(self.chunks[0], up_to) + self.total_data_len -= len(q) return q - def _statemachine(self): # -> bool, was any action taken? + def _statemachine(self): # state rule 1 if self.frame_type is None and self.total_data_len > 0: - self.frame_type = ord(self._extract(1)) + self.frame_type = ord(self._extract(1)[0]) if self.frame_type not in (FRAME_HEARTBEAT, FRAME_HEADER, FRAME_METHOD, FRAME_BODY): raise ValueError('Invalid frame') @@ -82,7 +82,7 @@ class ReceivingFormatter(object): return True # state rule 2 - if (self.frame_type == FRAME_HEARTBEAT) and (self.total_data_len > 3): + elif (self.frame_type == FRAME_HEARTBEAT) and (self.total_data_len > 3): data = b'' while len(data) < 3: data = data + self._extract(3 - len(data)) @@ -91,36 +91,38 @@ class ReceivingFormatter(object): # Invalid heartbeat frame! raise ValueError('Invalid AMQP heartbeat') - self.frames.append(AMQPHeartbeatFrame()) + self.on_frame(AMQPHeartbeatFrame()) self.frame_type = None return True # state rule 3 - if (self.frame_type != FRAME_HEARTBEAT) and (self.frame_type is not None) and (self.total_data_len > 6): + elif (self.frame_type != FRAME_HEARTBEAT) and (self.frame_type is not None) and (self.frame_size is None) and (self.total_data_len > 6): hdr = b'' while len(hdr) < 6: - hdr = hdr + self._extract(6 - len(hdr)) + hdr = hdr + six.binary_type(self._extract(6 - len(hdr))) - self.frame_channel, self.frame_size = struct.unpack('!BI', hdr) + self.frame_channel, self.frame_size = struct.unpack('!HI', hdr) return True # state rule 4 - if self.total_data_len >= (self.frame_size+1): + elif (self.frame_size is not None) and (self.total_data_len >= (self.frame_size+1)): - if len(self.chunks[0]) >= self.total_data_len: + if len(self.chunks[0]) >= self.frame_size: # We can subslice it - it's very fast - payload = self._extract(self.total_data_len) + payload = self._extract(self.frame_size) else: # Construct a separate buffer :( payload = io.BytesIO() - while payload.tell() < self.total_data_len: - payload.write(self._extract(self.total_data_len - payload.tell())) + while payload.tell() < self.frame_size: + payload.write(self._extract(self.frame_size - payload.tell())) + + assert payload.tell() <= self.total_data_len payload = buffer(payload.getvalue()) - if ord(self._extract(1)) != FRAME_END: + if ord(self._extract(1)[0]) != FRAME_END: raise ValueError('Invalid frame end') try: @@ -128,7 +130,7 @@ class ReceivingFormatter(object): except ValueError: raise - self.frames.append(frame) + self.on_frame(frame) self.frame_type = None self.frame_size = None diff --git a/coolamqp/uplink/connection/send_framer.py b/coolamqp/uplink/connection/send_framer.py new file mode 100644 index 0000000..3986d16 --- /dev/null +++ b/coolamqp/uplink/connection/send_framer.py @@ -0,0 +1,47 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + +import collections +import threading +import io +import socket + + +class SendingFramer(object): + """ + Assembles AMQP framing from received data and orchestrates their upload via a socket. + + Just call with .put(data) and get framing by iterator .framing(). + + Not thread safe. + + State machine + (frame_type is None) and has_bytes(1) -> (frame_type <- bytes(1)) + + (frame_type is HEARTBEAT) and has_bytes(3) -> (output_frame, frame_type <- None) + (frame_type is not HEARTBEAT and not None) and has_bytes(6) -> (frame_channel <- bytes(2), + frame_size <- bytes(4)) + + (frame_size is not None) and has_bytes(frame_size+1) -> (output_frame, + frame_type <- None + frame_size < None) + """ + def __init__(self, on_send=lambda data: None): + """ + :param on_send: a callable that can be called with some data to send + """ + self.on_send = on_send + + + def send(self, frames): + """ + Schedule to send some frames. + :param frames: list of AMQPFrame instances + """ + length = sum(frame.get_size() for frame in frames) + buf = io.BytesIO(bytearray(length)) + + for frame in frames: + frame.write_to(buf) + + self.on_send(buf.getvalue()) diff --git a/coolamqp/uplink/listener/__init__.py b/coolamqp/uplink/listener/__init__.py new file mode 100644 index 0000000..dff06e0 --- /dev/null +++ b/coolamqp/uplink/listener/__init__.py @@ -0,0 +1,17 @@ +# coding=UTF-8 +""" +A listener is a thread that monitors a bunch of sockets for activity. + +It provides both for sending and receiving messages. It is written +as a package, because the optimal network call, epoll, is not +available on Windows, and you might just want to use it. + +select and poll are not optimal, because if you wanted to send +something in that small gap where select/poll blocks, you won't +immediately be able to do so. With epoll, you can. +""" +from __future__ import absolute_import, division, print_function + + +from coolamqp.uplink.listener.thread import ListenerThread +from coolamqp.uplink.connection import Connection diff --git a/coolamqp/uplink/listener/epoll_listener.py b/coolamqp/uplink/listener/epoll_listener.py new file mode 100644 index 0000000..205a490 --- /dev/null +++ b/coolamqp/uplink/listener/epoll_listener.py @@ -0,0 +1,91 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import six +import logging +import select + +from coolamqp.uplink.listener.socket import SocketFailed, BaseSocket + + +logger = logging.getLogger(__name__) + + +class EpollSocket(BaseSocket): + """ + EpollListener substitutes your BaseSockets with this + """ + def __init__(self, sock, on_read, on_fail, listener): + BaseSocket.__init__(self, sock, on_read=on_read, on_fail=on_fail) + self.listener = listener + + def get_epoll_eventset(self): + if len(self.data_to_send) > 0: + return select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP | select.EPOLLOUT + else: + return select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP + + def send(self, data): + self.data_to_send.append(data) + self.listener.epoll.modify(self, self.get_epoll_eventset()) + + +class EpollListener(object): + """ + A listener using epoll. + """ + + def __init__(self): + self.epoll = select.epoll() + self.fd_to_sock = {} + + def wait(self, timeout=0): + events = self.epoll.poll(timeout=timeout) + for fd, event in events: + sock = self.fd_to_sock[fd] + + # Errors + try: + if event & (select.EPOLLERR | select.EPOLLHUP): + raise SocketFailed() + elif event & select.EPOLLIN: + sock.on_read() + elif event & select.EPOLLOUT: + sock.on_write() + except SocketFailed: + self.epoll.unregister(fd) + del self.fd_to_sock[fd] + sock.on_fail() + sock.close() + else: + self.epoll.modify(fd, sock.get_epoll_eventset()) + + def shutdown(self): + """ + Forcibly close all sockets that this manages (calling their on_fail's), + and close the object. + + This object is unusable after this call. + """ + for sock in six.itervalues(self.fd_to_sock): + sock.on_fail() + sock.close() + self.fd_to_sock = {} + self.epoll.close() + + def register(self, sock, on_read=lambda data: None, + on_fail=lambda: None): + """ + Add a socket to be listened for by the loop. + + :param sock: a socket instance (as returned by socket module) + :param on_read: callable(data) to be called with received data + :param on_fail: callable() to be called when socket fails + + :return: a BaseSocket instance to use instead of this socket + """ + sock = EpollSocket(sock, on_read, on_fail, self) + self.fd_to_sock[sock.fileno()] = sock + + self.epoll.register(sock, sock.get_epoll_eventset()) + return sock + diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py new file mode 100644 index 0000000..8c34971 --- /dev/null +++ b/coolamqp/uplink/listener/socket.py @@ -0,0 +1,97 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import collections +import socket + + +class SocketFailed(IOError): + """Failure during socket operation. It needs to be discarded.""" + + +class BaseSocket(object): + """ + Base class for sockets provided to listeners. + + This is based on a standard TCP socket. + + To be instantiated only by Listeners. + """ + + def __init__(self, sock, on_read=lambda data: None, + on_fail=lambda: None): + """ + + :param sock: socketobject + :param on_read: callable(data) to be called when data is read. + Listener thread context + Raises ValueError on socket should be closed + :param on_fail: callable() when socket is dead and to be discarded. + Listener thread context. + Socket descriptor will be handled by listener. + This should not + """ + assert sock is not None + self.sock = sock + self.data_to_send = collections.deque() + self.my_on_read = on_read + self.on_fail = on_fail + + def send(self, data): + """ + Schedule to send some data + + :param data: data to send, or None to terminate this socket + """ + raise Exception('Abstract; listener should override that') + + def on_read(self): + """Socket is readable, called by Listener""" + try: + data = self.sock.recv(2048) + except (IOError, socket.error): + raise SocketFailed() + + if len(data) == 0: + raise SocketFailed() + + try: + self.my_on_read(data) + except ValueError: + raise #debug + raise SocketFailed() + + def on_write(self): + """ + Socket is writable, called by Listener + :return: (bool) I finished sending all the data for now + :raises SocketFailed: on socket error + """ + if len(self.data_to_send) == 0: + return True # No data to send + + while len(self.data_to_send) > 0: + + if self.data_to_send[0] is None: + raise SocketFailed() # We should terminate the connection! + + try: + sent = self.sock.send(self.data_to_send[0]) + except (IOError, socket.error): + raise SocketFailed() + + if sent < len(self.data_to_send[0]): + # Not everything could be sent + self.data_to_send[0] = buffer(self.data_to_send[0], sent) + return False # I want to send more + else: + self.data_to_send.popleft() # Sent all! + + return True # all for now + + def fileno(self): + """Return descriptor number""" + return self.sock.fileno() + + def close(self): + """Close this socket""" + self.sock.close() \ No newline at end of file diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py new file mode 100644 index 0000000..f9c9572 --- /dev/null +++ b/coolamqp/uplink/listener/thread.py @@ -0,0 +1,40 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + +import threading + +from coolamqp.uplink.listener.epoll_listener import EpollListener + + +class ListenerThread(threading.Thread): + """ + A thread that does the listening. + + It automatically picks the best listener for given platform. + """ + + def __init__(self): + threading.Thread.__init__(self) + self.daemon = True + self.terminating = False + self.listener = EpollListener() + + def terminate(self): + self.terminating = True + + def run(self): + while not self.terminating: + self.listener.wait(timeout=1) + self.listener.shutdown() + + def register(self, sock, on_read=lambda data: None, on_fail=lambda: None): + """ + Add a socket to be listened for by the loop. + + :param sock: a socket instance (as returned by socket module) + :param on_read: callable(data) to be called with received data + :param on_fail: callable() to be called when socket fails + + :return: a BaseSocket instance to use instead of this socket + """ + return self.listener.register(sock, on_read, on_fail) \ No newline at end of file diff --git a/coolamqp/uplink/order.py b/coolamqp/uplink/order.py deleted file mode 100644 index 9eaf45c..0000000 --- a/coolamqp/uplink/order.py +++ /dev/null @@ -1,30 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function - - - -class MethodTransaction(object): - """ - This means "run a method, and call me back when reply arrives". - - Method's - """ - - -class SyncOrder(Order): - """ - This means "send a method frame, optionally some other framing, - and run a callback with the returned thing. - - Possible responses are registered for. - """ - def __init__(self, channel, method, other_frames=[], callback=lambda frame: None): - """ - :param channel: channel to use - :param method: AMQPMethodPayload instance - :param other_frames: list of other AMQPFrames to send next in order - :param callback: callback to run with received response. - methods that this will react to are determined by AMQPMethodPayload - """ - - diff --git a/coolamqp/uplink/reactor.py b/coolamqp/uplink/reactor.py new file mode 100644 index 0000000..ef81142 --- /dev/null +++ b/coolamqp/uplink/reactor.py @@ -0,0 +1,35 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + + + +class Reactor(object): + """ + Base class for objects that can: + - Receive AMQPFrame's + - Send AMQPFrame's + - Receive information about termination of connection + + Default implementation is a no-op default reactor. + """ + def __init__(self): + self.send_frame = lambda frame: None + + + def on_frame(self, frame): + """ + Frame was received. + :param frame: AMQPFrame instance + """ + + def set_send_frame(self, sender): + """ + Called when Reactor is registered in a Connection + :param sender: callable(amqp_frame) to call if Reactor wants to send a frame + """ + self.send_frame = sender + + def on_close(self): + """ + Called when connection is closed + """ diff --git a/coolamqp/uplink/reader_thread.py b/coolamqp/uplink/reader_thread.py deleted file mode 100644 index ae333b5..0000000 --- a/coolamqp/uplink/reader_thread.py +++ /dev/null @@ -1,34 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import socket -import threading - - -class ReaderThread(threading.Thread): - """ - A thread, whose job is to receive AMQP framing from AMQP TCP socket. - - Thread may inform Uplink of socket's lossage via on_socket_failed(exception). It should exit afterwards at once. - """ - - def __init__(self, sock, on_failure): - """ - :param uplink: Uplink instance - :param sock: a socket to use - """ - threading.Thread.__init__(self) - self.daemon = True - - self.uplink = uplink - self.sock = sock - self.is_cancelled = False - - - def on_cancel(self): - """ - Called by Uplink when it decides that we should not report any more framing, and should just die. - """ - self.is_cancelled = True - - - def run(self):_ \ No newline at end of file diff --git a/coolamqp/uplink/send_framer.py b/coolamqp/uplink/send_framer.py deleted file mode 100644 index 49c88a2..0000000 --- a/coolamqp/uplink/send_framer.py +++ /dev/null @@ -1,95 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function - -import collections -import threading -import io -import socket - - -class SendingOperator(object): - """ - Assembles AMQP framing from received data and orchestrates their upload via a socket. - - Just call with .put(data) and get framing by iterator .framing(). - - Not thread safe. - - State machine - (frame_type is None) and has_bytes(1) -> (frame_type <- bytes(1)) - - (frame_type is HEARTBEAT) and has_bytes(3) -> (output_frame, frame_type <- None) - (frame_type is not HEARTBEAT and not None) and has_bytes(6) -> (frame_channel <- bytes(2), - frame_size <- bytes(4)) - - (frame_size is not None) and has_bytes(frame_size+1) -> (output_frame, - frame_type <- None - frame_size < None) - """ - def __init__(self, sock, on_fail=lambda e: None, - on_close=lambda: None): - """ - :param sock: a non-timeouting socket - :param on_fail: callable(exception) to call when socket fails - :param on_close: callable(exception) to call when socket closes - """ - self.sock = sock - self.to_send = collections.deque() # either bytes, bytearrays and buffers, or tuple of - # (callable/0, callable/1) - - self.on_fail = on_fail - self.on_close = on_close - - self.failed = False - - def _failed(self, e): - """Discard all to_send, run on_fail callables""" - self.failed = True - self.on_fail(e) - - for order in self.to_send: - if isinstance(order, tuple) and len(order) == 2: - on_done, on_fail = order - if on_fail is not None: - on_fail(e) - - self.to_send = collections.deque() - - def run_sending_loop(self): - while len(self.to_send) > 0: - if isinstance(self.to_send[0], tuple) and len(self.to_send[0]) == 2: - on_done, on_fail = self.to_send.popleft() - if on_done is not None: - on_done() - else: - try: - sent_bytes = self.sock.send(self.to_send[0]) - except (IOError, socket.error) as e: - self._failed(e) - return - - if sent_bytes == len(self.to_send[0]): - # Fetch next fragment - self.to_send.popleft() - else: - # slice current buffer - self.to_send[0] = buffer(self.to_send[0], sent_bytes) - - def send(self, frames, on_done=None, on_fail=None): - """ - Schedule to send some framing. - :param frames: list of AMQPFrame instances - :param on_done: callable/0 to call when this is done (frame_end of last frame has just left this PC) - :param on_fail: callable(Exception) to call when something broke before the data could be sent. - """ - length = sum(frame.get_size() for frame in frames) - buf = io.BytesIO(bytearray(length)) - - for frame in frames: - frame.write_to(buf) - - self.to_send.append(buf.getvalue()) - if (on_done is not None) or (on_fail is not None): - self.to_send.append(on_done, on_fail) - - self.run_sending_loop() diff --git a/coolamqp/uplink/watchman.py b/coolamqp/uplink/watchman.py deleted file mode 100644 index b82bc03..0000000 --- a/coolamqp/uplink/watchman.py +++ /dev/null @@ -1,144 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import collections -from six.moves import queue - - -from coolamqp.uplink.exceptions import called_by -from coolamqp.scaffold import AcceptsFrames, RelaysFrames, Synchronized - -from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeartbeatFrame, AMQPBodyFrame, AMQPHeaderFrame - - -class Watch(object): - def __init__(self, channel, on_frame, predicate): - """ - :type predicate: callable(AMQPFrame object) -> should_trigger::bool - """ - self.channel = channel - self.on_frame = on_frame - self.predicate = predicate - - - - -class Watchman(AcceptsFrames, RelaysFrames, Synchronized): - """ - Watchman is the guy, who: - - - Receives frames from RecvFramer - - Executes in the context of the Listener - - You can ask to trigger, when a particular frame is received from server - """ - - def __init__(self): - super(Watchman, self).__init__(self) - self.watches = collections.defaultdict(collections.deque) - self.on_frame = lambda frame: None - - def wire_frame_to(self, on_frame): - """ - Set this Watchman to pass non-triggered frames to some callable. - - Called by: Uplink factory - - :param callable: callable(AMQPMethodPayload object) - """ - self.on_frame = on_frame - - @ - def trigger_methods(self, channel, frame_payload_types, on_frame): - """ - Register a one-shot trigger on an AMQP method. - - After triggering upon any of method frame payload types, you'll get a callback with - AMQPMethodPayload instance. This will prevent it from being relayed further. - - Called by: frontend, listener - - :param channel: channel ID - :param frame_payload_types: list of AMQPMethodPayload classes - :param on_frame: callable(AMQPMethodPayload instance) -> n/a - """ - def predicate(frame): - if not isinstance(frame, ) - m = Watch(channel, on_frame, lambda frame: isinstance(frame.payload )) - with self.lock: - - - - - - If you are ReaderThread, ask "hey, this frame just arrived, - is someone interested in it?" - - If you are frontend thread, can ask watchman to trigger a callback - when a particular frame arrives (or a bunch of them, if your request - expects a bunch). - - Since Watchman receives all framing from ReaderThread, it also knows about: - - channels being opened - - channels being closed by exception - - - - Watchman is also being informed about new channels being opened - - See page 19 of the AMQP specification. Responses for synchronous methods - arrive in order, ie. if I requested a queue.declare, and then an - exchange.declare, I expect that responses will arrive in order of - queue.declare-ok, exchange.declare-ok. - - This is important, because if we have a message, we need to check only - the first watch. - - :FRAMES = [AMQPMethodPayloadClass1, AMQPMethodPayloadClass2, ...] - :ONFRAME = callable(AMQPMethodPayload instance) - :ONDEAD = callable() | None - - :WATCH = ( :FRAMES :ONFRAME :ONDEAD ) - :WATCH_TO_LOAD = ( channel :FRAMES :ONFRAME :ONDEAD) - - - !!!!! LIMITED RIGHT NOW ONLY TO METHOD FRAMES - """ - def __init__(self): - self.watches_to_load = queue.Queue() # put new watches here - - self.watches = {} # channel => list of :WATCH - - - - def _analyze_watches(self): - - def on_frame(self, frame): - """ - A frame arrived. If this triggers a watch, trigger it and remove. - All framing received by ReaderThread go thru here. - - TO BE CALLED BY READER THREAD - - :param frame: AMQPFrame of any subtype - """ - - # Analyze pending watches - while self.watches_to_load.qsize() > 0: - channel, - - - def set_watch(self, channel, frame_types, on_frame, on_dead=None): - """ - Set a watch. Watch will fire if I see a method frame of type - found in the iterable of frame_types. - - TO BE CALLED BY FRONTEND THREAD. - - :param channel: channel to set watch on - :param frame_types: list of AMQPMethodPayload classes - :param on_frame: callable(AMQPMethodPayload instance) - :param on_dead: callable/0 to call if this watch will - for sure not be processed (eg. channel or connection died) - """ - self.watches_to_load.put((channel, frame_types, on_frame, on_dead)) - if channel not in self.watches: - p = collections.deque((frame_types, on_frame, on_dead)) - self.watches[channel] = p - else: - self.watches[channel].put diff --git a/coolamqp/uplink/iface.py b/tests/test_uplink/__init__.py similarity index 100% rename from coolamqp/uplink/iface.py rename to tests/test_uplink/__init__.py diff --git a/tests/test_uplink/test_basic.py b/tests/test_uplink/test_basic.py new file mode 100644 index 0000000..357b57d --- /dev/null +++ b/tests/test_uplink/test_basic.py @@ -0,0 +1,39 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import unittest + +from coolamqp.framing.definitions import ConnectionStart +from coolamqp.uplink import ListenerThread, Connection, Reactor +import socket +import time + + +def newc(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('127.0.0.1', 5672)) + s.settimeout(0) + s.send('AMQP\x00\x00\x09\x01') + return s + + +class LolReactor(Reactor): + def __init__(self): + Reactor.__init__(self) + self.got_connectionstart = False + + def on_frame(self, frame): + if isinstance(frame.payload, ConnectionStart): + self.got_connectionstart = True + + +class TestBasic(unittest.TestCase): + def test_gets_connectionstart(self): + lt = ListenerThread() + lt.start() + r = LolReactor() + con = Connection(newc(), lt, r) + + time.sleep(5) + + lt.terminate() + self.assertTrue(r.got_connectionstart) \ No newline at end of file -- GitLab