diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index a15b5c6a5f85ce40564245fe2a1ab465a119214a..05ed0e92634dc95ee526ae7dbd07bdac628e9ac4 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -19,6 +19,8 @@ class SingleNodeReconnector(object): self.attache_group = attache_group self.connection = None + self.terminating = False + def is_connected(self): return self.connection is not None @@ -32,11 +34,15 @@ class SingleNodeReconnector(object): self.connection.add_finalizer(self.on_fail) def on_fail(self): + if self.terminating: + return + self.connection = None self.connect() def shutdown(self): """Close this connection""" + self.terminating = True if self.connection is not None: self.connection.send(None) self.connection = None diff --git a/tests/test_clustering/__init__.py b/tests/test_clustering/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1c762b12fd99adc2f7d4e5137c5b872079457510 --- /dev/null +++ b/tests/test_clustering/__init__.py @@ -0,0 +1,8 @@ +# coding=UTF-8 +from __future__ import print_function, absolute_import, division +import six +import logging + +logger = logging.getLogger(__name__) + + diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py new file mode 100644 index 0000000000000000000000000000000000000000..bd1007ff520b619bde0ee713020770544c403486 --- /dev/null +++ b/tests/test_clustering/test_a.py @@ -0,0 +1,34 @@ +# coding=UTF-8 +""" +Test things +""" +from __future__ import print_function, absolute_import, division +import six +import unittest +import time, logging, threading +from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue +from coolamqp.clustering import Cluster + +import time + + +NODE = NodeDefinition('127.0.0.1', 'user', 'user', heartbeat=20) +logging.basicConfig(level=logging.DEBUG) + + +class TestA(unittest.TestCase): + def test_link(self): + """Connect and disconnect""" + c = Cluster([NODE]) + c.start() + c.shutdown() + + def test_consume(self): + c = Cluster([NODE]) + c.start() + con, fut = c.consume(Queue(u'hello', exclusive=True)) +# fut.result() + con.cancel() + c.shutdown() + +