diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f74fdd6d214281853e60f4c8e2828d34d639ec1..e44b4d0ebb26e0372f92828700ad9dbd95f77f78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,3 +6,4 @@ have been made so far, between releases. * removed the requirement for a Queue that for it to be equal to other Queue if their types do match * compile_definitions will now depend on requests +* added support for infinite (None) timeouts during start diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index ae6c60ffb74961cb39cf9074057542a1a85d523d..ad3558b61d339181ca1c5f415ffc3b98d4bc74fa 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1 @@ -__version__ = '1.3.2b1' +__version__ = '1.3.2b2' diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 268deb4b5b1784d6a872bf7852bc00b362fc149c..5e63a5058773edf3ee1a938e6e613f9af7910079 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -120,7 +120,7 @@ class Cluster(object): dont_trace=False # type: bool ): # type: (...) -> concurrent.futures.Future """ - Declare a Queue/Exchange + Declare a Queue/Exchange. :param obj: Queue/Exchange object :param persistent: should it be redefined upon reconnect? @@ -285,16 +285,18 @@ class Cluster(object): raise NotImplementedError( u'Sorry, this functionality is not yet implemented!') - def start(self, wait=True, timeout=10.0): # type: (bool, float, bool) -> None + def start(self, wait=True, timeout=10.0): """ Connect to broker. Initialize Cluster. Only after this call is Cluster usable. It is not safe to fork after this. - :param wait: block until connection is ready + :param wait: block until connection is ready. If None is given, then start will block as long as necessary. + :type wait: bool :param timeout: timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised + :type timeout: float | int | None :raise RuntimeError: called more than once :raise ConnectionDead: failed to connect within timeout """ @@ -334,11 +336,15 @@ class Cluster(object): if wait: # this is only going to take a short amount of time, so we're fine with polling start_at = monotonic() - while not self.connected and monotonic() - start_at < timeout: - time.sleep(0.1) - if not self.connected: - raise ConnectionDead( - '[%s] Could not connect within %s seconds' % (self.name, timeout,)) + if timeout is None: + while not self.connected: + time.sleep(0.2) + else: + while not self.connected and monotonic() - start_at < timeout: + time.sleep(0.1) + if not self.connected: + raise ConnectionDead( + '[%s] Could not connect within %s seconds' % (self.name, timeout,)) def shutdown(self, wait=True): # type: (bool) -> None """ diff --git a/docs/cluster.rst b/docs/cluster.rst index 2fd0d83d71d5484376879775c8b97c203b1844c7..985a868d2e161e67ff5fad9f601da863c30d5c68 100644 --- a/docs/cluster.rst +++ b/docs/cluster.rst @@ -4,6 +4,7 @@ CoolAMQP cluster .. autoclass:: coolamqp.clustering.Cluster :members: +.. note:: If environment variable :code:`COOLAMQP_FORCE_SELECT_LISTENER` is defined, select will be used instead of epoll. Publisher --------- diff --git a/docs/tutorial.rst b/docs/tutorial.rst index b3226f89b57abc8987fcae7b102a45ee9392ab47..c49b6cf38dfc513b82d565f807d048837cefda11 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -58,6 +58,12 @@ you must first define a queue, and register a consumer. This will create an auto-delete and exclusive queue. After than, a consumer will be registered for this queue. _no_ack=False_ will mean that we have to manually confirm messages. +.. warning:: if you declare a :class:`coolamqp.objects.Queue` without a name, this client will automatically + generate an UUID-name for you, and verify the queue is auto_delete. Since RabbitMQ supports + `automatic queue name generation <https://www.rabbitmq.com/docs/queues#names>`_, + this client does not use it, because the queue is valid only for the channel that declared it, + and CoolAMQP declares things with a dedicated channel. + You can specify a callback, that will be called with a message if one's received by this consumer. Since we did not do that, this will go to a generic queue belonging to _Cluster_. diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py index 5e84db3763bb5362502fc8f51a72400df2ac929b..146d3f4c7eecc29eebec2e1d61ba2e3c6821ac92 100644 --- a/tests/test_clustering/test_things.py +++ b/tests/test_clustering/test_things.py @@ -23,6 +23,11 @@ class TestConnecting(unittest.TestCase): except ImportError: self.skipTest('ConnectionBlocked not supported!') + def test_wait_timeout_none(self): + c = Cluster([NODE]) + c.start(wait=True, timeout=None) + c.shutdown(wait=True) + def test_on_fail(self): """Assert that on_fail doesn't fire if the cluster fails to connect""" q = {'failed': False}