From 944e1804740f257a38e3510448d6d5ee7978860c Mon Sep 17 00:00:00 2001 From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl> Date: Sun, 8 Oct 2017 04:36:08 +0200 Subject: [PATCH] on_fail --- CHANGELOG.md | 2 +- coolamqp/clustering/cluster.py | 10 ++++++++-- tests/test_clustering/test_things.py | 11 ++++++++++- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7805ea..705bbf4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ * 0.92: - * TBA + * Added `on_fail` event handler - fired upon connection loss * v0.91: * removed annoying warnings diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 655a676..f5216f3 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -39,7 +39,8 @@ class Cluster(object): """ :param nodes: list of nodes, or a single node. For now, only one is supported. :type nodes: NodeDefinition instance or a list of NodeDefinition instances - :param on_fail: callable/0 to call when connection fails. This is a one-shot + :param on_fail: callable/0 to call when connection fails in an + unclean way. This is a one-shot :type on_fail: callable/0 """ from coolamqp.objects import NodeDefinition @@ -175,7 +176,12 @@ class Cluster(object): self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) if self.on_fail is not None: - self.snr.on_fail.add(self.on_fail) + + def nice(): + if not self.snr.terminating: + self.on_fail() + + self.snr.on_fail.add(nice) # Spawn a transactional publisher and a noack publisher self.pub_tr = Publisher(Publisher.MODE_CNPUB) diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py index 27a9ac4..51cf2ff 100644 --- a/tests/test_clustering/test_things.py +++ b/tests/test_clustering/test_things.py @@ -15,13 +15,22 @@ logging.basicConfig(level=logging.DEBUG) class TestConnecting(unittest.TestCase): def test_on_fail(self): - q = {} + q = {'failed': False} c = Cluster([NodeDefinition('127.0.0.1', 'xguest', 'xguest', heartbeat=20)], on_fail=lambda: q.update(failed=True)) c.start() time.sleep(5) c.shutdown() self.assertTrue(q['failed']) + def test_on_clean(self): + q = {} + c = Cluster([NODE], on_fail=lambda: q.update(failed=True)) + c.start() + time.sleep(5) + c.shutdown() + time.sleep(5) + self.assertFalse(q['failed']) + def test_start_called_multiple_times(self): c = Cluster([NODE]) c.start(wait=True) -- GitLab