Skip to content
Snippets Groups Projects
Commit a0d45aa3 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

orders can be returned as futures

parent be61e384
No related branches found
No related tags found
No related merge requests found
......@@ -79,12 +79,12 @@ class Cluster(object):
:param routing_key: routing key to use
:param on_completed: callable/0 to call when this succeeds
:param on_failed: callable/1 to call when this fails with AMQPError instance
:return: a Future with this order's status
"""
self.thread.order_queue.append(SendMessage(message,
exchange or Exchange.direct,
routing_key,
on_completed=on_completed,
on_failed=on_failed))
a = SendMessage(message, exchange or Exchange.direct, routing_key,
on_completed=on_completed, on_failed=on_failed)
self.thread.order_queue.append(a)
return a
def declare_exchange(self, exchange, on_completed=None, on_failed=None):
"""
......@@ -92,6 +92,7 @@ class Cluster(object):
:param exchange: Exchange to declare
:param on_completed: callable/0 to call when this succeeds
:param on_failed: callable/1 to call when this fails with AMQPError instance
:return: a Future with this order's status
"""
self.thread.order_queue.append(DeclareExchange(exchange,
on_completed=on_completed,
......@@ -104,10 +105,11 @@ class Cluster(object):
:param exchange: Exchange to delete
:param on_completed: callable/0 to call when this succeeds
:param on_failed: callable/1 to call when this fails with AMQPError instance
:return: a Future with this order's status
"""
self.thread.order_queue.append(DeleteExchange(exchange,
on_completed=on_completed,
on_failed=on_failed))
a = DeleteExchange(exchange, on_completed=on_completed, on_failed=on_failed)
self.thread.order_queue.append(a)
return a
def delete_queue(self, queue, on_completed=None, on_failed=None):
......@@ -116,10 +118,11 @@ class Cluster(object):
:param queue: Queue to delete
:param on_completed: callable/0 to call when this succeeds
:param on_failed: callable/1 to call when this fails with AMQPError instance
:return: a Future with this order's status
"""
self.thread.order_queue.append(DeleteQueue(queue,
on_completed=on_completed,
on_failed=on_failed))
a = DeleteQueue(queue, on_completed=on_completed, on_failed=on_failed)
self.thread.order_queue.append(a)
return a
def cancel(self, queue, on_completed=None, on_failed=None):
"""
......@@ -128,10 +131,11 @@ class Cluster(object):
:param queue: Queue to consume from
:param on_completed: callable/0 to call when this succeeds
:param on_failed: callable/1 to call when this fails with AMQPError instance
:return: a Future with this order's status
"""
self.thread.order_queue.append(CancelQueue(queue,
on_completed=on_completed,
on_failed=on_failed))
a = CancelQueue(queue, on_completed=on_completed, on_failed=on_failed)
self.thread.order_queue.append(a)
return a
def consume(self, queue, on_completed=None, on_failed=None):
"""
......@@ -143,10 +147,11 @@ class Cluster(object):
:param queue: Queue to consume from
:param on_completed: callable/0 to call when this succeeds
:param on_failed: callable/1 to call when this fails with AMQPError instance
:return: a Future with this order's status
"""
self.thread.order_queue.append(ConsumeQueue(queue,
on_completed=on_completed,
on_failed=on_failed))
a = ConsumeQueue(queue, on_completed=on_completed, on_failed=on_failed)
self.thread.order_queue.append(a)
return a
def drain(self, wait=0):
"""
......
"""
Orders that can be dispatched to ClusterHandlerThread
"""
from threading import Lock
class Order(object):
"""Base class for orders dispatched to ClusterHandlerThread"""
def __init__(self, on_completed=None, on_failed=None):
self.on_completed = on_completed
self.on_failed = on_failed
self.result = None # None on non-completed
# True on completed OK
# exception instance on failed
self.lock = Lock()
self.lock.acquire()
def completed(self):
self.result = True
self.lock.release()
if self.on_completed is not None:
self.on_completed()
......@@ -16,9 +26,17 @@ class Order(object):
"""
:param e: AMQPError instance
"""
self.result = e
self.lock.release()
if self.on_failed is not None:
self.on_failed(e)
def result(self):
"""Wait until this is completed and return a response"""
self.lock.acquire()
return self.result
class SendMessage(Order):
"""Send a message"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment