Skip to content
Snippets Groups Projects
Commit 39465a0b authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

on_fail will be called only if Cluster was connected, v1.0.5

parent 9f327fd4
No related branches found
No related tags found
No related merge requests found
# v1.0.5:
* `on_fail` will be called only if the Cluster has been
connected at least once
# v1.0.4: # v1.0.4:
* add missing locals, which rendered CoolAMQP to be unable to process some messages * add missing locals, which rendered CoolAMQP to be unable to process some messages
......
# coding=UTF-8 # coding=UTF-8
__version__ = '1.0.4' __version__ = '1.0.5'
...@@ -80,7 +80,7 @@ class Publisher(Channeler, Synchronized): ...@@ -80,7 +80,7 @@ class Publisher(Channeler, Synchronized):
class UnusablePublisher(Exception): class UnusablePublisher(Exception):
"""This publisher will never work (eg. MODE_CNPUB on a broker not supporting publisher confirms)""" """This publisher will never work (eg. MODE_CNPUB on a broker not supporting publisher confirms)"""
def __init__(self, mode): def __init__(self, mode, cluster_to_set_connected_upon_first_connect=None):
Channeler.__init__(self) Channeler.__init__(self)
Synchronized.__init__(self) Synchronized.__init__(self)
...@@ -94,7 +94,7 @@ class Publisher(Channeler, Synchronized): ...@@ -94,7 +94,7 @@ class Publisher(Channeler, Synchronized):
# Future to confirm or None, flags as tuple|empty tuple # Future to confirm or None, flags as tuple|empty tuple
self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB
self.cluster_to_set_connected_upon_first_connect = cluster_to_set_connected_upon_first_connect
self.critically_failed = False self.critically_failed = False
self.content_flow = True self.content_flow = True
self.blocked = False self.blocked = False
...@@ -313,6 +313,16 @@ class Publisher(Channeler, Synchronized): ...@@ -313,6 +313,16 @@ class Publisher(Channeler, Synchronized):
self.state = ST_ONLINE self.state = ST_ONLINE
self.on_operational(True) self.on_operational(True)
# inform the cluster that we've been connected
try:
self.cluster_to_set_connected_upon_first_connect
except AttributeError:
pass
else:
if self.cluster_to_set_connected_upon_first_connect is not None:
self.cluster_to_set_connected_upon_first_connect.connected = True
del self.cluster_to_set_connected_upon_first_connect
# now we need to listen for BasicAck and BasicNack # now we need to listen for BasicAck and BasicNack
mw = MethodWatch(self.channel_id, (BasicAck, BasicNack), mw = MethodWatch(self.channel_id, (BasicAck, BasicNack),
......
...@@ -71,10 +71,11 @@ class Cluster(object): ...@@ -71,10 +71,11 @@ class Cluster(object):
self.extra_properties = extra_properties self.extra_properties = extra_properties
self.log_frames = log_frames self.log_frames = log_frames
self.on_blocked = on_blocked self.on_blocked = on_blocked
self.connected = False
if on_fail is not None: if on_fail is not None:
def decorated(): def decorated():
if not self.listener.terminating: if not self.listener.terminating and self.connected:
on_fail() on_fail()
self.on_fail = decorated self.on_fail = decorated
...@@ -221,7 +222,7 @@ class Cluster(object): ...@@ -221,7 +222,7 @@ class Cluster(object):
self.snr = SingleNodeReconnector(self.node, self.attache_group, self.snr = SingleNodeReconnector(self.node, self.attache_group,
self.listener, self.extra_properties, self.listener, self.extra_properties,
log_frames, self.name) log_frames, self.name, self)
self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost()))
if self.on_fail is not None: if self.on_fail is not None:
self.snr.on_fail.add(self.on_fail) self.snr.on_fail.add(self.on_fail)
...@@ -230,7 +231,7 @@ class Cluster(object): ...@@ -230,7 +231,7 @@ class Cluster(object):
self.snr.on_blocked.add(self.on_blocked) self.snr.on_blocked.add(self.on_blocked)
# Spawn a transactional publisher and a noack publisher # Spawn a transactional publisher and a noack publisher
self.pub_tr = Publisher(Publisher.MODE_CNPUB) self.pub_tr = Publisher(Publisher.MODE_CNPUB, cluster_to_set_connected_upon_first_connect=self)
self.pub_na = Publisher(Publisher.MODE_NOACK) self.pub_na = Publisher(Publisher.MODE_NOACK)
self.decl = Declarer() self.decl = Declarer()
...@@ -245,9 +246,9 @@ class Cluster(object): ...@@ -245,9 +246,9 @@ class Cluster(object):
if wait: if wait:
# this is only going to take a short amount of time, so we're fine with polling # this is only going to take a short amount of time, so we're fine with polling
start_at = monotonic.monotonic() start_at = monotonic.monotonic()
while not self.attache_group.is_online() and monotonic.monotonic() - start_at < timeout: while not self.connected and monotonic.monotonic() - start_at < timeout:
time.sleep(0.1) time.sleep(0.1)
if not self.attache_group.is_online(): if not self.connected:
raise ConnectionDead( raise ConnectionDead(
'[%s] Could not connect within %s seconds' % (self.name, timeout,)) '[%s] Could not connect within %s seconds' % (self.name, timeout,))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment