diff --git a/coolamqp/backends/__init__.py b/coolamqp/backends/__init__.py index a1caeea735ead73be2957a6c730a31ca13e9ecbb..f91d890de0483e2106fad5a21952e6f01b5779ba 100644 --- a/coolamqp/backends/__init__.py +++ b/coolamqp/backends/__init__.py @@ -1,3 +1,3 @@ #coding=UTF-8 from .pyamqp import PyAMQPBackend -from .base import AMQPError, ConnectionFailedError, RemoteAMQPError +from .base import AMQPError, ConnectionFailedError, RemoteAMQPError, Cancelled diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index a86e1ca68b27daec64178c678ea7368b704214b3..401b89775beb82a95d20bb5999a5845823a3c99b 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -1,11 +1,13 @@ #coding=UTF-8 class AMQPError(Exception): - pass + """Connection errors and bawking of AMQP server""" class ConnectionFailedError(AMQPError): """Connection to broker failed""" +class Cancelled(Exception): + """Cancel ordered by user""" class RemoteAMQPError(AMQPError): """ diff --git a/coolamqp/handler.py b/coolamqp/handler.py index 978e5657647fef2e2027720058c2584438cd4654..aed7aea1fd7173e41798188082423cf28914fad1 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -4,7 +4,7 @@ import six.moves.queue as Queue import logging import collections import time -from .backends import ConnectionFailedError, RemoteAMQPError +from .backends import ConnectionFailedError, RemoteAMQPError, Cancelled from .messages import Exchange from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ @@ -96,6 +96,10 @@ class ClusterHandlerThread(threading.Thread): order = self.order_queue.popleft() try: + if order.cancelled: + order.failed(Cancelled()) + return + if isinstance(order, SendMessage): self.backend.basic_publish(order.message, order.exchange, order.routing_key) elif isinstance(order, SetQoS): diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 0934be504c8d3feab402d531a6c7b3bf6470e54a..abbee28b1f22ee0b400ac72941a9d03d68ce0258 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -15,6 +15,15 @@ class Order(object): # exception instance on failed self.lock = Lock() self.lock.acquire() + self.cancelled = False + + def has_finished(self): + """Return if this task has either completed or failed""" + return self._result is not None + + def cancel(self): + """Cancel this order""" + self.cancelled = True def completed(self): self._result = True @@ -25,7 +34,7 @@ class Order(object): def failed(self, e): """ - :param e: AMQPError instance + :param e: AMQPError instance or Cancelled instance """ self._result = e self.lock.release()