From b108a8c1333cd93d82b60560463332fb8fad2515 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Tue, 23 Jun 2020 22:08:06 +0200 Subject: [PATCH] fill in queue's name + queue.bind --- CHANGELOG.md | 5 +++-- coolamqp/__init__.py | 3 +-- coolamqp/attaches/declarer.py | 13 +++++++++++-- coolamqp/clustering/cluster.py | 15 ++++++++++++++- coolamqp/objects.py | 15 +++++++++++++++ tests/test_clustering/test_a.py | 16 ++++++++++++++++ 6 files changed, 60 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a89f6c4..ffabfc7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ -# v1.1.2 +# v1.2 -* _TBA_ +* queue's name will be filled in upon being declared +* added queue.bind # v1.1.1 diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 4325fbb..64477cf 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,2 +1 @@ -# coding=UTF-8 -__version__ = '1.1.2_a1' +__version__ = '1.2' diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index 7eecc8e..0e56841 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -13,8 +13,8 @@ from coolamqp.attaches.utils import Synchronized from coolamqp.exceptions import AMQPError, ConnectionDead from coolamqp.framing.definitions import ChannelOpenOk, ExchangeDeclare, \ ExchangeDeclareOk, QueueDeclare, \ - QueueDeclareOk, ChannelClose, QueueDelete, QueueDeleteOk -from coolamqp.objects import Exchange, Queue, Callable + QueueDeclareOk, ChannelClose, QueueDelete, QueueDeleteOk, QueueBind, QueueBindOk +from coolamqp.objects import Exchange, Queue, Callable, QueueBind as CommandQueueBind logger = logging.getLogger(__name__) @@ -115,6 +115,12 @@ class Operation(object): obj.auto_delete, False, []), (QueueDeclareOk, ChannelClose), self._callback) + elif isinstance(obj, CommandQueueBind): + self.declarer.method_and_watch( + QueueBind(obj.queue, obj.exchange, obj.routing_key, False, []), + (QueueBindOk, ChannelClose), + self._callback + ) def _callback(self, payload): assert not self.done @@ -132,6 +138,9 @@ class Operation(object): self.obj) # todo access not threadsafe self.declarer.on_discard(self.obj) else: + if isinstance(payload, QueueDeclareOk): + self.obj.name = payload.queue + self.span_finished() if self.fut is not None: self.fut.set_result(None) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 551c210..598e5f1 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -19,7 +19,7 @@ from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ NothingMuch, Event from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.exceptions import ConnectionDead -from coolamqp.objects import Exchange, Message, Queue, FrameLogger +from coolamqp.objects import Exchange, Message, Queue, FrameLogger, QueueBind from coolamqp.uplink import ListenerThread logger = logging.getLogger(__name__) @@ -93,6 +93,19 @@ class Cluster(object): else: self.on_fail = None + def bind(self, queue, exchange, routing_key, persistent=False, span=None): + """ + Bind a queue to an exchange + """ + if span is not None: + child_span = self._make_span('bind', span) + else: + child_span = None + fut = self.decl.declare(QueueBind(queue, exchange, routing_key), + persistent=persistent, + span=child_span) + return close_future(fut, child_span) + def declare(self, obj, # type: tp.Union[Queue, Exchange] persistent=False, # type: bool span=None # type: tp.Optional[opentracing.Span] diff --git a/coolamqp/objects.py b/coolamqp/objects.py index aa9fff3..5ac2516 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -274,6 +274,21 @@ class Queue(object): return hash(self.name) +class QueueBind(object): + """An order to be declared which binds a given queue to an exchange""" + def __init__(self, queue, # type: tp.Union[Queue, bytes, unicode] + exchange, # type: tp.Union[Exchange, bytes, unicode] + routing_key # type: tp.Union[bytes, unicode] + ): + if isinstance(queue, Queue): + queue = queue.name + self.queue = tobytes(queue) # type: bytes + if isinstance(exchange, Exchange): + exchange = exchange.name + self.exchange = tobytes(exchange) # type: bytes + self.routing_key = tobytes(routing_key) # type: bytes + + class NodeDefinition(object): """ Definition of a reachable AMQP node. diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 4560d98..b7ac229 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -28,6 +28,20 @@ class TestA(unittest.TestCase): def tearDown(self): self.c.shutdown() + def test_queue_bind(self): + queue = Queue('my-queue') + exchange = Exchange('my-exchange', type='topic') + self.c.declare(queue).result() + self.c.declare(exchange).result() + self.c.bind(queue, exchange, 'test').result() + q = six.moves.queue.Queue() + cons, fut = self.c.consume(queue, on_message=lambda msg: q.put(msg), no_ack=True) + fut.result() + self.c.publish(Message(b'test'), exchange=exchange, routing_key='test', confirm=True).result() + msg_v = q.get(block=True, timeout=5) + self.assertEqual(msg_v.body, b'test') + cons.cancel() + def test_delete_queue(self): # that's how it's written, due to http://www.rabbitmq.com/specification.html#method-status-queue.delete self.c.delete_queue(Queue(u'i-do-not-exist')).result() @@ -79,6 +93,8 @@ class TestA(unittest.TestCase): f.result() + self.assertIsNotNone(q.name) + def test_send_recv_zerolen(self): P = {'q': False} -- GitLab