From 0bbde6642788347c11a3a8b4199530b78d71c061 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Wed, 13 Nov 2024 08:45:01 +0100 Subject: [PATCH] fixed #10 --- CHANGELOG.md | 1 + coolamqp/attaches/consumer.py | 11 +++++------ coolamqp/clustering/cluster.py | 3 ++- coolamqp/framing/definitions.py | 1 + coolamqp/framing/field_table.py | 2 ++ coolamqp/objects.py | 11 +++++++++-- tests/test_clustering/test_topic_exchanges.py | 5 +++-- tests/test_objects.py | 5 +++++ 8 files changed, 28 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cc96b5..0e7cd43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ v2.0.0 * changed some default arguments for Queues for them to better make sense * some argument combinations just raise ValueError * PendingDeprecationWarning changed into a DeprecationWarning + * added support for headers exchanges * changes to Cluster: * declare will refuse to declare an anonymous queue * renamed publish(tx) to publish(confirm) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index f5d6474..0d05c4c 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -395,25 +395,24 @@ class Consumer(Channeler): elif isinstance(payload, QueueDeclareOk): # did we need an anonymous name? - if not self.queue.name: - self.queue.name = self.queue.name.tobytes() + if self.queue.anonymous: + self.queue.name = payload.queue.tobytes() - queue_declared = False + queue_bound = False # We need any form of binding. if self.queue.exchange is not None: - queue_declared = True + queue_bound = True qb = QueueBind( self.queue.name, self.queue.exchange.name.encode('utf-8'), self.queue.routing_key, False, self.queue.arguments_bind) - logger.debug('Running %s' % (repr(qb))) self.method_and_watch( qb, QueueBindOk, self.on_setup ) - if not queue_declared: + if not queue_bound: # default exchange, pretend it was bind ok self.on_setup(QueueBindOk()) elif isinstance(payload, QueueBindOk): diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index e62fbb3..0760c9d 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -9,6 +9,7 @@ from concurrent.futures import Future import six +from coolamqp.argumentify import argumentify from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.attaches.utils import close_future from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ @@ -112,7 +113,7 @@ class Cluster(object): child_span = self._make_span('bind', span) else: child_span = None - fut = self.decl.declare(QueueBind(queue, exchange, routing_key, arguments), + fut = self.decl.declare(QueueBind(queue, exchange, routing_key, argumentify(arguments)), span=child_span) return close_future(fut, child_span) diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index f106e5b..5c76cfa 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -2947,6 +2947,7 @@ class BasicContentPropertyList(AMQPContentPropertyList): """ if 'headers' in kwargs: kwargs['headers'] = argumentify(kwargs['headers']) + logger.warning('Argumentified headers are %s' % (kwargs['headers'], )) zpf = bytearray([ (('content_type' in kwargs) << 7) | diff --git a/coolamqp/framing/field_table.py b/coolamqp/framing/field_table.py index 5d7b1de..fc159a0 100644 --- a/coolamqp/framing/field_table.py +++ b/coolamqp/framing/field_table.py @@ -254,6 +254,8 @@ def frame_table_size(table): """ :return: length of table representation, in bytes, INCLUDING length header""" + import logging + logging.getLogger(__name__).warning(repr(table)) return 4 + sum(1 + len(k) + frame_field_value_size(fv) for k, fv in table) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 3ab5a67..12e3113 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -11,13 +11,20 @@ import warnings import six from coolamqp.argumentify import argumentify, tobytes, toutf8 -from coolamqp.framing.definitions import BasicContentPropertyList as MessageProperties +from coolamqp.framing.definitions import BasicContentPropertyList logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() +class MessageProperties(BasicContentPropertyList): + def __new__(cls, *args, **kwargs): + if 'headers' in kwargs: + kwargs['headers'] = argumentify(kwargs['headers']) + return BasicContentPropertyList.__new__(cls, *args, **kwargs) + + class Callable(object): """ Add a bunch of callables to one list, and just invoke'm. @@ -275,7 +282,7 @@ class Queue(object): self.exclusive = exclusive self.arguments = argumentify(arguments) self.arguments_bind = argumentify(arguments_bind) - self.anonymous = self.name is None # if this queue is anonymous, it must be regenerated upon reconnect + self.anonymous = self.name is None if self.auto_delete and self.durable: raise ValueError('Cannot create an auto_delete and durable queue') diff --git a/tests/test_clustering/test_topic_exchanges.py b/tests/test_clustering/test_topic_exchanges.py index b44efe7..0a2bfab 100644 --- a/tests/test_clustering/test_topic_exchanges.py +++ b/tests/test_clustering/test_topic_exchanges.py @@ -44,12 +44,13 @@ class TestTopic(unittest.TestCase): self.c.declare(queue2).result() test = {'a': 0} + def do_message(msg): msg.ack() test['a'] += 1 - cons1, fut1 = self.c.consume(queue1, no_ack=False) - cons2, fut2 = self.c.consume(queue2, no_ack=False) + cons1, fut1 = self.c.consume(queue1, on_message=do_message, no_ack=False) + cons2, fut2 = self.c.consume(queue2, on_message=do_message, no_ack=False) fut1.result() fut2.result() diff --git a/tests/test_objects.py b/tests/test_objects.py index 9d37c62..19f12ca 100644 --- a/tests/test_objects.py +++ b/tests/test_objects.py @@ -38,6 +38,11 @@ class TestObjects(unittest.TestCase): if IS_PY3: self.assertTrue(issubclass(w[1].category, DeprecationWarning)) + def test_headers(self): + msg = MessageProperties(headers={'city': 'sydney'}) + buf = io.BytesIO() + msg.write_to(buf) + def test_queue_declare(self): args = argumentify({'x-dead-letter-exchange': 'deadletter', 'x-message-ttl': 1000}) -- GitLab