diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index bd64179682617b8bb2945713f49ed02441bc14c9..788980303bb495737bef126b3a8c78e09660fadf 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -79,12 +79,12 @@ class Cluster(object): :param routing_key: routing key to use :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status """ - self.thread.order_queue.append(SendMessage(message, - exchange or Exchange.direct, - routing_key, - on_completed=on_completed, - on_failed=on_failed)) + a = SendMessage(message, exchange or Exchange.direct, routing_key, + on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a def declare_exchange(self, exchange, on_completed=None, on_failed=None): """ @@ -92,6 +92,7 @@ class Cluster(object): :param exchange: Exchange to declare :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status """ self.thread.order_queue.append(DeclareExchange(exchange, on_completed=on_completed, @@ -104,10 +105,11 @@ class Cluster(object): :param exchange: Exchange to delete :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status """ - self.thread.order_queue.append(DeleteExchange(exchange, - on_completed=on_completed, - on_failed=on_failed)) + a = DeleteExchange(exchange, on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a def delete_queue(self, queue, on_completed=None, on_failed=None): @@ -116,10 +118,11 @@ class Cluster(object): :param queue: Queue to delete :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status """ - self.thread.order_queue.append(DeleteQueue(queue, - on_completed=on_completed, - on_failed=on_failed)) + a = DeleteQueue(queue, on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a def cancel(self, queue, on_completed=None, on_failed=None): """ @@ -128,10 +131,11 @@ class Cluster(object): :param queue: Queue to consume from :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status """ - self.thread.order_queue.append(CancelQueue(queue, - on_completed=on_completed, - on_failed=on_failed)) + a = CancelQueue(queue, on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a def consume(self, queue, on_completed=None, on_failed=None): """ @@ -143,10 +147,11 @@ class Cluster(object): :param queue: Queue to consume from :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status """ - self.thread.order_queue.append(ConsumeQueue(queue, - on_completed=on_completed, - on_failed=on_failed)) + a = ConsumeQueue(queue, on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a def drain(self, wait=0): """ diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 1d0c5a5e12e9ff1d1b0ec44a7a2a669b9e5f006a..efdbcc61aab93f87c3218b5260f6e7cd6d8c896e 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -1,14 +1,24 @@ """ Orders that can be dispatched to ClusterHandlerThread """ +from threading import Lock + class Order(object): """Base class for orders dispatched to ClusterHandlerThread""" def __init__(self, on_completed=None, on_failed=None): self.on_completed = on_completed self.on_failed = on_failed + self.result = None # None on non-completed + # True on completed OK + # exception instance on failed + self.lock = Lock() + self.lock.acquire() def completed(self): + self.result = True + self.lock.release() + if self.on_completed is not None: self.on_completed() @@ -16,9 +26,17 @@ class Order(object): """ :param e: AMQPError instance """ + self.result = e + self.lock.release() + if self.on_failed is not None: self.on_failed(e) + def result(self): + """Wait until this is completed and return a response""" + self.lock.acquire() + return self.result + class SendMessage(Order): """Send a message"""