diff --git a/README.md b/README.md index 536783655cc259f97b221493df22284efd44dbf5..b874c4909d30a4b8aa4bb1cfb9a81518315c68b9 100644 --- a/README.md +++ b/README.md @@ -25,10 +25,7 @@ You only need to remember that: 3. Delivering messages multiple times may happen. * Ensure you know when it happens. Keywords: message acknowledgement, amqp specification -The project is actively maintained, used in a commercial project and unit tested with high coverage. +The project is actively maintained and used in a commercial project. Tests can run +either on Vagrant (Vagrantfile attached) or Travis CI, and run against RabbitMQ. Enjoy! - -todo ----- -* Allow binding queues with exchanges with a routing_key \ No newline at end of file diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 66a7e02ec58c2aff528c9949a1d90e8c602c6f8d..b04e79eabefef0eeea9efe7aff44303af8a064e1 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -1,4 +1,4 @@ -#coding=UTF-8 +# coding=UTF-8 import itertools from six.moves import queue as Queue from coolamqp.backends import PyAMQPBackend @@ -101,7 +101,10 @@ class Cluster(object): def declare_queue(self, queue, on_completed=None, on_failed=None): """ - Declares a queue + Declares a queue. + + NOTE THAT IF YOU declare + :param queue: Queue to declare :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance @@ -195,10 +198,15 @@ class Cluster(object): self.thread.start() return self - def shutdown(self): + def shutdown(self, complete_outstanding_tasks=False): """ - Cleans everything and returns + Cleans everything and returns. + + :param complete_outstanding_tasks: if set to True, pending operations will be completed. + If False, thread will exit without completing them. + This can mean that if the cluster doesn't come up online, shutdown MAY BLOCK FOREVER. """ + self.thread.complete_outstanding_upon_termination = complete_outstanding_tasks self.thread.terminate() self.thread.join() # thread closes the AMQP uplink for us diff --git a/coolamqp/handler.py b/coolamqp/handler.py index 007c0c348514a05993fb8afe66eb37b0f9ae0bc4..c0fa2e23f5729a21db731ec12f001dbbe0400281 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -1,4 +1,4 @@ -#coding=UTF-8 +# coding=UTF-8 import threading import six.moves.queue as Queue import six @@ -7,7 +7,8 @@ import collections import time from .backends import ConnectionFailedError, RemoteAMQPError, Cancelled from .messages import Exchange -from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived +from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, \ + MessageReceived from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \ DeleteExchange, SetQoS, DeclareQueue @@ -15,6 +16,11 @@ from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ logger = logging.getLogger(__name__) +class _ImOuttaHere(Exception): + """Thrown upon thread terminating. + Thrown only if complete_outstanding_upon_termination is False""" + + class ClusterHandlerThread(threading.Thread): """ Thread that does bookkeeping for a Cluster. @@ -27,12 +33,13 @@ class ClusterHandlerThread(threading.Thread): self.cluster = cluster self.is_terminating = False + self.complete_outstanding_upon_termination = False self.order_queue = collections.deque() # queue for inbound orders self.event_queue = Queue.Queue() # queue for tasks done self.connect_id = -1 # connectID of current connection self.declared_exchanges = {} # declared exchanges, by their names - self.queues_by_consumer_tags = {} # listened queues, by their consumer tags + self.queues_by_consumer_tags = {} # subbed queues, by their consumer tags self.backend = None self.first_connect = True @@ -42,7 +49,7 @@ class ClusterHandlerThread(threading.Thread): def _reconnect(self): exponential_backoff_delay = 1 - while True: + while not self.cluster.connected: if self.backend is not None: self.backend.shutdown() self.backend = None @@ -75,15 +82,14 @@ class ClusterHandlerThread(threading.Thread): self.backend = None # good policy to release resources before you sleep time.sleep(exponential_backoff_delay) - if self.is_terminating: - raise SystemError('Thread was requested to terminate') + if self.is_terminating and (not self.complete_outstanding_upon_termination): + raise _ImOuttaHere() - exponential_backoff_delay = max(60, exponential_backoff_delay * 2) + exponential_backoff_delay = min(60, exponential_backoff_delay * 2) else: self.cluster.connected = True self.event_queue.put(ConnectionUp(initial=self.first_connect)) self.first_connect = False - break # we connected :) def perform_order(self): @@ -152,27 +158,33 @@ class ClusterHandlerThread(threading.Thread): def run(self): self._reconnect() - while (not self.is_terminating) or len(self.order_queue) > 0: + # Loop while there are things to do + while (not self.is_terminating) or (len(self.order_queue) > 0): try: while len(self.order_queue) > 0: self.perform_order() # just drain shit - self.backend.process(max_time=2) - + self.backend.process(max_time=1) + except _ImOuttaHere: # thrown only if self.complete_outstanding_upon_termination + assert self.complete_outstanding_upon_termination + break except ConnectionFailedError as e: logger.warning('Connection to broker lost') - self.cluster.connected = True + self.cluster.connected = False self.event_queue.put(ConnectionDown()) - self._reconnect() + try: + self._reconnect() + except _ImOuttaHere: + break + assert self.is_terminating if (not self.cluster.connected) or (self.backend is not None): self.backend.shutdown() self.backend = None self.cluster.connected = False - def terminate(self): """ Called by Cluster. Tells to finish all jobs and quit. diff --git a/tests/test_failures.py b/tests/test_failures.py index a5530f969c527a872b8b53408185a676b4abc0f2..872c422f5b167a04f8443019dab6dacb543f2acc 100644 --- a/tests/test_failures.py +++ b/tests/test_failures.py @@ -39,10 +39,10 @@ class TestFailures(unittest.TestCase): self.amqp.shutdown() def test_connection_down_and_up(self): - """are messages generated at all? does it reconnect?""" + """Are ConnectionUp/Down messages generated at all? does it reconnect?""" os.system("sudo service rabbitmq-server restart") - self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) - self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + self.assertIsInstance(self.amqp.drain(wait=6), ConnectionDown) + self.assertIsInstance(self.amqp.drain(wait=10), ConnectionUp) def test_longer_disconnects(self): os.system("sudo service rabbitmq-server stop") @@ -56,7 +56,7 @@ class TestFailures(unittest.TestCase): os.system("sudo service rabbitmq-server restart") self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) - self.assertIsInstance(self.amqp.drain(wait=4), ConnectionUp) + self.assertIsInstance(self.amqp.drain(wait=10), ConnectionUp) self.amqp.consume(Queue('lol', exclusive=True)).result() self.amqp.send(Message('what the fuck'), '', routing_key='lol') @@ -74,10 +74,9 @@ class TestFailures(unittest.TestCase): self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) self.assertFalse(self.amqp.connected) os.system("sudo service rabbitmq-server start") - self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + self.assertIsInstance(self.amqp.drain(wait=10), ConnectionUp) self.assertTrue(self.amqp.connected) - def test_sending_a_message_is_cancelled(self): """are messages generated at all? does it reconnect?""" @@ -91,7 +90,7 @@ class TestFailures(unittest.TestCase): os.system("sudo service rabbitmq-server start") - self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + self.assertIsInstance(self.amqp.drain(wait=10), ConnectionUp) self.assertIsNone(self.amqp.drain(wait=6)) # message is NOT received @@ -123,6 +122,6 @@ class TestFailures(unittest.TestCase): self.amqp.send(Message('hello'), xchg) self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) - self.assertIsInstance(self.amqp.drain(wait=4), ConnectionUp) + self.assertIsInstance(self.amqp.drain(wait=10), ConnectionUp) self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)