Skip to content
Snippets Groups Projects
Commit 0d17445d authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

fixed #15

parent 12f6c79b
No related branches found
No related tags found
1 merge request!11Issue #15
Pipeline #63922 passed with stages
in 2 minutes and 30 seconds
Previous release notes are hosted on [GitHub](https://github.com/smok-serwis/coolamqp/releases).
Since v1.3.2 they'll be put here and in release description.
v2.0.1
v2.1.0
======
* removed extra logging from argumentify
* user will be notified upon declaring an auto-delete durable exchange
* deprecated Consumer(fail_on_first_time_resource_locked)
* we now support [streams](https://www.rabbitmq.com/docs/streams)
v2.0.0
======
......
......@@ -17,6 +17,7 @@ Why CoolAMQP?
* [Negative Acknowledgements](https://www.rabbitmq.com/docs/nack)
* traceable using [opentracing](https://opentracing.io/)
* code coverage is 81% at the moment
* full support for [streams](https://www.rabbitmq.com/docs/streams)
* 120 second stress tests are part of each release
......
__version__ = '2.0.1a2'
__version__ = '2.1.0a1'
version: '3.2'
services:
amqp:
image: rabbitmq:3-management
image: rabbitmq:4.0-management
unittest:
command: nose2 -vv tests.test_clustering.test_streams
build:
context: .
dockerfile: tests/Dockerfile
......
......@@ -9,6 +9,7 @@ Welcome to CoolAMQP's documentation!
tutorials
how-to-guide
caveats
streams
tracing
reference
frames
......
Streams
=======
RabbitMQ 4 added a new feature called `streams <https://www.rabbitmq.com/docs/streams>`_ , perhaps to look more like Kafka.
Anyway, we fully support this feature, but there are a few caveats you must watch out for:
If you specify argument :code:`x-stream-offset` and aim to provide a number, please don't provide it as a string.
This will cause your connection to RabbitMQ to crash.
You can naturally provide "first" or "next" or "last".
Basically everything works `as documented <https://www.rabbitmq.com/docs/streams#consuming>`_. Happy usage!
......@@ -12,23 +12,24 @@ from coolamqp.exceptions import ConnectionDead
from coolamqp.objects import NodeDefinition, Queue, Exchange, Message
NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('coolamqp').setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)
class TestConnecting(unittest.TestCase):
class TestStreams(unittest.TestCase):
def test_argumented_exchange(self):
def test_streams(self):
c = Cluster(NODE)
c.start(wait=True, timeout=None)
if c.properties.version[0] < 4:
if not c.properties.properties['version'].startswith('4'):
c.shutdown(wait=True)
return
stream = Queue('my-stream', durable=True, auto_delete=False, exclusive=False,
arguments={'x-queue-type': 'stream'})
c.declare(stream)
c.declare(stream).result()
for i in range(10):
c.publish(Message(('dupa%s' % (i, )).encode('utf-8')), routing_key='my-stream', confirm=True).result()
......@@ -36,15 +37,15 @@ class TestConnecting(unittest.TestCase):
def handle_msg(msg):
test['a'] += 1
msg.ack()
cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': '3'}, no_ack=True)
cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': 4}, no_ack=False)
fut.result()
logger.warning(repr(test))
time.sleep(3)
cons.cancel()
cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': 'first'}, no_ack=True)
cons.cancel().result()
self.assertEqual(test['a'], 6)
cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': 'first'}, no_ack=False)
fut.result()
time.sleep(3)
cons.cancel()
logger.warning(repr(test))
self.assertEqual(test['a'], 16)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment