From b693afa9b06d7de8571fe9661bb0a9fa107ca87a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Thu, 22 Dec 2016 15:27:50 +0100 Subject: [PATCH] passes test --- coolamqp/cluster.py | 6 +++++- coolamqp/handler.py | 8 ++++++++ tests/test_basics.py | 32 +++++++++++++++++++++++++------- tests/test_failures.py | 23 +++++++++++++++++++++++ 4 files changed, 61 insertions(+), 8 deletions(-) diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 4108659..6ebd56a 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -123,7 +123,6 @@ class Cluster(object): self.thread.order_queue.append(a) return a - def delete_queue(self, queue, on_completed=None, on_failed=None): """ Delete a queue @@ -202,3 +201,8 @@ class Cluster(object): """ self.thread.terminate() self.thread.join() + + if self.connected: + self.backend.shutdown() + self.backend = None + self.connected = False diff --git a/coolamqp/handler.py b/coolamqp/handler.py index 58a8610..9705407 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -119,6 +119,7 @@ class ClusterHandlerThread(threading.Thread): self.backend.queue_delete(order.queue) elif isinstance(order, ConsumeQueue): if order.queue.consumer_tag in self.queues_by_consumer_tags: + order.completed() return # already consuming, belay that self.backend.queue_declare(order.queue) @@ -172,6 +173,13 @@ class ClusterHandlerThread(threading.Thread): self.event_queue.put(ConnectionDown()) self._reconnect() + if (not self.cluster.connected) or (self.backend is not None): + self.backend.shutdown() + self.backend = None + self.cluster.connected = False + + + def terminate(self): """ Called by Cluster. Tells to finish all jobs and quit. diff --git a/tests/test_basics.py b/tests/test_basics.py index 750d164..1491104 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -7,10 +7,15 @@ from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ConnectionDown, ConsumerCancelled, Message +def getamqp(): + amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) + amqp.start() + return amqp + + class TestBasics(unittest.TestCase): def setUp(self): - self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) - self.amqp.start() + self.amqp = getamqp() self.assertIsInstance(self.amqp.drain(1), ConnectionUp) def tearDown(self): @@ -29,8 +34,6 @@ class TestBasics(unittest.TestCase): self.assertIs(self.amqp.drain(wait=4), None) - self.amqp.delete_queue(myq) - def test_nacknowledge(self): myq = Queue('myqueue', exclusive=True) @@ -46,7 +49,24 @@ class TestBasics(unittest.TestCase): self.assertIsInstance(p, MessageReceived) self.assertEquals(six.binary_type(p.message.body), 'what the fuck') - self.amqp.delete_queue(myq) + + def test_bug_hangs(self): + p = Queue('lol', exclusive=True) + self.amqp.consume(p) + self.amqp.consume(p).result() + + def test_consume_twice(self): + """Spawn a second connection and try to consume an exclusive queue twice""" + amqp2 = getamqp() + + has_failed = {'has_failed': False} + + self.amqp.consume(Queue('lol', exclusive=True)).result() + amqp2.consume(Queue('lol', exclusive=True), on_failed=lambda e: has_failed.update({'has_failed': True})).result() + + self.assertTrue(has_failed['has_failed']) + + amqp2.shutdown() def test_send_and_receive(self): myq = Queue('myqueue', exclusive=True) @@ -73,5 +93,3 @@ class TestBasics(unittest.TestCase): self.amqp.cancel(myq) self.assertIsInstance(self.amqp.drain(wait=10), ConsumerCancelled) - - self.amqp.delete_queue(myq) diff --git a/tests/test_failures.py b/tests/test_failures.py index 2347f66..daa07db 100644 --- a/tests/test_failures.py +++ b/tests/test_failures.py @@ -22,3 +22,26 @@ class TestFailures(unittest.TestCase): os.system("sudo service rabbitmq-server restart") self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + + def test_connection_flags_are_okay(self): + os.system("sudo service rabbitmq-server stop") + self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) + self.assertFalse(self.amqp.connected) + os.system("sudo service rabbitmq-server start") + self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + self.assertTrue(self.amqp.connected) + + def test_connection_down_and_up_redeclare_queues(self): + """are messages generated at all? does it reconnect?""" + + q1 = Queue('wtf1', exclusive=True) + + self.amqp.consume(q1) + + os.system("sudo service rabbitmq-server restart") + self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) + self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + + self.amqp.send(Message('what the fuck'), '', routing_key='wtf1') + + self.assertIsInstance(self.amqp.drain(wait=10), MessageReceived) -- GitLab