Skip to content
Snippets Groups Projects
Commit d9c70a3e authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

jab at bug

parent 14f5ce09
No related branches found
No related tags found
No related merge requests found
...@@ -43,7 +43,7 @@ class SingleNodeReconnector(object): ...@@ -43,7 +43,7 @@ class SingleNodeReconnector(object):
return return
self.connection = None self.connection = None
self.connect() self.call_next_io_event(self.connect)
def shutdown(self): def shutdown(self):
"""Close this connection""" """Close this connection"""
......
...@@ -18,6 +18,7 @@ class Callable(object): ...@@ -18,6 +18,7 @@ class Callable(object):
""" """
Add a bunch of callables to one list, and just invoke'm. Add a bunch of callables to one list, and just invoke'm.
INTERNAL USE ONLY INTERNAL USE ONLY
#todo not thread safe
""" """
def __init__(self, oneshots=False): def __init__(self, oneshots=False):
......
...@@ -4,6 +4,7 @@ from __future__ import absolute_import, division, print_function ...@@ -4,6 +4,7 @@ from __future__ import absolute_import, division, print_function
import threading import threading
from coolamqp.uplink.listener.epoll_listener import EpollListener from coolamqp.uplink.listener.epoll_listener import EpollListener
from coolamqp.objects import Callable
class ListenerThread(threading.Thread): class ListenerThread(threading.Thread):
...@@ -17,6 +18,18 @@ class ListenerThread(threading.Thread): ...@@ -17,6 +18,18 @@ class ListenerThread(threading.Thread):
threading.Thread.__init__(self, name='coolamqp/ListenerThread') threading.Thread.__init__(self, name='coolamqp/ListenerThread')
self.daemon = True self.daemon = True
self.terminating = False self.terminating = False
self._call_next_io_event = Callable(oneshots=True)
def call_next_io_event(self, callable):
"""
Call callable after current I/O event is fully processed
sometimes many callables are called in response to single
I/O (eg. teardown, startup). This guarantees a call after
all these are done.
:param callable: callable/0
"""
self._call_next_io_event()
def terminate(self): def terminate(self):
self.terminating = True self.terminating = True
...@@ -28,6 +41,7 @@ class ListenerThread(threading.Thread): ...@@ -28,6 +41,7 @@ class ListenerThread(threading.Thread):
def run(self): def run(self):
while not self.terminating: while not self.terminating:
self.listener.wait(timeout=1) self.listener.wait(timeout=1)
self.listener.shutdown() self.listener.shutdown()
def register(self, sock, on_read=lambda data: None, on_fail=lambda: None): def register(self, sock, on_read=lambda data: None, on_fail=lambda: None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment