From f970e35970866e2ab4d82450a4950abbed10eafd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Wed, 10 Aug 2016 17:57:50 +0200 Subject: [PATCH] done #3 --- coolamqp/backends/__init__.py | 2 +- coolamqp/backends/base.py | 4 +++- coolamqp/handler.py | 6 +++++- coolamqp/orders.py | 11 ++++++++++- 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/coolamqp/backends/__init__.py b/coolamqp/backends/__init__.py index a1caeea..f91d890 100644 --- a/coolamqp/backends/__init__.py +++ b/coolamqp/backends/__init__.py @@ -1,3 +1,3 @@ #coding=UTF-8 from .pyamqp import PyAMQPBackend -from .base import AMQPError, ConnectionFailedError, RemoteAMQPError +from .base import AMQPError, ConnectionFailedError, RemoteAMQPError, Cancelled diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index a86e1ca..401b897 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -1,11 +1,13 @@ #coding=UTF-8 class AMQPError(Exception): - pass + """Connection errors and bawking of AMQP server""" class ConnectionFailedError(AMQPError): """Connection to broker failed""" +class Cancelled(Exception): + """Cancel ordered by user""" class RemoteAMQPError(AMQPError): """ diff --git a/coolamqp/handler.py b/coolamqp/handler.py index 978e565..aed7aea 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -4,7 +4,7 @@ import six.moves.queue as Queue import logging import collections import time -from .backends import ConnectionFailedError, RemoteAMQPError +from .backends import ConnectionFailedError, RemoteAMQPError, Cancelled from .messages import Exchange from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ @@ -96,6 +96,10 @@ class ClusterHandlerThread(threading.Thread): order = self.order_queue.popleft() try: + if order.cancelled: + order.failed(Cancelled()) + return + if isinstance(order, SendMessage): self.backend.basic_publish(order.message, order.exchange, order.routing_key) elif isinstance(order, SetQoS): diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 0934be5..abbee28 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -15,6 +15,15 @@ class Order(object): # exception instance on failed self.lock = Lock() self.lock.acquire() + self.cancelled = False + + def has_finished(self): + """Return if this task has either completed or failed""" + return self._result is not None + + def cancel(self): + """Cancel this order""" + self.cancelled = True def completed(self): self._result = True @@ -25,7 +34,7 @@ class Order(object): def failed(self, e): """ - :param e: AMQPError instance + :param e: AMQPError instance or Cancelled instance """ self._result = e self.lock.release() -- GitLab