diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 9f31d190bf9bbc4c01ecaf6c6f76ed7eb2e962e4..57f16a75153a6e76c8079d4aac14c492599ca1a8 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -7,10 +7,12 @@ import six import logging import warnings import time +import monotonic from coolamqp.uplink import ListenerThread from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.objects import Exchange +from coolamqp.exceptions import ConnectionDead from concurrent.futures import Future from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ @@ -165,7 +167,7 @@ class Cluster(object): raise NotImplementedError( u'Sorry, this functionality is not yet implemented!') - def start(self, wait=True): + def start(self, wait=True, timeout=10.0): """ Connect to broker. Initialize Cluster. @@ -173,7 +175,9 @@ class Cluster(object): It is not safe to fork after this. :param wait: block until connection is ready + :param timeout: timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised :raise RuntimeError: called more than once + :raise ConnectionDead: failed to connect within timeout """ try: @@ -206,12 +210,15 @@ class Cluster(object): self.listener.init() self.listener.start() - self.snr.connect() + self.snr.connect(timeout=timeout) # todo not really elegant if wait: - while not self.snr.is_connected(): + start_at = monotonic.monotonic() + while not self.snr.is_connected() and monotonic.monotonic() - start_at < timeout: time.sleep(0.1) + if not self.snr.is_connected(): + raise ConnectionDead('Could not connect within %s seconds' % (timeout, )) def shutdown(self, wait=True): """ diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index 9219ef71051f4114cd7b5d8804848e632aeccdd4..bb5b752914a5d0cda645536cc8eb318d595075df 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -29,13 +29,13 @@ class SingleNodeReconnector(object): def is_connected(self): return self.connection is not None - def connect(self): + def connect(self, timeout): assert self.connection is None # Initiate connecting - this order is very important! self.connection = Connection(self.node_def, self.listener_thread) self.attache_group.attach(self.connection) - self.connection.start() + self.connection.start(timeout) self.connection.finalize.add(self.on_fail) def _on_fail(self): diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 73b3dbe1be70a8e9b0170304fff6dbd6324ed1e9..e1ea187726ab46d63d0c61e5f68fc37b41f29ba0 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -2,10 +2,12 @@ from __future__ import absolute_import, division, print_function import logging import collections +import monotonic import time import socket import six +from coolamqp.exceptions import ConnectionDead from coolamqp.uplink.connection.recv_framer import ReceivingFramer from coolamqp.uplink.connection.send_framer import SendingFramer from coolamqp.framing.frames import AMQPMethodFrame @@ -122,7 +124,7 @@ class Connection(object): while len(self.callables_on_connected) > 0: self.callables_on_connected.pop()() - def start(self): + def start(self, timeout): """ Start processing events for this connect. Create the socket, transmit 'AMQP\x00\x00\x09\x01' and roll. @@ -131,13 +133,15 @@ class Connection(object): """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - + start_at = monotonic.monotonic() while True: try: sock.connect( (self.node_definition.host, self.node_definition.port)) except socket.error as e: time.sleep(0.5) # Connection refused? Very bad things? + if monotonic.monotonic() - start_at < timeout: + raise ConnectionDead() else: break diff --git a/setup.cfg b/setup.cfg index daeed6a7dba54953da7a5515fecd5d090e4028e0..eea9d2d715363a675e989b14a51163df080d86f4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] description-file = README.md name = CoolAMQP -version = 0.94rc1 +version = 0.95 license = MIT License classifiers = Programming Language :: Python