Skip to content
Snippets Groups Projects
Commit 90c55f26 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

cleanup again

parent 22df4260
No related branches found
No related tags found
No related merge requests found
...@@ -211,15 +211,15 @@ class Cluster(object): ...@@ -211,15 +211,15 @@ class Cluster(object):
self.thread.start() self.thread.start()
return self return self
def shutdown(self, complete_outstanding_tasks=False): def shutdown(self, complete_remaining_tasks=False):
""" """
Cleans everything and returns. Cleans everything and returns.
:param complete_outstanding_tasks: if set to True, pending operations will be completed. :param complete_remaining_tasks_tasks: if set to True, pending operations will be completed.
If False, thread will exit without completing them. If False, thread will exit without completing them.
This can mean that if the cluster doesn't come up online, shutdown MAY BLOCK FOREVER. This can mean that if the cluster doesn't come up online, shutdown MAY BLOCK FOREVER.
""" """
self.thread.complete_outstanding_upon_termination = complete_outstanding_tasks self.thread.complete_remaining_upon_termination = complete_remaining_tasks
self.thread.terminate() self.thread.terminate()
self.thread.join() self.thread.join()
# thread closes the AMQP uplink for us # thread closes the AMQP uplink for us
...@@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) ...@@ -17,7 +17,7 @@ logger = logging.getLogger(__name__)
class _ImOuttaHere(Exception): class _ImOuttaHere(Exception):
"""Thrown upon thread terminating. """Thrown upon thread terminating.
Thrown only if complete_outstanding_upon_termination is False""" Thrown only if complete_remaining_upon_termination is False"""
class ClusterHandlerThread(threading.Thread): class ClusterHandlerThread(threading.Thread):
...@@ -32,7 +32,7 @@ class ClusterHandlerThread(threading.Thread): ...@@ -32,7 +32,7 @@ class ClusterHandlerThread(threading.Thread):
self.cluster = cluster self.cluster = cluster
self.is_terminating = False self.is_terminating = False
self.complete_outstanding_upon_termination = False self.complete_remaining_upon_termination = False
self.order_queue = collections.deque() # queue for inbound orders self.order_queue = collections.deque() # queue for inbound orders
self.event_queue = queue.Queue() # queue for tasks done self.event_queue = queue.Queue() # queue for tasks done
self.connect_id = -1 # connectID of current connection self.connect_id = -1 # connectID of current connection
...@@ -81,7 +81,7 @@ class ClusterHandlerThread(threading.Thread): ...@@ -81,7 +81,7 @@ class ClusterHandlerThread(threading.Thread):
self.backend = None # good policy to release resources before you sleep self.backend = None # good policy to release resources before you sleep
time.sleep(exponential_backoff_delay) time.sleep(exponential_backoff_delay)
if self.is_terminating and (not self.complete_outstanding_upon_termination): if self.is_terminating and (not self.complete_remaining_upon_termination):
raise _ImOuttaHere() raise _ImOuttaHere()
exponential_backoff_delay = min(60, exponential_backoff_delay * 2) exponential_backoff_delay = min(60, exponential_backoff_delay * 2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment