diff --git a/CHANGELOG.md b/CHANGELOG.md index 90ff8c8219158e831165681313f148aaec52eb2e..28401d4ada0e3bdd1c66166f67620d1497b2ea6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * split `compile_definitions` into a separate package * exceptions will display their reply_text correctly if fed a memoryview * added Docker-based tests +* far more robust wait in `Cluster.start` made # v0.100: diff --git a/coolamqp/attaches/agroup.py b/coolamqp/attaches/agroup.py index 1433f7f5363b29f1f9043670763c61499b58e8e6..d24901ead24fa586eb4a7d1e24c47b5a92bbef09 100644 --- a/coolamqp/attaches/agroup.py +++ b/coolamqp/attaches/agroup.py @@ -10,8 +10,9 @@ import logging logger = logging.getLogger(__name__) -from coolamqp.attaches.channeler import Attache, ST_OFFLINE +from coolamqp.attaches.channeler import Attache, ST_OFFLINE, ST_ONLINE from coolamqp.attaches.consumer import Consumer +from coolamqp.attaches.publisher import Publisher class AttacheGroup(Attache): @@ -23,6 +24,10 @@ class AttacheGroup(Attache): super(AttacheGroup, self).__init__() self.attaches = [] + # these two to be filled in during add() + self.tx_publisher = None + self.non_tx_publisher = None + def add(self, attache): """ Add an attache to this group. @@ -42,6 +47,12 @@ class AttacheGroup(Attache): if isinstance(attache, Consumer): attache.attache_group = self + if isinstance(attache, Publisher): + if attache.mode == Publisher.MODE_CNPUB: + self.tx_publisher = attache + else: + self.non_tx_publisher = attache + def on_cancel_customer(self, customer): """ Called by a customer, when it's cancelled. @@ -64,3 +75,6 @@ class AttacheGroup(Attache): for attache in self.attaches: if not attache.cancelled: attache.attach(connection) + + def is_online(self): # type: () -> bool + return self.tx_publisher.state == ST_ONLINE and self.non_tx_publisher.state == ST_ONLINE diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 78a9df3a3c2dd7246884f301c983ff4594c7b98d..c9d5d592242c456f0951d798adb72dd3d757c091 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -232,12 +232,12 @@ class Cluster(object): self.listener.start() self.snr.connect(timeout=timeout) - # todo not really elegant if wait: + # this is only going to take a short amount of time, so we're fine with polling start_at = monotonic.monotonic() - while not self.snr.is_connected() and monotonic.monotonic() - start_at < timeout: + while not self.attache_group.is_online() and monotonic.monotonic() - start_at < timeout: time.sleep(0.1) - if not self.snr.is_connected(): + if not self.attache_group.is_online(): raise ConnectionDead('Could not connect within %s seconds' % (timeout,)) def shutdown(self, wait=True): # type: (bool) -> None diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py index 7b1022179627467a899e2a200ef66e99eb010cb8..79a80f4025bd70da2d0c37e9a1547fe0db7d10f1 100644 --- a/tests/test_clustering/test_things.py +++ b/tests/test_clustering/test_things.py @@ -8,6 +8,7 @@ import time, logging, threading, monotonic, warnings from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, \ ReceivedMessage, Exchange from coolamqp.clustering import Cluster, MessageReceived, NothingMuch +from coolamqp.exceptions import ConnectionDead import time @@ -21,8 +22,7 @@ class TestConnecting(unittest.TestCase): c = Cluster( NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'xguest', 'xguest', heartbeat=20), on_fail=lambda: q.update(failed=True)) - c.start() - time.sleep(5) + self.assertRaises(ConnectionDead, c.start) c.shutdown() self.assertTrue(q['failed'])