diff --git a/CHANGELOG.md b/CHANGELOG.md index a89f6c4c8adb83c2edba46ee3e57d546467bb40a..ffabfc707ea76daeea9fb9e569f00894bbebf298 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ -# v1.1.2 +# v1.2 -* _TBA_ +* queue's name will be filled in upon being declared +* added queue.bind # v1.1.1 diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 4325fbbd060e6f6fc520c72e78cb96816ed6a85b..64477cf29211637d0eddbb4f4004ba5a337aab48 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,2 +1 @@ -# coding=UTF-8 -__version__ = '1.1.2_a1' +__version__ = '1.2' diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index 7eecc8e2f2748db76aece9ef28e59d41a13d0215..0e56841488ec7a250f0c69578acbbbbb45cf92a4 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -13,8 +13,8 @@ from coolamqp.attaches.utils import Synchronized from coolamqp.exceptions import AMQPError, ConnectionDead from coolamqp.framing.definitions import ChannelOpenOk, ExchangeDeclare, \ ExchangeDeclareOk, QueueDeclare, \ - QueueDeclareOk, ChannelClose, QueueDelete, QueueDeleteOk -from coolamqp.objects import Exchange, Queue, Callable + QueueDeclareOk, ChannelClose, QueueDelete, QueueDeleteOk, QueueBind, QueueBindOk +from coolamqp.objects import Exchange, Queue, Callable, QueueBind as CommandQueueBind logger = logging.getLogger(__name__) @@ -115,6 +115,12 @@ class Operation(object): obj.auto_delete, False, []), (QueueDeclareOk, ChannelClose), self._callback) + elif isinstance(obj, CommandQueueBind): + self.declarer.method_and_watch( + QueueBind(obj.queue, obj.exchange, obj.routing_key, False, []), + (QueueBindOk, ChannelClose), + self._callback + ) def _callback(self, payload): assert not self.done @@ -132,6 +138,9 @@ class Operation(object): self.obj) # todo access not threadsafe self.declarer.on_discard(self.obj) else: + if isinstance(payload, QueueDeclareOk): + self.obj.name = payload.queue + self.span_finished() if self.fut is not None: self.fut.set_result(None) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 551c21001f77b8890b5624ab3f616071b5f013a6..598e5f1687c73de22dc87711826695cb9f5e74d8 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -19,7 +19,7 @@ from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ NothingMuch, Event from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.exceptions import ConnectionDead -from coolamqp.objects import Exchange, Message, Queue, FrameLogger +from coolamqp.objects import Exchange, Message, Queue, FrameLogger, QueueBind from coolamqp.uplink import ListenerThread logger = logging.getLogger(__name__) @@ -93,6 +93,19 @@ class Cluster(object): else: self.on_fail = None + def bind(self, queue, exchange, routing_key, persistent=False, span=None): + """ + Bind a queue to an exchange + """ + if span is not None: + child_span = self._make_span('bind', span) + else: + child_span = None + fut = self.decl.declare(QueueBind(queue, exchange, routing_key), + persistent=persistent, + span=child_span) + return close_future(fut, child_span) + def declare(self, obj, # type: tp.Union[Queue, Exchange] persistent=False, # type: bool span=None # type: tp.Optional[opentracing.Span] diff --git a/coolamqp/objects.py b/coolamqp/objects.py index aa9fff321eb321ee49b4be85e848d96470f86132..5ac2516040bb0d5121ebb6f845cb86285c7ebdb7 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -274,6 +274,21 @@ class Queue(object): return hash(self.name) +class QueueBind(object): + """An order to be declared which binds a given queue to an exchange""" + def __init__(self, queue, # type: tp.Union[Queue, bytes, unicode] + exchange, # type: tp.Union[Exchange, bytes, unicode] + routing_key # type: tp.Union[bytes, unicode] + ): + if isinstance(queue, Queue): + queue = queue.name + self.queue = tobytes(queue) # type: bytes + if isinstance(exchange, Exchange): + exchange = exchange.name + self.exchange = tobytes(exchange) # type: bytes + self.routing_key = tobytes(routing_key) # type: bytes + + class NodeDefinition(object): """ Definition of a reachable AMQP node. diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 4560d98c240f00c6c2b564eb6c76241586ed9d25..b7ac229758549ec0ccee27e9ca9eda85347683bd 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -28,6 +28,20 @@ class TestA(unittest.TestCase): def tearDown(self): self.c.shutdown() + def test_queue_bind(self): + queue = Queue('my-queue') + exchange = Exchange('my-exchange', type='topic') + self.c.declare(queue).result() + self.c.declare(exchange).result() + self.c.bind(queue, exchange, 'test').result() + q = six.moves.queue.Queue() + cons, fut = self.c.consume(queue, on_message=lambda msg: q.put(msg), no_ack=True) + fut.result() + self.c.publish(Message(b'test'), exchange=exchange, routing_key='test', confirm=True).result() + msg_v = q.get(block=True, timeout=5) + self.assertEqual(msg_v.body, b'test') + cons.cancel() + def test_delete_queue(self): # that's how it's written, due to http://www.rabbitmq.com/specification.html#method-status-queue.delete self.c.delete_queue(Queue(u'i-do-not-exist')).result() @@ -79,6 +93,8 @@ class TestA(unittest.TestCase): f.result() + self.assertIsNotNone(q.name) + def test_send_recv_zerolen(self): P = {'q': False}