diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index 6d7e42ff85dd76d76b72ed38f1e83ae910ae5343..a8d6e73308f4be055810b6664ab872ce6379c9ea 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -47,6 +47,12 @@ class AMQPBackend(object): :param exchange: Exchange object """ + def exchange_delete(self, exchange): + """ + Delete an exchange + :param exchange: Exchange object + """ + def queue_bind(self, queue, exchange, routing_key=''): """ Bind a queue to an exchange @@ -55,6 +61,14 @@ class AMQPBackend(object): :param routing_key: routing key to use """ + def queue_delete(self, queue): + """ + Delete a queue. + + :param queue: Queue + """ + + def queue_declare(self, queue): """ Declare a queue. diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 4a520fd5877818be0c07da9ef93ce8f2e699b9dc..82dad65c4fded209b4ce43de7037d5695e822263 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -76,6 +76,14 @@ class PyAMQPBackend(AMQPBackend): def basic_ack(self, delivery_tag): self.channel.basic_ack(delivery_tag, multiple=False) + @translate_exceptions + def exchange_delete(self, exchange): + self.channel.exchange_delete(exchange.name) + + @translate_exceptions + def queue_delete(self, queue): + self.channel.queue_delete(queue.name) + @translate_exceptions def basic_nack(self, delivery_tag): self.channel.basic_nack(delivery_tag, multiple=False) diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index ef3338e09c2c9a894556e744e934050db4ed0e0d..8ac7270d7df099312fede8b0a9f532e0d72f18be 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -1,7 +1,8 @@ import itertools import Queue from coolamqp.backends import PyAMQPBackend -from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue +from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \ + DeleteExchange from .messages import Exchange @@ -95,6 +96,29 @@ class Cluster(object): on_failed=on_failed)) + def delete_exchange(self, exchange, on_completed=None, on_failed=None): + """ + Delete an exchange + :param exchange: Exchange to delete + :param on_completed: callable/0 to call when this succeeds + :param on_failed: callable/1 to call when this fails with AMQPError instance + """ + self.thread.order_queue.append(DeleteExchange(exchange, + on_completed=on_completed, + on_failed=on_failed)) + + + def delete_queue(self, queue, on_completed=None, on_failed=None): + """ + Delete a queue + :param queue: Queue to delete + :param on_completed: callable/0 to call when this succeeds + :param on_failed: callable/1 to call when this fails with AMQPError instance + """ + self.thread.order_queue.append(DeleteQueue(queue, + on_completed=on_completed, + on_failed=on_failed)) + def cancel(self, queue, on_completed=None, on_failed=None): """ Cancel consuming from a queue diff --git a/coolamqp/handler.py b/coolamqp/handler.py index ae141cb77ac8f85177be26855d182ad63b571370..c74581b337b8b642a2b8c25d5b3aaef55dfd7245 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -7,7 +7,8 @@ from .backends import ConnectionFailedError, RemoteAMQPError from .messages import Exchange from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ - AcknowledgeMessage, NAcknowledgeMessage + AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \ + DeleteExchange logger = logging.getLogger(__name__) @@ -95,6 +96,10 @@ class ClusterHandlerThread(threading.Thread): elif isinstance(order, DeclareExchange): self.backend.exchange_declare(order.exchange) self.declared_exchanges.append(order.exchange) + elif isinstance(order, DeleteExchange): + self.backend.exchange_delete(order.exchange) + elif isinstance(order, DeleteQueue): + self.backend.queue_delete(order.queue) elif isinstance(order, ConsumeQueue): self.backend.queue_declare(order.queue) diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 8d27e35cb7f279dbfc7394e10c67d7c4bcbb21ce..1d0c5a5e12e9ff1d1b0ec44a7a2a669b9e5f006a 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -36,6 +36,13 @@ class DeclareExchange(Order): self.exchange = exchange +class DeleteExchange(Order): + """Delete an exchange""" + def __init__(self, exchange, on_completed=None, on_failed=None): + Order.__init__(self, on_completed=on_completed, on_failed=on_failed) + self.exchange = exchange + + class ConsumeQueue(Order): """Declare and consume from a queue""" def __init__(self, queue, on_completed=None, on_failed=None): @@ -43,6 +50,13 @@ class ConsumeQueue(Order): self.queue = queue +class DeleteQueue(Order): + """Delete a queue""" + def __init__(self, queue, on_completed=None, on_failed=None): + Order.__init__(self, on_completed=on_completed, on_failed=on_failed) + self.queue = queue + + class CancelQueue(Order): """Cancel consuming from a queue""" def __init__(self, queue, on_completed=None, on_failed=None):