From 9aa067692bc36a37b20b2fc4863070aa69a6f3c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@ericsson.com> Date: Mon, 2 Sep 2024 15:27:25 +0200 Subject: [PATCH] add timeout + docs explaination --- CHANGELOG.md | 1 + coolamqp/__init__.py | 2 +- coolamqp/clustering/cluster.py | 22 ++++++++++++++-------- docs/cluster.rst | 1 + docs/tutorial.rst | 6 ++++++ tests/test_clustering/test_things.py | 5 +++++ 6 files changed, 28 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f74fdd..e44b4d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,3 +6,4 @@ have been made so far, between releases. * removed the requirement for a Queue that for it to be equal to other Queue if their types do match * compile_definitions will now depend on requests +* added support for infinite (None) timeouts during start diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index ae6c60f..ad3558b 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1 @@ -__version__ = '1.3.2b1' +__version__ = '1.3.2b2' diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 268deb4..5e63a50 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -120,7 +120,7 @@ class Cluster(object): dont_trace=False # type: bool ): # type: (...) -> concurrent.futures.Future """ - Declare a Queue/Exchange + Declare a Queue/Exchange. :param obj: Queue/Exchange object :param persistent: should it be redefined upon reconnect? @@ -285,16 +285,18 @@ class Cluster(object): raise NotImplementedError( u'Sorry, this functionality is not yet implemented!') - def start(self, wait=True, timeout=10.0): # type: (bool, float, bool) -> None + def start(self, wait=True, timeout=10.0): """ Connect to broker. Initialize Cluster. Only after this call is Cluster usable. It is not safe to fork after this. - :param wait: block until connection is ready + :param wait: block until connection is ready. If None is given, then start will block as long as necessary. + :type wait: bool :param timeout: timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised + :type timeout: float | int | None :raise RuntimeError: called more than once :raise ConnectionDead: failed to connect within timeout """ @@ -334,11 +336,15 @@ class Cluster(object): if wait: # this is only going to take a short amount of time, so we're fine with polling start_at = monotonic() - while not self.connected and monotonic() - start_at < timeout: - time.sleep(0.1) - if not self.connected: - raise ConnectionDead( - '[%s] Could not connect within %s seconds' % (self.name, timeout,)) + if timeout is None: + while not self.connected: + time.sleep(0.2) + else: + while not self.connected and monotonic() - start_at < timeout: + time.sleep(0.1) + if not self.connected: + raise ConnectionDead( + '[%s] Could not connect within %s seconds' % (self.name, timeout,)) def shutdown(self, wait=True): # type: (bool) -> None """ diff --git a/docs/cluster.rst b/docs/cluster.rst index 2fd0d83..985a868 100644 --- a/docs/cluster.rst +++ b/docs/cluster.rst @@ -4,6 +4,7 @@ CoolAMQP cluster .. autoclass:: coolamqp.clustering.Cluster :members: +.. note:: If environment variable :code:`COOLAMQP_FORCE_SELECT_LISTENER` is defined, select will be used instead of epoll. Publisher --------- diff --git a/docs/tutorial.rst b/docs/tutorial.rst index b3226f8..c49b6cf 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -58,6 +58,12 @@ you must first define a queue, and register a consumer. This will create an auto-delete and exclusive queue. After than, a consumer will be registered for this queue. _no_ack=False_ will mean that we have to manually confirm messages. +.. warning:: if you declare a :class:`coolamqp.objects.Queue` without a name, this client will automatically + generate an UUID-name for you, and verify the queue is auto_delete. Since RabbitMQ supports + `automatic queue name generation <https://www.rabbitmq.com/docs/queues#names>`_, + this client does not use it, because the queue is valid only for the channel that declared it, + and CoolAMQP declares things with a dedicated channel. + You can specify a callback, that will be called with a message if one's received by this consumer. Since we did not do that, this will go to a generic queue belonging to _Cluster_. diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py index 5e84db3..146d3f4 100644 --- a/tests/test_clustering/test_things.py +++ b/tests/test_clustering/test_things.py @@ -23,6 +23,11 @@ class TestConnecting(unittest.TestCase): except ImportError: self.skipTest('ConnectionBlocked not supported!') + def test_wait_timeout_none(self): + c = Cluster([NODE]) + c.start(wait=True, timeout=None) + c.shutdown(wait=True) + def test_on_fail(self): """Assert that on_fail doesn't fire if the cluster fails to connect""" q = {'failed': False} -- GitLab