diff --git a/README.md b/README.md index 3feb447f5c1e1da225076bdedfb385e926e60ec6..d63036d82e88258d70e0d499ccefb356f908d891 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,12 @@ if you need every CPU cycle you can get. **v0.9x** series will have a stable API. +## Current limitations + +* channel flow mechanism is not supported (#11) +* _confirm=True_ is not available if you're not RabbitMQ (#8) +* no Windows support (#9) + ## What's new * v0.89: diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index c3e59cb555720e71f3650239c2db17b82b39e2df..82a5080ad11fa1561cc6948b3a11ee035e8cfafe 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -7,7 +7,7 @@ import six import collections import logging from coolamqp.framing.definitions import ChannelOpenOk, ExchangeDeclare, ExchangeDeclareOk, QueueDeclare, \ - QueueDeclareOk, ChannelClose + QueueDeclareOk, ChannelClose, QueueDelete, QueueDeleteOk from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, Synchronized from concurrent.futures import Future @@ -74,6 +74,29 @@ class Operation(object): self.declarer.on_operation_done() +class DeleteQueue(Operation): + def __init__(self, declarer, queue, fut): + super(DeleteQueue, self).__init__(declarer, queue, fut=fut) + + def perform(self): + queue = self.obj + + print('bang') + self.declarer.method_and_watch(QueueDelete(queue.name, False, False, False), + (QueueDeleteOk, ChannelClose), + self._callback) + + def _callback(self, payload): + print('got', payload) + assert not self.done + self.done = True + if isinstance(payload, ChannelClose): + self.fut.set_exception(AMQPError(payload)) + else: # Queue.DeleteOk + self.fut.set_result(None) + self.declarer.on_operation_done() + + class Declarer(Channeler, Synchronized): """ Doing other things, such as declaring, deleting and other stuff. @@ -141,11 +164,32 @@ class Declarer(Channeler, Synchronized): self.in_process = None self._do_operations() + def delete_queue(self, queue): + """ + Delete a queue. + + Future is returned, so that user knows when it happens. This may fail. + Returned Future is already running, and so cannot be cancelled. + + If the queue is in declared consumer list, it will not be removed. + + :param queue: Queue instance + :return: a Future + """ + fut = Future() + fut.set_running_or_notify_cancel() + + self.left_to_declare.append(DeleteQueue(self, queue, fut)) + self._do_operations() + + return fut + def declare(self, obj, persistent=False): """ Schedule to have an object declared. Future is returned, so that user knows when it happens. + Returned Future is already running, and so cannot be cancelled. Exchange declarations never fail. Of course they do, but you will be told that it succeeded. This is by design, diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 1af2ae9585978c544471dc30b2f242eee50237dc..ec40b50e835699c85c56ea4ea7a90a8d8711f309 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -96,6 +96,15 @@ class Cluster(object): self.attache_group.add(con) return con, fut + def delete_queue(self, queue): + """ + Delete a queue. + + :param queue: Queue instance that represents what to delete + :return: a Future (will succeed with None or fail with AMQPError) + """ + return self.decl.delete_queue(queue) + def publish(self, message, exchange=None, routing_key=u'', tx=None, confirm=None): """ Publish a message. @@ -134,7 +143,6 @@ class Cluster(object): except Publisher.UnusablePublisher: raise NotImplementedError(u'Sorry, this functionality is not yet implemented!') - def start(self, wait=True): """ Connect to broker. Initialize Cluster. diff --git a/coolamqp/objects.py b/coolamqp/objects.py index d4e8758e48520d3abfd5ababfbb982a03c99a44d..bb6e1658a49f02e3411f2eee51560e839fc62229 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -161,12 +161,13 @@ class Queue(object): upon declaration. If a disconnect happens, and connection to other node is reestablished, this name will CHANGE AGAIN, and be reflected in this object. This change will be done before CoolAMQP signals reconnection. + :type name: byte type or text type :param durable: Is the queue durable? :param exchange: Exchange for this queue to bind to. None for no binding. :param exclusive: Is this queue exclusive? :param auto_delete: Is this queue auto_delete ? """ - self.name = name.encode('utf8') #: public, this is of type bytes ALWAYS + self.name = name.encode('utf8') if isinstance(name, six.text_type) else name #: public, this is of type bytes ALWAYS # if name is '', this will be filled in with broker-generated name upon declaration self.durable = durable self.exchange = exchange diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 6059529c964c85e7a037de73b441668b4720d3ff..669eeacee4fdeed0a787d9741eb510942a1243b2 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -24,6 +24,10 @@ class TestA(unittest.TestCase): def tearDown(self): self.c.shutdown() + def test_delete_queue(self): + # that's how it's written, due to http://www.rabbitmq.com/specification.html#method-status-queue.delete + self.c.delete_queue(Queue(u'i-do-not-exist')).result() + def test_consume(self): con, fut = self.c.consume(Queue(u'hello', exclusive=True)) fut.result() diff --git a/tests/test_clustering/test_double.py b/tests/test_clustering/test_double.py index 561adbf1286da46bcf543fabbd9601d079f5a76c..cb627c9859f9260e1dddad73866785566e5e9e69 100644 --- a/tests/test_clustering/test_double.py +++ b/tests/test_clustering/test_double.py @@ -11,6 +11,10 @@ from coolamqp.clustering import Cluster NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20) from coolamqp.exceptions import AMQPError, RESOURCE_LOCKED + +logging.basicConfig(level=logging.DEBUG) + + class TestDouble(unittest.TestCase): def setUp(self): @@ -24,6 +28,17 @@ class TestDouble(unittest.TestCase): self.c1.shutdown() self.c2.shutdown() + # def test_ccn(self): + # q1 = Queue(b'yo', auto_delete=True) + # + # con1, fut1 = self.c1.consume(q1) + # fut1.result() + # + # self.c2.delete_queue(q1) #.result() + # + # time.sleep(5) + # self.assertTrue(con1.cancelled) + # def test_resource_locked(self): q = Queue(u'yo', exclusive=True, auto_delete=True)