diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index 45ad444aea3c1c45bfa10ab5419631a52605f339..d902195ba1543a0d76bb41e9aa74bbfe2b12cc75 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -43,7 +43,7 @@ class SingleNodeReconnector(object): return self.connection = None - self.connect() + self.call_next_io_event(self.connect) def shutdown(self): """Close this connection""" diff --git a/coolamqp/objects.py b/coolamqp/objects.py index f72545d841008996b084407fe39c183e897978cf..96f620a121f1f4487e0672ff2f5b8683e84b3332 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -18,6 +18,7 @@ class Callable(object): """ Add a bunch of callables to one list, and just invoke'm. INTERNAL USE ONLY + #todo not thread safe """ def __init__(self, oneshots=False): diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py index 5af8893e6d9b1be9044c28444ace87b87eeb4f8e..4d82d8ce5b291384539569e693775c376b16d2e0 100644 --- a/coolamqp/uplink/listener/thread.py +++ b/coolamqp/uplink/listener/thread.py @@ -4,6 +4,7 @@ from __future__ import absolute_import, division, print_function import threading from coolamqp.uplink.listener.epoll_listener import EpollListener +from coolamqp.objects import Callable class ListenerThread(threading.Thread): @@ -17,6 +18,18 @@ class ListenerThread(threading.Thread): threading.Thread.__init__(self, name='coolamqp/ListenerThread') self.daemon = True self.terminating = False + self._call_next_io_event = Callable(oneshots=True) + + def call_next_io_event(self, callable): + """ + Call callable after current I/O event is fully processed + + sometimes many callables are called in response to single + I/O (eg. teardown, startup). This guarantees a call after + all these are done. + :param callable: callable/0 + """ + self._call_next_io_event() def terminate(self): self.terminating = True @@ -28,6 +41,7 @@ class ListenerThread(threading.Thread): def run(self): while not self.terminating: self.listener.wait(timeout=1) + self.listener.shutdown() def register(self, sock, on_read=lambda data: None, on_fail=lambda: None):