Skip to content
Snippets Groups Projects
Commit f970e359 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

done #3

parent 17b3d77f
No related branches found
No related tags found
No related merge requests found
#coding=UTF-8
from .pyamqp import PyAMQPBackend
from .base import AMQPError, ConnectionFailedError, RemoteAMQPError
from .base import AMQPError, ConnectionFailedError, RemoteAMQPError, Cancelled
#coding=UTF-8
class AMQPError(Exception):
pass
"""Connection errors and bawking of AMQP server"""
class ConnectionFailedError(AMQPError):
"""Connection to broker failed"""
class Cancelled(Exception):
"""Cancel ordered by user"""
class RemoteAMQPError(AMQPError):
"""
......
......@@ -4,7 +4,7 @@ import six.moves.queue as Queue
import logging
import collections
import time
from .backends import ConnectionFailedError, RemoteAMQPError
from .backends import ConnectionFailedError, RemoteAMQPError, Cancelled
from .messages import Exchange
from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived
from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \
......@@ -96,6 +96,10 @@ class ClusterHandlerThread(threading.Thread):
order = self.order_queue.popleft()
try:
if order.cancelled:
order.failed(Cancelled())
return
if isinstance(order, SendMessage):
self.backend.basic_publish(order.message, order.exchange, order.routing_key)
elif isinstance(order, SetQoS):
......
......@@ -15,6 +15,15 @@ class Order(object):
# exception instance on failed
self.lock = Lock()
self.lock.acquire()
self.cancelled = False
def has_finished(self):
"""Return if this task has either completed or failed"""
return self._result is not None
def cancel(self):
"""Cancel this order"""
self.cancelled = True
def completed(self):
self._result = True
......@@ -25,7 +34,7 @@ class Order(object):
def failed(self, e):
"""
:param e: AMQPError instance
:param e: AMQPError instance or Cancelled instance
"""
self._result = e
self.lock.release()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment