diff --git a/coolamqp/framing/__init__.py b/coolamqp/framing/__init__.py index 9f2b35b38d89264ee25685611d0a65a192e165f6..ff76c6b8b4f80cd2e56490131288285c23c60957 100644 --- a/coolamqp/framing/__init__.py +++ b/coolamqp/framing/__init__.py @@ -1,2 +1,11 @@ # coding=UTF-8 +""" +The layer that: + - manages serialization/deserializtion (frames) + - manages low-level data sending (streams) + - sets up connection to AMQP + - reacts and allows sending low-level AMQP commands + +This layer bears no notion of fault tolerance +""" from __future__ import absolute_import, division, print_function diff --git a/coolamqp/framing/connector.py b/coolamqp/framing/connector.py new file mode 100644 index 0000000000000000000000000000000000000000..8dcc9aa51b3475d9988ce6aa545aba1ac3133db1 --- /dev/null +++ b/coolamqp/framing/connector.py @@ -0,0 +1,12 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + + +def connect(host, port): + """ + Given a host and port, return a socket + that + :param host: + :param port: + :return: + """ \ No newline at end of file diff --git a/coolamqp/framing/frames/__init__.py b/coolamqp/framing/frames/__init__.py index 56b1f03091d659efcc8cb8055b7fd3d63d0b01fe..425e564e29a5ebe05404e9edb401d7cffd7e94ee 100644 --- a/coolamqp/framing/frames/__init__.py +++ b/coolamqp/framing/frames/__init__.py @@ -3,6 +3,7 @@ from __future__ import absolute_import, division, print_function """ Definitions of frames. +Mechanisms for serialization/deserialization of AMQP frames. -machine.py is machine-generated from AMQP specification +definitions.py is machine-generated from AMQP specification. """ \ No newline at end of file diff --git a/coolamqp/framing/frames/base.py b/coolamqp/framing/frames/base.py index bfbf3de616a99eecb3cbff20834c2920bbc06561..7cd16e14983164343621704dbcd4bcfc2a8a733a 100644 --- a/coolamqp/framing/frames/base.py +++ b/coolamqp/framing/frames/base.py @@ -50,6 +50,13 @@ class AMQPFrame(object): # base class for frames """ raise NotImplementedError('Override me') + def get_size(self): + """ + Return size of this frame, in bytes, from frame type to frame_end + :return: int + """ + raise NotImplementedError() + class AMQPPayload(object): """Payload is something that can write itself to bytes, @@ -60,4 +67,11 @@ class AMQPPayload(object): Emit itself into a buffer, from length to FRAME_END :param buf: buffer to write to (will be written using .write) - """ \ No newline at end of file + """ + + def get_size(self): + """ + Return size of this payload + :return: int + """ + raise NotImplementedError() diff --git a/coolamqp/framing/frames/base_definitions.py b/coolamqp/framing/frames/base_definitions.py index 14c90be50de350c8bd527f3e9b6b392c63e86338..0afd83c17c63a9c5e72ae38ac364a54af77b7a70 100644 --- a/coolamqp/framing/frames/base_definitions.py +++ b/coolamqp/framing/frames/base_definitions.py @@ -9,8 +9,16 @@ import struct from coolamqp.framing.frames.base import AMQPPayload + class AMQPClass(object): - pass + """An AMQP class""" + + +class AMQPContentPropertyList(object): + """ + A class is intmately bound with content and content properties + """ + PROPERTIES = [] class AMQPMethodPayload(AMQPPayload): diff --git a/coolamqp/framing/frames/compilation/compile_definitions.py b/coolamqp/framing/frames/compilation/compile_definitions.py index 8d96977f706d28234b8f02b39173e4186f61fef8..de7f068f6a1b79b5a0084583427f4e83bb18ca14 100644 --- a/coolamqp/framing/frames/compilation/compile_definitions.py +++ b/coolamqp/framing/frames/compilation/compile_definitions.py @@ -31,7 +31,7 @@ CoolAMQP is copyright (c) 2016 DMS Serwis s.c. import struct -from coolamqp.framing.frames.base_definitions import AMQPClass, AMQPMethodPayload +from coolamqp.framing.frames.base_definitions import AMQPClass, AMQPMethodPayload, AMQPContentPropertyList from coolamqp.framing.frames.field_table import enframe_table, deframe_table, frame_table_size ''') @@ -75,6 +75,9 @@ from coolamqp.framing.frames.field_table import enframe_table, deframe_table, fr # Output classes for cls in get_classes(xml): + + cls = cls._replace(content_properties=[p._replace(basic_type=domain_to_basic_type[p.type]) for p in cls.content_properties]) + line('''\nclass %s(AMQPClass): """ %s @@ -82,8 +85,27 @@ from coolamqp.framing.frames.field_table import enframe_table, deframe_table, fr NAME = %s INDEX = %s -''', name_class(cls.name), doxify(None, cls.docs), frepr(cls.name), cls.index) +''', + name_class(cls.name), doxify(None, cls.docs), frepr(cls.name), cls.index) + + line('''\nclass %sContentPropertyList(AMQPContentPropertyList): + """ + %s + """ + CLASS_NAME = %s + CLASS_INDEX = %s + CLASS = %s + + CONTENT_PROPERTIES = [ + # tuple of (name, domain, type) +''', + name_class(cls.name), doxify(None, cls.docs), frepr(cls.name), cls.index, name_class(cls.name)) + + for property in cls.content_properties: + line(' (%s, %s, %s), # %s\n', frepr(property.name), frepr(property.type), frepr(property.basic_type), + frepr(property.label)) + line(' ]\n\n') for method in cls.methods: full_class_name = '%s%s' % (name_class(cls.name), name_method(method.name)) @@ -106,6 +128,8 @@ from coolamqp.framing.frames.field_table import enframe_table, deframe_table, fr CLASSNAME = %s FULLNAME = %s + CONTENT_PROPERTY_LIST = %sContentPropertyList + CLASS_INDEX = %s CLASS_INDEX_BINARY = %s METHOD_INDEX = %s @@ -129,6 +153,7 @@ from coolamqp.framing.frames.field_table import enframe_table, deframe_table, fr frepr(method.name), frepr(cls.name), frepr(cls.name + '.' + method.name), + name_class(cls.name), frepr(cls.index), as_nice_escaped_string(chr(cls.index)), frepr(method.index), diff --git a/coolamqp/framing/frames/compilation/utilities.py b/coolamqp/framing/frames/compilation/utilities.py index f7ac878cd36494813cc8137fad330d211b5176ee..48f59d293cb33eedb9ea4a305e5e0d8220a2b06f 100644 --- a/coolamqp/framing/frames/compilation/utilities.py +++ b/coolamqp/framing/frames/compilation/utilities.py @@ -14,7 +14,8 @@ Method = namedtuple('Method', ('name', 'synchronous', 'index', 'label', 'docs', 'sent_by_client', 'sent_by_server', 'constant')) # synchronous is bool, constant is bool # repponse is a list of method.name -Class_ = namedtuple('Class_', ('name', 'index', 'docs', 'methods')) # label is int +Property = namedtuple('Property', ('name', 'type', 'label', 'basic_type')) +Class_ = namedtuple('Class_', ('name', 'index', 'docs', 'methods', 'content_properties')) # label is int Domain = namedtuple('Domain', ('name', 'type', 'elementary')) # elementary is bool @@ -106,6 +107,10 @@ def for_method_field(elem): # for <field> in <method> None) +def for_content_property(elem): + a = elem.attrib + return Property(a['name'], a['domain'], a.get('label', ''), None) + def for_method(elem): # for <method> a = elem.attrib return Method(six.text_type(a['name']), bool(int(a.get('synchronous', '0'))), int(a['index']), a['label'], get_docs(elem), @@ -119,7 +124,8 @@ def for_method(elem): # for <method> def for_class(elem): # for <class> a = elem.attrib methods = sorted([for_method(me) for me in elem.getchildren() if me.tag == 'method'], key=lambda m: (m.name.strip('-')[0], -len(m.response))) - return Class_(six.text_type(a['name']), int(a['index']), get_docs(elem) or a['label'], methods) + return Class_(six.text_type(a['name']), int(a['index']), get_docs(elem) or a['label'], methods, + [for_content_property(e) for e in elem.getchildren() if e.tag == 'field']) def for_constant(elem): # for <constant> a = elem.attrib diff --git a/coolamqp/framing/frames/definitions.py b/coolamqp/framing/frames/definitions.py index ea82d72416b7624abc0100e484d040be4910a90d..443783f2d63e296371a2c2fbee9dd003719b80b7 100644 --- a/coolamqp/framing/frames/definitions.py +++ b/coolamqp/framing/frames/definitions.py @@ -12,7 +12,7 @@ CoolAMQP is copyright (c) 2016 DMS Serwis s.c. import struct -from coolamqp.framing.frames.base_definitions import AMQPClass, AMQPMethodPayload +from coolamqp.framing.frames.base_definitions import AMQPClass, AMQPMethodPayload, AMQPContentPropertyList from coolamqp.framing.frames.field_table import enframe_table, deframe_table, frame_table_size # Core constants @@ -104,6 +104,21 @@ class Connection(AMQPClass): INDEX = 10 +class ConnectionContentPropertyList(AMQPContentPropertyList): + """ + The connection class provides methods for a client to establish a network connection to + + a server, and for both peers to operate the connection thereafter. + """ + CLASS_NAME = u'connection' + CLASS_INDEX = 10 + CLASS = Connection + + CONTENT_PROPERTIES = [ + # tuple of (name, domain, type) + ] + + class ConnectionClose(AMQPMethodPayload): """ Request a connection close @@ -118,6 +133,8 @@ class ConnectionClose(AMQPMethodPayload): CLASSNAME = u'connection' FULLNAME = u'connection.close' + CONTENT_PROPERTY_LIST = ConnectionContentPropertyList + CLASS_INDEX = 10 CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 50 @@ -193,6 +210,8 @@ class ConnectionCloseOk(AMQPMethodPayload): CLASSNAME = u'connection' FULLNAME = u'connection.close-ok' + CONTENT_PROPERTY_LIST = ConnectionContentPropertyList + CLASS_INDEX = 10 CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 51 @@ -238,6 +257,8 @@ class ConnectionOpen(AMQPMethodPayload): CLASSNAME = u'connection' FULLNAME = u'connection.open' + CONTENT_PROPERTY_LIST = ConnectionContentPropertyList + CLASS_INDEX = 10 CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 40 @@ -305,6 +326,8 @@ class ConnectionOpenOk(AMQPMethodPayload): CLASSNAME = u'connection' FULLNAME = u'connection.open-ok' + CONTENT_PROPERTY_LIST = ConnectionContentPropertyList + CLASS_INDEX = 10 CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 41 @@ -352,6 +375,8 @@ class ConnectionStart(AMQPMethodPayload): CLASSNAME = u'connection' FULLNAME = u'connection.start' + CONTENT_PROPERTY_LIST = ConnectionContentPropertyList + CLASS_INDEX = 10 CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 10 @@ -450,6 +475,8 @@ class ConnectionSecure(AMQPMethodPayload): CLASSNAME = u'connection' FULLNAME = u'connection.secure' + CONTENT_PROPERTY_LIST = ConnectionContentPropertyList + CLASS_INDEX = 10 CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 20 @@ -510,6 +537,8 @@ class ConnectionStartOk(AMQPMethodPayload): CLASSNAME = u'connection' FULLNAME = u'connection.start-ok' + CONTENT_PROPERTY_LIST = ConnectionContentPropertyList + CLASS_INDEX = 10 CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 11 @@ -607,6 +636,8 @@ class ConnectionSecureOk(AMQPMethodPayload): CLASSNAME = u'connection' FULLNAME = u'connection.secure-ok' + CONTENT_PROPERTY_LIST = ConnectionContentPropertyList + CLASS_INDEX = 10 CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 21 @@ -669,6 +700,8 @@ class ConnectionTune(AMQPMethodPayload): CLASSNAME = u'connection' FULLNAME = u'connection.tune' + CONTENT_PROPERTY_LIST = ConnectionContentPropertyList + CLASS_INDEX = 10 CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 30 @@ -740,6 +773,8 @@ class ConnectionTuneOk(AMQPMethodPayload): CLASSNAME = u'connection' FULLNAME = u'connection.tune-ok' + CONTENT_PROPERTY_LIST = ConnectionContentPropertyList + CLASS_INDEX = 10 CLASS_INDEX_BINARY = b'\x0A' METHOD_INDEX = 31 @@ -810,6 +845,21 @@ class Channel(AMQPClass): INDEX = 20 +class ChannelContentPropertyList(AMQPContentPropertyList): + """ + The channel class provides methods for a client to establish a channel to a + + server and for both peers to operate the channel thereafter. + """ + CLASS_NAME = u'channel' + CLASS_INDEX = 20 + CLASS = Channel + + CONTENT_PROPERTIES = [ + # tuple of (name, domain, type) + ] + + class ChannelClose(AMQPMethodPayload): """ Request a channel close @@ -824,6 +874,8 @@ class ChannelClose(AMQPMethodPayload): CLASSNAME = u'channel' FULLNAME = u'channel.close' + CONTENT_PROPERTY_LIST = ChannelContentPropertyList + CLASS_INDEX = 20 CLASS_INDEX_BINARY = b'\x14' METHOD_INDEX = 40 @@ -899,6 +951,8 @@ class ChannelCloseOk(AMQPMethodPayload): CLASSNAME = u'channel' FULLNAME = u'channel.close-ok' + CONTENT_PROPERTY_LIST = ChannelContentPropertyList + CLASS_INDEX = 20 CLASS_INDEX_BINARY = b'\x14' METHOD_INDEX = 41 @@ -945,6 +999,8 @@ class ChannelFlow(AMQPMethodPayload): CLASSNAME = u'channel' FULLNAME = u'channel.flow' + CONTENT_PROPERTY_LIST = ChannelContentPropertyList + CLASS_INDEX = 20 CLASS_INDEX_BINARY = b'\x14' METHOD_INDEX = 20 @@ -1002,6 +1058,8 @@ class ChannelFlowOk(AMQPMethodPayload): CLASSNAME = u'channel' FULLNAME = u'channel.flow-ok' + CONTENT_PROPERTY_LIST = ChannelContentPropertyList + CLASS_INDEX = 20 CLASS_INDEX_BINARY = b'\x14' METHOD_INDEX = 21 @@ -1060,6 +1118,8 @@ class ChannelOpen(AMQPMethodPayload): CLASSNAME = u'channel' FULLNAME = u'channel.open' + CONTENT_PROPERTY_LIST = ChannelContentPropertyList + CLASS_INDEX = 20 CLASS_INDEX_BINARY = b'\x14' METHOD_INDEX = 10 @@ -1104,6 +1164,8 @@ class ChannelOpenOk(AMQPMethodPayload): CLASSNAME = u'channel' FULLNAME = u'channel.open-ok' + CONTENT_PROPERTY_LIST = ChannelContentPropertyList + CLASS_INDEX = 20 CLASS_INDEX_BINARY = b'\x14' METHOD_INDEX = 11 @@ -1148,6 +1210,21 @@ class Exchange(AMQPClass): INDEX = 40 +class ExchangeContentPropertyList(AMQPContentPropertyList): + """ + Exchanges match and distribute messages across queues. exchanges can be configured in + + the server or declared at runtime. + """ + CLASS_NAME = u'exchange' + CLASS_INDEX = 40 + CLASS = Exchange + + CONTENT_PROPERTIES = [ + # tuple of (name, domain, type) + ] + + class ExchangeDeclare(AMQPMethodPayload): """ Verify exchange exists, create if needed @@ -1160,6 +1237,8 @@ class ExchangeDeclare(AMQPMethodPayload): CLASSNAME = u'exchange' FULLNAME = u'exchange.declare' + CONTENT_PROPERTY_LIST = ExchangeContentPropertyList + CLASS_INDEX = 40 CLASS_INDEX_BINARY = b'\x28' METHOD_INDEX = 10 @@ -1274,6 +1353,8 @@ class ExchangeDelete(AMQPMethodPayload): CLASSNAME = u'exchange' FULLNAME = u'exchange.delete' + CONTENT_PROPERTY_LIST = ExchangeContentPropertyList + CLASS_INDEX = 40 CLASS_INDEX_BINARY = b'\x28' METHOD_INDEX = 20 @@ -1350,6 +1431,8 @@ class ExchangeDeclareOk(AMQPMethodPayload): CLASSNAME = u'exchange' FULLNAME = u'exchange.declare-ok' + CONTENT_PROPERTY_LIST = ExchangeContentPropertyList + CLASS_INDEX = 40 CLASS_INDEX_BINARY = b'\x28' METHOD_INDEX = 11 @@ -1392,6 +1475,8 @@ class ExchangeDeleteOk(AMQPMethodPayload): CLASSNAME = u'exchange' FULLNAME = u'exchange.delete-ok' + CONTENT_PROPERTY_LIST = ExchangeContentPropertyList + CLASS_INDEX = 40 CLASS_INDEX_BINARY = b'\x28' METHOD_INDEX = 21 @@ -1434,6 +1519,22 @@ class Queue(AMQPClass): INDEX = 50 +class QueueContentPropertyList(AMQPContentPropertyList): + """ + Queues store and forward messages. queues can be configured in the server or created at + + runtime. Queues must be attached to at least one exchange in order to receive messages + from publishers. + """ + CLASS_NAME = u'queue' + CLASS_INDEX = 50 + CLASS = Queue + + CONTENT_PROPERTIES = [ + # tuple of (name, domain, type) + ] + + class QueueBind(AMQPMethodPayload): """ Bind queue to an exchange @@ -1448,6 +1549,8 @@ class QueueBind(AMQPMethodPayload): CLASSNAME = u'queue' FULLNAME = u'queue.bind' + CONTENT_PROPERTY_LIST = QueueContentPropertyList + CLASS_INDEX = 50 CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 20 @@ -1553,6 +1656,8 @@ class QueueBindOk(AMQPMethodPayload): CLASSNAME = u'queue' FULLNAME = u'queue.bind-ok' + CONTENT_PROPERTY_LIST = QueueContentPropertyList + CLASS_INDEX = 50 CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 21 @@ -1597,6 +1702,8 @@ class QueueDeclare(AMQPMethodPayload): CLASSNAME = u'queue' FULLNAME = u'queue.declare' + CONTENT_PROPERTY_LIST = QueueContentPropertyList + CLASS_INDEX = 50 CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 10 @@ -1715,6 +1822,8 @@ class QueueDelete(AMQPMethodPayload): CLASSNAME = u'queue' FULLNAME = u'queue.delete' + CONTENT_PROPERTY_LIST = QueueContentPropertyList + CLASS_INDEX = 50 CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 40 @@ -1797,6 +1906,8 @@ class QueueDeclareOk(AMQPMethodPayload): CLASSNAME = u'queue' FULLNAME = u'queue.declare-ok' + CONTENT_PROPERTY_LIST = QueueContentPropertyList + CLASS_INDEX = 50 CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 11 @@ -1869,6 +1980,8 @@ class QueueDeleteOk(AMQPMethodPayload): CLASSNAME = u'queue' FULLNAME = u'queue.delete-ok' + CONTENT_PROPERTY_LIST = QueueContentPropertyList + CLASS_INDEX = 50 CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 41 @@ -1925,6 +2038,8 @@ class QueuePurge(AMQPMethodPayload): CLASSNAME = u'queue' FULLNAME = u'queue.purge' + CONTENT_PROPERTY_LIST = QueueContentPropertyList + CLASS_INDEX = 50 CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 30 @@ -1992,6 +2107,8 @@ class QueuePurgeOk(AMQPMethodPayload): CLASSNAME = u'queue' FULLNAME = u'queue.purge-ok' + CONTENT_PROPERTY_LIST = QueueContentPropertyList + CLASS_INDEX = 50 CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 31 @@ -2047,6 +2164,8 @@ class QueueUnbind(AMQPMethodPayload): CLASSNAME = u'queue' FULLNAME = u'queue.unbind' + CONTENT_PROPERTY_LIST = QueueContentPropertyList + CLASS_INDEX = 50 CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 50 @@ -2136,6 +2255,8 @@ class QueueUnbindOk(AMQPMethodPayload): CLASSNAME = u'queue' FULLNAME = u'queue.unbind-ok' + CONTENT_PROPERTY_LIST = QueueContentPropertyList + CLASS_INDEX = 50 CLASS_INDEX_BINARY = b'\x32' METHOD_INDEX = 51 @@ -2175,6 +2296,33 @@ class Basic(AMQPClass): INDEX = 60 +class BasicContentPropertyList(AMQPContentPropertyList): + """ + The basic class provides methods that support an industry-standard messaging model. + """ + CLASS_NAME = u'basic' + CLASS_INDEX = 60 + CLASS = Basic + + CONTENT_PROPERTIES = [ + # tuple of (name, domain, type) + (u'content-type', u'shortstr', u'shortstr'), # u'MIME content type' + (u'content-encoding', u'shortstr', u'shortstr'), # u'MIME content encoding' + (u'headers', u'table', u'table'), # u'message header field table' + (u'delivery-mode', u'octet', u'octet'), # u'non-persistent (1) or persistent (2)' + (u'priority', u'octet', u'octet'), # u'message priority, 0 to 9' + (u'correlation-id', u'shortstr', u'shortstr'), # u'application correlation identifier' + (u'reply-to', u'shortstr', u'shortstr'), # u'address to reply to' + (u'expiration', u'shortstr', u'shortstr'), # u'message expiration specification' + (u'message-id', u'shortstr', u'shortstr'), # u'application message identifier' + (u'timestamp', u'timestamp', u'timestamp'), # u'message timestamp' + (u'type', u'shortstr', u'shortstr'), # u'message type name' + (u'user-id', u'shortstr', u'shortstr'), # u'creating user id' + (u'app-id', u'shortstr', u'shortstr'), # u'creating application id' + (u'reserved', u'shortstr', u'shortstr'), # u'reserved, must be empty' + ] + + class BasicAck(AMQPMethodPayload): """ Acknowledge one or more messages @@ -2188,6 +2336,8 @@ class BasicAck(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.ack' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 80 @@ -2253,6 +2403,8 @@ class BasicConsume(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.consume' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 20 @@ -2359,6 +2511,8 @@ class BasicCancel(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.cancel' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 30 @@ -2424,6 +2578,8 @@ class BasicConsumeOk(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.consume-ok' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 21 @@ -2483,6 +2639,8 @@ class BasicCancelOk(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.cancel-ok' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 31 @@ -2544,6 +2702,8 @@ class BasicDeliver(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.deliver' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 60 @@ -2635,6 +2795,8 @@ class BasicGet(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.get' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 70 @@ -2704,6 +2866,8 @@ class BasicGetOk(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.get-ok' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 71 @@ -2792,6 +2956,8 @@ class BasicGetEmpty(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.get-empty' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 72 @@ -2839,6 +3005,8 @@ class BasicPublish(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.publish' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 40 @@ -2937,6 +3105,8 @@ class BasicQos(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.qos' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 10 @@ -3017,6 +3187,8 @@ class BasicQosOk(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.qos-ok' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 11 @@ -3062,6 +3234,8 @@ class BasicReturn(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.return' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 50 @@ -3146,6 +3320,8 @@ class BasicReject(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.reject' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 90 @@ -3209,6 +3385,8 @@ class BasicRecoverAsync(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.recover-async' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 100 @@ -3269,6 +3447,8 @@ class BasicRecover(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.recover' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 110 @@ -3327,6 +3507,8 @@ class BasicRecoverOk(AMQPMethodPayload): CLASSNAME = u'basic' FULLNAME = u'basic.recover-ok' + CONTENT_PROPERTY_LIST = BasicContentPropertyList + CLASS_INDEX = 60 CLASS_INDEX_BINARY = b'\x3C' METHOD_INDEX = 111 @@ -3374,6 +3556,28 @@ class Tx(AMQPClass): INDEX = 90 +class TxContentPropertyList(AMQPContentPropertyList): + """ + The tx class allows publish and ack operations to be batched into atomic + + units of work. The intention is that all publish and ack requests issued + within a transaction will complete successfully or none of them will. + Servers SHOULD implement atomic transactions at least where all publish + or ack requests affect a single queue. Transactions that cover multiple + queues may be non-atomic, given that queues can be created and destroyed + asynchronously, and such events do not form part of any transaction. + Further, the behaviour of transactions with respect to the immediate and + mandatory flags on Basic.Publish methods is not defined. + """ + CLASS_NAME = u'tx' + CLASS_INDEX = 90 + CLASS = Tx + + CONTENT_PROPERTIES = [ + # tuple of (name, domain, type) + ] + + class TxCommit(AMQPMethodPayload): """ Commit the current transaction @@ -3386,6 +3590,8 @@ class TxCommit(AMQPMethodPayload): CLASSNAME = u'tx' FULLNAME = u'tx.commit' + CONTENT_PROPERTY_LIST = TxContentPropertyList + CLASS_INDEX = 90 CLASS_INDEX_BINARY = b'\x5A' METHOD_INDEX = 20 @@ -3428,6 +3634,8 @@ class TxCommitOk(AMQPMethodPayload): CLASSNAME = u'tx' FULLNAME = u'tx.commit-ok' + CONTENT_PROPERTY_LIST = TxContentPropertyList + CLASS_INDEX = 90 CLASS_INDEX_BINARY = b'\x5A' METHOD_INDEX = 21 @@ -3473,6 +3681,8 @@ class TxRollback(AMQPMethodPayload): CLASSNAME = u'tx' FULLNAME = u'tx.rollback' + CONTENT_PROPERTY_LIST = TxContentPropertyList + CLASS_INDEX = 90 CLASS_INDEX_BINARY = b'\x5A' METHOD_INDEX = 30 @@ -3515,6 +3725,8 @@ class TxRollbackOk(AMQPMethodPayload): CLASSNAME = u'tx' FULLNAME = u'tx.rollback-ok' + CONTENT_PROPERTY_LIST = TxContentPropertyList + CLASS_INDEX = 90 CLASS_INDEX_BINARY = b'\x5A' METHOD_INDEX = 31 @@ -3558,6 +3770,8 @@ class TxSelect(AMQPMethodPayload): CLASSNAME = u'tx' FULLNAME = u'tx.select' + CONTENT_PROPERTY_LIST = TxContentPropertyList + CLASS_INDEX = 90 CLASS_INDEX_BINARY = b'\x5A' METHOD_INDEX = 10 @@ -3600,6 +3814,8 @@ class TxSelectOk(AMQPMethodPayload): CLASSNAME = u'tx' FULLNAME = u'tx.select-ok' + CONTENT_PROPERTY_LIST = TxContentPropertyList + CLASS_INDEX = 90 CLASS_INDEX_BINARY = b'\x5A' METHOD_INDEX = 11 diff --git a/coolamqp/framing/frames/frames.py b/coolamqp/framing/frames/frames.py index ea85d9a7b3dcf8ba2afd2e0ade9d78e204ddda44..7e51b7b8bcd6eebcfaa18a06c8474cf54d41c68e 100644 --- a/coolamqp/framing/frames/frames.py +++ b/coolamqp/framing/frames/frames.py @@ -39,6 +39,8 @@ class AMQPMethodFrame(AMQPFrame): return AMQPMethodFrame(channel, payload) + def get_size(self): + return 10 + self.payload.get_size() class AMQPHeaderFrame(AMQPFrame): @@ -68,6 +70,9 @@ class AMQPHeaderFrame(AMQPFrame): def unserialize(channel, payload_as_buffer): pass + def get_size(self): + raise NotImplementedError + class AMQPBodyFrame(AMQPFrame): FRAME_TYPE = FRAME_BODY @@ -85,6 +90,9 @@ class AMQPBodyFrame(AMQPFrame): def unserialize(channel, payload_as_buffer): return AMQPBodyFrame(channel, payload_as_buffer) + def get_size(self): + return 8 + len(self.data) + class AMQPHeartbeatFrame(AMQPFrame): FRAME_TYPE = FRAME_HEARTBEAT @@ -98,3 +106,5 @@ class AMQPHeartbeatFrame(AMQPFrame): AMQPFrame.write_to(self, buf) buf.write(chr(FRAME_END)) + def get_size(self): + return AMQPHeartbeatFrame.LENGTH diff --git a/coolamqp/framing/order.py b/coolamqp/framing/order.py new file mode 100644 index 0000000000000000000000000000000000000000..74486d434544705c2cba66781784dd4b04d0dda8 --- /dev/null +++ b/coolamqp/framing/order.py @@ -0,0 +1,30 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + + + +class MethodTransaction(object): + """ + This means "run a method, and call me back when reply arrives". + + Method's + """ + + +class SyncOrder(Order): + """ + This means "send a method frame, optionally some other frames, + and run a callback with the returned thing. + + Possible responses are registered for. + """ + def __init__(self, channel, method, other_frames=[], callback=lambda frame: None): + """ + :param channel: channel to use + :param method: AMQPMethodPayload instance + :param other_frames: list of other AMQPFrames to send next in order + :param callback: callback to run with received response. + methods that this will react to are determined by AMQPMethodPayload + """ + + diff --git a/coolamqp/framing/react/__init__.py b/coolamqp/framing/react/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1ecfda3bb4a648582888bc86871de56daba9fce3 --- /dev/null +++ b/coolamqp/framing/react/__init__.py @@ -0,0 +1,6 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + +""" + +""" \ No newline at end of file diff --git a/coolamqp/framing/streams/__init__.py b/coolamqp/framing/streams/__init__.py index 111635de9e6ce30fd3cbb34c7e3532b50d1f60d4..7670bd2bf36d35609e06e53cd6455f221b39b897 100644 --- a/coolamqp/framing/streams/__init__.py +++ b/coolamqp/framing/streams/__init__.py @@ -1,6 +1,9 @@ # coding=UTF-8 """ -Classes that allow to receive and send frames in a rapid way +Classes that allow to receive and send frames in a rapid way, +and manage low-level connection details. + +These modules bear no notion of fault-tolerance. """ from __future__ import absolute_import, division, print_function diff --git a/coolamqp/framing/streams/exceptions.py b/coolamqp/framing/streams/exceptions.py new file mode 100644 index 0000000000000000000000000000000000000000..0637f9fc106149cfdae552f9577421133e3ead3a --- /dev/null +++ b/coolamqp/framing/streams/exceptions.py @@ -0,0 +1,4 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + + diff --git a/coolamqp/framing/streams/send_operator.py b/coolamqp/framing/streams/send_operator.py new file mode 100644 index 0000000000000000000000000000000000000000..1161ee429028ba1315acd105709c9119d87cc22b --- /dev/null +++ b/coolamqp/framing/streams/send_operator.py @@ -0,0 +1,97 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + +import collections +import threading +import io +import socket + + + +class SendingOperator(object): + """ + Assembles AMQP frames from received data and orchestrates their upload via a socket. + + Just call with .put(data) and get frames by iterator .frames(). + + Not thread safe. + + State machine + (frame_type is None) and has_bytes(1) -> (frame_type <- bytes(1)) + + (frame_type is HEARTBEAT) and has_bytes(3) -> (output_frame, frame_type <- None) + (frame_type is not HEARTBEAT and not None) and has_bytes(6) -> (frame_channel <- bytes(2), + frame_size <- bytes(4)) + + (frame_size is not None) and has_bytes(frame_size+1) -> (output_frame, + frame_type <- None + frame_size < None) + """ + def __init__(self, sock, on_fail=lambda e: None, + on_close=lambda: None): + """ + :param sock: a non-timeouting socket + :param on_fail: callable(exception) to call when socket fails + :param on_close: callable(exception) to call when socket closes + """ + self.sock = sock + self.to_send = collections.deque() # either bytes, bytearrays and buffers, or tuple of + # (callable/0, callable/1) + + self.on_fail = on_fail + self.on_close = on_close + + self.failed = False + + + def _failed(self, e): + """Discard all to_send, run on_fail callables""" + self.failed = True + self.on_fail(e) + + for order in self.to_send: + if isinstance(order, tuple) and len(order) == 2: + on_done, on_fail = order + if on_fail is not None: + on_fail(e) + + self.to_send = collections.deque() + + def run_sending_loop(self): + while len(self.to_send) > 0: + if isinstance(self.to_send[0], tuple) and len(self.to_send[0]) == 2: + on_done, on_fail = self.to_send.popleft() + if on_done is not None: + on_done() + else: + try: + sent_bytes = self.sock.send(self.to_send[0]) + except (IOError, socket.error) as e: + self._failed(e) + return + + if sent_bytes == len(self.to_send[0]): + # Fetch next fragment + self.to_send.popleft() + else: + # slice current buffer + self.to_send[0] = buffer(self.to_send[0], sent_bytes) + + def send(self, frames, on_done=None, on_fail=None): + """ + Schedule to send some frames. + :param frames: list of AMQPFrame instances + :param on_done: callable/0 to call when this is done (frame_end of last frame has just left this PC) + :param on_fail: callable(Exception) to call when something broke before the data could be sent. + """ + length = sum(frame.get_size() for frame in frames) + buf = io.BytesIO(bytearray(length)) + + for frame in frames: + frame.write_to(buf) + + self.to_send.append(buf.getvalue()) + if (on_done is not None) or (on_fail is not None): + self.to_send.append(on_done, on_fail) + + self.run_sending_loop() diff --git a/coolamqp/framing/streams/uplink.py b/coolamqp/framing/streams/uplink.py new file mode 100644 index 0000000000000000000000000000000000000000..e938afa5fedd781bfc59925b7dc6aa37f223e1c4 --- /dev/null +++ b/coolamqp/framing/streams/uplink.py @@ -0,0 +1,27 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import collections +import socket + +class Uplink(object): + """ + This coordinates access to a shared socket. + + This should be discarded when the TCP connection dies. + """ + + def __init__(self, sock): + """ + Pass a fresh socket, just a + :param sock: + """ + self.sock = sock + self.sock.settimeout(0) + self.sock.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1) # disable Nagle + + # when a method frame comes in, it is checked here first. + # if there's a match, that means a sync request completed. + self.waiting_queue = collections.defaultdict() + + +