diff --git a/CHANGELOG.md b/CHANGELOG.md index 169ff0099fc176a080aeba437cea992be66c1e53..2c5d1d51872a8e3600f9fdb18b04c16765a4ef80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,13 @@ Previous release notes are hosted on [GitHub](https://github.com/smok-serwis/coolamqp/releases). Since v1.3.2 they'll be put here and in release description. -v2.0.1 +v2.1.0 ====== * removed extra logging from argumentify * user will be notified upon declaring an auto-delete durable exchange +* deprecated Consumer(fail_on_first_time_resource_locked) +* we now support [streams](https://www.rabbitmq.com/docs/streams) v2.0.0 ====== diff --git a/LICENSE.md b/LICENSE.md index d344e241073849aabd4934c86474d007b8aacd5d..7db6dab602dcb8b253002249598de00f41c0c363 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,7 +1,6 @@ The MIT License (MIT) -Copyright (c) 2016-2018 DMS Serwis s. c. -Copyright (c) 2018-2024 SMOK sp. z o. o. +Copyright (c) 2016-2024 Piotr MaĹlanka Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 61ff669294a39e4dbba00e8443cd90022e89e9a4..6351918b6f333e0ca5deac6a588e186a5499f1c3 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ Why CoolAMQP? * [Negative Acknowledgements](https://www.rabbitmq.com/docs/nack) * traceable using [opentracing](https://opentracing.io/) * code coverage is 81% at the moment +* full support for [streams](https://www.rabbitmq.com/docs/streams) * 120 second stress tests are part of each release diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 0075adf5ba041fddb2358ff826f06f855b675d04..3fbef98d59b8db897c47c77c769931ba1b717c86 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1 @@ -__version__ = '2.0.1a1' +__version__ = '2.1.0a1' diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index d75e753b29770a9e6ba961f65d5072c4d1abf9a6..ddf40ab99f3bbe23464ecafed18e71b5db12ee6e 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -18,6 +18,7 @@ from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, BasicQosOk from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame from coolamqp.objects import Callable +from coolamqp.argumentify import argumentify from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch logger = logging.getLogger(__name__) @@ -59,7 +60,7 @@ class Consumer(Channeler): go. WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS - DO! + DO! You can subscribe to be informed when the consumer is cancelled (for any reason, server or client side) with: @@ -98,23 +99,30 @@ class Consumer(Channeler): If the consumer doesn't get the chance to be declared - because of a connection fail - next reconnect will consider this to be SECOND declaration, ie. it will retry ad infinitum + + .. deprecated:: v2.0.1 + Use normal reconnects, for fuck's sake! + :type fail_on_first_time_resource_locked: bool :param body_receive_mode: how should message.body be received. This has a performance impact - :type body_receive_mode: a property of BodyReceiveMode + :type body_receive_mode: a property of :classs:`BodyReceiveMode` + :param arguments: a dictionary, extra set of arguments to be provided to RabbitMQ during binding. + Primarily to support streams. """ __slots__ = ('queue', 'no_ack', 'on_message', 'cancelled', 'receiver', 'attache_group', 'channel_close_sent', 'qos', 'qos_update_sent', 'future_to_notify', 'future_to_notify_on_dead', 'fail_on_first_time_resource_locked', 'body_receive_mode', 'consumer_tag', 'on_cancel', 'on_broker_cancel', - 'hb_watch', 'deliver_watch', 'span') + 'hb_watch', 'deliver_watch', 'span', 'arguments') def __init__(self, queue, on_message, span=None, no_ack=True, qos=0, future_to_notify=None, fail_on_first_time_resource_locked=False, - body_receive_mode=BodyReceiveMode.BYTES + body_receive_mode=BodyReceiveMode.BYTES, + arguments=None ): """ Note that if you specify QoS, it is applied before basic.consume is @@ -125,8 +133,12 @@ class Consumer(Channeler): self.span = span self.queue = queue + self.arguments = argumentify(arguments) self.no_ack = no_ack + if fail_on_first_time_resource_locked: + warnings.warn('This is heavily deprecated and discouraged', DeprecationWarning) + self.on_message = on_message # consumer? @@ -428,7 +440,7 @@ class Consumer(Channeler): self.method_and_watch( BasicConsume(self.queue.name, self.consumer_tag, False, self.no_ack, self.queue.exclusive, False, - []), + self.arguments), BasicConsumeOk, self.on_setup ) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index f81e3642c6aadc9ec01069ca0cadc0b2f68541eb..1699259ab0f1fec8c79e66dccda91a680b1784ff 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -193,7 +193,7 @@ class Cluster(object): Take care not to lose the Consumer object - it's the only way to cancel a consumer! .. note:: You don't need to explicitly declare queues and exchanges that you will be using beforehand, - this will do this for you on the same channel. + this will do this for you on the same channel, which is also the only way to use anonymous queues. If accepts more arguments. Consult :class:`coolamqp.attaches.consumer.Consumer` for details. diff --git a/coolamqp/objects.py b/coolamqp/objects.py index ac92a97cd78d6b3e57677880d5f62141ae75f563..3d7cad60dc1e8739839140866204ac72876a366a 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -27,7 +27,7 @@ class MessageProperties(BasicContentPropertyList): :type content_type: binary type (max length 255) (AMQP as shortstr) :param content_encoding: MIME content encoding :type content_encoding: binary type (max length 255) (AMQP as shortstr) - :param headers: message header field table + :param headers: message header field table. You can pass a dictionary here safely. :type headers: table. See coolamqp.uplink.framing.field_table (AMQP as table) :param delivery_mode: non-persistent (1) or persistent (2) :type delivery_mode: int, 8 bit unsigned (AMQP as octet) @@ -95,25 +95,26 @@ class Message(object): :param body: stream of octets :type body: anything with a buffer interface - :param properties: AMQP properties to be sent along. - default is 'no properties at all' - You can pass a dict - it will be passed to - MessageProperties, - but it's slow - don't do that. + :param properties: AMQP properties to be sent along. default is 'no properties at all'. + You can pass a dict - it will be passed to MessageProperties, but it's slow - don't do that. :type properties: :class:`coolamqp.objects.MessageProperties` instance """ __slots__ = ('body', 'properties') - Properties = MessageProperties # an alias for easier use + #: an alias for easier use + Properties = MessageProperties - def __init__(self, body, # type: bytes - properties=None # type: tp.Optional[MessageProperties] + def __init__(self, body, + properties=None ): """ Create a Message object. Please take care with passing empty bodies, as py-amqp has some failure on it. + + :param body: bytes + :param properties: a :class:`~coolamqp.objects.MessageProperties` to send along """ if isinstance(body, six.text_type): raise TypeError(u'body cannot be a text type!') @@ -294,9 +295,9 @@ class Queue(object): :raises ValueError: tried to create a queue that was not durable or auto_delete :raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous :raises ValueError: tried to create a queue that was anonymous and not auto_delete or durable - :warning DeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations + :warns DeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations that will be soon unavailable (eg. RabbitMQ 4.0). - :warning UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue + :warns UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue """ __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', 'anonymous', 'consumer_tag', 'arguments', 'routing_key', 'arguments_bind') diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 77d805a6d42aee240b7485149e3abeb4da1d6df6..c67cbd804fe32040a4b9d4bd97ba0770212263f0 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -34,7 +34,7 @@ CLIENT_DATA = [ # these fields to be of type short-string (b'product', (b'CoolAMQP', 'S')), (b'version', (__version__.encode('utf8'), 'S')), - (b'copyright', (b'Copyright (C) 2016-2024 SMOK sp. z o.o.', 'S')), + (b'copyright', (b'Copyright (C) 2016-2024 Piotr Maslanka', 'S')), ( b'information', ( b'Licensed under the MIT License.\nSee https://git.dms-serwis.com.pl/smokserwis/coolamqp for details', diff --git a/docker-compose.yml b/docker-compose.yml index 741ac5b84464571fcd4244ad7aa3dddc0cb09944..c93f04898bd736f4a89f673f2c49502fb7397380 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,8 +1,9 @@ version: '3.2' services: amqp: - image: rabbitmq:3-management + image: rabbitmq:4.0-management unittest: + command: nose2 -vv tests.test_clustering.test_streams build: context: . dockerfile: tests/Dockerfile diff --git a/docs/conf.py b/docs/conf.py index 91003849ab677b6b7ff956a01f472c0b242bdcc9..eb8fe69a70342893ad675fc7d4c99d92bfbcb2fa 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -54,8 +54,8 @@ master_doc = 'index' # General information about the project. project = u'CoolAMQP' -copyright = u'2016-2024, SMOK sp. z o. o.' -author = u'SMOK sp. z o. o.' +copyright = u'2016-2024, Piotr MaĹlanka' +author = u'Piotr MaĹlanka' # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the @@ -73,7 +73,7 @@ release = __version__ # # This is also used if you do content translation via gettext catalogs. # Usually you set "language" from the command line for these cases. -language = None +language = 'en' # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. @@ -102,7 +102,7 @@ html_theme = 'alabaster' # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +#html_static_path = ['_static'] # -- Options for HTMLHelp output ------------------------------------------ @@ -134,7 +134,7 @@ latex_elements = { # author, documentclass [howto, manual, or own class]). latex_documents = [ (master_doc, 'CoolAMQP.tex', u'CoolAMQP Documentation', - u'DMS Serwis s.c.', 'manual'), + u'Piotr MaĹlanka', 'manual'), ] # -- Options for manual page output --------------------------------------- @@ -173,6 +173,8 @@ with open('frames.rst', 'w') as f_out: f_out.write('''=========================== Glossary of all AMQP frames =========================== + +Please note that this is automatically generated. ''') for class_ in BINARY_HEADER_TO_METHOD.values(): f_out.write('.. autoclass:: coolamqp.framing.definitions.%s\n :members:\n\n' % ( diff --git a/docs/frames.rst b/docs/frames.rst index ace0fdc35cc0928549f02087b0593abfe1817f4b..fac953793578fc79bbc56db5d5ba9b6c34cc9b6d 100644 --- a/docs/frames.rst +++ b/docs/frames.rst @@ -1 +1,195 @@ -To be automatically filled in during build by conf.py +=========================== +Glossary of all AMQP frames +=========================== +.. autoclass:: coolamqp.framing.definitions.ConnectionBlocked + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionClose + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionCloseOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionOpen + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionOpenOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionStart + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionSecure + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionStartOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionSecureOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionTune + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionTuneOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionUpdateSecret + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionUnblocked + :members: + +.. autoclass:: coolamqp.framing.definitions.ConnectionUpdateSecretOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ChannelClose + :members: + +.. autoclass:: coolamqp.framing.definitions.ChannelCloseOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ChannelFlow + :members: + +.. autoclass:: coolamqp.framing.definitions.ChannelFlowOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ChannelOpen + :members: + +.. autoclass:: coolamqp.framing.definitions.ChannelOpenOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ExchangeBind + :members: + +.. autoclass:: coolamqp.framing.definitions.ExchangeBindOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ExchangeDeclare + :members: + +.. autoclass:: coolamqp.framing.definitions.ExchangeDelete + :members: + +.. autoclass:: coolamqp.framing.definitions.ExchangeDeclareOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ExchangeDeleteOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ExchangeUnbind + :members: + +.. autoclass:: coolamqp.framing.definitions.ExchangeUnbindOk + :members: + +.. autoclass:: coolamqp.framing.definitions.QueueBind + :members: + +.. autoclass:: coolamqp.framing.definitions.QueueBindOk + :members: + +.. autoclass:: coolamqp.framing.definitions.QueueDeclare + :members: + +.. autoclass:: coolamqp.framing.definitions.QueueDelete + :members: + +.. autoclass:: coolamqp.framing.definitions.QueueDeclareOk + :members: + +.. autoclass:: coolamqp.framing.definitions.QueueDeleteOk + :members: + +.. autoclass:: coolamqp.framing.definitions.QueuePurge + :members: + +.. autoclass:: coolamqp.framing.definitions.QueuePurgeOk + :members: + +.. autoclass:: coolamqp.framing.definitions.QueueUnbind + :members: + +.. autoclass:: coolamqp.framing.definitions.QueueUnbindOk + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicAck + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicConsume + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicCancel + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicConsumeOk + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicCancelOk + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicDeliver + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicGet + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicGetOk + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicGetEmpty + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicNack + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicPublish + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicQos + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicQosOk + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicReturn + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicReject + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicRecoverAsync + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicRecover + :members: + +.. autoclass:: coolamqp.framing.definitions.BasicRecoverOk + :members: + +.. autoclass:: coolamqp.framing.definitions.TxCommit + :members: + +.. autoclass:: coolamqp.framing.definitions.TxCommitOk + :members: + +.. autoclass:: coolamqp.framing.definitions.TxRollback + :members: + +.. autoclass:: coolamqp.framing.definitions.TxRollbackOk + :members: + +.. autoclass:: coolamqp.framing.definitions.TxSelect + :members: + +.. autoclass:: coolamqp.framing.definitions.TxSelectOk + :members: + +.. autoclass:: coolamqp.framing.definitions.ConfirmSelect + :members: + +.. autoclass:: coolamqp.framing.definitions.ConfirmSelectOk + :members: + diff --git a/docs/index.rst b/docs/index.rst index 5c924b47274a1bc294c9284ceeddaa3c16479247..80c661e0652bc413abd0ba6c0e49cc17a7dd69fb 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -6,10 +6,10 @@ Welcome to CoolAMQP's documentation! :caption: Contents whatsnew - cluster tutorials how-to-guide caveats + streams tracing reference frames diff --git a/docs/streams.rst b/docs/streams.rst new file mode 100644 index 0000000000000000000000000000000000000000..b14844bf72bf70f08c6396cfd4e2d949b6cebe0c --- /dev/null +++ b/docs/streams.rst @@ -0,0 +1,11 @@ +Streams +======= + +RabbitMQ 4 added a new feature called `streams <https://www.rabbitmq.com/docs/streams>`_ , perhaps to look more like Kafka. +Anyway, we fully support this feature, but there are a few caveats you must watch out for: + +If you specify argument :code:`x-stream-offset` and aim to provide a number, please don't provide it as a string. +This will cause your connection to RabbitMQ to crash. +You can naturally provide "first" or "next" or "last". + +Basically everything works `as documented <https://www.rabbitmq.com/docs/streams#consuming>`_. Happy usage! diff --git a/tests/test_clustering/test_streams.py b/tests/test_clustering/test_streams.py new file mode 100644 index 0000000000000000000000000000000000000000..871dc22bb1a368dbcf310fb6cc07bee60f9a27eb --- /dev/null +++ b/tests/test_clustering/test_streams.py @@ -0,0 +1,51 @@ +# coding=UTF-8 + +from __future__ import print_function, absolute_import, division + +import logging +import os +import time +import unittest + +from coolamqp.clustering import Cluster +from coolamqp.exceptions import ConnectionDead +from coolamqp.objects import NodeDefinition, Queue, Exchange, Message + +NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG) +logging.getLogger('coolamqp').setLevel(logging.DEBUG) + + +class TestStreams(unittest.TestCase): + + def test_streams(self): + c = Cluster(NODE) + c.start(wait=True, timeout=None) + if not c.properties.properties['version'].startswith('4'): + c.shutdown(wait=True) + return + + stream = Queue('my-stream', durable=True, auto_delete=False, exclusive=False, + arguments={'x-queue-type': 'stream'}) + c.declare(stream).result() + + for i in range(10): + c.publish(Message(('dupa%s' % (i, )).encode('utf-8')), routing_key='my-stream', confirm=True).result() + + test = {'a': 0} + + def handle_msg(msg): + test['a'] += 1 + msg.ack() + + cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': 4}, no_ack=False) + fut.result() + time.sleep(3) + cons.cancel().result() + self.assertEqual(test['a'], 6) + cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': 'first'}, no_ack=False) + fut.result() + time.sleep(3) + cons.cancel() + self.assertEqual(test['a'], 16)