diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 78077085670e3c300c4a9ef388e83de6b1fbe1e7..86938649b18b3132547c7a38c09d482bc1e0f2ed 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -413,7 +413,7 @@ class Consumer(Channeler): QueueBind( self.queue.name, self.queue.exchange.name.encode('utf8'), - b'', False, []), + b'', False, self.queue.arguments_bind), QueueBindOk, self.on_setup ) diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 68278f4291aa0e4cf4f3554639a284ad0483d875..3909a159a8184be57552f94d42c325582a77120e 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -1,5 +1,8 @@ # coding=UTF-8 from __future__ import print_function, absolute_import + +from coolamqp.objects import argumentify + """ A Python version of the AMQP machine-readable specification. @@ -2941,6 +2944,9 @@ class BasicContentPropertyList(AMQPContentPropertyList): :param reserved: reserved, must be empty :type reserved: binary type (max length 255) (AMQP as shortstr) """ + if 'headers' in kwargs: + kwargs['headers'] = argumentify(kwargs['headers']) + zpf = bytearray([ (('content_type' in kwargs) << 7) | (('content_encoding' in kwargs) << 6) | diff --git a/coolamqp/objects.py b/coolamqp/objects.py index f1cf91999c650bee2cc642c26ee4a5a7e0df4b88..7d483341f9cc3b9122662a1e1fd231d57a3eac1b 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -34,7 +34,7 @@ def tobytes(q): def argumentify(arguments): if arguments is None: - return None + return [] args = [] if isinstance(arguments, dict): for key, value in arguments.items(): @@ -268,7 +268,9 @@ class Queue(object): :param exchange: Exchange for this queue to bind to. None for no binding. :param exclusive: This queue will be deleted when the connection closes :param auto_delete: This queue will be deleted when the last consumer unsubscribes - :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument + :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument during + declaration + :param arguments_bind: arguments to pass to binding to a (headers, I suppose exchange) :raises ValueError: tried to create a queue that was not durable or auto_delete :raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous :raises ValueError: tried to create a queue that was anonymous and not auto_delete or durable @@ -277,14 +279,15 @@ class Queue(object): :warning UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue """ __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', - 'anonymous', 'consumer_tag', 'arguments') + 'anonymous', 'consumer_tag', 'arguments', 'arguments_bind') def __init__(self, name=None, # type: tp.Union[str, bytes, None] durable=False, # type: bool exchange=None, # type: tp.Optional[Exchange] exclusive=True, # type: bool auto_delete=True, # type: bool - arguments=None # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]] + arguments=None, # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]], + arguments_bind=None, ): if name is None: self.name = None @@ -296,6 +299,7 @@ class Queue(object): self.auto_delete = auto_delete self.exclusive = exclusive self.arguments = argumentify(arguments) + self.arguments_bind = argumentify(arguments_bind) self.anonymous = self.name is None # if this queue is anonymous, it must be regenerated upon reconnect if self.auto_delete and self.durable: diff --git a/tests/test_clustering/test_topic_exchanges.py b/tests/test_clustering/test_topic_exchanges.py index dc8c18ea29c1178c54286560b3ac3f17ca55a175..4d6554bb56dc7d00955604e96317f8554bb0d72a 100644 --- a/tests/test_clustering/test_topic_exchanges.py +++ b/tests/test_clustering/test_topic_exchanges.py @@ -3,11 +3,12 @@ import time import os import unittest import logging +import uuid import monotonic from coolamqp.clustering import Cluster -from coolamqp.objects import Exchange, Queue, NodeDefinition, Message +from coolamqp.objects import Exchange, Queue, NodeDefinition, Message, MessageProperties XCHG = Exchange('smok5.results', type='topic', durable=True) @@ -20,6 +21,7 @@ logging.getLogger('coolamqp').setLevel(logging.DEBUG) did_receive = False + class TestTopic(unittest.TestCase): def setUp(self): self.c = Cluster([NODE]) @@ -32,6 +34,31 @@ class TestTopic(unittest.TestCase): pass self.c.shutdown() + def test_headers_exchange(self): + xchg_name = uuid.uuid4().hex + exchange = Exchange(xchg_name, b'headers') + self.c.declare(exchange).result() + queue1 = Queue(exchange=exchange, arguments={'x-match': 'all', 'location': 'brisbane'}) + self.c.declare(queue1).result() + queue2 = Queue(exchange=exchange, arguments={'x-match': 'all', 'location': 'sydney'}) + self.c.declare(queue2).result() + + test = {'a': 0} + def do_message(msg): + msg.ack() + test['a'] += 1 + + cons1, fut1 = self.c.consume(queue1, no_ack=False).result() + cons2, fut2 = self.c.consume(queue2, no_ack=False).result() + + self.c.publish(Message(b'dupa', MessageProperties(headers={'location': 'sydney'})), exchange=exchange, confirm=True).result() + self.c.publish(Message(b'dupa', MessageProperties(headers={'location': 'brisbane'})), exchange=exchange, confirm=True).result() + self.c.publish(Message(b'dupa', MessageProperties(headers={'location': 'wtf'})), exchange=exchange, confirm=True).result() + time.sleep(1) + cons1.cancel().result() + cons2.cancel().result() + self.assertEqual(test['a'], 2) + def test_bind_stuff(self): self.c.declare(QUEUE) self.c.declare(XCHG).result()