From dee636c6b69130429570218ec447dc87eaaf4b49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <pmaslanka@smok.co> Date: Wed, 13 Nov 2024 12:04:11 +0000 Subject: [PATCH] Feature#10 --- CHANGELOG.md | 1 + coolamqp/argumentify.py | 53 +++++++++++++++++++ coolamqp/attaches/channeler.py | 4 +- coolamqp/attaches/consumer.py | 16 +++--- coolamqp/clustering/cluster.py | 3 +- .../framing/compilation/content_property.py | 5 +- coolamqp/framing/definitions.py | 7 ++- coolamqp/framing/field_table.py | 24 ++++++--- coolamqp/objects.py | 52 +++++++----------- coolamqp/uplink/connection/connection.py | 1 + coolamqp/uplink/connection/recv_framer.py | 1 + coolamqp/uplink/handshake.py | 2 + tests/test_clustering/test_topic_exchanges.py | 32 ++++++++++- tests/test_framing.py | 2 +- tests/test_objects.py | 9 +++- 15 files changed, 154 insertions(+), 58 deletions(-) create mode 100644 coolamqp/argumentify.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cc96b5..0e7cd43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ v2.0.0 * changed some default arguments for Queues for them to better make sense * some argument combinations just raise ValueError * PendingDeprecationWarning changed into a DeprecationWarning + * added support for headers exchanges * changes to Cluster: * declare will refuse to declare an anonymous queue * renamed publish(tx) to publish(confirm) diff --git a/coolamqp/argumentify.py b/coolamqp/argumentify.py new file mode 100644 index 0000000..51aaed7 --- /dev/null +++ b/coolamqp/argumentify.py @@ -0,0 +1,53 @@ +import warnings + +import six + +from coolamqp.framing.field_table import get_type_for +import logging + +logger = logging.getLogger(__name__) + + + +def tobytes(q): + if isinstance(q, memoryview): + return q.tobytes() + return q.encode('utf-8') if isinstance(q, six.text_type) else q + + +def toutf8(q): + if isinstance(q, memoryview): + q = q.tobytes() + return q.decode('utf-8') if isinstance(q, six.binary_type) else q + + +def argumentify(arguments): + if arguments is None: + return [] + logger.warning('Input is %s' % (arguments, )) + # Was it argumented already? + # if isinstance(arguments, list): + # if len(arguments) >= 1: + # if isinstance(arguments[0], tuple): + # if isinstance(arguments[0][1], str) and len(arguments[0][1]) == 1: + # # Looks argumentified already + # return arguments + args = [] + if isinstance(arguments, dict): + for key, value in arguments.items(): + key = tobytes(key) + args.append((key, (value, get_type_for(value)))) + logger.warning('Output is %s', (args, 'F')) + return (args, 'F') + elif len(arguments[0]) == 2: + for key, value in arguments: + key = tobytes(key) + args.append((key, (value, get_type_for(value)))) + return (args, 'F') + elif isinstance(arguments, (list, tuple)): + for value in arguments: + args.append((value, get_type_for(value))) + return (args, 'A') + else: + warnings.warn('Unnecessary call to argumentify, see issue #11 for details', UserWarning) + return args diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index b22bb47..a6ef07a 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -8,6 +8,7 @@ from __future__ import print_function, absolute_import, division import logging import typing as tp +import coolamqp.argumentify from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, \ ChannelClose, ChannelCloseOk, BasicCancel, \ BasicCancelOk @@ -161,8 +162,7 @@ class Channeler(Attache): self.channel_id = None if isinstance(payload, ChannelClose): - logger.debug('Channel closed: %s %s', payload.reply_code, - payload.reply_text.tobytes()) + logger.debug('Channel closed: %s %s', payload.reply_code, payload.reply_text) def methods(self, payloads): # type: (tp.Iterable[coolamqp.framing.base.AMQPMethodPayload]) -> None diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 816a9c5..dae02c0 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -8,6 +8,7 @@ import uuid import warnings from concurrent.futures import Future +import coolamqp.argumentify from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.exceptions import AMQPError from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ @@ -365,7 +366,7 @@ class Consumer(Channeler): self.queue.exchange.auto_delete, False, False, - []), + self.queue.exchange.arguments), ExchangeDeclareOk, self.on_setup ) @@ -386,7 +387,7 @@ class Consumer(Channeler): self.queue.exclusive, self.queue.auto_delete, False, - [] + self.queue.arguments ), QueueDeclareOk, self.on_setup @@ -394,25 +395,24 @@ class Consumer(Channeler): elif isinstance(payload, QueueDeclareOk): # did we need an anonymous name? - if not self.queue.name: + if self.queue.anonymous: self.queue.name = payload.queue.tobytes() - queue_declared = False + queue_bound = False # We need any form of binding. if self.queue.exchange is not None: - queue_declared = True + queue_bound = True qb = QueueBind( self.queue.name, self.queue.exchange.name.encode('utf-8'), - self.queue.routing_key, False, []) - logger.debug('Running %s' % (repr(qb))) + self.queue.routing_key, False, self.queue.arguments_bind) self.method_and_watch( qb, QueueBindOk, self.on_setup ) - if not queue_declared: + if not queue_bound: # default exchange, pretend it was bind ok self.on_setup(QueueBindOk()) elif isinstance(payload, QueueBindOk): diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index e62fbb3..0760c9d 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -9,6 +9,7 @@ from concurrent.futures import Future import six +from coolamqp.argumentify import argumentify from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.attaches.utils import close_future from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ @@ -112,7 +113,7 @@ class Cluster(object): child_span = self._make_span('bind', span) else: child_span = None - fut = self.decl.declare(QueueBind(queue, exchange, routing_key, arguments), + fut = self.decl.declare(QueueBind(queue, exchange, routing_key, argumentify(arguments)), span=child_span) return close_future(fut, child_span) diff --git a/coolamqp/framing/compilation/content_property.py b/coolamqp/framing/compilation/content_property.py index 0149db2..cf83661 100644 --- a/coolamqp/framing/compilation/content_property.py +++ b/coolamqp/framing/compilation/content_property.py @@ -1,7 +1,7 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -from coolamqp.framing.field_table import deframe_table +from coolamqp.framing.field_table import deframe_table, frame_table_size, enframe_table """Generate serializers/unserializers/length getters for given property_flags""" import six @@ -141,7 +141,6 @@ def _compile_particular_content_property_list_class(zpf, fields): mod.append(get_counter(present_fields, prefix=u'self.', indent_level=2)[ :-1]) # skip eol mod.append(u' + %s\n' % (zpf_length,)) # account for pf length - return u''.join(mod), structers @@ -156,6 +155,8 @@ def compile_particular_content_property_list_class(zpf, fields): locals_ = { 'AMQPContentPropertyList': AMQPContentPropertyList, 'deframe_table': deframe_table, + 'frame_table_size': frame_table_size, + 'enframe_table': enframe_table, } for structer in structers: if structer not in STRUCTERS_FOR_NOW: diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 68278f4..12645c8 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -1,5 +1,9 @@ # coding=UTF-8 from __future__ import print_function, absolute_import + +import coolamqp.argumentify +from coolamqp.argumentify import argumentify + """ A Python version of the AMQP machine-readable specification. @@ -3021,7 +3025,8 @@ class BasicContentPropertyList(AMQPContentPropertyList): while buf[offset + pfl - 1] & 1: pfl += 2 zpf = BasicContentPropertyList.zero_property_flags(buf[offset:offset + - pfl]).tobytes() + pfl]).tobytes() + if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: return BasicContentPropertyList.PARTICULAR_CLASSES[ zpf].from_buffer(buf, offset) diff --git a/coolamqp/framing/field_table.py b/coolamqp/framing/field_table.py index 5d7b1de..a2752c6 100644 --- a/coolamqp/framing/field_table.py +++ b/coolamqp/framing/field_table.py @@ -133,8 +133,12 @@ def get_type_for(val): return 'l' elif isinstance(val, float): return 'd' + elif isinstance(val, (tuple, list)): + return 'A' + elif isinstance(val, dict): + return 'F' else: - raise ValueError('Undeterminable type') + raise ValueError('I have zero idea what you have just passed') def enframe_field_value(buf, fv): @@ -203,8 +207,9 @@ def enframe_table(buf, table): # type (tp.BinaryIO, table) -> None :param buf: target buffer to write to :param table: table to write """ + if isinstance(table, tuple) and len(table) > 1 and table[1] == 'F': # Todo: fix an ugly hack + table = table[0] _tobuf(buf, '!I', frame_table_size(table) - 4) - for name, fv in table: _tobufv(buf, name, '!B', len(name)) enframe_field_value(buf, fv) @@ -240,10 +245,14 @@ def deframe_table(buf, start_offset): # -> (table, bytes_consumed) def frame_field_value_size(fv): v, t = fv - if FIELD_TYPES[t][0] is None: - return FIELD_TYPES[t][4](v) + 1 - else: - return FIELD_TYPES[t][0] + 1 + try: + if FIELD_TYPES[t][0] is None: + return FIELD_TYPES[t][4](v) + 1 + else: + return FIELD_TYPES[t][0] + 1 + except KeyError: + # todo: fix this hack + return frame_field_value_size(t) def frame_array_size(array): @@ -254,6 +263,9 @@ def frame_table_size(table): """ :return: length of table representation, in bytes, INCLUDING length header""" + if isinstance(table, tuple) and len(table) == 2: + table = table[0] + # todo: fix this hack return 4 + sum(1 + len(k) + frame_field_value_size(fv) for k, fv in table) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 7647c66..bf10586 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -10,42 +10,21 @@ import warnings import six -from coolamqp.framing.definitions import \ - BasicContentPropertyList as MessageProperties -from coolamqp.framing.field_table import get_type_for +from coolamqp.argumentify import argumentify, tobytes, toutf8 +from coolamqp.framing.definitions import BasicContentPropertyList logger = logging.getLogger(__name__) -EMPTY_PROPERTIES = MessageProperties() - - - -def toutf8(q): - if isinstance(q, memoryview): - q = q.tobytes() - return q.decode('utf-8') if isinstance(q, six.binary_type) else q +class MessageProperties(BasicContentPropertyList): + def __new__(cls, *args, **kwargs): + if 'headers' in kwargs: + if isinstance(kwargs['headers'], dict): + kwargs['headers'] = argumentify(kwargs['headers']) + return BasicContentPropertyList.__new__(cls, *args, **kwargs) -def tobytes(q): - if isinstance(q, memoryview): - return q.tobytes() - return q.encode('utf-8') if isinstance(q, six.text_type) else q - -def argumentify(arguments): - if arguments is None: - return None - args = [] - if isinstance(arguments, dict): - for key, value in arguments.items(): - key = tobytes(key) - args.append((key, (value, get_type_for(value)))) - else: - for key, value in arguments: - key = tobytes(key) - args.append((key, (value, get_type_for(value)))) - - return args +EMPTY_PROPERTIES = MessageProperties() class Callable(object): @@ -271,6 +250,9 @@ class Queue(object): :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument :param routing_key: routing key that will be used to bind to an exchange. Used only when this queue is associated with an exchange. Default value of blank should suffice. + :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument during + declaration + :param arguments_bind: arguments to pass to binding to a (headers, I suppose exchange) :raises ValueError: tried to create a queue that was not durable or auto_delete :raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous :raises ValueError: tried to create a queue that was anonymous and not auto_delete or durable @@ -279,7 +261,7 @@ class Queue(object): :warning UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue """ __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', - 'anonymous', 'consumer_tag', 'arguments', 'routing_key') + 'anonymous', 'consumer_tag', 'arguments', 'routing_key', 'arguments_bind') def __init__(self, name=None, # type: tp.Union[str, bytes, None] durable=False, # type: bool @@ -287,7 +269,8 @@ class Queue(object): exclusive=True, # type: bool auto_delete=True, # type: bool arguments=None, # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]], - routing_key=b'' #: type: tp.Union[str, bytes] + routing_key=b'', #: type: tp.Union[str, bytes] + arguments_bind=None, ): if name is None: self.name = None @@ -300,7 +283,8 @@ class Queue(object): self.auto_delete = auto_delete self.exclusive = exclusive self.arguments = argumentify(arguments) - self.anonymous = self.name is None # if this queue is anonymous, it must be regenerated upon reconnect + self.arguments_bind = argumentify(arguments_bind) + self.anonymous = self.name is None if self.auto_delete and self.durable: raise ValueError('Cannot create an auto_delete and durable queue') @@ -349,7 +333,7 @@ class QueueBind(object): exchange = exchange.name self.exchange = tobytes(exchange) # type: bytes self.routing_key = tobytes(routing_key) # type: bytes - self.arguments = argumentify(arguments) + self.arguments = arguments or [] def __eq__(self, other): return self.queue == other.queue and self.exchange == other.exchange and self.routing_key == other.routing_key diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 0974f5c..cf0cce9 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -8,6 +8,7 @@ import time import typing as tp import uuid +import coolamqp.argumentify from coolamqp.utils import monotonic from coolamqp.exceptions import ConnectionDead diff --git a/coolamqp/uplink/connection/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py index cd35ec6..4f45452 100644 --- a/coolamqp/uplink/connection/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -7,6 +7,7 @@ import struct import six +import coolamqp.argumentify from coolamqp.framing.definitions import FRAME_HEADER, FRAME_HEARTBEAT, \ FRAME_END, FRAME_METHOD, FRAME_BODY from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame, \ diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 99d99bb..77d805a 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -1,6 +1,8 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function +import coolamqp.argumentify + """ Provides reactors that can authenticate an AQMP session """ diff --git a/tests/test_clustering/test_topic_exchanges.py b/tests/test_clustering/test_topic_exchanges.py index dc8c18e..0a2bfab 100644 --- a/tests/test_clustering/test_topic_exchanges.py +++ b/tests/test_clustering/test_topic_exchanges.py @@ -3,11 +3,12 @@ import time import os import unittest import logging +import uuid import monotonic from coolamqp.clustering import Cluster -from coolamqp.objects import Exchange, Queue, NodeDefinition, Message +from coolamqp.objects import Exchange, Queue, NodeDefinition, Message, MessageProperties XCHG = Exchange('smok5.results', type='topic', durable=True) @@ -20,6 +21,7 @@ logging.getLogger('coolamqp').setLevel(logging.DEBUG) did_receive = False + class TestTopic(unittest.TestCase): def setUp(self): self.c = Cluster([NODE]) @@ -32,6 +34,34 @@ class TestTopic(unittest.TestCase): pass self.c.shutdown() + def test_headers_exchange(self): + xchg_name = uuid.uuid4().hex + exchange = Exchange(xchg_name, b'headers') + self.c.declare(exchange).result() + queue1 = Queue(uuid.uuid4().hex, exchange=exchange, arguments_bind={'x-match': 'all', 'location': 'brisbane'}) + self.c.declare(queue1).result() + queue2 = Queue(uuid.uuid4().hex, exchange=exchange, arguments_bind={'x-match': 'all', 'location': 'sydney'}) + self.c.declare(queue2).result() + + test = {'a': 0} + + def do_message(msg): + msg.ack() + test['a'] += 1 + + cons1, fut1 = self.c.consume(queue1, on_message=do_message, no_ack=False) + cons2, fut2 = self.c.consume(queue2, on_message=do_message, no_ack=False) + fut1.result() + fut2.result() + + self.c.publish(Message(b'dupa', MessageProperties(headers={'location': 'sydney'})), exchange=exchange, confirm=True).result() + self.c.publish(Message(b'dupa', MessageProperties(headers={'location': 'brisbane'})), exchange=exchange, confirm=True).result() + self.c.publish(Message(b'dupa', MessageProperties(headers={'location': 'wtf'})), exchange=exchange, confirm=True).result() + time.sleep(1) + cons1.cancel().result() + cons2.cancel().result() + self.assertEqual(test['a'], 2) + def test_bind_stuff(self): self.c.declare(QUEUE) self.c.declare(XCHG).result() diff --git a/tests/test_framing.py b/tests/test_framing.py index 3b4c10b..5442da3 100644 --- a/tests/test_framing.py +++ b/tests/test_framing.py @@ -3,7 +3,7 @@ import unittest from coolamqp.framing.field_table import enframe_table -from coolamqp.objects import argumentify +from coolamqp.argumentify import argumentify class TestFraming(unittest.TestCase): diff --git a/tests/test_objects.py b/tests/test_objects.py index 1415ba6..19f12ca 100644 --- a/tests/test_objects.py +++ b/tests/test_objects.py @@ -8,8 +8,8 @@ import warnings from coolamqp.framing.definitions import QueueDeclare -from coolamqp.objects import NodeDefinition, MessageProperties, Queue, argumentify - +from coolamqp.objects import NodeDefinition, MessageProperties, Queue +from coolamqp.argumentify import argumentify logger = logging.getLogger(__name__) logging.getLogger('coolamqp').setLevel(logging.DEBUG) @@ -38,6 +38,11 @@ class TestObjects(unittest.TestCase): if IS_PY3: self.assertTrue(issubclass(w[1].category, DeprecationWarning)) + def test_headers(self): + msg = MessageProperties(headers={'city': 'sydney'}) + buf = io.BytesIO() + msg.write_to(buf) + def test_queue_declare(self): args = argumentify({'x-dead-letter-exchange': 'deadletter', 'x-message-ttl': 1000}) -- GitLab