diff --git a/CHANGELOG.md b/CHANGELOG.md index 85fc4b57e000c01b534a5b81ddd9c524a90e51f5..161f339d4df0fdfdab121b335deb747e9aa319e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,3 +9,4 @@ Since v1.3.2 they'll be put here and in release description. * stress tests will run for 120 seconds now * stress tests will be harder, and use more queues * added arguments to queues, queue binds and exchanges +* creating auto_delete non-exclusive queues will be met with a [PendingDeprecationWarning](https://www.rabbitmq.com/blog/2021/08/21/4.0-deprecation-announcements) diff --git a/coolamqp/framing/field_table.py b/coolamqp/framing/field_table.py index a728197bdd7a61ca5472bb968ce06e600f7827e4..282a7012ad26a9dd631a40a89d0de5664c98491e 100644 --- a/coolamqp/framing/field_table.py +++ b/coolamqp/framing/field_table.py @@ -53,6 +53,8 @@ def deframe_shortstr(buf, offset): # -> value, bytes_eaten def enframe_shortstr(buf, value): + if not isinstance(value, six.binary_type): + value = value.encode('utf-8') _tobufv(buf, value, '!B', len(value)) @@ -62,6 +64,8 @@ def deframe_longstr(buf, offset): # -> value, bytes_eaten def enframe_longstr(buf, value): + if not isinstance(value, six.binary_type): + value = value.encode('utf-8') _tobufv(buf, value, '!I', len(value)) @@ -105,6 +109,32 @@ else: chrpy3 = lambda x: x +def get_type_for(val): + if isinstance(val, six.string_types): + return 'S' + elif isinstance(val, bool): + return 't' + elif isinstance(val, int): + if -128 <= val <= 127: + return 'b' + elif 0 <= val <= 255: + return 'B' + elif -32768 <= val <= 32767: + return 'u' + elif 0 <= val <= 65535: + return 'U' + elif -0x80000000 <= val <= 0x7FFFFFFF: + return 'i' + elif 0 <= val <= 0xFFFFFFFF: + return 'I' + else: + return 'l' + elif isinstance(val, float): + return 'd' + else: + raise ValueError('Undeterminable type') + + def enframe_field_value(buf, fv): value, type = fv buf.write(type.encode('utf8')) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index d8cb7f0a8e8cace0bd24bdcaa7add7d1d5dd08ff..597866fe763b52e4fa1e1b864ff3c566a0a3a87b 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -6,12 +6,14 @@ import logging import threading import typing as tp import uuid +import warnings import six from coolamqp.framing.base import AMQPFrame from coolamqp.framing.definitions import \ BasicContentPropertyList as MessageProperties +from coolamqp.framing.field_table import get_type_for logger = logging.getLogger(__name__) @@ -24,18 +26,14 @@ def argumentify(arguments): args = [] if isinstance(arguments, dict): for key, value in arguments.items(): - if not isinstance(value, six.string_types): - value = str(value) - if not isinstance(value, six.binary_type): - value = value.encode('utf8') - args.append((key, value)) + if not isinstance(key, six.binary_type): + key = key.encode('utf-8') + args.append((key, (value, get_type_for(value)))) else: for key, value in arguments: - if not isinstance(value, six.string_types): - value = str(value) - if not isinstance(value, six.binary_type): - value = value.encode('utf8') - args.append((key, value)) + if not isinstance(key, six.binary_type): + key = key.encode('utf-8') + args.append((key, (value, get_type_for(value)))) return args @@ -196,6 +194,9 @@ class Exchange(object): """ This represents an Exchange used in AMQP. This is hashable. + + :param name: exchange name + :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument """ __slots__ = ('name', 'type', 'durable', 'auto_delete', 'arguments') @@ -207,10 +208,6 @@ class Exchange(object): auto_delete=False, # type: bool arguments=None ): - """ - :type name: unicode is preferred, binary type will get decoded to - unicode with utf8 - """ self.name = toutf8(name) # must be unicode self.type = tobytes(type) # must be bytes self.durable = durable @@ -246,6 +243,8 @@ class Queue(object): :param exchange: Exchange for this queue to bind to. None for no binding. :param exclusive: Is this queue exclusive? :param auto_delete: Is this queue auto_delete ? + :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument + :warn DeprecationWarning: if a non-exclusive auto_delete queue is created """ __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', 'anonymous', 'consumer_tag', 'arguments') @@ -266,6 +265,8 @@ class Queue(object): self.auto_delete = auto_delete self.exclusive = exclusive self.arguments = argumentify(arguments) + if self.auto_delete and not self.exclusive: + warnings.warn('This will be removed in RabbitMQ 4.0', DeprecationWarning) self.anonymous = not len( self.name) # if this queue is anonymous, it must be regenerated upon reconnect diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 163c099b4eba896257ad739cb72b805b7e4d373f..acfb3b7c230f690c43b6481c73ee9712dbdf9acb 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -33,7 +33,7 @@ CLIENT_DATA = [ (b'copyright', (b'Copyright (C) 2016-2024 SMOK sp. z o.o.', 'S')), ( b'information', ( - b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', + b'Licensed under the MIT License.\nSee https://git.dms-serwis.com.pl/smokserwis/coolamqp for details', 'S')), (b'capabilities', ([(capa, (True, 't')) for capa in SUPPORTED_EXTENSIONS], 'F')), diff --git a/tests/Dockerfile b/tests/Dockerfile index a9abad9004ac19772cbec1843a337121676b87b2..856448281d494bc4a217fa348352f1bb1ed14ef5 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -16,5 +16,5 @@ ENV AMQP_HOST=amqp # for those pesky builds on Windows RUN chmod -R ugo-x /coolamqp -CMD ["python", "setup.py", "test"] +CMD ["coverage", "run", "-m", "nose2", "-F", "-vv"] diff --git a/tests/test_attaches/test_consumer.py b/tests/test_attaches/test_consumer.py index 2bf4fc04e99794a80290652a149b7f6e72f6efdb..8173a1cec6edf2ab2bb5d07d97770f37cde45a0d 100644 --- a/tests/test_attaches/test_consumer.py +++ b/tests/test_attaches/test_consumer.py @@ -11,4 +11,4 @@ class TestConsumer(unittest.TestCase): def test_issue_26(self): """Support for passing qos as int""" cons = Consumer(Queue('wtf'), lambda msg: None, qos=25) - self.assertEquals(cons.qos, (0, 25)) + self.assertEqual(cons.qos, (0, 25)) diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 7761d359f6c37bffafa55bb1cf9a09f2b353c774..ea960d294689b10cd16d6b53d91923c1605e045e 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -74,11 +74,11 @@ class TestA(unittest.TestCase): con.set_qos(100, 100) time.sleep(1) - self.assertEquals(con.qos, (100, 100)) + self.assertEqual(con.qos, (100, 100)) con.set_qos(None, 110) time.sleep(1) - self.assertEquals(con.qos, (0, 110)) + self.assertEqual(con.qos, (0, 110)) def test_declare_anonymous(self): xchg = Exchange('wtfzomg', type='fanout') @@ -135,17 +135,17 @@ class TestA(unittest.TestCase): time.sleep(1) self.assertTrue(p['q']) - self.assertEquals(p['count'], 2) + self.assertEqual(p['count'], 2) def test_message_with_propos_confirm(self): p = {'q': False} def ok(e): self.assertIsInstance(e, ReceivedMessage) - self.assertEquals(e.body, b'hello4') + self.assertEqual(e.body, b'hello4') # bcoz u can compare memoryviews to their providers :D - self.assertEquals(e.properties.content_type, b'text/plain') - self.assertEquals(e.properties.content_encoding, b'utf8') + self.assertEqual(e.properties.content_type, b'text/plain') + self.assertEqual(e.properties.content_encoding, b'utf8') p['q'] = True con, fut = self.c.consume(Queue(u'hello4', exclusive=True), @@ -172,10 +172,10 @@ class TestA(unittest.TestCase): def ok(e): self.assertIsInstance(e, ReceivedMessage) - self.assertEquals(e.body, b'hello5') + self.assertEqual(e.body, b'hello5') # bcoz u can compare memoryviews to their providers :D - self.assertEquals(e.properties.content_type, b'text/plain') - self.assertEquals(e.properties.content_encoding, b'utf8') + self.assertEqual(e.properties.content_type, b'text/plain') + self.assertEqual(e.properties.content_encoding, b'utf8') p['q'] = True con, fut = self.c.consume(Queue(u'hello5', exclusive=True), @@ -197,7 +197,7 @@ class TestA(unittest.TestCase): def ok(e): self.assertIsInstance(e, ReceivedMessage) - self.assertEquals(e.body, b'hello6') + self.assertEqual(e.body, b'hello6') p['q'] = True con, fut = self.c.consume(Queue(u'hello6', exclusive=True), @@ -223,14 +223,14 @@ class TestA(unittest.TestCase): m = self.c.drain(2) self.assertIsInstance(m, MessageReceived) self.assertIsInstance(m.body, memoryview) - self.assertEquals(m.body, data) + self.assertEqual(m.body, data) data = six.binary_type(os.urandom(512 * 1024)) self.c.publish(Message(data), routing_key=u'hello7', confirm=True) m = self.c.drain(9) self.assertIsInstance(m, MessageReceived) self.assertIsInstance(m.body, memoryview) - self.assertEquals(m.body.tobytes(), data) + self.assertEqual(m.body.tobytes(), data) def test_send_recv_nonzerolen_listofmemoryview(self): """single and multi frame in LIST_OF_MEMORYVIEW mode""" @@ -245,14 +245,14 @@ class TestA(unittest.TestCase): m = self.c.drain(1) self.assertIsInstance(m, MessageReceived) self.assertIsInstance(m.body[0], memoryview) - self.assertEquals(m.body[0], data) + self.assertEqual(m.body[0], data) data = six.binary_type(os.urandom(512 * 1024)) self.c.publish(Message(data), routing_key=u'hello8', confirm=True) m = self.c.drain(5) self.assertIsInstance(m, MessageReceived) self.assertTrue(all([isinstance(x, memoryview) for x in m.body])) - self.assertEquals(b''.join(x.tobytes() for x in m.body), data) + self.assertEqual(b''.join(x.tobytes() for x in m.body), data) def test_consumer_cancel(self): con, fut = self.c.consume( diff --git a/tests/test_clustering/test_double.py b/tests/test_clustering/test_double.py index 12e8f8a2e37ce715edf78ea065e393dd7e07e951..a723c861d943047433f84c0fdd1cfedb08fa1a69 100644 --- a/tests/test_clustering/test_double.py +++ b/tests/test_clustering/test_double.py @@ -62,7 +62,7 @@ class TestDouble(unittest.TestCase): fail_on_first_time_resource_locked=True) fut2.result(timeout=20) except AMQPError as e: - self.assertEquals(e.reply_code, RESOURCE_LOCKED) + self.assertEqual(e.reply_code, RESOURCE_LOCKED) self.assertFalse(e.is_hard_error()) else: self.fail('Expected exception') diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py index 2cd252dec46d0f6f905da7429233d20eb6cffdb9..e14b30d982c71d27236d0050a61828802872a689 100644 --- a/tests/test_clustering/test_things.py +++ b/tests/test_clustering/test_things.py @@ -19,23 +19,33 @@ class TestConnecting(unittest.TestCase): def test_argumented_exchange(self): - xchg = Exchange('test', auto_delete=True) + xchg = Exchange('test-wer', durable=True) c = Cluster([NODE]) c.start(wait=True, timeout=None) c.declare(xchg).result() - xchg2 = Exchange('test2', auto_delete=True, arguments={'alternate-exchange': 'test'}) - c.declare(xchg).result() + xchg2 = Exchange('test2-werwer', durable=True, arguments={'alternate-exchange': 'test-wer'}) + c.declare(xchg2).result() c.shutdown(True) def test_argumented_queue(self): - que = Queue(auto_delete=True, arguments=[(b'x-max-priority', 10)]) - que2 = Queue(auto_delete=True, arguments={'x-max-priority': 10}) + que = Queue(auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)]) + que2 = Queue(auto_delete=True, exclusive=True, arguments={'x-max-priority': 10}) c = Cluster([NODE]) c.start(wait=True, timeout=None) c.declare(que).result() c.declare(que2).result() c.shutdown(True) + def test_argumented_bind(self): + c = Cluster([NODE]) + c.start(wait=True, timeout=None) + que = Queue(auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)]) + xchg = Exchange('test3-wertest', type='headers', durable=True) + c.declare(que).result() + c.declare(xchg).result() + c.bind(que, xchg, routing_key=b'', arguments={'x-match': 'all', 'format': 'pdf'}).result() + c.shutdown(True) + def test_connection_blocked(self): try: from coolamqp.framing.definitions import ConnectionBlocked @@ -80,8 +90,8 @@ class TestConnecting(unittest.TestCase): q2 = Queue(b'lolwut') q3 = Queue(u'not') - self.assertEquals(q1, q2) - self.assertEquals(hash(q1), hash(q2)) + self.assertEqual(q1, q2) + self.assertEqual(hash(q1), hash(q2)) self.assertNotEqual(q1, q3) def test_node_with_kwargs(self): @@ -89,7 +99,7 @@ class TestConnecting(unittest.TestCase): user='guest', password='guest') - self.assertEquals(node.virtual_host, '/') # default + self.assertEqual(node.virtual_host, '/') # default def test_amqpconnstring_port(self): node = NodeDefinition('amqp://lol:lol@lol:4123/vhost') diff --git a/tests/test_framing.py b/tests/test_framing.py new file mode 100644 index 0000000000000000000000000000000000000000..3cffff04f5e8030a31d70f6efc0e51359aa26302 --- /dev/null +++ b/tests/test_framing.py @@ -0,0 +1,14 @@ +import io +import unittest + +from coolamqp.framing.field_table import enframe_table + +from coolamqp.objects import argumentify + + +class TestFraming(unittest.TestCase): + def test_something(self): + buf = io.BytesIO() + args = argumentify({'x-match': 'all', 'format': 'pdf'}) + res = enframe_table(buf, args) + print(repr(res)) diff --git a/tests/test_objects.py b/tests/test_objects.py index b3335475bc52520c7e4d1eab503be159384f1271..e4f3a4e3adcb71b76ccb9bd4613a86d1d9dd129e 100644 --- a/tests/test_objects.py +++ b/tests/test_objects.py @@ -3,7 +3,7 @@ from __future__ import print_function, absolute_import, division import unittest -from coolamqp.objects import NodeDefinition, MessageProperties +from coolamqp.objects import NodeDefinition, MessageProperties, Queue class TestObjects(unittest.TestCase): @@ -12,21 +12,25 @@ class TestObjects(unittest.TestCase): ce_p_msg = MessageProperties(content_encoding=b'wtf') self.assertIn('wtf', str(ce_p_msg)) + def test_warning(self): + with self.assertWarns(DeprecationWarning): + Queue(auto_delete=True, exclusive=False) + def test_node_definition_from_amqp(self): n1 = NodeDefinition(u'amqp://ala:ma@kota/psa') - self.assertEquals(n1.user, u'ala') - self.assertEquals(n1.password, u'ma') - self.assertEquals(n1.virtual_host, u'psa') - self.assertEquals(n1.host, u'kota') + self.assertEqual(n1.user, u'ala') + self.assertEqual(n1.password, u'ma') + self.assertEqual(n1.virtual_host, u'psa') + self.assertEqual(n1.host, u'kota') n1 = NodeDefinition(u'amqp://ala:ma@kota/') - self.assertEquals(n1.virtual_host, u'/') + self.assertEqual(n1.virtual_host, u'/') def test_get_message_properties(self): empty_p_msg = MessageProperties() ce_p_msg = MessageProperties(content_encoding=b'wtf') self.assertIsNone(empty_p_msg.get('content_encoding'), None) - self.assertEquals(ce_p_msg.get('content_encoding', b'wtf'), b'wtf') + self.assertEqual(ce_p_msg.get('content_encoding', b'wtf'), b'wtf')