From 39465a0b9e6172520ee4a37085e9c3c849ad4af9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Wed, 6 May 2020 18:43:04 +0200 Subject: [PATCH] on_fail will be called only if Cluster was connected, v1.0.5 --- CHANGELOG.md | 5 +++++ coolamqp/__init__.py | 2 +- coolamqp/attaches/publisher.py | 14 ++++++++++++-- coolamqp/clustering/cluster.py | 11 ++++++----- 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b98392e..be2c210 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# v1.0.5: + +* `on_fail` will be called only if the Cluster has been + connected at least once + # v1.0.4: * add missing locals, which rendered CoolAMQP to be unable to process some messages diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 836d942..4ecea91 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,2 +1,2 @@ # coding=UTF-8 -__version__ = '1.0.4' +__version__ = '1.0.5' diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 704d3d9..62fd665 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -80,7 +80,7 @@ class Publisher(Channeler, Synchronized): class UnusablePublisher(Exception): """This publisher will never work (eg. MODE_CNPUB on a broker not supporting publisher confirms)""" - def __init__(self, mode): + def __init__(self, mode, cluster_to_set_connected_upon_first_connect=None): Channeler.__init__(self) Synchronized.__init__(self) @@ -94,7 +94,7 @@ class Publisher(Channeler, Synchronized): # Future to confirm or None, flags as tuple|empty tuple self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB - + self.cluster_to_set_connected_upon_first_connect = cluster_to_set_connected_upon_first_connect self.critically_failed = False self.content_flow = True self.blocked = False @@ -313,6 +313,16 @@ class Publisher(Channeler, Synchronized): self.state = ST_ONLINE self.on_operational(True) + # inform the cluster that we've been connected + try: + self.cluster_to_set_connected_upon_first_connect + except AttributeError: + pass + else: + if self.cluster_to_set_connected_upon_first_connect is not None: + self.cluster_to_set_connected_upon_first_connect.connected = True + del self.cluster_to_set_connected_upon_first_connect + # now we need to listen for BasicAck and BasicNack mw = MethodWatch(self.channel_id, (BasicAck, BasicNack), diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index d19baf5..9050f26 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -71,10 +71,11 @@ class Cluster(object): self.extra_properties = extra_properties self.log_frames = log_frames self.on_blocked = on_blocked + self.connected = False if on_fail is not None: def decorated(): - if not self.listener.terminating: + if not self.listener.terminating and self.connected: on_fail() self.on_fail = decorated @@ -221,7 +222,7 @@ class Cluster(object): self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener, self.extra_properties, - log_frames, self.name) + log_frames, self.name, self) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) if self.on_fail is not None: self.snr.on_fail.add(self.on_fail) @@ -230,7 +231,7 @@ class Cluster(object): self.snr.on_blocked.add(self.on_blocked) # Spawn a transactional publisher and a noack publisher - self.pub_tr = Publisher(Publisher.MODE_CNPUB) + self.pub_tr = Publisher(Publisher.MODE_CNPUB, cluster_to_set_connected_upon_first_connect=self) self.pub_na = Publisher(Publisher.MODE_NOACK) self.decl = Declarer() @@ -245,9 +246,9 @@ class Cluster(object): if wait: # this is only going to take a short amount of time, so we're fine with polling start_at = monotonic.monotonic() - while not self.attache_group.is_online() and monotonic.monotonic() - start_at < timeout: + while not self.connected and monotonic.monotonic() - start_at < timeout: time.sleep(0.1) - if not self.attache_group.is_online(): + if not self.connected: raise ConnectionDead( '[%s] Could not connect within %s seconds' % (self.name, timeout,)) -- GitLab