diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index a6b992102320531951b13dee6cde2ca2d13fde18..9f427e0e805b588b2fceeeae35b4b2248b0767ef 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -60,18 +60,18 @@ unittest_select: unittest_epoll_python27: - extends: .before_test + stage: unittest image: python:2.7 + variables: + AMQP_HOST: "rabbitmq" before_script: - - pip install nose2 nose2[coverage_plugin] + - pip install nose2 coverage requests yapf nose2[coverage_plugin] - python setup.py install script: - nose2 -F -vv - variables: - AMQP_HOST: "rabbitmq" - after_script: - - mv .coverage .coverage.python27epoll - + services: + - name: rabbitmq:3.10-management + alias: rabbitmq unittest_epoll: extends: .before_test diff --git a/CHANGELOG.md b/CHANGELOG.md index 966126608bf9dbcdc79f98277cba4d1127722a3f..599e743bb111e7d1034c151525492c6df65abce7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,18 @@ Previous release notes are hosted on [GitHub](https://github.com/smok-serwis/coolamqp/releases). Since v1.3.2 they'll be put here and in release description. +# v2.0.0 +======== + +* changes to Queues: + * anonymous queues are back, for usage refer [here](https://smokserwis.docs.smok.co/coolamqp/advanced.html) + * changed some default arguments for Queues for them to better make sense + * some argument combinations just raise ValueError + * PendingDeprecationWarning changed into a DeprecationWarning +* changes to Cluster: + * declare will refuse to declare an anonymous queue + * renamed publish(tx) to publish(confirm) + # v1.5.0 ======== diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 82c729d490e9864996ffc277044833ee7187132d..3be5d873691ac33ca01de620f8564e2b51c87f11 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -103,7 +103,12 @@ class Cluster(object): dont_trace=False, arguments=None): """ Bind a queue to an exchange + + :raise ValueError: cannot bind to anonymous queues """ + if queue.anonymous: + raise ValueError('Canoot bind to anonymous queue') + if span is not None and not dont_trace: child_span = self._make_span('bind', span) else: @@ -121,6 +126,8 @@ class Cluster(object): """ Declare a Queue/Exchange. + Non-anonymous queues have to be declared. Anonymous can't. + .. note:: Note that if your queue relates to an exchange that has not yet been declared you'll be faced with AMQP error 404: NOT_FOUND, so try to declare your exchanges before your queues. @@ -129,7 +136,10 @@ class Cluster(object): :param span: optional parent span, if opentracing is installed :param dont_trace: if True, a span won't be output :return: Future + :raises ValueError: tried to declare an anonymous queue """ + if isinstance(obj, Queue) and obj.anonymous: + raise ValueError('You cannot declare an anonymous queue!') if span is not None and not dont_trace: child_span = self._make_span('declare', span) else: @@ -183,6 +193,9 @@ class Cluster(object): Take care not to lose the Consumer object - it's the only way to cancel a consumer! + .. note:: You don't need to explicitly declare queues and exchanges that you will be using beforehand, + this will do this for you on the same channel. + :param queue: Queue object, being consumed from right now. Note that name of anonymous queue might change at any time! :param on_message: callable that will process incoming messages @@ -232,7 +245,6 @@ class Cluster(object): def publish(self, message, # type: Message exchange=None, # type: tp.Union[Exchange, str, bytes] routing_key=u'', # type: tp.Union[str, bytes] - tx=None, # type: tp.Optional[bool] confirm=None, # type: tp.Optional[bool] span=None, # type: tp.Optional[opentracing.Span] dont_trace=False # type: bool @@ -246,9 +258,8 @@ class Cluster(object): :param confirm: Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. - Note that if tx if False, and message cannot be delivered to broker at once, + Note that if confirm is False, and message cannot be delivered to broker at once, it will be discarded - :param tx: deprecated, alias for confirm :param span: optionally, current span, if opentracing is installed :param dont_trace: if set to True, a span won't be generated :return: Future to be finished on completion or None, is confirm/tx was not chosen @@ -266,19 +277,8 @@ class Cluster(object): if isinstance(routing_key, six.text_type): routing_key = routing_key.encode('utf8') - if tx is not None: # confirm is a drop-in replacement. tx is unfortunately named - warnings.warn(u'Use confirm kwarg instead', DeprecationWarning) - - if confirm is not None: - raise RuntimeError( - u'Using both tx= and confirm= at once does not make sense') - elif confirm is not None: - tx = confirm - else: - tx = False - try: - if tx: + if confirm: clb = self.pub_tr else: clb = self.pub_na diff --git a/coolamqp/objects.py b/coolamqp/objects.py index eb6731d1bc2d4658705a259d1299f58eef042c1c..f1cf91999c650bee2cc642c26ee4a5a7e0df4b88 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -19,36 +19,35 @@ logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() + +def toutf8(q): + if isinstance(q, memoryview): + q = q.tobytes() + return q.decode('utf-8') if isinstance(q, six.binary_type) else q + + +def tobytes(q): + if isinstance(q, memoryview): + return q.tobytes() + return q.encode('utf-8') if isinstance(q, six.text_type) else q + + def argumentify(arguments): if arguments is None: return None args = [] if isinstance(arguments, dict): for key, value in arguments.items(): - if not isinstance(key, six.binary_type): - key = key.encode('utf-8') + key = tobytes(key) args.append((key, (value, get_type_for(value)))) else: for key, value in arguments: - if not isinstance(key, six.binary_type): - key = key.encode('utf-8') + key = tobytes(key) args.append((key, (value, get_type_for(value)))) return args -def toutf8(q): - if isinstance(q, six.binary_type): - q = q.decode('utf8') - return q - - -def tobytes(q): - if isinstance(q, six.text_type): - q = q.encode('utf8') - return q - - class Callable(object): """ Add a bunch of callables to one list, and just invoke'm. @@ -88,8 +87,7 @@ class Message(object): You can pass a dict - it will be passed to MessageProperties, but it's slow - don't do that. - :type properties: MessageProperties instance, None or a dict (SLOW!) - + :type properties: :class:`coolamqp.objects.MessageProperties` instance """ __slots__ = ('body', 'properties') @@ -231,7 +229,6 @@ class Exchange(object): Exchange.direct = Exchange() - class ServerProperties(object): """ An object describing properties of the target server. @@ -255,11 +252,8 @@ class ServerProperties(object): elif isinstance(prop_value, list): prop_value = [toutf8(prop[0]) for prop in prop_value] self.properties[prop_name] = prop_value - self.mechanisms = data.mechanisms.tobytes().decode('utf-8').split(' ') - self.locales = data.locales.tobytes().decode('utf-8') - - def __str__(self): - return '%s %s %s %s' % (self.version, repr(self.properties), self.mechanisms, self.locales) + self.mechanisms = toutf8(data.mechanisms).split(' ') + self.locales = toutf8(data.locales) class Queue(object): @@ -267,50 +261,62 @@ class Queue(object): This object represents a Queue that applications consume from or publish to. Create a queue definition. - :param name: name of the queue. Generates a random uuid.uuid4().hex if not given. Note that this kind of queue - will probably require to be declared. + :param name: name of the queue. + None (default) for autogeneration. Just follow the rules for :ref:`anonymq`. + If empty string, a UUID name will be generated, and you won't have an anonymous queue anymore. :param durable: Is the queue durable? :param exchange: Exchange for this queue to bind to. None for no binding. - :param exclusive: Is this queue exclusive? - :param auto_delete: Is this queue auto_delete ? + :param exclusive: This queue will be deleted when the connection closes + :param auto_delete: This queue will be deleted when the last consumer unsubscribes :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument - :warning PendingDeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations + :raises ValueError: tried to create a queue that was not durable or auto_delete + :raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous + :raises ValueError: tried to create a queue that was anonymous and not auto_delete or durable + :warning DeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations that will be soon unavailable (eg. RabbitMQ 4.0). + :warning UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue """ __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', 'anonymous', 'consumer_tag', 'arguments') - def __init__(self, name=b'', # type: tp.Union[str, bytes] + def __init__(self, name=None, # type: tp.Union[str, bytes, None] durable=False, # type: bool exchange=None, # type: tp.Optional[Exchange] - exclusive=False, # type: bool - auto_delete=False, # type: bool + exclusive=True, # type: bool + auto_delete=True, # type: bool arguments=None # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]] ): - if not name: - name = uuid.uuid4().hex - self.name = tobytes(name) #: public, must be bytes - # if name is '', this will be filled in with broker-generated name upon declaration + if name is None: + self.name = None + else: + self.name = tobytes(uuid.uuid4().hex if not name else name) + self.durable = durable self.exchange = exchange self.auto_delete = auto_delete self.exclusive = exclusive self.arguments = argumentify(arguments) + self.anonymous = self.name is None # if this queue is anonymous, it must be regenerated upon reconnect if self.auto_delete and self.durable: - warnings.warn('This will be removed in RabbitMQ 4.0', PendingDeprecationWarning) + raise ValueError('Cannot create an auto_delete and durable queue') + + if self.anonymous and (not self.auto_delete or self.durable): + raise ValueError('Zero sense to make a anonymous non-auto-delete or durable queue') - if self.auto_delete and not self.exclusive: - warnings.warn('This will be removed in RabbitMQ 4.0', PendingDeprecationWarning) + if not self.anonymous and (self.auto_delete or self.exclusive): + warnings.warn('This may cause unpredictable behaviour', UserWarning) - self.anonymous = not len( - self.name) # if this queue is anonymous, it must be regenerated upon reconnect + if self.durable and self.anonymous: + raise ValueError('Cannot declare an anonymous durable queue') - self.consumer_tag = self.name if not self.anonymous else uuid.uuid4().hex.encode( - 'utf8') # bytes, consumer tag to use in AMQP comms + if self.auto_delete and not self.exclusive and not self.anonymous: + raise ValueError('Cannot create an auto_delete and durable queue non-anonymous') - assert isinstance(self.name, six.binary_type) - assert isinstance(self.consumer_tag, six.binary_type) + self.consumer_tag = self.name if not self.anonymous else tobytes(uuid.uuid4().hex) + + if not self.exclusive and self.auto_delete: + warnings.warn('This will be removed in RabbitMQ 4.0', DeprecationWarning) def __eq__(self, other): return self.name == other.name @@ -318,6 +324,9 @@ class Queue(object): def __hash__(self): return hash(self.name) + def __repr__(self): + return 'Queue(%s, %s, %s, %s, %s, %s' % (self.name, self.durable, self.exchange, self.exclusive, self.arguments) + class QueueBind(object): """An order to be declared which binds a given queue to an exchange""" diff --git a/docs/advanced.rst b/docs/advanced.rst index 2f743c98cab9967642959243cc758a78e2aaccf0..de6b35fbf435b8d66a8dddbf9494570718c8cd93 100644 --- a/docs/advanced.rst +++ b/docs/advanced.rst @@ -3,3 +3,15 @@ Advanced things .. autoclass:: coolamqp.uplink.connection.Connection :members: + + +Declaring anonymous queues +-------------------------- + +.. _anonymq: + +In order to make use of an anonymous queue, you must first :meth:`coolamqp.clustering.Cluster.consume` it, since +:meth:`coolamqp.clustering.Cluster.declare` will use a separate channel, in which the queue will be invalid. It will +raise ValueError if you try to do that, anyway. + +Anonymous queues must be auto_delete and exclusive, ValueError will be raised otherwise. diff --git a/docs/whatsnew.rst b/docs/whatsnew.rst index 4e2b86d9feeffb1bc98f317a59794e7e82ffe835..d696f22d3cbbf9af1bacaf8b53404289ae10f308 100644 --- a/docs/whatsnew.rst +++ b/docs/whatsnew.rst @@ -1,4 +1,4 @@ -What's new? What's new? +What's new? =========== CoolAMQP 2.0.0 marks a slight philosophy shift. Whereas 1.x used auto-generated UUID names, 2.0 will let the server diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 27f5d69e58d3fe620b7cc85764546fac9d67caf6..1012f3a1d5590da3f2187dd0f2478f8d4b8a3858 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -85,7 +85,7 @@ class TestA(unittest.TestCase): def test_declare_anonymous(self): xchg = Exchange('wtfzomg', type='fanout') - q = Queue(exchange=xchg) + q = Queue('', exchange=xchg) self.c.declare(xchg).result() self.c.declare(q).result() self.assertTrue(q.name) @@ -110,7 +110,7 @@ class TestA(unittest.TestCase): con, fut = self.c.consume(Queue(u'hello3', exclusive=True), on_message=ok, no_ack=True) fut.result() - self.c.publish(Message(b''), routing_key=u'hello3', tx=True).result() + self.c.publish(Message(b''), routing_key=u'hello3', confirm=True).result() time.sleep(1) @@ -133,7 +133,7 @@ class TestA(unittest.TestCase): con, fut = self.c.consume(Queue(u'hello3', exclusive=True), on_message=ok, no_ack=False) fut.result() - self.c.publish(Message(b''), routing_key=u'hello3', tx=True).result() + self.c.publish(Message(b''), routing_key=u'hello3', confirm=True).result() time.sleep(1) @@ -159,13 +159,6 @@ class TestA(unittest.TestCase): 'content_encoding': b'utf8' }), routing_key=u'hello4', confirm=True).result() - self.assertRaises(RuntimeError, - lambda: self.c.publish(Message(b'hello4', properties={ - 'content_type': b'text/plain', - 'content_encoding': b'utf8' - }), routing_key=u'hello4', confirm=True, - tx=True).result()) - time.sleep(1) self.assertTrue(p['q']) @@ -187,7 +180,7 @@ class TestA(unittest.TestCase): self.c.publish(Message(b'hello5', properties={ 'content_type': b'text/plain', 'content_encoding': b'utf8' - }), routing_key=u'hello5', tx=True).result() + }), routing_key=u'hello5', confirm=True).result() time.sleep(2) @@ -207,7 +200,7 @@ class TestA(unittest.TestCase): on_message=ok, no_ack=True) fut.result() self.c.publish(Message(b'hello6'), routing_key=u'hello6', - tx=True).result() + confirm=True).result() time.sleep(1) diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py index 206ec06a66ef1922e6574f940ed270ea95a3b044..b5abebcb2722e9fc1d5bc815ad03a624212b306e 100644 --- a/tests/test_clustering/test_exchanges.py +++ b/tests/test_clustering/test_exchanges.py @@ -13,6 +13,7 @@ from coolamqp.objects import Message, NodeDefinition, Queue, MessageProperties, NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) logging.getLogger('coolamqp').setLevel(logging.DEBUG) +logger = logging.getLogger(__name__) class TestExchanges(unittest.TestCase): @@ -23,13 +24,20 @@ class TestExchanges(unittest.TestCase): def tearDown(self): self.c.shutdown() + def test_declare_anonymq(self): + que = Queue(auto_delete=True) + self.assertRaises(ValueError, self.c.declare, que) + cons, fut = self.c.consume(que, no_ack=True) + fut.result() + cons.cancel().result() + def test_deadlettering(self): xchg_name = uuid.uuid4().hex dead_queue_name = uuid.uuid4().hex grave_queue_name = uuid.uuid4().hex DEADLETTER = Exchange(xchg_name, type=b'direct') QUEUE = Queue(dead_queue_name, durable=True, auto_delete=False, exclusive=False) - GRAVEYARD_QUEUE = Queue(grave_queue_name, durable=True, arguments={'x-dead-letter-exchange': xchg_name, + GRAVEYARD_QUEUE = Queue(grave_queue_name, durable=True, auto_delete=False, arguments={'x-dead-letter-exchange': xchg_name, 'x-message-ttl': 1000}) self.c.declare(DEADLETTER).result() self.c.declare(QUEUE).result() @@ -59,7 +67,7 @@ class TestExchanges(unittest.TestCase): f1.result() f2.result() - self.c.publish(Message(b'hello'), exchange=x, tx=True).result() + self.c.publish(Message(b'hello'), exchange=x, confirm=True).result() self.assertIsInstance(self.c.drain(2), MessageReceived) self.assertIsInstance(self.c.drain(2), MessageReceived) diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py index e49fa12b7ddad7d9b0e3bcd81409dc58a1e47636..dee2bafe64df486159702c76a9771206ad01e830 100644 --- a/tests/test_clustering/test_things.py +++ b/tests/test_clustering/test_things.py @@ -29,17 +29,15 @@ class TestConnecting(unittest.TestCase): def test_argumented_queue(self): que = Queue(auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)]) - que2 = Queue(auto_delete=True, exclusive=True, arguments={'x-max-priority': 10}) c = Cluster([NODE]) c.start(wait=True, timeout=None) - c.declare(que).result() - c.declare(que2).result() + self.assertRaises(ValueError, c.declare, que) c.shutdown(True) def test_argumented_bind(self): c = Cluster([NODE]) c.start(wait=True, timeout=None) - que = Queue(auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)]) + que = Queue('', auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)]) xchg = Exchange('test3-wertest', type='headers', durable=True) c.declare(que).result() c.declare(xchg).result() diff --git a/tests/test_clustering/test_topic_exchanges.py b/tests/test_clustering/test_topic_exchanges.py index 45ef2f361d312bf1fc03307c97418f7a15d5e45a..dc8c18ea29c1178c54286560b3ac3f17ca55a175 100644 --- a/tests/test_clustering/test_topic_exchanges.py +++ b/tests/test_clustering/test_topic_exchanges.py @@ -11,7 +11,7 @@ from coolamqp.objects import Exchange, Queue, NodeDefinition, Message XCHG = Exchange('smok5.results', type='topic', durable=True) -QUEUE = Queue(exchange=XCHG, exclusive=True, auto_delete=True) +QUEUE = Queue('', exchange=XCHG, exclusive=True, auto_delete=True) logger = logging.getLogger(__name__) NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) @@ -33,8 +33,8 @@ class TestTopic(unittest.TestCase): self.c.shutdown() def test_bind_stuff(self): + self.c.declare(QUEUE) self.c.declare(XCHG).result() - self.c.declare(QUEUE).result() self.c.bind(QUEUE, XCHG, routing_key='hello-world').result() global did_receive diff --git a/tests/test_objects.py b/tests/test_objects.py index 9fbf5b156a777f194f03cc68c93dd25eb1681d4d..1415ba61ded72bd1fd99a0f2d8e591fadae7c949 100644 --- a/tests/test_objects.py +++ b/tests/test_objects.py @@ -1,5 +1,6 @@ # coding=UTF-8 from __future__ import print_function, absolute_import, division +import sys import logging import unittest import io @@ -13,9 +14,30 @@ from coolamqp.objects import NodeDefinition, MessageProperties, Queue, argumenti logger = logging.getLogger(__name__) logging.getLogger('coolamqp').setLevel(logging.DEBUG) +IS_PY3 = sys.version.startswith('3') + class TestObjects(unittest.TestCase): + def test_queue_failures(self): + self.assertRaises(ValueError, Queue, None, durable=True) + self.assertRaises(ValueError, Queue, 'test', auto_delete=True, durable=True) + self.assertRaises(ValueError, Queue, None, auto_delete=False) + self.assertRaises(ValueError, Queue, 'test', auto_delete=True, exclusive=False) + + @unittest.skipUnless(sys.version.startswith('3'), 'Needs Python 3.x') + def test_queue_warns(self): + warnings.resetwarnings() + + with warnings.catch_warnings(record=True) as w: + Queue('test', auto_delete=True, exclusive=True) + Queue(auto_delete=True, exclusive=False) + logger.warning(repr(w)) + self.assertEqual(len(w), 2 if IS_PY3 else 1) + self.assertTrue(issubclass(w[0].category, UserWarning)) + if IS_PY3: + self.assertTrue(issubclass(w[1].category, DeprecationWarning)) + def test_queue_declare(self): args = argumentify({'x-dead-letter-exchange': 'deadletter', 'x-message-ttl': 1000}) @@ -32,13 +54,6 @@ class TestObjects(unittest.TestCase): ce_p_msg = MessageProperties(content_encoding=b'wtf') self.assertIn('wtf', str(ce_p_msg)) - def test_warning(self): - warnings.resetwarnings() - with warnings.catch_warnings(record=True) as w: - Queue(auto_delete=True, exclusive=False) - self.assertEqual(len(w), 1) - self.assertTrue(issubclass(w[0].category, PendingDeprecationWarning)) - def test_node_definition_from_amqp(self): n1 = NodeDefinition(u'amqp://ala:ma@kota/psa')