diff --git a/CHANGELOG.md b/CHANGELOG.md index b98392e096da2222d1e3024fa77201e7c041904b..be2c210540362a4ec360f081c0b9fdcbbc725830 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# v1.0.5: + +* `on_fail` will be called only if the Cluster has been + connected at least once + # v1.0.4: * add missing locals, which rendered CoolAMQP to be unable to process some messages diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 836d9429222c660a1bbdbe71ae920dd82b4bdb86..4ecea9114977b5934fce7e9f5ad35095bec07ca8 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,2 +1,2 @@ # coding=UTF-8 -__version__ = '1.0.4' +__version__ = '1.0.5' diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 704d3d958fcd64eff2754c12ce6dafacba538738..62fd665835bff0a545930cf6d872b60833e7ee2b 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -80,7 +80,7 @@ class Publisher(Channeler, Synchronized): class UnusablePublisher(Exception): """This publisher will never work (eg. MODE_CNPUB on a broker not supporting publisher confirms)""" - def __init__(self, mode): + def __init__(self, mode, cluster_to_set_connected_upon_first_connect=None): Channeler.__init__(self) Synchronized.__init__(self) @@ -94,7 +94,7 @@ class Publisher(Channeler, Synchronized): # Future to confirm or None, flags as tuple|empty tuple self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB - + self.cluster_to_set_connected_upon_first_connect = cluster_to_set_connected_upon_first_connect self.critically_failed = False self.content_flow = True self.blocked = False @@ -313,6 +313,16 @@ class Publisher(Channeler, Synchronized): self.state = ST_ONLINE self.on_operational(True) + # inform the cluster that we've been connected + try: + self.cluster_to_set_connected_upon_first_connect + except AttributeError: + pass + else: + if self.cluster_to_set_connected_upon_first_connect is not None: + self.cluster_to_set_connected_upon_first_connect.connected = True + del self.cluster_to_set_connected_upon_first_connect + # now we need to listen for BasicAck and BasicNack mw = MethodWatch(self.channel_id, (BasicAck, BasicNack), diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index d19baf5ea4e5430b67d7bc3051870570b72c798f..9050f2612dab12a87502095f4ab6742b098969c8 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -71,10 +71,11 @@ class Cluster(object): self.extra_properties = extra_properties self.log_frames = log_frames self.on_blocked = on_blocked + self.connected = False if on_fail is not None: def decorated(): - if not self.listener.terminating: + if not self.listener.terminating and self.connected: on_fail() self.on_fail = decorated @@ -221,7 +222,7 @@ class Cluster(object): self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener, self.extra_properties, - log_frames, self.name) + log_frames, self.name, self) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) if self.on_fail is not None: self.snr.on_fail.add(self.on_fail) @@ -230,7 +231,7 @@ class Cluster(object): self.snr.on_blocked.add(self.on_blocked) # Spawn a transactional publisher and a noack publisher - self.pub_tr = Publisher(Publisher.MODE_CNPUB) + self.pub_tr = Publisher(Publisher.MODE_CNPUB, cluster_to_set_connected_upon_first_connect=self) self.pub_na = Publisher(Publisher.MODE_NOACK) self.decl = Declarer() @@ -245,9 +246,9 @@ class Cluster(object): if wait: # this is only going to take a short amount of time, so we're fine with polling start_at = monotonic.monotonic() - while not self.attache_group.is_online() and monotonic.monotonic() - start_at < timeout: + while not self.connected and monotonic.monotonic() - start_at < timeout: time.sleep(0.1) - if not self.attache_group.is_online(): + if not self.connected: raise ConnectionDead( '[%s] Could not connect within %s seconds' % (self.name, timeout,))