From d9c70a3ef5087ff8514bbbe133f60c48fa18c91e Mon Sep 17 00:00:00 2001
From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl>
Date: Sat, 18 Mar 2017 00:00:46 +0100
Subject: [PATCH] jab at bug

---
 coolamqp/clustering/single.py      |  2 +-
 coolamqp/objects.py                |  1 +
 coolamqp/uplink/listener/thread.py | 14 ++++++++++++++
 3 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py
index 45ad444..d902195 100644
--- a/coolamqp/clustering/single.py
+++ b/coolamqp/clustering/single.py
@@ -43,7 +43,7 @@ class SingleNodeReconnector(object):
             return
 
         self.connection = None
-        self.connect()
+        self.call_next_io_event(self.connect)
 
     def shutdown(self):
         """Close this connection"""
diff --git a/coolamqp/objects.py b/coolamqp/objects.py
index f72545d..96f620a 100644
--- a/coolamqp/objects.py
+++ b/coolamqp/objects.py
@@ -18,6 +18,7 @@ class Callable(object):
     """
     Add a bunch of callables to one list, and just invoke'm.
     INTERNAL USE ONLY
+    #todo not thread safe
     """
 
     def __init__(self, oneshots=False):
diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py
index 5af8893..4d82d8c 100644
--- a/coolamqp/uplink/listener/thread.py
+++ b/coolamqp/uplink/listener/thread.py
@@ -4,6 +4,7 @@ from __future__ import absolute_import, division, print_function
 import threading
 
 from coolamqp.uplink.listener.epoll_listener import EpollListener
+from coolamqp.objects import Callable
 
 
 class ListenerThread(threading.Thread):
@@ -17,6 +18,18 @@ class ListenerThread(threading.Thread):
         threading.Thread.__init__(self, name='coolamqp/ListenerThread')
         self.daemon = True
         self.terminating = False
+        self._call_next_io_event = Callable(oneshots=True)
+
+    def call_next_io_event(self, callable):
+        """
+        Call callable after current I/O event is fully processed
+
+        sometimes many callables are called in response to single
+        I/O (eg. teardown, startup). This guarantees a call after
+        all these are done.
+        :param callable: callable/0
+        """
+        self._call_next_io_event()
 
     def terminate(self):
         self.terminating = True
@@ -28,6 +41,7 @@ class ListenerThread(threading.Thread):
     def run(self):
         while not self.terminating:
             self.listener.wait(timeout=1)
+
         self.listener.shutdown()
 
     def register(self, sock, on_read=lambda data: None, on_fail=lambda: None):
-- 
GitLab