diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 459a91e285a37869f329ce34fe5bc19b6d261ae4..655a676f998e1e400cf0846f868d97f50f4c77a2 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -35,10 +35,12 @@ class Cluster(object): ST_LINK_LOST = 0 # Link has been lost ST_LINK_REGAINED = 1 # Link has been regained - def __init__(self, nodes): + def __init__(self, nodes, on_fail=None): """ :param nodes: list of nodes, or a single node. For now, only one is supported. :type nodes: NodeDefinition instance or a list of NodeDefinition instances + :param on_fail: callable/0 to call when connection fails. This is a one-shot + :type on_fail: callable/0 """ from coolamqp.objects import NodeDefinition if isinstance(nodes, NodeDefinition): @@ -48,6 +50,7 @@ class Cluster(object): raise NotImplementedError(u'Multiple nodes not supported yet') self.node, = nodes + self.on_fail = on_fail def declare(self, obj, persistent=False): """ @@ -171,6 +174,8 @@ class Cluster(object): self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) + if self.on_fail is not None: + self.snr.on_fail.add(self.on_fail) # Spawn a transactional publisher and a noack publisher self.pub_tr = Publisher(Publisher.MODE_CNPUB) diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py index 3db360cbc7d77678e6a62262edd3b33cf554c815..27a9ac4a85fc100922d30037e84c901aaa55a1ad 100644 --- a/tests/test_clustering/test_things.py +++ b/tests/test_clustering/test_things.py @@ -14,6 +14,14 @@ logging.basicConfig(level=logging.DEBUG) class TestConnecting(unittest.TestCase): + def test_on_fail(self): + q = {} + c = Cluster([NodeDefinition('127.0.0.1', 'xguest', 'xguest', heartbeat=20)], on_fail=lambda: q.update(failed=True)) + c.start() + time.sleep(5) + c.shutdown() + self.assertTrue(q['failed']) + def test_start_called_multiple_times(self): c = Cluster([NODE]) c.start(wait=True)