From 0fee9106d83486f21edeee1ceb84c5de1ae8739c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Tue, 12 Nov 2024 17:45:38 +0100 Subject: [PATCH] fixed #6 --- coolamqp/attaches/consumer.py | 19 +++++++++---------- coolamqp/objects.py | 8 ++++++-- docs/conf.py | 6 ++++++ tests/test_clustering/test_exchanges.py | 19 +++++++++++++++++++ 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 7807708..4c1c852 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -407,16 +407,15 @@ class Consumer(Channeler): queue_declared = False # We need any form of binding. if self.queue.exchange is not None: - if self.queue.exchange.type != b'topic': - queue_declared = True - self.method_and_watch( - QueueBind( - self.queue.name, - self.queue.exchange.name.encode('utf8'), - b'', False, []), - QueueBindOk, - self.on_setup - ) + queue_declared = True + self.method_and_watch( + QueueBind( + self.queue.name, + self.queue.exchange.name.encode('utf8'), + self.queue.routing_key, False, []), + QueueBindOk, + self.on_setup + ) if not queue_declared: # default exchange, pretend it was bind ok diff --git a/coolamqp/objects.py b/coolamqp/objects.py index f1cf919..ed0260f 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -269,6 +269,8 @@ class Queue(object): :param exclusive: This queue will be deleted when the connection closes :param auto_delete: This queue will be deleted when the last consumer unsubscribes :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument + :param routing_key: routing key that will be used to bind to an exchange. Used only when this + queue is associated with an exchange. Default value of blank should suffice. :raises ValueError: tried to create a queue that was not durable or auto_delete :raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous :raises ValueError: tried to create a queue that was anonymous and not auto_delete or durable @@ -277,14 +279,15 @@ class Queue(object): :warning UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue """ __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', - 'anonymous', 'consumer_tag', 'arguments') + 'anonymous', 'consumer_tag', 'arguments', 'routing_key') def __init__(self, name=None, # type: tp.Union[str, bytes, None] durable=False, # type: bool exchange=None, # type: tp.Optional[Exchange] exclusive=True, # type: bool auto_delete=True, # type: bool - arguments=None # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]] + arguments=None, # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]], + routing_key=b'' #: type: tp.Union[str, bytes] ): if name is None: self.name = None @@ -293,6 +296,7 @@ class Queue(object): self.durable = durable self.exchange = exchange + self.routing_key = tobytes(routing_key) self.auto_delete = auto_delete self.exclusive = exclusive self.arguments = argumentify(arguments) diff --git a/docs/conf.py b/docs/conf.py index 29896f7..f8f08c5 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -35,6 +35,12 @@ source_parsers = { # ones. extensions = ['sphinx.ext.autodoc'] +autodoc_default_options = { + 'members': True, +} +autodoc_default_flags = [ + 'show-inheritance' +] # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py index b5abebc..2cf2c28 100644 --- a/tests/test_clustering/test_exchanges.py +++ b/tests/test_clustering/test_exchanges.py @@ -31,6 +31,25 @@ class TestExchanges(unittest.TestCase): fut.result() cons.cancel().result() + def test_topic_exchanges(self): + xchg_name = uuid.uuid4().hex + test = {'a': 0} + + def do_msg(msg): + msg.ack() + test['a'] += 1 + + xchg = Exchange(xchg_name, type=b'topic') + queue = Queue(exchange=xchg, routing_key=b'test') + cons, fut = self.c.consume(queue, no_ack=False, on_message=do_msg) + fut.result() + self.c.publish(Message(b'dupa'), uuid.uuid4().hex, routing_key=b'test') + self.c.publish(Message(b'dupa2'), uuid.uuid4().hex, routing_key=b'test2') + self.c.publish(Message(b'dupa'), uuid.uuid4().hex, routing_key=b'test.na.zwierzetach') + + cons.cancel().result() + self.assertEqual(test['a'], 2) + def test_deadlettering(self): xchg_name = uuid.uuid4().hex dead_queue_name = uuid.uuid4().hex -- GitLab