diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cc96b5a0400b947521bb4feb2d6a756b34ab429..0e7cd43f5a9907bc0990ec7140884334d4b8df48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ v2.0.0 * changed some default arguments for Queues for them to better make sense * some argument combinations just raise ValueError * PendingDeprecationWarning changed into a DeprecationWarning + * added support for headers exchanges * changes to Cluster: * declare will refuse to declare an anonymous queue * renamed publish(tx) to publish(confirm) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index f5d6474e20d4b4d282a9b74f4f73b84cc44fd524..0d05c4c286b939750809e9b80aaf72ce6085c31a 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -395,25 +395,24 @@ class Consumer(Channeler): elif isinstance(payload, QueueDeclareOk): # did we need an anonymous name? - if not self.queue.name: - self.queue.name = self.queue.name.tobytes() + if self.queue.anonymous: + self.queue.name = payload.queue.tobytes() - queue_declared = False + queue_bound = False # We need any form of binding. if self.queue.exchange is not None: - queue_declared = True + queue_bound = True qb = QueueBind( self.queue.name, self.queue.exchange.name.encode('utf-8'), self.queue.routing_key, False, self.queue.arguments_bind) - logger.debug('Running %s' % (repr(qb))) self.method_and_watch( qb, QueueBindOk, self.on_setup ) - if not queue_declared: + if not queue_bound: # default exchange, pretend it was bind ok self.on_setup(QueueBindOk()) elif isinstance(payload, QueueBindOk): diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index f106e5bc3a01dbc2f35fea76d177110769e8ca40..5c76cfae80561df0d9699cdb21213eb2bc11ae9d 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -2947,6 +2947,7 @@ class BasicContentPropertyList(AMQPContentPropertyList): """ if 'headers' in kwargs: kwargs['headers'] = argumentify(kwargs['headers']) + logger.warning('Argumentified headers are %s' % (kwargs['headers'], )) zpf = bytearray([ (('content_type' in kwargs) << 7) | diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 3ab5a671592abd8ffdd4b5c308c4ccccc418c70e..9137ae59bb45aba9297c503620eb66d7ebf75029 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -275,7 +275,7 @@ class Queue(object): self.exclusive = exclusive self.arguments = argumentify(arguments) self.arguments_bind = argumentify(arguments_bind) - self.anonymous = self.name is None # if this queue is anonymous, it must be regenerated upon reconnect + self.anonymous = self.name is None if self.auto_delete and self.durable: raise ValueError('Cannot create an auto_delete and durable queue') diff --git a/tests/test_clustering/test_topic_exchanges.py b/tests/test_clustering/test_topic_exchanges.py index b44efe7131cab8e8cba7921b3e3bd3bf76dcb0d9..0a2bfab6f241268786a9415d9d557a8015471cc4 100644 --- a/tests/test_clustering/test_topic_exchanges.py +++ b/tests/test_clustering/test_topic_exchanges.py @@ -44,12 +44,13 @@ class TestTopic(unittest.TestCase): self.c.declare(queue2).result() test = {'a': 0} + def do_message(msg): msg.ack() test['a'] += 1 - cons1, fut1 = self.c.consume(queue1, no_ack=False) - cons2, fut2 = self.c.consume(queue2, no_ack=False) + cons1, fut1 = self.c.consume(queue1, on_message=do_message, no_ack=False) + cons2, fut2 = self.c.consume(queue2, on_message=do_message, no_ack=False) fut1.result() fut2.result()