diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cc96b5a0400b947521bb4feb2d6a756b34ab429..0e7cd43f5a9907bc0990ec7140884334d4b8df48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ v2.0.0 * changed some default arguments for Queues for them to better make sense * some argument combinations just raise ValueError * PendingDeprecationWarning changed into a DeprecationWarning + * added support for headers exchanges * changes to Cluster: * declare will refuse to declare an anonymous queue * renamed publish(tx) to publish(confirm) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index f5d6474e20d4b4d282a9b74f4f73b84cc44fd524..0d05c4c286b939750809e9b80aaf72ce6085c31a 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -395,25 +395,24 @@ class Consumer(Channeler): elif isinstance(payload, QueueDeclareOk): # did we need an anonymous name? - if not self.queue.name: - self.queue.name = self.queue.name.tobytes() + if self.queue.anonymous: + self.queue.name = payload.queue.tobytes() - queue_declared = False + queue_bound = False # We need any form of binding. if self.queue.exchange is not None: - queue_declared = True + queue_bound = True qb = QueueBind( self.queue.name, self.queue.exchange.name.encode('utf-8'), self.queue.routing_key, False, self.queue.arguments_bind) - logger.debug('Running %s' % (repr(qb))) self.method_and_watch( qb, QueueBindOk, self.on_setup ) - if not queue_declared: + if not queue_bound: # default exchange, pretend it was bind ok self.on_setup(QueueBindOk()) elif isinstance(payload, QueueBindOk): diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index e62fbb3ef90b309a692462beacdfe9f44bb383d2..0760c9d7c1810f3b3050700c997e2d328b0a8623 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -9,6 +9,7 @@ from concurrent.futures import Future import six +from coolamqp.argumentify import argumentify from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.attaches.utils import close_future from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ @@ -112,7 +113,7 @@ class Cluster(object): child_span = self._make_span('bind', span) else: child_span = None - fut = self.decl.declare(QueueBind(queue, exchange, routing_key, arguments), + fut = self.decl.declare(QueueBind(queue, exchange, routing_key, argumentify(arguments)), span=child_span) return close_future(fut, child_span) diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index f106e5bc3a01dbc2f35fea76d177110769e8ca40..5c76cfae80561df0d9699cdb21213eb2bc11ae9d 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -2947,6 +2947,7 @@ class BasicContentPropertyList(AMQPContentPropertyList): """ if 'headers' in kwargs: kwargs['headers'] = argumentify(kwargs['headers']) + logger.warning('Argumentified headers are %s' % (kwargs['headers'], )) zpf = bytearray([ (('content_type' in kwargs) << 7) | diff --git a/coolamqp/framing/field_table.py b/coolamqp/framing/field_table.py index 5d7b1de9923a0d8f0c0a45acc1ddb031ef35bcb4..fc159a066e5ba88bc4cdda936110741cf9d63f56 100644 --- a/coolamqp/framing/field_table.py +++ b/coolamqp/framing/field_table.py @@ -254,6 +254,8 @@ def frame_table_size(table): """ :return: length of table representation, in bytes, INCLUDING length header""" + import logging + logging.getLogger(__name__).warning(repr(table)) return 4 + sum(1 + len(k) + frame_field_value_size(fv) for k, fv in table) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 3ab5a671592abd8ffdd4b5c308c4ccccc418c70e..12e3113a9d81921a452cd2619aff19f574905f0b 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -11,13 +11,20 @@ import warnings import six from coolamqp.argumentify import argumentify, tobytes, toutf8 -from coolamqp.framing.definitions import BasicContentPropertyList as MessageProperties +from coolamqp.framing.definitions import BasicContentPropertyList logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() +class MessageProperties(BasicContentPropertyList): + def __new__(cls, *args, **kwargs): + if 'headers' in kwargs: + kwargs['headers'] = argumentify(kwargs['headers']) + return BasicContentPropertyList.__new__(cls, *args, **kwargs) + + class Callable(object): """ Add a bunch of callables to one list, and just invoke'm. @@ -275,7 +282,7 @@ class Queue(object): self.exclusive = exclusive self.arguments = argumentify(arguments) self.arguments_bind = argumentify(arguments_bind) - self.anonymous = self.name is None # if this queue is anonymous, it must be regenerated upon reconnect + self.anonymous = self.name is None if self.auto_delete and self.durable: raise ValueError('Cannot create an auto_delete and durable queue') diff --git a/tests/test_clustering/test_topic_exchanges.py b/tests/test_clustering/test_topic_exchanges.py index b44efe7131cab8e8cba7921b3e3bd3bf76dcb0d9..0a2bfab6f241268786a9415d9d557a8015471cc4 100644 --- a/tests/test_clustering/test_topic_exchanges.py +++ b/tests/test_clustering/test_topic_exchanges.py @@ -44,12 +44,13 @@ class TestTopic(unittest.TestCase): self.c.declare(queue2).result() test = {'a': 0} + def do_message(msg): msg.ack() test['a'] += 1 - cons1, fut1 = self.c.consume(queue1, no_ack=False) - cons2, fut2 = self.c.consume(queue2, no_ack=False) + cons1, fut1 = self.c.consume(queue1, on_message=do_message, no_ack=False) + cons2, fut2 = self.c.consume(queue2, on_message=do_message, no_ack=False) fut1.result() fut2.result() diff --git a/tests/test_objects.py b/tests/test_objects.py index 9d37c62d2cedd4f257a95a6e5646483e6e1ae2e2..19f12ca2e105f7a08f48833cad6849ecf625ac35 100644 --- a/tests/test_objects.py +++ b/tests/test_objects.py @@ -38,6 +38,11 @@ class TestObjects(unittest.TestCase): if IS_PY3: self.assertTrue(issubclass(w[1].category, DeprecationWarning)) + def test_headers(self): + msg = MessageProperties(headers={'city': 'sydney'}) + buf = io.BytesIO() + msg.write_to(buf) + def test_queue_declare(self): args = argumentify({'x-dead-letter-exchange': 'deadletter', 'x-message-ttl': 1000})