From 604a808171fe1329297fc74bd37975aeafe614fc Mon Sep 17 00:00:00 2001 From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl> Date: Sun, 29 Jan 2017 18:59:01 +0100 Subject: [PATCH] queue delete added but it's hardly supported by rabbitmq :D --- README.md | 6 ++++ coolamqp/attaches/declarer.py | 46 +++++++++++++++++++++++++++- coolamqp/clustering/cluster.py | 10 +++++- coolamqp/objects.py | 3 +- tests/test_clustering/test_a.py | 4 +++ tests/test_clustering/test_double.py | 15 +++++++++ 6 files changed, 81 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 3feb447..d63036d 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,12 @@ if you need every CPU cycle you can get. **v0.9x** series will have a stable API. +## Current limitations + +* channel flow mechanism is not supported (#11) +* _confirm=True_ is not available if you're not RabbitMQ (#8) +* no Windows support (#9) + ## What's new * v0.89: diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index c3e59cb..82a5080 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -7,7 +7,7 @@ import six import collections import logging from coolamqp.framing.definitions import ChannelOpenOk, ExchangeDeclare, ExchangeDeclareOk, QueueDeclare, \ - QueueDeclareOk, ChannelClose + QueueDeclareOk, ChannelClose, QueueDelete, QueueDeleteOk from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, Synchronized from concurrent.futures import Future @@ -74,6 +74,29 @@ class Operation(object): self.declarer.on_operation_done() +class DeleteQueue(Operation): + def __init__(self, declarer, queue, fut): + super(DeleteQueue, self).__init__(declarer, queue, fut=fut) + + def perform(self): + queue = self.obj + + print('bang') + self.declarer.method_and_watch(QueueDelete(queue.name, False, False, False), + (QueueDeleteOk, ChannelClose), + self._callback) + + def _callback(self, payload): + print('got', payload) + assert not self.done + self.done = True + if isinstance(payload, ChannelClose): + self.fut.set_exception(AMQPError(payload)) + else: # Queue.DeleteOk + self.fut.set_result(None) + self.declarer.on_operation_done() + + class Declarer(Channeler, Synchronized): """ Doing other things, such as declaring, deleting and other stuff. @@ -141,11 +164,32 @@ class Declarer(Channeler, Synchronized): self.in_process = None self._do_operations() + def delete_queue(self, queue): + """ + Delete a queue. + + Future is returned, so that user knows when it happens. This may fail. + Returned Future is already running, and so cannot be cancelled. + + If the queue is in declared consumer list, it will not be removed. + + :param queue: Queue instance + :return: a Future + """ + fut = Future() + fut.set_running_or_notify_cancel() + + self.left_to_declare.append(DeleteQueue(self, queue, fut)) + self._do_operations() + + return fut + def declare(self, obj, persistent=False): """ Schedule to have an object declared. Future is returned, so that user knows when it happens. + Returned Future is already running, and so cannot be cancelled. Exchange declarations never fail. Of course they do, but you will be told that it succeeded. This is by design, diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 1af2ae9..ec40b50 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -96,6 +96,15 @@ class Cluster(object): self.attache_group.add(con) return con, fut + def delete_queue(self, queue): + """ + Delete a queue. + + :param queue: Queue instance that represents what to delete + :return: a Future (will succeed with None or fail with AMQPError) + """ + return self.decl.delete_queue(queue) + def publish(self, message, exchange=None, routing_key=u'', tx=None, confirm=None): """ Publish a message. @@ -134,7 +143,6 @@ class Cluster(object): except Publisher.UnusablePublisher: raise NotImplementedError(u'Sorry, this functionality is not yet implemented!') - def start(self, wait=True): """ Connect to broker. Initialize Cluster. diff --git a/coolamqp/objects.py b/coolamqp/objects.py index d4e8758..bb6e165 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -161,12 +161,13 @@ class Queue(object): upon declaration. If a disconnect happens, and connection to other node is reestablished, this name will CHANGE AGAIN, and be reflected in this object. This change will be done before CoolAMQP signals reconnection. + :type name: byte type or text type :param durable: Is the queue durable? :param exchange: Exchange for this queue to bind to. None for no binding. :param exclusive: Is this queue exclusive? :param auto_delete: Is this queue auto_delete ? """ - self.name = name.encode('utf8') #: public, this is of type bytes ALWAYS + self.name = name.encode('utf8') if isinstance(name, six.text_type) else name #: public, this is of type bytes ALWAYS # if name is '', this will be filled in with broker-generated name upon declaration self.durable = durable self.exchange = exchange diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 6059529..669eeac 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -24,6 +24,10 @@ class TestA(unittest.TestCase): def tearDown(self): self.c.shutdown() + def test_delete_queue(self): + # that's how it's written, due to http://www.rabbitmq.com/specification.html#method-status-queue.delete + self.c.delete_queue(Queue(u'i-do-not-exist')).result() + def test_consume(self): con, fut = self.c.consume(Queue(u'hello', exclusive=True)) fut.result() diff --git a/tests/test_clustering/test_double.py b/tests/test_clustering/test_double.py index 561adbf..cb627c9 100644 --- a/tests/test_clustering/test_double.py +++ b/tests/test_clustering/test_double.py @@ -11,6 +11,10 @@ from coolamqp.clustering import Cluster NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20) from coolamqp.exceptions import AMQPError, RESOURCE_LOCKED + +logging.basicConfig(level=logging.DEBUG) + + class TestDouble(unittest.TestCase): def setUp(self): @@ -24,6 +28,17 @@ class TestDouble(unittest.TestCase): self.c1.shutdown() self.c2.shutdown() + # def test_ccn(self): + # q1 = Queue(b'yo', auto_delete=True) + # + # con1, fut1 = self.c1.consume(q1) + # fut1.result() + # + # self.c2.delete_queue(q1) #.result() + # + # time.sleep(5) + # self.assertTrue(con1.cancelled) + # def test_resource_locked(self): q = Queue(u'yo', exclusive=True, auto_delete=True) -- GitLab