Skip to content
Snippets Groups Projects
Commit b693afa9 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

passes test

parent e97e126f
No related branches found
No related tags found
No related merge requests found
...@@ -123,7 +123,6 @@ class Cluster(object): ...@@ -123,7 +123,6 @@ class Cluster(object):
self.thread.order_queue.append(a) self.thread.order_queue.append(a)
return a return a
def delete_queue(self, queue, on_completed=None, on_failed=None): def delete_queue(self, queue, on_completed=None, on_failed=None):
""" """
Delete a queue Delete a queue
...@@ -202,3 +201,8 @@ class Cluster(object): ...@@ -202,3 +201,8 @@ class Cluster(object):
""" """
self.thread.terminate() self.thread.terminate()
self.thread.join() self.thread.join()
if self.connected:
self.backend.shutdown()
self.backend = None
self.connected = False
...@@ -119,6 +119,7 @@ class ClusterHandlerThread(threading.Thread): ...@@ -119,6 +119,7 @@ class ClusterHandlerThread(threading.Thread):
self.backend.queue_delete(order.queue) self.backend.queue_delete(order.queue)
elif isinstance(order, ConsumeQueue): elif isinstance(order, ConsumeQueue):
if order.queue.consumer_tag in self.queues_by_consumer_tags: if order.queue.consumer_tag in self.queues_by_consumer_tags:
order.completed()
return # already consuming, belay that return # already consuming, belay that
self.backend.queue_declare(order.queue) self.backend.queue_declare(order.queue)
...@@ -172,6 +173,13 @@ class ClusterHandlerThread(threading.Thread): ...@@ -172,6 +173,13 @@ class ClusterHandlerThread(threading.Thread):
self.event_queue.put(ConnectionDown()) self.event_queue.put(ConnectionDown())
self._reconnect() self._reconnect()
if (not self.cluster.connected) or (self.backend is not None):
self.backend.shutdown()
self.backend = None
self.cluster.connected = False
def terminate(self): def terminate(self):
""" """
Called by Cluster. Tells to finish all jobs and quit. Called by Cluster. Tells to finish all jobs and quit.
......
...@@ -7,10 +7,15 @@ from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ...@@ -7,10 +7,15 @@ from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp,
ConnectionDown, ConsumerCancelled, Message ConnectionDown, ConsumerCancelled, Message
def getamqp():
amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')])
amqp.start()
return amqp
class TestBasics(unittest.TestCase): class TestBasics(unittest.TestCase):
def setUp(self): def setUp(self):
self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) self.amqp = getamqp()
self.amqp.start()
self.assertIsInstance(self.amqp.drain(1), ConnectionUp) self.assertIsInstance(self.amqp.drain(1), ConnectionUp)
def tearDown(self): def tearDown(self):
...@@ -29,8 +34,6 @@ class TestBasics(unittest.TestCase): ...@@ -29,8 +34,6 @@ class TestBasics(unittest.TestCase):
self.assertIs(self.amqp.drain(wait=4), None) self.assertIs(self.amqp.drain(wait=4), None)
self.amqp.delete_queue(myq)
def test_nacknowledge(self): def test_nacknowledge(self):
myq = Queue('myqueue', exclusive=True) myq = Queue('myqueue', exclusive=True)
...@@ -46,7 +49,24 @@ class TestBasics(unittest.TestCase): ...@@ -46,7 +49,24 @@ class TestBasics(unittest.TestCase):
self.assertIsInstance(p, MessageReceived) self.assertIsInstance(p, MessageReceived)
self.assertEquals(six.binary_type(p.message.body), 'what the fuck') self.assertEquals(six.binary_type(p.message.body), 'what the fuck')
self.amqp.delete_queue(myq)
def test_bug_hangs(self):
p = Queue('lol', exclusive=True)
self.amqp.consume(p)
self.amqp.consume(p).result()
def test_consume_twice(self):
"""Spawn a second connection and try to consume an exclusive queue twice"""
amqp2 = getamqp()
has_failed = {'has_failed': False}
self.amqp.consume(Queue('lol', exclusive=True)).result()
amqp2.consume(Queue('lol', exclusive=True), on_failed=lambda e: has_failed.update({'has_failed': True})).result()
self.assertTrue(has_failed['has_failed'])
amqp2.shutdown()
def test_send_and_receive(self): def test_send_and_receive(self):
myq = Queue('myqueue', exclusive=True) myq = Queue('myqueue', exclusive=True)
...@@ -73,5 +93,3 @@ class TestBasics(unittest.TestCase): ...@@ -73,5 +93,3 @@ class TestBasics(unittest.TestCase):
self.amqp.cancel(myq) self.amqp.cancel(myq)
self.assertIsInstance(self.amqp.drain(wait=10), ConsumerCancelled) self.assertIsInstance(self.amqp.drain(wait=10), ConsumerCancelled)
self.amqp.delete_queue(myq)
...@@ -22,3 +22,26 @@ class TestFailures(unittest.TestCase): ...@@ -22,3 +22,26 @@ class TestFailures(unittest.TestCase):
os.system("sudo service rabbitmq-server restart") os.system("sudo service rabbitmq-server restart")
self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown)
self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp)
def test_connection_flags_are_okay(self):
os.system("sudo service rabbitmq-server stop")
self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown)
self.assertFalse(self.amqp.connected)
os.system("sudo service rabbitmq-server start")
self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp)
self.assertTrue(self.amqp.connected)
def test_connection_down_and_up_redeclare_queues(self):
"""are messages generated at all? does it reconnect?"""
q1 = Queue('wtf1', exclusive=True)
self.amqp.consume(q1)
os.system("sudo service rabbitmq-server restart")
self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown)
self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp)
self.amqp.send(Message('what the fuck'), '', routing_key='wtf1')
self.assertIsInstance(self.amqp.drain(wait=10), MessageReceived)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment