Skip to content
Snippets Groups Projects
Commit c3201642 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

added exchange and queue delete

parent a5a138ca
No related branches found
No related tags found
No related merge requests found
...@@ -47,6 +47,12 @@ class AMQPBackend(object): ...@@ -47,6 +47,12 @@ class AMQPBackend(object):
:param exchange: Exchange object :param exchange: Exchange object
""" """
def exchange_delete(self, exchange):
"""
Delete an exchange
:param exchange: Exchange object
"""
def queue_bind(self, queue, exchange, routing_key=''): def queue_bind(self, queue, exchange, routing_key=''):
""" """
Bind a queue to an exchange Bind a queue to an exchange
...@@ -55,6 +61,14 @@ class AMQPBackend(object): ...@@ -55,6 +61,14 @@ class AMQPBackend(object):
:param routing_key: routing key to use :param routing_key: routing key to use
""" """
def queue_delete(self, queue):
"""
Delete a queue.
:param queue: Queue
"""
def queue_declare(self, queue): def queue_declare(self, queue):
""" """
Declare a queue. Declare a queue.
......
...@@ -76,6 +76,14 @@ class PyAMQPBackend(AMQPBackend): ...@@ -76,6 +76,14 @@ class PyAMQPBackend(AMQPBackend):
def basic_ack(self, delivery_tag): def basic_ack(self, delivery_tag):
self.channel.basic_ack(delivery_tag, multiple=False) self.channel.basic_ack(delivery_tag, multiple=False)
@translate_exceptions
def exchange_delete(self, exchange):
self.channel.exchange_delete(exchange.name)
@translate_exceptions
def queue_delete(self, queue):
self.channel.queue_delete(queue.name)
@translate_exceptions @translate_exceptions
def basic_nack(self, delivery_tag): def basic_nack(self, delivery_tag):
self.channel.basic_nack(delivery_tag, multiple=False) self.channel.basic_nack(delivery_tag, multiple=False)
......
import itertools import itertools
import Queue import Queue
from coolamqp.backends import PyAMQPBackend from coolamqp.backends import PyAMQPBackend
from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \
DeleteExchange
from .messages import Exchange from .messages import Exchange
...@@ -95,6 +96,29 @@ class Cluster(object): ...@@ -95,6 +96,29 @@ class Cluster(object):
on_failed=on_failed)) on_failed=on_failed))
def delete_exchange(self, exchange, on_completed=None, on_failed=None):
"""
Delete an exchange
:param exchange: Exchange to delete
:param on_completed: callable/0 to call when this succeeds
:param on_failed: callable/1 to call when this fails with AMQPError instance
"""
self.thread.order_queue.append(DeleteExchange(exchange,
on_completed=on_completed,
on_failed=on_failed))
def delete_queue(self, queue, on_completed=None, on_failed=None):
"""
Delete a queue
:param queue: Queue to delete
:param on_completed: callable/0 to call when this succeeds
:param on_failed: callable/1 to call when this fails with AMQPError instance
"""
self.thread.order_queue.append(DeleteQueue(queue,
on_completed=on_completed,
on_failed=on_failed))
def cancel(self, queue, on_completed=None, on_failed=None): def cancel(self, queue, on_completed=None, on_failed=None):
""" """
Cancel consuming from a queue Cancel consuming from a queue
......
...@@ -7,7 +7,8 @@ from .backends import ConnectionFailedError, RemoteAMQPError ...@@ -7,7 +7,8 @@ from .backends import ConnectionFailedError, RemoteAMQPError
from .messages import Exchange from .messages import Exchange
from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived
from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \
AcknowledgeMessage, NAcknowledgeMessage AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \
DeleteExchange
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -95,6 +96,10 @@ class ClusterHandlerThread(threading.Thread): ...@@ -95,6 +96,10 @@ class ClusterHandlerThread(threading.Thread):
elif isinstance(order, DeclareExchange): elif isinstance(order, DeclareExchange):
self.backend.exchange_declare(order.exchange) self.backend.exchange_declare(order.exchange)
self.declared_exchanges.append(order.exchange) self.declared_exchanges.append(order.exchange)
elif isinstance(order, DeleteExchange):
self.backend.exchange_delete(order.exchange)
elif isinstance(order, DeleteQueue):
self.backend.queue_delete(order.queue)
elif isinstance(order, ConsumeQueue): elif isinstance(order, ConsumeQueue):
self.backend.queue_declare(order.queue) self.backend.queue_declare(order.queue)
......
...@@ -36,6 +36,13 @@ class DeclareExchange(Order): ...@@ -36,6 +36,13 @@ class DeclareExchange(Order):
self.exchange = exchange self.exchange = exchange
class DeleteExchange(Order):
"""Delete an exchange"""
def __init__(self, exchange, on_completed=None, on_failed=None):
Order.__init__(self, on_completed=on_completed, on_failed=on_failed)
self.exchange = exchange
class ConsumeQueue(Order): class ConsumeQueue(Order):
"""Declare and consume from a queue""" """Declare and consume from a queue"""
def __init__(self, queue, on_completed=None, on_failed=None): def __init__(self, queue, on_completed=None, on_failed=None):
...@@ -43,6 +50,13 @@ class ConsumeQueue(Order): ...@@ -43,6 +50,13 @@ class ConsumeQueue(Order):
self.queue = queue self.queue = queue
class DeleteQueue(Order):
"""Delete a queue"""
def __init__(self, queue, on_completed=None, on_failed=None):
Order.__init__(self, on_completed=on_completed, on_failed=on_failed)
self.queue = queue
class CancelQueue(Order): class CancelQueue(Order):
"""Cancel consuming from a queue""" """Cancel consuming from a queue"""
def __init__(self, queue, on_completed=None, on_failed=None): def __init__(self, queue, on_completed=None, on_failed=None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment