diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e17f43d709b05d96f6dfa05c57cd459575523de..0cc96b5a0400b947521bb4feb2d6a756b34ab429 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,11 @@ v2.0.0 * same can be said of Consumer.set_qos(prefetch_count) * added Cluster. +Compatible changes +------------------ + +* fixed a bug wherein bad invocation of NodeDefinition would result in an exception + v1.5.0 ====== diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 78077085670e3c300c4a9ef388e83de6b1fbe1e7..1a61da8e4b095cc04fe0e95378b16fb38d483109 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -407,16 +407,17 @@ class Consumer(Channeler): queue_declared = False # We need any form of binding. if self.queue.exchange is not None: - if self.queue.exchange.type != b'topic': - queue_declared = True - self.method_and_watch( - QueueBind( - self.queue.name, - self.queue.exchange.name.encode('utf8'), - b'', False, []), - QueueBindOk, - self.on_setup - ) + queue_declared = True + qb = QueueBind( + self.queue.name, + self.queue.exchange.name.encode('utf-8'), + self.queue.routing_key, False, []) + logger.debug('Running %s' % (repr(qb))) + self.method_and_watch( + qb, + QueueBindOk, + self.on_setup + ) if not queue_declared: # default exchange, pretend it was bind ok diff --git a/coolamqp/objects.py b/coolamqp/objects.py index f1cf91999c650bee2cc642c26ee4a5a7e0df4b88..c396cd04fab17448880c1ade15bd5f62d7976f23 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -269,6 +269,8 @@ class Queue(object): :param exclusive: This queue will be deleted when the connection closes :param auto_delete: This queue will be deleted when the last consumer unsubscribes :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument + :param routing_key: routing key that will be used to bind to an exchange. Used only when this + queue is associated with an exchange. Default value of blank should suffice. :raises ValueError: tried to create a queue that was not durable or auto_delete :raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous :raises ValueError: tried to create a queue that was anonymous and not auto_delete or durable @@ -277,14 +279,15 @@ class Queue(object): :warning UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue """ __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', - 'anonymous', 'consumer_tag', 'arguments') + 'anonymous', 'consumer_tag', 'arguments', 'routing_key') def __init__(self, name=None, # type: tp.Union[str, bytes, None] durable=False, # type: bool exchange=None, # type: tp.Optional[Exchange] exclusive=True, # type: bool auto_delete=True, # type: bool - arguments=None # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]] + arguments=None, # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]], + routing_key=b'' #: type: tp.Union[str, bytes] ): if name is None: self.name = None @@ -293,6 +296,7 @@ class Queue(object): self.durable = durable self.exchange = exchange + self.routing_key = tobytes(routing_key) self.auto_delete = auto_delete self.exclusive = exclusive self.arguments = argumentify(arguments) @@ -388,14 +392,12 @@ class NodeDefinition(object): def __init__(self, *args, **kwargs): self.heartbeat = kwargs.pop('heartbeat', None) self.port = kwargs.pop('port', 5672) + self.host = None + self.user = None + self.password = None + self.virtual_host = '/' - if len(kwargs) > 0: - # Prepare arguments for amqp.connection.Connection - self.host = kwargs['host'] - self.user = kwargs['user'] - self.password = kwargs['password'] - self.virtual_host = kwargs.get('virtual_host', '/') - elif len(args) == 3: + if len(args) == 3: self.host, self.user, self.password = args self.virtual_host = '/' elif len(args) == 4: @@ -422,8 +424,13 @@ class NodeDefinition(object): host, port = self.host.split(u':', 1) self.port = int(port) # else get that port from kwargs - else: - raise ValueError(u'What did you exactly pass?') + + if len(kwargs) > 0: + # Prepare arguments for amqp.connection.Connection + self.host = kwargs.get('host', self.host) + self.user = kwargs.get('user', self.user) + self.password = kwargs.get('password', self.password) + self.virtual_host = kwargs.get('virtual_host', self.virtual_host) def __str__(self): # type: () -> str return six.text_type( diff --git a/docs/conf.py b/docs/conf.py index 29896f768b9948b827dc234c7c95576286d2306a..f8f08c5733ae79e339c02418bdb18bbab9f58593 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -35,6 +35,12 @@ source_parsers = { # ones. extensions = ['sphinx.ext.autodoc'] +autodoc_default_options = { + 'members': True, +} +autodoc_default_flags = [ + 'show-inheritance' +] # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py index b5abebcb2722e9fc1d5bc815ad03a624212b306e..1fc5ebd9b62d2b1cadf4e1e92cd3686154bbd9b9 100644 --- a/tests/test_clustering/test_exchanges.py +++ b/tests/test_clustering/test_exchanges.py @@ -3,6 +3,7 @@ from __future__ import print_function, absolute_import, division import logging import os +import time import unittest import uuid @@ -10,7 +11,7 @@ from coolamqp.clustering import Cluster, MessageReceived, NothingMuch from coolamqp.objects import Message, NodeDefinition, Queue, MessageProperties, Exchange # todo handle bad auth -NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) +NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', hertbeat=20) logging.basicConfig(level=logging.DEBUG) logging.getLogger('coolamqp').setLevel(logging.DEBUG) logger = logging.getLogger(__name__) @@ -31,6 +32,25 @@ class TestExchanges(unittest.TestCase): fut.result() cons.cancel().result() + def test_topic_exchanges(self): + xchg_name = uuid.uuid4().hex + test = {'a': 0} + + def do_msg(msg): + msg.ack() + test['a'] += 1 + + xchg = Exchange(xchg_name, type=b'topic') + queue = Queue(exchange=xchg, routing_key=b'test') + cons, fut = self.c.consume(queue, no_ack=False, on_message=do_msg) + fut.result() + self.c.publish(Message(b'dupa'), xchg_name, routing_key=b'test', confirm=True).result() + self.c.publish(Message(b'dupa2'), xchg_name, routing_key=b'test2', confirm=True).result() + self.c.publish(Message(b'dupa'), xchg_name, routing_key=b'test', confirm=True).result() + time.sleep(1) + cons.cancel().result() + self.assertEqual(test['a'], 2) + def test_deadlettering(self): xchg_name = uuid.uuid4().hex dead_queue_name = uuid.uuid4().hex