diff --git a/CHANGELOG.md b/CHANGELOG.md index f7b61ea3417782ca57b4f8feedb40099f605fe89..2c5d1d51872a8e3600f9fdb18b04c16765a4ef80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,12 +1,13 @@ Previous release notes are hosted on [GitHub](https://github.com/smok-serwis/coolamqp/releases). Since v1.3.2 they'll be put here and in release description. -v2.0.1 +v2.1.0 ====== * removed extra logging from argumentify * user will be notified upon declaring an auto-delete durable exchange * deprecated Consumer(fail_on_first_time_resource_locked) +* we now support [streams](https://www.rabbitmq.com/docs/streams) v2.0.0 ====== diff --git a/README.md b/README.md index 61ff669294a39e4dbba00e8443cd90022e89e9a4..6351918b6f333e0ca5deac6a588e186a5499f1c3 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ Why CoolAMQP? * [Negative Acknowledgements](https://www.rabbitmq.com/docs/nack) * traceable using [opentracing](https://opentracing.io/) * code coverage is 81% at the moment +* full support for [streams](https://www.rabbitmq.com/docs/streams) * 120 second stress tests are part of each release diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 6205ab18e790c6c301e521564ebccf7d3b6c0814..3fbef98d59b8db897c47c77c769931ba1b717c86 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1 @@ -__version__ = '2.0.1a2' +__version__ = '2.1.0a1' diff --git a/docker-compose.yml b/docker-compose.yml index 741ac5b84464571fcd4244ad7aa3dddc0cb09944..c93f04898bd736f4a89f673f2c49502fb7397380 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,8 +1,9 @@ version: '3.2' services: amqp: - image: rabbitmq:3-management + image: rabbitmq:4.0-management unittest: + command: nose2 -vv tests.test_clustering.test_streams build: context: . dockerfile: tests/Dockerfile diff --git a/docs/index.rst b/docs/index.rst index c13d77f1f1fe16fb3476c5e19af825cf89c27e53..80c661e0652bc413abd0ba6c0e49cc17a7dd69fb 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -9,6 +9,7 @@ Welcome to CoolAMQP's documentation! tutorials how-to-guide caveats + streams tracing reference frames diff --git a/docs/streams.rst b/docs/streams.rst new file mode 100644 index 0000000000000000000000000000000000000000..b14844bf72bf70f08c6396cfd4e2d949b6cebe0c --- /dev/null +++ b/docs/streams.rst @@ -0,0 +1,11 @@ +Streams +======= + +RabbitMQ 4 added a new feature called `streams <https://www.rabbitmq.com/docs/streams>`_ , perhaps to look more like Kafka. +Anyway, we fully support this feature, but there are a few caveats you must watch out for: + +If you specify argument :code:`x-stream-offset` and aim to provide a number, please don't provide it as a string. +This will cause your connection to RabbitMQ to crash. +You can naturally provide "first" or "next" or "last". + +Basically everything works `as documented <https://www.rabbitmq.com/docs/streams#consuming>`_. Happy usage! diff --git a/tests/test_clustering/test_streams.py b/tests/test_clustering/test_streams.py index 65a08af037eaf0f9b5028ed47794703165c54903..871dc22bb1a368dbcf310fb6cc07bee60f9a27eb 100644 --- a/tests/test_clustering/test_streams.py +++ b/tests/test_clustering/test_streams.py @@ -12,23 +12,24 @@ from coolamqp.exceptions import ConnectionDead from coolamqp.objects import NodeDefinition, Queue, Exchange, Message NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) +logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) logging.getLogger('coolamqp').setLevel(logging.DEBUG) -logger = logging.getLogger(__name__) -class TestConnecting(unittest.TestCase): +class TestStreams(unittest.TestCase): - def test_argumented_exchange(self): + def test_streams(self): c = Cluster(NODE) c.start(wait=True, timeout=None) - if c.properties.version[0] < 4: + if not c.properties.properties['version'].startswith('4'): c.shutdown(wait=True) return stream = Queue('my-stream', durable=True, auto_delete=False, exclusive=False, arguments={'x-queue-type': 'stream'}) - c.declare(stream) + c.declare(stream).result() + for i in range(10): c.publish(Message(('dupa%s' % (i, )).encode('utf-8')), routing_key='my-stream', confirm=True).result() @@ -36,15 +37,15 @@ class TestConnecting(unittest.TestCase): def handle_msg(msg): test['a'] += 1 + msg.ack() - cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': '3'}, no_ack=True) + cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': 4}, no_ack=False) fut.result() - logger.warning(repr(test)) time.sleep(3) - cons.cancel() - - cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': 'first'}, no_ack=True) + cons.cancel().result() + self.assertEqual(test['a'], 6) + cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': 'first'}, no_ack=False) fut.result() time.sleep(3) cons.cancel() - logger.warning(repr(test)) + self.assertEqual(test['a'], 16)