From b77efc41a423fcd372289a089c98f2c5471b35cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Wed, 13 Nov 2024 08:45:01 +0100 Subject: [PATCH] fixed #10 --- CHANGELOG.md | 1 + coolamqp/attaches/consumer.py | 11 +++++------ coolamqp/framing/definitions.py | 1 + coolamqp/objects.py | 2 +- tests/test_clustering/test_topic_exchanges.py | 5 +++-- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cc96b5..0e7cd43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ v2.0.0 * changed some default arguments for Queues for them to better make sense * some argument combinations just raise ValueError * PendingDeprecationWarning changed into a DeprecationWarning + * added support for headers exchanges * changes to Cluster: * declare will refuse to declare an anonymous queue * renamed publish(tx) to publish(confirm) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index f5d6474..0d05c4c 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -395,25 +395,24 @@ class Consumer(Channeler): elif isinstance(payload, QueueDeclareOk): # did we need an anonymous name? - if not self.queue.name: - self.queue.name = self.queue.name.tobytes() + if self.queue.anonymous: + self.queue.name = payload.queue.tobytes() - queue_declared = False + queue_bound = False # We need any form of binding. if self.queue.exchange is not None: - queue_declared = True + queue_bound = True qb = QueueBind( self.queue.name, self.queue.exchange.name.encode('utf-8'), self.queue.routing_key, False, self.queue.arguments_bind) - logger.debug('Running %s' % (repr(qb))) self.method_and_watch( qb, QueueBindOk, self.on_setup ) - if not queue_declared: + if not queue_bound: # default exchange, pretend it was bind ok self.on_setup(QueueBindOk()) elif isinstance(payload, QueueBindOk): diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index f106e5b..5c76cfa 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -2947,6 +2947,7 @@ class BasicContentPropertyList(AMQPContentPropertyList): """ if 'headers' in kwargs: kwargs['headers'] = argumentify(kwargs['headers']) + logger.warning('Argumentified headers are %s' % (kwargs['headers'], )) zpf = bytearray([ (('content_type' in kwargs) << 7) | diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 3ab5a67..9137ae5 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -275,7 +275,7 @@ class Queue(object): self.exclusive = exclusive self.arguments = argumentify(arguments) self.arguments_bind = argumentify(arguments_bind) - self.anonymous = self.name is None # if this queue is anonymous, it must be regenerated upon reconnect + self.anonymous = self.name is None if self.auto_delete and self.durable: raise ValueError('Cannot create an auto_delete and durable queue') diff --git a/tests/test_clustering/test_topic_exchanges.py b/tests/test_clustering/test_topic_exchanges.py index b44efe7..0a2bfab 100644 --- a/tests/test_clustering/test_topic_exchanges.py +++ b/tests/test_clustering/test_topic_exchanges.py @@ -44,12 +44,13 @@ class TestTopic(unittest.TestCase): self.c.declare(queue2).result() test = {'a': 0} + def do_message(msg): msg.ack() test['a'] += 1 - cons1, fut1 = self.c.consume(queue1, no_ack=False) - cons2, fut2 = self.c.consume(queue2, no_ack=False) + cons1, fut1 = self.c.consume(queue1, on_message=do_message, no_ack=False) + cons2, fut2 = self.c.consume(queue2, on_message=do_message, no_ack=False) fut1.result() fut2.result() -- GitLab