From 12f6c79bf30010c81b603a3d67cba0e6e364ba65 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl>
Date: Wed, 13 Nov 2024 16:05:39 +0100
Subject: [PATCH] try to fix this

---
 CHANGELOG.md                          |   1 +
 LICENSE.md                            |   3 +-
 coolamqp/__init__.py                  |   2 +-
 coolamqp/attaches/consumer.py         |  22 ++-
 coolamqp/clustering/cluster.py        |   2 +-
 coolamqp/objects.py                   |  23 +--
 coolamqp/uplink/handshake.py          |   2 +-
 docs/conf.py                          |  12 +-
 docs/frames.rst                       | 196 +++++++++++++++++++++++++-
 docs/index.rst                        |   1 -
 tests/test_clustering/test_streams.py |  50 +++++++
 11 files changed, 286 insertions(+), 28 deletions(-)
 create mode 100644 tests/test_clustering/test_streams.py

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 169ff00..f7b61ea 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,7 @@ v2.0.1
 
 * removed extra logging from argumentify
 * user will be notified upon declaring an auto-delete durable exchange
+* deprecated Consumer(fail_on_first_time_resource_locked)
 
 v2.0.0
 ======
diff --git a/LICENSE.md b/LICENSE.md
index d344e24..7db6dab 100644
--- a/LICENSE.md
+++ b/LICENSE.md
@@ -1,7 +1,6 @@
 The MIT License (MIT)
 
-Copyright (c) 2016-2018 DMS Serwis s. c.
-Copyright (c) 2018-2024 SMOK sp. z o. o.
+Copyright (c) 2016-2024 Piotr Maślanka
 
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py
index 0075adf..6205ab1 100644
--- a/coolamqp/__init__.py
+++ b/coolamqp/__init__.py
@@ -1 +1 @@
-__version__ = '2.0.1a1'
+__version__ = '2.0.1a2'
diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py
index d75e753..ddf40ab 100644
--- a/coolamqp/attaches/consumer.py
+++ b/coolamqp/attaches/consumer.py
@@ -18,6 +18,7 @@ from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \
     BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, BasicQosOk
 from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame
 from coolamqp.objects import Callable
+from coolamqp.argumentify import argumentify
 from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch
 
 logger = logging.getLogger(__name__)
@@ -59,7 +60,7 @@ class Consumer(Channeler):
     go.
 
     WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS
-     DO!
+    DO!
 
     You can subscribe to be informed when the consumer is cancelled (for any
     reason, server or client side) with:
@@ -98,23 +99,30 @@ class Consumer(Channeler):
         If the consumer doesn't get the chance to be declared - because
         of a connection fail - next reconnect will consider this to be
         SECOND declaration, ie. it will retry ad infinitum
+
+    .. deprecated:: v2.0.1
+            Use normal reconnects, for fuck's sake!
+
     :type fail_on_first_time_resource_locked: bool
     :param body_receive_mode: how should message.body be received. This
         has a performance impact
-    :type body_receive_mode: a property of BodyReceiveMode
+    :type body_receive_mode: a property of :classs:`BodyReceiveMode`
+    :param arguments: a dictionary, extra set of arguments to be provided to RabbitMQ during binding.
+        Primarily to support streams.
     """
     __slots__ = ('queue', 'no_ack', 'on_message', 'cancelled', 'receiver',
                  'attache_group', 'channel_close_sent', 'qos', 'qos_update_sent',
                  'future_to_notify', 'future_to_notify_on_dead',
                  'fail_on_first_time_resource_locked',
                  'body_receive_mode', 'consumer_tag', 'on_cancel', 'on_broker_cancel',
-                 'hb_watch', 'deliver_watch', 'span')
+                 'hb_watch', 'deliver_watch', 'span', 'arguments')
 
     def __init__(self, queue, on_message, span=None,
                  no_ack=True, qos=0,
                  future_to_notify=None,
                  fail_on_first_time_resource_locked=False,
-                 body_receive_mode=BodyReceiveMode.BYTES
+                 body_receive_mode=BodyReceiveMode.BYTES,
+                 arguments=None
                  ):
         """
         Note that if you specify QoS, it is applied before basic.consume is
@@ -125,8 +133,12 @@ class Consumer(Channeler):
 
         self.span = span
         self.queue = queue
+        self.arguments = argumentify(arguments)
         self.no_ack = no_ack
 
+        if fail_on_first_time_resource_locked:
+            warnings.warn('This is heavily deprecated and discouraged', DeprecationWarning)
+
         self.on_message = on_message
 
         # consumer?
@@ -428,7 +440,7 @@ class Consumer(Channeler):
             self.method_and_watch(
                 BasicConsume(self.queue.name, self.consumer_tag,
                              False, self.no_ack, self.queue.exclusive, False,
-                             []),
+                             self.arguments),
                 BasicConsumeOk,
                 self.on_setup
             )
diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py
index f81e364..1699259 100644
--- a/coolamqp/clustering/cluster.py
+++ b/coolamqp/clustering/cluster.py
@@ -193,7 +193,7 @@ class Cluster(object):
         Take care not to lose the Consumer object - it's the only way to cancel a consumer!
 
         .. note:: You don't need to explicitly declare queues and exchanges that you will be using beforehand,
-                  this will do this for you on the same channel.
+                  this will do this for you on the same channel, which is also the only way to use anonymous queues.
 
         If accepts more arguments. Consult :class:`coolamqp.attaches.consumer.Consumer` for details.
 
diff --git a/coolamqp/objects.py b/coolamqp/objects.py
index ac92a97..3d7cad6 100644
--- a/coolamqp/objects.py
+++ b/coolamqp/objects.py
@@ -27,7 +27,7 @@ class MessageProperties(BasicContentPropertyList):
         :type content_type: binary type (max length 255) (AMQP as shortstr)
         :param content_encoding: MIME content encoding
         :type content_encoding: binary type (max length 255) (AMQP as shortstr)
-        :param headers: message header field table
+        :param headers: message header field table. You can pass a dictionary here safely.
         :type headers: table. See coolamqp.uplink.framing.field_table (AMQP as table)
         :param delivery_mode: non-persistent (1) or persistent (2)
         :type delivery_mode: int, 8 bit unsigned (AMQP as octet)
@@ -95,25 +95,26 @@ class Message(object):
 
     :param body: stream of octets
     :type body: anything with a buffer interface
-    :param properties: AMQP properties to be sent along.
-                       default is 'no properties at all'
-                       You can pass a dict - it will be passed to
-                       MessageProperties,
-                       but it's slow - don't do that.
+    :param properties: AMQP properties to be sent along. default is 'no properties at all'.
+                       You can pass a dict - it will be passed to MessageProperties, but it's slow - don't do that.
     :type properties: :class:`coolamqp.objects.MessageProperties` instance
     """
     __slots__ = ('body', 'properties')
 
-    Properties = MessageProperties  # an alias for easier use
+    #: an alias for easier use
+    Properties = MessageProperties
 
-    def __init__(self, body,         # type: bytes
-                 properties=None     # type: tp.Optional[MessageProperties]
+    def __init__(self, body,
+                 properties=None
                  ):
         """
         Create a Message object.
 
         Please take care with passing empty bodies, as py-amqp has some
         failure on it.
+
+        :param body: bytes
+        :param properties: a :class:`~coolamqp.objects.MessageProperties` to send along
         """
         if isinstance(body, six.text_type):
             raise TypeError(u'body cannot be a text type!')
@@ -294,9 +295,9 @@ class Queue(object):
     :raises ValueError: tried to create a queue that was not durable or auto_delete
     :raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous
     :raises ValueError: tried to create a queue that was anonymous and not auto_delete or durable
-    :warning DeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations
+    :warns DeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations
         that will be soon unavailable (eg. RabbitMQ 4.0).
-    :warning UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue
+    :warns UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue
     """
     __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive',
                  'anonymous', 'consumer_tag', 'arguments', 'routing_key', 'arguments_bind')
diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py
index 77d805a..c67cbd8 100644
--- a/coolamqp/uplink/handshake.py
+++ b/coolamqp/uplink/handshake.py
@@ -34,7 +34,7 @@ CLIENT_DATA = [
     # these fields to be of type short-string
     (b'product', (b'CoolAMQP', 'S')),
     (b'version', (__version__.encode('utf8'), 'S')),
-    (b'copyright', (b'Copyright (C) 2016-2024 SMOK sp. z o.o.', 'S')),
+    (b'copyright', (b'Copyright (C) 2016-2024 Piotr Maslanka', 'S')),
     (
         b'information', (
             b'Licensed under the MIT License.\nSee https://git.dms-serwis.com.pl/smokserwis/coolamqp for details',
diff --git a/docs/conf.py b/docs/conf.py
index 9100384..eb8fe69 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -54,8 +54,8 @@ master_doc = 'index'
 
 # General information about the project.
 project = u'CoolAMQP'
-copyright = u'2016-2024, SMOK sp. z o. o.'
-author = u'SMOK sp. z o. o.'
+copyright = u'2016-2024, Piotr Maślanka'
+author = u'Piotr Maślanka'
 
 # The version info for the project you're documenting, acts as replacement for
 # |version| and |release|, also used in various other places throughout the
@@ -73,7 +73,7 @@ release = __version__
 #
 # This is also used if you do content translation via gettext catalogs.
 # Usually you set "language" from the command line for these cases.
-language = None
+language = 'en'
 
 # List of patterns, relative to source directory, that match files and
 # directories to ignore when looking for source files.
@@ -102,7 +102,7 @@ html_theme = 'alabaster'
 # Add any paths that contain custom static files (such as style sheets) here,
 # relative to this directory. They are copied after the builtin static files,
 # so a file named "default.css" will overwrite the builtin "default.css".
-html_static_path = ['_static']
+#html_static_path = ['_static']
 
 # -- Options for HTMLHelp output ------------------------------------------
 
@@ -134,7 +134,7 @@ latex_elements = {
 #  author, documentclass [howto, manual, or own class]).
 latex_documents = [
     (master_doc, 'CoolAMQP.tex', u'CoolAMQP Documentation',
-     u'DMS Serwis s.c.', 'manual'),
+     u'Piotr Maślanka', 'manual'),
 ]
 
 # -- Options for manual page output ---------------------------------------
@@ -173,6 +173,8 @@ with open('frames.rst', 'w') as f_out:
     f_out.write('''===========================
 Glossary of all AMQP frames
 ===========================
+
+Please note that this is automatically generated.
 ''')
     for class_ in BINARY_HEADER_TO_METHOD.values():
         f_out.write('.. autoclass:: coolamqp.framing.definitions.%s\n    :members:\n\n' % (
diff --git a/docs/frames.rst b/docs/frames.rst
index ace0fdc..fac9537 100644
--- a/docs/frames.rst
+++ b/docs/frames.rst
@@ -1 +1,195 @@
-To be automatically filled in during build by conf.py
+===========================
+Glossary of all AMQP frames
+===========================
+.. autoclass:: coolamqp.framing.definitions.ConnectionBlocked
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionClose
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionCloseOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionOpen
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionOpenOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionStart
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionSecure
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionStartOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionSecureOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionTune
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionTuneOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionUpdateSecret
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionUnblocked
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConnectionUpdateSecretOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ChannelClose
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ChannelCloseOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ChannelFlow
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ChannelFlowOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ChannelOpen
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ChannelOpenOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ExchangeBind
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ExchangeBindOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ExchangeDeclare
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ExchangeDelete
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ExchangeDeclareOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ExchangeDeleteOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ExchangeUnbind
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ExchangeUnbindOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.QueueBind
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.QueueBindOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.QueueDeclare
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.QueueDelete
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.QueueDeclareOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.QueueDeleteOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.QueuePurge
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.QueuePurgeOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.QueueUnbind
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.QueueUnbindOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicAck
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicConsume
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicCancel
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicConsumeOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicCancelOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicDeliver
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicGet
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicGetOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicGetEmpty
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicNack
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicPublish
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicQos
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicQosOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicReturn
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicReject
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicRecoverAsync
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicRecover
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.BasicRecoverOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.TxCommit
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.TxCommitOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.TxRollback
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.TxRollbackOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.TxSelect
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.TxSelectOk
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConfirmSelect
+    :members:
+
+.. autoclass:: coolamqp.framing.definitions.ConfirmSelectOk
+    :members:
+
diff --git a/docs/index.rst b/docs/index.rst
index 5c924b4..c13d77f 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -6,7 +6,6 @@ Welcome to CoolAMQP's documentation!
     :caption: Contents
 
     whatsnew
-    cluster
     tutorials
     how-to-guide
     caveats
diff --git a/tests/test_clustering/test_streams.py b/tests/test_clustering/test_streams.py
new file mode 100644
index 0000000..65a08af
--- /dev/null
+++ b/tests/test_clustering/test_streams.py
@@ -0,0 +1,50 @@
+# coding=UTF-8
+
+from __future__ import print_function, absolute_import, division
+
+import logging
+import os
+import time
+import unittest
+
+from coolamqp.clustering import Cluster
+from coolamqp.exceptions import ConnectionDead
+from coolamqp.objects import NodeDefinition, Queue, Exchange, Message
+
+NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
+logging.basicConfig(level=logging.DEBUG)
+logging.getLogger('coolamqp').setLevel(logging.DEBUG)
+logger = logging.getLogger(__name__)
+
+
+class TestConnecting(unittest.TestCase):
+
+    def test_argumented_exchange(self):
+        c = Cluster(NODE)
+        c.start(wait=True, timeout=None)
+        if c.properties.version[0] < 4:
+            c.shutdown(wait=True)
+            return
+
+        stream = Queue('my-stream', durable=True, auto_delete=False, exclusive=False,
+                       arguments={'x-queue-type': 'stream'})
+        c.declare(stream)
+        for i in range(10):
+            c.publish(Message(('dupa%s' % (i, )).encode('utf-8')), routing_key='my-stream', confirm=True).result()
+
+        test = {'a': 0}
+
+        def handle_msg(msg):
+            test['a'] += 1
+
+        cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': '3'}, no_ack=True)
+        fut.result()
+        logger.warning(repr(test))
+        time.sleep(3)
+        cons.cancel()
+
+        cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': 'first'}, no_ack=True)
+        fut.result()
+        time.sleep(3)
+        cons.cancel()
+        logger.warning(repr(test))
-- 
GitLab