From a0d45aa31be1ff956b9abb7fb5e48700a6e8882c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Fri, 10 Jun 2016 23:34:37 +0200 Subject: [PATCH] orders can be returned as futures --- coolamqp/cluster.py | 39 ++++++++++++++++++++++----------------- coolamqp/orders.py | 18 ++++++++++++++++++ 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index bd64179..7889803 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -79,12 +79,12 @@ class Cluster(object): :param routing_key: routing key to use :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status """ - self.thread.order_queue.append(SendMessage(message, - exchange or Exchange.direct, - routing_key, - on_completed=on_completed, - on_failed=on_failed)) + a = SendMessage(message, exchange or Exchange.direct, routing_key, + on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a def declare_exchange(self, exchange, on_completed=None, on_failed=None): """ @@ -92,6 +92,7 @@ class Cluster(object): :param exchange: Exchange to declare :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status """ self.thread.order_queue.append(DeclareExchange(exchange, on_completed=on_completed, @@ -104,10 +105,11 @@ class Cluster(object): :param exchange: Exchange to delete :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status """ - self.thread.order_queue.append(DeleteExchange(exchange, - on_completed=on_completed, - on_failed=on_failed)) + a = DeleteExchange(exchange, on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a def delete_queue(self, queue, on_completed=None, on_failed=None): @@ -116,10 +118,11 @@ class Cluster(object): :param queue: Queue to delete :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status """ - self.thread.order_queue.append(DeleteQueue(queue, - on_completed=on_completed, - on_failed=on_failed)) + a = DeleteQueue(queue, on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a def cancel(self, queue, on_completed=None, on_failed=None): """ @@ -128,10 +131,11 @@ class Cluster(object): :param queue: Queue to consume from :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status """ - self.thread.order_queue.append(CancelQueue(queue, - on_completed=on_completed, - on_failed=on_failed)) + a = CancelQueue(queue, on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a def consume(self, queue, on_completed=None, on_failed=None): """ @@ -143,10 +147,11 @@ class Cluster(object): :param queue: Queue to consume from :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status """ - self.thread.order_queue.append(ConsumeQueue(queue, - on_completed=on_completed, - on_failed=on_failed)) + a = ConsumeQueue(queue, on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a def drain(self, wait=0): """ diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 1d0c5a5..efdbcc6 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -1,14 +1,24 @@ """ Orders that can be dispatched to ClusterHandlerThread """ +from threading import Lock + class Order(object): """Base class for orders dispatched to ClusterHandlerThread""" def __init__(self, on_completed=None, on_failed=None): self.on_completed = on_completed self.on_failed = on_failed + self.result = None # None on non-completed + # True on completed OK + # exception instance on failed + self.lock = Lock() + self.lock.acquire() def completed(self): + self.result = True + self.lock.release() + if self.on_completed is not None: self.on_completed() @@ -16,9 +26,17 @@ class Order(object): """ :param e: AMQPError instance """ + self.result = e + self.lock.release() + if self.on_failed is not None: self.on_failed(e) + def result(self): + """Wait until this is completed and return a response""" + self.lock.acquire() + return self.result + class SendMessage(Order): """Send a message""" -- GitLab