From 3be70ade98388c8d4ba81b89fcbb5e3b119df0f7 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl>
Date: Fri, 18 Oct 2024 16:15:09 +0200
Subject: [PATCH] fixes #3

---
 CHANGELOG.md                            | 12 ++++
 coolamqp/__init__.py                    |  2 +-
 coolamqp/clustering/cluster.py          | 23 +++----
 coolamqp/objects.py                     | 92 +++++++++++++------------
 docs/advanced.rst                       | 12 ++++
 docs/whatsnew.rst                       |  8 +++
 tests/test_clustering/test_exchanges.py |  7 ++
 tests/test_objects.py                   |  6 ++
 8 files changed, 101 insertions(+), 61 deletions(-)
 create mode 100644 docs/whatsnew.rst

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9661266..24a56d8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,18 @@
 Previous release notes are hosted on [GitHub](https://github.com/smok-serwis/coolamqp/releases).
 Since v1.3.2 they'll be put here and in release description.
 
+# v2.0.0
+========
+
+* changes to Queues:
+  * anonymous queues are back, for usage refer [here](https://smokserwis.docs.smok.co/coolamqp/advanced.html)
+  * changed some default arguments for Queues for them to better make sense
+  * some argument combinations just raise ValueError
+  * PendingDeprecationWarning changed into a DeprecationWarning
+* changes to Cluster:
+  * declare will refuse to declare an anonymous queue
+  * removed publish(tx)
+
 # v1.5.0
 ========
 
diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py
index 77f1c8e..ab3d1d5 100644
--- a/coolamqp/__init__.py
+++ b/coolamqp/__init__.py
@@ -1 +1 @@
-__version__ = '1.5.0'
+__version__ = '1.5.1a1'
diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py
index 82c729d..d8eeb8b 100644
--- a/coolamqp/clustering/cluster.py
+++ b/coolamqp/clustering/cluster.py
@@ -129,7 +129,10 @@ class Cluster(object):
         :param span: optional parent span, if opentracing is installed
         :param dont_trace: if True, a span won't be output
         :return: Future
+        :raises ValueError: tried to declare an anonymous queue
         """
+        if isinstance(obj, Queue) and obj.anonymous:
+            raise ValueError('You cannot declare an anonymous queue!')
         if span is not None and not dont_trace:
             child_span = self._make_span('declare', span)
         else:
@@ -183,6 +186,9 @@ class Cluster(object):
 
         Take care not to lose the Consumer object - it's the only way to cancel a consumer!
 
+        .. note:: You don't need to explicitly declare queues and exchanges that you will be using beforehand,
+                  this will do this for you on the same channel.
+
         :param queue: Queue object, being consumed from right now.
             Note that name of anonymous queue might change at any time!
         :param on_message: callable that will process incoming messages
@@ -232,7 +238,6 @@ class Cluster(object):
     def publish(self, message,  # type: Message
                 exchange=None,  # type: tp.Union[Exchange, str, bytes]
                 routing_key=u'',  # type: tp.Union[str, bytes]
-                tx=None,  # type: tp.Optional[bool]
                 confirm=None,  # type: tp.Optional[bool]
                 span=None,  # type: tp.Optional[opentracing.Span]
                 dont_trace=False    # type: bool
@@ -246,9 +251,8 @@ class Cluster(object):
         :param confirm: Whether to publish it using confirms/transactions.
                         If you choose so, you will receive a Future that can be used
                         to check it broker took responsibility for this message.
-                        Note that if tx if False, and message cannot be delivered to broker at once,
+                        Note that if confirm is False, and message cannot be delivered to broker at once,
                         it will be discarded
-        :param tx: deprecated, alias for confirm
         :param span: optionally, current span, if opentracing is installed
         :param dont_trace: if set to True, a span won't be generated
         :return: Future to be finished on completion or None, is confirm/tx was not chosen
@@ -266,19 +270,8 @@ class Cluster(object):
         if isinstance(routing_key, six.text_type):
             routing_key = routing_key.encode('utf8')
 
-        if tx is not None:  # confirm is a drop-in replacement. tx is unfortunately named
-            warnings.warn(u'Use confirm kwarg instead', DeprecationWarning)
-
-            if confirm is not None:
-                raise RuntimeError(
-                    u'Using both tx= and confirm= at once does not make sense')
-        elif confirm is not None:
-            tx = confirm
-        else:
-            tx = False
-
         try:
-            if tx:
+            if confirm:
                 clb = self.pub_tr
             else:
                 clb = self.pub_na
diff --git a/coolamqp/objects.py b/coolamqp/objects.py
index eb6731d..6261633 100644
--- a/coolamqp/objects.py
+++ b/coolamqp/objects.py
@@ -19,36 +19,33 @@ logger = logging.getLogger(__name__)
 EMPTY_PROPERTIES = MessageProperties()
 
 
+
+def toutf8(q):
+    if isinstance(q, memoryview):
+        q = q.tobytes()
+    return q.decode('utf-8') if isinstance(q, six.binary_type) else q
+
+
+def tobytes(q):
+    return q.encode('utf-8') if isinstance(q, six.text_type) else q
+
+
 def argumentify(arguments):
     if arguments is None:
         return None
     args = []
     if isinstance(arguments, dict):
         for key, value in arguments.items():
-            if not isinstance(key, six.binary_type):
-                key = key.encode('utf-8')
+            key = tobytes(key)
             args.append((key, (value, get_type_for(value))))
     else:
         for key, value in arguments:
-            if not isinstance(key, six.binary_type):
-                key = key.encode('utf-8')
+            key = tobytes(key)
             args.append((key, (value, get_type_for(value))))
 
     return args
 
 
-def toutf8(q):
-    if isinstance(q, six.binary_type):
-        q = q.decode('utf8')
-    return q
-
-
-def tobytes(q):
-    if isinstance(q, six.text_type):
-        q = q.encode('utf8')
-    return q
-
-
 class Callable(object):
     """
     Add a bunch of callables to one list, and just invoke'm.
@@ -88,8 +85,7 @@ class Message(object):
                        You can pass a dict - it will be passed to
                        MessageProperties,
                        but it's slow - don't do that.
-    :type properties: MessageProperties instance, None or a dict (SLOW!)
-
+    :type properties: :class:`coolamqp.objects.MessageProperties` instance
     """
     __slots__ = ('body', 'properties')
 
@@ -231,7 +227,6 @@ class Exchange(object):
 Exchange.direct = Exchange()
 
 
-
 class ServerProperties(object):
     """
     An object describing properties of the target server.
@@ -255,11 +250,8 @@ class ServerProperties(object):
             elif isinstance(prop_value, list):
                 prop_value = [toutf8(prop[0]) for prop in prop_value]
             self.properties[prop_name] = prop_value
-        self.mechanisms = data.mechanisms.tobytes().decode('utf-8').split(' ')
-        self.locales = data.locales.tobytes().decode('utf-8')
-
-    def __str__(self):
-        return '%s %s %s %s' % (self.version, repr(self.properties), self.mechanisms, self.locales)
+        self.mechanisms = toutf8(data.mechanisms).split(' ')
+        self.locales = toutf8(data.locales)
 
 
 class Queue(object):
@@ -267,50 +259,60 @@ class Queue(object):
     This object represents a Queue that applications consume from or publish to.
     Create a queue definition.
 
-    :param name: name of the queue. Generates a random uuid.uuid4().hex if not given. Note that this kind of queue
-                 will probably require to be declared.
+    :param name: name of the queue.
+        None (default) for autogeneration. Just follow the rules for :ref:`anonymq`.
+        If empty string, a UUID name will be generated, and you won't have an anonymous queue anymore.
     :param durable: Is the queue durable?
     :param exchange: Exchange for this queue to bind to. None for no binding.
-    :param exclusive: Is this queue exclusive?
-    :param auto_delete: Is this queue auto_delete ?
+    :param exclusive: This queue will be deleted when the connection closes
+    :param auto_delete: This queue will be deleted when the last consumer unsubscribes
     :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument
-    :warning PendingDeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations
+    :raises ValueError: tried to create a queue that was not durable or auto_delete
+    :raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous
+    :raises ValueError: tried to create a queue that was anonymous and not auto_delete
+    :warning DeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations
         that will be soon unavailable (eg. RabbitMQ 4.0).
+    :warning UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue
     """
     __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive',
                  'anonymous', 'consumer_tag', 'arguments')
 
-    def __init__(self, name=b'',  # type: tp.Union[str, bytes]
+    def __init__(self, name=None,  # type: tp.Union[str, bytes, None]
                  durable=False,  # type: bool
                  exchange=None,  # type: tp.Optional[Exchange]
-                 exclusive=False,  # type: bool
-                 auto_delete=False,  # type: bool
+                 exclusive=True,  # type: bool
+                 auto_delete=True,  # type: bool
                  arguments=None     # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]]
                  ):
-        if not name:
-            name = uuid.uuid4().hex
-        self.name = tobytes(name)  #: public, must be bytes
-        # if name is '', this will be filled in with broker-generated name upon declaration
+        if name is None:
+            self.name = None
+        else:
+            name = uuid.uuid4().hex if not name else name
+            self.name = tobytes(name)
+
         self.durable = durable
         self.exchange = exchange
         self.auto_delete = auto_delete
         self.exclusive = exclusive
         self.arguments = argumentify(arguments)
+        self.anonymous = self.name is None  # if this queue is anonymous, it must be regenerated upon reconnect
 
         if self.auto_delete and self.durable:
-            warnings.warn('This will be removed in RabbitMQ 4.0', PendingDeprecationWarning)
+            raise ValueError('Cannot create an auto_delete and exclusive queue')
 
-        if self.auto_delete and not self.exclusive:
-            warnings.warn('This will be removed in RabbitMQ 4.0', PendingDeprecationWarning)
+        if not self.anonymous:
+            if self.auto_delete or self.exclusive:
+                warnings.warn('This may cause unpredictable behaviour', UserWarning)
+        elif self.durable:
+            raise ValueError('Cannot declare an anonymous durable queue')
 
-        self.anonymous = not len(
-            self.name)  # if this queue is anonymous, it must be regenerated upon reconnect
+        if self.auto_delete and not self.exclusive and not self.anonymous:
+            raise ValueError('Cannot create an auto_delete and durable queue non-anonymous')
 
-        self.consumer_tag = self.name if not self.anonymous else uuid.uuid4().hex.encode(
-            'utf8')  # bytes, consumer tag to use in AMQP comms
+        self.consumer_tag = self.name if not self.anonymous else tobytes(uuid.uuid4().hex)
 
-        assert isinstance(self.name, six.binary_type)
-        assert isinstance(self.consumer_tag, six.binary_type)
+        if not self.exclusive and self.auto_delete:
+            warnings.warn('This will be removed in RabbitMQ 4.0', DeprecationWarning)
 
     def __eq__(self, other):
         return self.name == other.name
diff --git a/docs/advanced.rst b/docs/advanced.rst
index 2f743c9..de6b35f 100644
--- a/docs/advanced.rst
+++ b/docs/advanced.rst
@@ -3,3 +3,15 @@ Advanced things
 
 .. autoclass:: coolamqp.uplink.connection.Connection
     :members:
+
+
+Declaring anonymous queues
+--------------------------
+
+.. _anonymq:
+
+In order to make use of an anonymous queue, you must first :meth:`coolamqp.clustering.Cluster.consume` it, since
+:meth:`coolamqp.clustering.Cluster.declare` will use a separate channel, in which the queue will be invalid. It will
+raise ValueError if you try to do that, anyway.
+
+Anonymous queues must be auto_delete and exclusive, ValueError will be raised otherwise.
diff --git a/docs/whatsnew.rst b/docs/whatsnew.rst
new file mode 100644
index 0000000..ff6ef63
--- /dev/null
+++ b/docs/whatsnew.rst
@@ -0,0 +1,8 @@
+What's new?
+===========
+
+CoolAMQP 2.0.0 marks a slight philosophy shift. Whereas 1.x used auto-generated UUID names, 2.0 will let the server
+pick their names for themselves.
+
+It also forbids some combinations of Queue arguments, and makes the default values more palatable, so for example
+a naked :class:`coolamqp.objects.Queue` will be anonymous, non-durable, exclusive and auto-delete.
\ No newline at end of file
diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py
index 206ec06..f6e8f93 100644
--- a/tests/test_clustering/test_exchanges.py
+++ b/tests/test_clustering/test_exchanges.py
@@ -23,6 +23,13 @@ class TestExchanges(unittest.TestCase):
     def tearDown(self):
         self.c.shutdown()
 
+    def test_declare_anonymq(self):
+        que = Queue(auto_delete=True)
+        self.assertRaises(ValueError, self.c.declare, que)
+        cons, fut = self.c.consume(que, no_ack=True)
+        fut.result()
+        cons.cancel().result()
+
     def test_deadlettering(self):
         xchg_name = uuid.uuid4().hex
         dead_queue_name = uuid.uuid4().hex
diff --git a/tests/test_objects.py b/tests/test_objects.py
index 9fbf5b1..49e119f 100644
--- a/tests/test_objects.py
+++ b/tests/test_objects.py
@@ -16,6 +16,12 @@ logging.getLogger('coolamqp').setLevel(logging.DEBUG)
 
 class TestObjects(unittest.TestCase):
 
+    def test_queue_failures(self):
+        self.assertRaises(ValueError, Queue, None, durable=True)
+        self.assertRaises(ValueError, Queue, 'test', auto_delete=True, durable=True)
+        self.assertRaises(ValueError, Queue, None, auto_delete=False)
+        self.assertRaises(ValueError, Queue, 'test', auto_delete=True, exclusive=False)
+
     def test_queue_declare(self):
         args = argumentify({'x-dead-letter-exchange': 'deadletter',
                                            'x-message-ttl': 1000})
-- 
GitLab