From c320164228dbe415d06722443041c192630e96d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sat, 28 May 2016 02:08:16 +0200 Subject: [PATCH] added exchange and queue delete --- coolamqp/backends/base.py | 14 ++++++++++++++ coolamqp/backends/pyamqp.py | 8 ++++++++ coolamqp/cluster.py | 26 +++++++++++++++++++++++++- coolamqp/handler.py | 7 ++++++- coolamqp/orders.py | 14 ++++++++++++++ 5 files changed, 67 insertions(+), 2 deletions(-) diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index 6d7e42f..a8d6e73 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -47,6 +47,12 @@ class AMQPBackend(object): :param exchange: Exchange object """ + def exchange_delete(self, exchange): + """ + Delete an exchange + :param exchange: Exchange object + """ + def queue_bind(self, queue, exchange, routing_key=''): """ Bind a queue to an exchange @@ -55,6 +61,14 @@ class AMQPBackend(object): :param routing_key: routing key to use """ + def queue_delete(self, queue): + """ + Delete a queue. + + :param queue: Queue + """ + + def queue_declare(self, queue): """ Declare a queue. diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 4a520fd..82dad65 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -76,6 +76,14 @@ class PyAMQPBackend(AMQPBackend): def basic_ack(self, delivery_tag): self.channel.basic_ack(delivery_tag, multiple=False) + @translate_exceptions + def exchange_delete(self, exchange): + self.channel.exchange_delete(exchange.name) + + @translate_exceptions + def queue_delete(self, queue): + self.channel.queue_delete(queue.name) + @translate_exceptions def basic_nack(self, delivery_tag): self.channel.basic_nack(delivery_tag, multiple=False) diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index ef3338e..8ac7270 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -1,7 +1,8 @@ import itertools import Queue from coolamqp.backends import PyAMQPBackend -from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue +from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \ + DeleteExchange from .messages import Exchange @@ -95,6 +96,29 @@ class Cluster(object): on_failed=on_failed)) + def delete_exchange(self, exchange, on_completed=None, on_failed=None): + """ + Delete an exchange + :param exchange: Exchange to delete + :param on_completed: callable/0 to call when this succeeds + :param on_failed: callable/1 to call when this fails with AMQPError instance + """ + self.thread.order_queue.append(DeleteExchange(exchange, + on_completed=on_completed, + on_failed=on_failed)) + + + def delete_queue(self, queue, on_completed=None, on_failed=None): + """ + Delete a queue + :param queue: Queue to delete + :param on_completed: callable/0 to call when this succeeds + :param on_failed: callable/1 to call when this fails with AMQPError instance + """ + self.thread.order_queue.append(DeleteQueue(queue, + on_completed=on_completed, + on_failed=on_failed)) + def cancel(self, queue, on_completed=None, on_failed=None): """ Cancel consuming from a queue diff --git a/coolamqp/handler.py b/coolamqp/handler.py index ae141cb..c74581b 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -7,7 +7,8 @@ from .backends import ConnectionFailedError, RemoteAMQPError from .messages import Exchange from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ - AcknowledgeMessage, NAcknowledgeMessage + AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \ + DeleteExchange logger = logging.getLogger(__name__) @@ -95,6 +96,10 @@ class ClusterHandlerThread(threading.Thread): elif isinstance(order, DeclareExchange): self.backend.exchange_declare(order.exchange) self.declared_exchanges.append(order.exchange) + elif isinstance(order, DeleteExchange): + self.backend.exchange_delete(order.exchange) + elif isinstance(order, DeleteQueue): + self.backend.queue_delete(order.queue) elif isinstance(order, ConsumeQueue): self.backend.queue_declare(order.queue) diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 8d27e35..1d0c5a5 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -36,6 +36,13 @@ class DeclareExchange(Order): self.exchange = exchange +class DeleteExchange(Order): + """Delete an exchange""" + def __init__(self, exchange, on_completed=None, on_failed=None): + Order.__init__(self, on_completed=on_completed, on_failed=on_failed) + self.exchange = exchange + + class ConsumeQueue(Order): """Declare and consume from a queue""" def __init__(self, queue, on_completed=None, on_failed=None): @@ -43,6 +50,13 @@ class ConsumeQueue(Order): self.queue = queue +class DeleteQueue(Order): + """Delete a queue""" + def __init__(self, queue, on_completed=None, on_failed=None): + Order.__init__(self, on_completed=on_completed, on_failed=on_failed) + self.queue = queue + + class CancelQueue(Order): """Cancel consuming from a queue""" def __init__(self, queue, on_completed=None, on_failed=None): -- GitLab