From 11dc12e56435ec4c17966197d9c0a744213d232e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl>
Date: Wed, 23 Oct 2019 08:29:41 +0200
Subject: [PATCH] Fixed #7

---
 coolamqp/clustering/cluster.py           | 13 ++++++++++---
 coolamqp/clustering/single.py            |  4 ++--
 coolamqp/uplink/connection/connection.py |  8 ++++++--
 setup.cfg                                |  2 +-
 4 files changed, 19 insertions(+), 8 deletions(-)

diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py
index 9f31d19..57f16a7 100644
--- a/coolamqp/clustering/cluster.py
+++ b/coolamqp/clustering/cluster.py
@@ -7,10 +7,12 @@ import six
 import logging
 import warnings
 import time
+import monotonic
 from coolamqp.uplink import ListenerThread
 from coolamqp.clustering.single import SingleNodeReconnector
 from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer
 from coolamqp.objects import Exchange
+from coolamqp.exceptions import ConnectionDead
 from concurrent.futures import Future
 
 from coolamqp.clustering.events import ConnectionLost, MessageReceived, \
@@ -165,7 +167,7 @@ class Cluster(object):
             raise NotImplementedError(
                 u'Sorry, this functionality is not yet implemented!')
 
-    def start(self, wait=True):
+    def start(self, wait=True, timeout=10.0):
         """
         Connect to broker. Initialize Cluster.
 
@@ -173,7 +175,9 @@ class Cluster(object):
         It is not safe to fork after this.
 
         :param wait: block until connection is ready
+        :param timeout: timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised
         :raise RuntimeError: called more than once
+        :raise ConnectionDead: failed to connect within timeout
         """
 
         try:
@@ -206,12 +210,15 @@ class Cluster(object):
 
         self.listener.init()
         self.listener.start()
-        self.snr.connect()
+        self.snr.connect(timeout=timeout)
 
         # todo not really elegant
         if wait:
-            while not self.snr.is_connected():
+            start_at = monotonic.monotonic()
+            while not self.snr.is_connected() and monotonic.monotonic() - start_at < timeout:
                 time.sleep(0.1)
+            if not self.snr.is_connected():
+                raise ConnectionDead('Could not connect within %s seconds' % (timeout, ))
 
     def shutdown(self, wait=True):
         """
diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py
index 9219ef7..bb5b752 100644
--- a/coolamqp/clustering/single.py
+++ b/coolamqp/clustering/single.py
@@ -29,13 +29,13 @@ class SingleNodeReconnector(object):
     def is_connected(self):
         return self.connection is not None
 
-    def connect(self):
+    def connect(self, timeout):
         assert self.connection is None
 
         # Initiate connecting - this order is very important!
         self.connection = Connection(self.node_def, self.listener_thread)
         self.attache_group.attach(self.connection)
-        self.connection.start()
+        self.connection.start(timeout)
         self.connection.finalize.add(self.on_fail)
 
     def _on_fail(self):
diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py
index 73b3dbe..e1ea187 100644
--- a/coolamqp/uplink/connection/connection.py
+++ b/coolamqp/uplink/connection/connection.py
@@ -2,10 +2,12 @@
 from __future__ import absolute_import, division, print_function
 import logging
 import collections
+import monotonic
 import time
 import socket
 import six
 
+from coolamqp.exceptions import ConnectionDead
 from coolamqp.uplink.connection.recv_framer import ReceivingFramer
 from coolamqp.uplink.connection.send_framer import SendingFramer
 from coolamqp.framing.frames import AMQPMethodFrame
@@ -122,7 +124,7 @@ class Connection(object):
         while len(self.callables_on_connected) > 0:
             self.callables_on_connected.pop()()
 
-    def start(self):
+    def start(self, timeout):
         """
         Start processing events for this connect. Create the socket,
         transmit 'AMQP\x00\x00\x09\x01' and roll.
@@ -131,13 +133,15 @@ class Connection(object):
         """
 
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
+        start_at = monotonic.monotonic()
         while True:
             try:
                 sock.connect(
                     (self.node_definition.host, self.node_definition.port))
             except socket.error as e:
                 time.sleep(0.5)  # Connection refused? Very bad things?
+                if monotonic.monotonic() - start_at < timeout:
+                    raise ConnectionDead()
             else:
                 break
 
diff --git a/setup.cfg b/setup.cfg
index daeed6a..eea9d2d 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,7 +1,7 @@
 [metadata]
 description-file = README.md
 name = CoolAMQP
-version = 0.94rc1
+version = 0.95
 license = MIT License
 classifiers = 
     Programming Language :: Python
-- 
GitLab