diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 41086590f7ecefccb9e41dbf96cecdc7d0b1e2a0..6ebd56afa308afcb13df7151fcc262d3ca88742d 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -123,7 +123,6 @@ class Cluster(object): self.thread.order_queue.append(a) return a - def delete_queue(self, queue, on_completed=None, on_failed=None): """ Delete a queue @@ -202,3 +201,8 @@ class Cluster(object): """ self.thread.terminate() self.thread.join() + + if self.connected: + self.backend.shutdown() + self.backend = None + self.connected = False diff --git a/coolamqp/handler.py b/coolamqp/handler.py index 58a861023f93f996c2773cd827ecd54e63761e11..9705407d19cc9718bf3225d13ee6d383b7afa1ac 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -119,6 +119,7 @@ class ClusterHandlerThread(threading.Thread): self.backend.queue_delete(order.queue) elif isinstance(order, ConsumeQueue): if order.queue.consumer_tag in self.queues_by_consumer_tags: + order.completed() return # already consuming, belay that self.backend.queue_declare(order.queue) @@ -172,6 +173,13 @@ class ClusterHandlerThread(threading.Thread): self.event_queue.put(ConnectionDown()) self._reconnect() + if (not self.cluster.connected) or (self.backend is not None): + self.backend.shutdown() + self.backend = None + self.cluster.connected = False + + + def terminate(self): """ Called by Cluster. Tells to finish all jobs and quit. diff --git a/tests/test_basics.py b/tests/test_basics.py index 750d1643769689ee9f73e201059d6b893cc662bc..1491104a2ecd1594b35b85d808d2b118f950244a 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -7,10 +7,15 @@ from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ConnectionDown, ConsumerCancelled, Message +def getamqp(): + amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) + amqp.start() + return amqp + + class TestBasics(unittest.TestCase): def setUp(self): - self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) - self.amqp.start() + self.amqp = getamqp() self.assertIsInstance(self.amqp.drain(1), ConnectionUp) def tearDown(self): @@ -29,8 +34,6 @@ class TestBasics(unittest.TestCase): self.assertIs(self.amqp.drain(wait=4), None) - self.amqp.delete_queue(myq) - def test_nacknowledge(self): myq = Queue('myqueue', exclusive=True) @@ -46,7 +49,24 @@ class TestBasics(unittest.TestCase): self.assertIsInstance(p, MessageReceived) self.assertEquals(six.binary_type(p.message.body), 'what the fuck') - self.amqp.delete_queue(myq) + + def test_bug_hangs(self): + p = Queue('lol', exclusive=True) + self.amqp.consume(p) + self.amqp.consume(p).result() + + def test_consume_twice(self): + """Spawn a second connection and try to consume an exclusive queue twice""" + amqp2 = getamqp() + + has_failed = {'has_failed': False} + + self.amqp.consume(Queue('lol', exclusive=True)).result() + amqp2.consume(Queue('lol', exclusive=True), on_failed=lambda e: has_failed.update({'has_failed': True})).result() + + self.assertTrue(has_failed['has_failed']) + + amqp2.shutdown() def test_send_and_receive(self): myq = Queue('myqueue', exclusive=True) @@ -73,5 +93,3 @@ class TestBasics(unittest.TestCase): self.amqp.cancel(myq) self.assertIsInstance(self.amqp.drain(wait=10), ConsumerCancelled) - - self.amqp.delete_queue(myq) diff --git a/tests/test_failures.py b/tests/test_failures.py index 2347f667bdc43f0e87ef112b246c971a37eeb082..daa07db8a646dcae66a39e4168b9b47ad329c20d 100644 --- a/tests/test_failures.py +++ b/tests/test_failures.py @@ -22,3 +22,26 @@ class TestFailures(unittest.TestCase): os.system("sudo service rabbitmq-server restart") self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + + def test_connection_flags_are_okay(self): + os.system("sudo service rabbitmq-server stop") + self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) + self.assertFalse(self.amqp.connected) + os.system("sudo service rabbitmq-server start") + self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + self.assertTrue(self.amqp.connected) + + def test_connection_down_and_up_redeclare_queues(self): + """are messages generated at all? does it reconnect?""" + + q1 = Queue('wtf1', exclusive=True) + + self.amqp.consume(q1) + + os.system("sudo service rabbitmq-server restart") + self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) + self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + + self.amqp.send(Message('what the fuck'), '', routing_key='wtf1') + + self.assertIsInstance(self.amqp.drain(wait=10), MessageReceived)