diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 405017785d0ee2a2df5aa6f0d39f82dcf2586c17..c8dfdf52c757da38dfa880fe73374f7482132b8a 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -6,7 +6,7 @@ stages: .before_test: stage: unittest services: - - name: rabbitmq:3-management + - name: rabbitmq:3.10-management alias: rabbitmq before_script: - python setup.py install diff --git a/CHANGELOG.md b/CHANGELOG.md index a04ef92bb56b69caf23cf8ae65b24957de4d4035..892516763f769f1515d1121fdf03372dfcaf6769 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ Since v1.3.2 they'll be put here and in release description. * fixed declare documentation * added docs regarding consume method. * added testing topic exchanges +* bugfix regarding deadlettering queues +* prefetch_size will be forced to 0 to better comply with [RabbitMQ](https://www.rabbitmq.com/docs/specification#method-status-basic.qos) + * and a DeprecationWarning will be shown to people who try to set something else. 0 will be forced upon them anyway. # v1.4.1 ======= diff --git a/README.md b/README.md index 12461e7b89219befad98bcdb58464ac02b2268df..47007d91878d5c81bcd4ea49455e7faaec5675aa 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ coolamqp Why CoolAMQP? ------------- +* tested against all versions of RabbitMQ 3.x (with testing with RabbitMQ 4.x pending) * AMQP 0.9.1 client that's native Python * heavily optimized for speed * geared towards interfacing with [RabbitMQ](https://www.rabbitmq.com/) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 00fbeb329c6fdc09fe5e5c66b86915d2c9f94d9e..bd43bf3356005f1f9460cb3042cfbe93b69c099e 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -5,6 +5,7 @@ import io import logging import typing as tp import uuid +import warnings from concurrent.futures import Future from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE @@ -168,9 +169,11 @@ class Consumer(Channeler): :param prefetch_size: prefetch in octets :param prefetch_count: prefetch in whole messages """ + if prefetch_size: + warnings.warn('RabbitMQ stopped supporting prefetch_sizes, will use 0 anyway', DeprecationWarning) if self.state == ST_ONLINE: - self.method(BasicQos(prefetch_size or 0, prefetch_count, False)) - self.qos = prefetch_size or 0, prefetch_count + self.method(BasicQos(0, prefetch_count, False)) + self.qos = 0, prefetch_count def cancel(self): # type: () -> Future """ @@ -426,7 +429,7 @@ class Consumer(Channeler): elif isinstance(payload, QueueBindOk): if self.qos is not None: self.method_and_watch( - BasicQos(self.qos[0], self.qos[1], False), + BasicQos(0, self.qos[1], False), BasicQosOk, self.on_setup ) @@ -472,7 +475,7 @@ class Consumer(Channeler): # resend QoS, in case of sth if self.qos is not None: - self.set_qos(self.qos[0], self.qos[1]) + self.set_qos(0, self.qos[1]) def _qosify(qos): diff --git a/coolamqp/framing/field_table.py b/coolamqp/framing/field_table.py index 282a7012ad26a9dd631a40a89d0de5664c98491e..5d7b1de9923a0d8f0c0a45acc1ddb031ef35bcb4 100644 --- a/coolamqp/framing/field_table.py +++ b/coolamqp/framing/field_table.py @@ -24,6 +24,8 @@ def _tobuf(buf, pattern, *vals): # type: (io.BytesIO, str, *tp.Any) -> int def _tobufv(buf, value, pattern, *vals): # type: (io.BytesIO, bytes, str, *tp.Any) -> None + if not isinstance(value, six.binary_type): + value = value.encode('utf-8') _tobuf(buf, pattern, *vals) buf.write(value) @@ -175,7 +177,8 @@ def deframe_array(buf, offset): values = [] while offset < (start_offset + 1 + ln): - v, t, delta = deframe_field_value(buf, offset) + vt, delta = deframe_field_value(buf, offset) + v, t = vt offset += delta values.append((v, t)) @@ -221,7 +224,11 @@ def deframe_table(buf, start_offset): # -> (table, bytes_consumed) offset += ln fv, delta = deframe_field_value(buf, offset) offset += delta - fields.append((field_name.tobytes(), fv)) + if isinstance(field_name, memoryview): + field_name = field_name.tobytes() + elif not isinstance(field_name, six.binary_type): + field_name = field_name.encode('utf-8') + fields.append((field_name, fv)) if offset > (start_offset + table_length + 4): raise ValueError( diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index ea960d294689b10cd16d6b53d91923c1605e045e..3c9884591b7ca23aba30d2160f718072e8a119a1 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -55,7 +55,7 @@ class TestA(unittest.TestCase): con, fut = self.c.consume(Queue(u'hello', exclusive=True)) fut.result() - data = six.binary_type(os.urandom(20 * 1024 * 1024 + 1423)) + data = six.binary_type(os.urandom(16777216)) self.c.publish(Message(data), routing_key=b'hello', confirm=True).result() @@ -72,9 +72,9 @@ class TestA(unittest.TestCase): fut.result() - con.set_qos(100, 100) + con.set_qos(0, 100) time.sleep(1) - self.assertEqual(con.qos, (100, 100)) + self.assertEqual(con.qos, (0, 100)) con.set_qos(None, 110) time.sleep(1) diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py index 98dd406ec7151df1bad910a07db13086e47af7e8..206ec06a66ef1922e6574f940ed270ea95a3b044 100644 --- a/tests/test_clustering/test_exchanges.py +++ b/tests/test_clustering/test_exchanges.py @@ -4,14 +4,15 @@ from __future__ import print_function, absolute_import, division import logging import os import unittest +import uuid from coolamqp.clustering import Cluster, MessageReceived, NothingMuch -from coolamqp.objects import Message, NodeDefinition, Queue, \ - Exchange +from coolamqp.objects import Message, NodeDefinition, Queue, MessageProperties, Exchange # todo handle bad auth NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) +logging.getLogger('coolamqp').setLevel(logging.DEBUG) class TestExchanges(unittest.TestCase): @@ -22,6 +23,27 @@ class TestExchanges(unittest.TestCase): def tearDown(self): self.c.shutdown() + def test_deadlettering(self): + xchg_name = uuid.uuid4().hex + dead_queue_name = uuid.uuid4().hex + grave_queue_name = uuid.uuid4().hex + DEADLETTER = Exchange(xchg_name, type=b'direct') + QUEUE = Queue(dead_queue_name, durable=True, auto_delete=False, exclusive=False) + GRAVEYARD_QUEUE = Queue(grave_queue_name, durable=True, arguments={'x-dead-letter-exchange': xchg_name, + 'x-message-ttl': 1000}) + self.c.declare(DEADLETTER).result() + self.c.declare(QUEUE).result() + self.c.declare(GRAVEYARD_QUEUE).result() + self.c.bind(QUEUE, DEADLETTER, grave_queue_name).result() + cons, fut = self.c.consume(QUEUE, no_ack=True) + fut.result() + self.c.publish(Message(b'test', MessageProperties(content_type=b'application/json', + content_encoding=b'utf8', + delivery_mode=2)), routing_key=grave_queue_name, confirm=True).result() + + self.assertIsInstance(self.c.drain(10), MessageReceived) + cons.cancel() + def test_declare_exchange(self): a = Exchange(u'jolax', type=b'fanout', auto_delete=True) self.c.declare(a).result() diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py index e14b30d982c71d27236d0050a61828802872a689..98ee35938b1ea30c4a8c490586c97fbe94e362b4 100644 --- a/tests/test_clustering/test_things.py +++ b/tests/test_clustering/test_things.py @@ -13,11 +13,11 @@ from coolamqp.objects import NodeDefinition, Queue, Exchange NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) +logging.getLogger('coolamqp').setLevel(logging.DEBUG) class TestConnecting(unittest.TestCase): - def test_argumented_exchange(self): xchg = Exchange('test-wer', durable=True) c = Cluster([NODE]) diff --git a/tests/test_clustering/test_topic_exchanges.py b/tests/test_clustering/test_topic_exchanges.py index 067db55ca1c46a7d105d613b182ad95df5500e6c..45ef2f361d312bf1fc03307c97418f7a15d5e45a 100644 --- a/tests/test_clustering/test_topic_exchanges.py +++ b/tests/test_clustering/test_topic_exchanges.py @@ -1,3 +1,4 @@ +import logging import time import os import unittest @@ -11,11 +12,13 @@ from coolamqp.objects import Exchange, Queue, NodeDefinition, Message XCHG = Exchange('smok5.results', type='topic', durable=True) QUEUE = Queue(exchange=XCHG, exclusive=True, auto_delete=True) - +logger = logging.getLogger(__name__) NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) +logging.getLogger('coolamqp').setLevel(logging.DEBUG) +did_receive = False class TestTopic(unittest.TestCase): def setUp(self): @@ -33,12 +36,12 @@ class TestTopic(unittest.TestCase): self.c.declare(XCHG).result() self.c.declare(QUEUE).result() self.c.bind(QUEUE, XCHG, routing_key='hello-world').result() - - did_receive = False + global did_receive def do(msg): - nonlocal did_receive - did_receive = True + global did_receive + if msg.body == b'good boy': + did_receive = True msg.ack() self.cons, fut = self.c.consume(QUEUE, on_message=do, no_ack=False) @@ -53,6 +56,6 @@ class TestTopic(unittest.TestCase): self.fail("Message not received within 10 seconds") did_receive = False - self.c.publish(Message(b'good boy', exchange=XCHG, routing_key='helloworld'), confirm=True).result() - time.sleep(5) + self.c.publish(Message(b'good boy2'), exchange=XCHG, routing_key='yolooldies', confirm=True).result() + time.sleep(10) self.assertFalse(did_receive) diff --git a/tests/test_objects.py b/tests/test_objects.py index 37f335d760052f810afdaf1563b9de3100e14174..9fbf5b156a777f194f03cc68c93dd25eb1681d4d 100644 --- a/tests/test_objects.py +++ b/tests/test_objects.py @@ -1,13 +1,32 @@ # coding=UTF-8 from __future__ import print_function, absolute_import, division - +import logging import unittest +import io import warnings -from coolamqp.objects import NodeDefinition, MessageProperties, Queue +from coolamqp.framing.definitions import QueueDeclare + +from coolamqp.objects import NodeDefinition, MessageProperties, Queue, argumentify + + +logger = logging.getLogger(__name__) +logging.getLogger('coolamqp').setLevel(logging.DEBUG) class TestObjects(unittest.TestCase): + + def test_queue_declare(self): + args = argumentify({'x-dead-letter-exchange': 'deadletter', + 'x-message-ttl': 1000}) + qd = QueueDeclare(b'test', False, False, False, False, False, args) + buf = io.BytesIO() + qd.write_arguments(buf) + buf = buf.getvalue() + logger.warning(repr(buf)) + qd = QueueDeclare.from_buffer(buf, 0) + self.assertEqual(qd.queue, b'test') + def test_message_properties(self): empty_p_msg = MessageProperties() ce_p_msg = MessageProperties(content_encoding=b'wtf')