Skip to content
Snippets Groups Projects
Commit 944e1804 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

on_fail

parent 0c3334da
No related branches found
No related tags found
No related merge requests found
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
* 0.92: * 0.92:
* TBA * Added `on_fail` event handler - fired upon connection loss
* v0.91: * v0.91:
* removed annoying warnings * removed annoying warnings
......
...@@ -39,7 +39,8 @@ class Cluster(object): ...@@ -39,7 +39,8 @@ class Cluster(object):
""" """
:param nodes: list of nodes, or a single node. For now, only one is supported. :param nodes: list of nodes, or a single node. For now, only one is supported.
:type nodes: NodeDefinition instance or a list of NodeDefinition instances :type nodes: NodeDefinition instance or a list of NodeDefinition instances
:param on_fail: callable/0 to call when connection fails. This is a one-shot :param on_fail: callable/0 to call when connection fails in an
unclean way. This is a one-shot
:type on_fail: callable/0 :type on_fail: callable/0
""" """
from coolamqp.objects import NodeDefinition from coolamqp.objects import NodeDefinition
...@@ -175,7 +176,12 @@ class Cluster(object): ...@@ -175,7 +176,12 @@ class Cluster(object):
self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener) self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener)
self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost()))
if self.on_fail is not None: if self.on_fail is not None:
self.snr.on_fail.add(self.on_fail)
def nice():
if not self.snr.terminating:
self.on_fail()
self.snr.on_fail.add(nice)
# Spawn a transactional publisher and a noack publisher # Spawn a transactional publisher and a noack publisher
self.pub_tr = Publisher(Publisher.MODE_CNPUB) self.pub_tr = Publisher(Publisher.MODE_CNPUB)
......
...@@ -15,13 +15,22 @@ logging.basicConfig(level=logging.DEBUG) ...@@ -15,13 +15,22 @@ logging.basicConfig(level=logging.DEBUG)
class TestConnecting(unittest.TestCase): class TestConnecting(unittest.TestCase):
def test_on_fail(self): def test_on_fail(self):
q = {} q = {'failed': False}
c = Cluster([NodeDefinition('127.0.0.1', 'xguest', 'xguest', heartbeat=20)], on_fail=lambda: q.update(failed=True)) c = Cluster([NodeDefinition('127.0.0.1', 'xguest', 'xguest', heartbeat=20)], on_fail=lambda: q.update(failed=True))
c.start() c.start()
time.sleep(5) time.sleep(5)
c.shutdown() c.shutdown()
self.assertTrue(q['failed']) self.assertTrue(q['failed'])
def test_on_clean(self):
q = {}
c = Cluster([NODE], on_fail=lambda: q.update(failed=True))
c.start()
time.sleep(5)
c.shutdown()
time.sleep(5)
self.assertFalse(q['failed'])
def test_start_called_multiple_times(self): def test_start_called_multiple_times(self):
c = Cluster([NODE]) c = Cluster([NODE])
c.start(wait=True) c.start(wait=True)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment