diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cc96b5a0400b947521bb4feb2d6a756b34ab429..0e7cd43f5a9907bc0990ec7140884334d4b8df48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ v2.0.0 * changed some default arguments for Queues for them to better make sense * some argument combinations just raise ValueError * PendingDeprecationWarning changed into a DeprecationWarning + * added support for headers exchanges * changes to Cluster: * declare will refuse to declare an anonymous queue * renamed publish(tx) to publish(confirm) diff --git a/coolamqp/argumentify.py b/coolamqp/argumentify.py index a8f0fd2b2af2cac689e279d09c563f696d128020..51aaed7708399cc688e486fa6fe124ee01b4c0f8 100644 --- a/coolamqp/argumentify.py +++ b/coolamqp/argumentify.py @@ -1,22 +1,12 @@ +import warnings + import six from coolamqp.framing.field_table import get_type_for +import logging +logger = logging.getLogger(__name__) -def argumentify(arguments): - if arguments is None: - return [] - args = [] - if isinstance(arguments, dict): - for key, value in arguments.items(): - key = tobytes(key) - args.append((key, (value, get_type_for(value)))) - else: - for key, value in arguments: - key = tobytes(key) - args.append((key, (value, get_type_for(value)))) - - return args def tobytes(q): @@ -28,4 +18,36 @@ def tobytes(q): def toutf8(q): if isinstance(q, memoryview): q = q.tobytes() - return q.decode('utf-8') if isinstance(q, six.binary_type) else q \ No newline at end of file + return q.decode('utf-8') if isinstance(q, six.binary_type) else q + + +def argumentify(arguments): + if arguments is None: + return [] + logger.warning('Input is %s' % (arguments, )) + # Was it argumented already? + # if isinstance(arguments, list): + # if len(arguments) >= 1: + # if isinstance(arguments[0], tuple): + # if isinstance(arguments[0][1], str) and len(arguments[0][1]) == 1: + # # Looks argumentified already + # return arguments + args = [] + if isinstance(arguments, dict): + for key, value in arguments.items(): + key = tobytes(key) + args.append((key, (value, get_type_for(value)))) + logger.warning('Output is %s', (args, 'F')) + return (args, 'F') + elif len(arguments[0]) == 2: + for key, value in arguments: + key = tobytes(key) + args.append((key, (value, get_type_for(value)))) + return (args, 'F') + elif isinstance(arguments, (list, tuple)): + for value in arguments: + args.append((value, get_type_for(value))) + return (args, 'A') + else: + warnings.warn('Unnecessary call to argumentify, see issue #11 for details', UserWarning) + return args diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index f5d6474e20d4b4d282a9b74f4f73b84cc44fd524..dae02c08f6f00a85c2fe7adab8d5fddc28cfb2c4 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -366,7 +366,7 @@ class Consumer(Channeler): self.queue.exchange.auto_delete, False, False, - []), + self.queue.exchange.arguments), ExchangeDeclareOk, self.on_setup ) @@ -387,7 +387,7 @@ class Consumer(Channeler): self.queue.exclusive, self.queue.auto_delete, False, - [] + self.queue.arguments ), QueueDeclareOk, self.on_setup @@ -395,25 +395,24 @@ class Consumer(Channeler): elif isinstance(payload, QueueDeclareOk): # did we need an anonymous name? - if not self.queue.name: - self.queue.name = self.queue.name.tobytes() + if self.queue.anonymous: + self.queue.name = payload.queue.tobytes() - queue_declared = False + queue_bound = False # We need any form of binding. if self.queue.exchange is not None: - queue_declared = True + queue_bound = True qb = QueueBind( self.queue.name, self.queue.exchange.name.encode('utf-8'), self.queue.routing_key, False, self.queue.arguments_bind) - logger.debug('Running %s' % (repr(qb))) self.method_and_watch( qb, QueueBindOk, self.on_setup ) - if not queue_declared: + if not queue_bound: # default exchange, pretend it was bind ok self.on_setup(QueueBindOk()) elif isinstance(payload, QueueBindOk): diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index e62fbb3ef90b309a692462beacdfe9f44bb383d2..0760c9d7c1810f3b3050700c997e2d328b0a8623 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -9,6 +9,7 @@ from concurrent.futures import Future import six +from coolamqp.argumentify import argumentify from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.attaches.utils import close_future from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ @@ -112,7 +113,7 @@ class Cluster(object): child_span = self._make_span('bind', span) else: child_span = None - fut = self.decl.declare(QueueBind(queue, exchange, routing_key, arguments), + fut = self.decl.declare(QueueBind(queue, exchange, routing_key, argumentify(arguments)), span=child_span) return close_future(fut, child_span) diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index f106e5bc3a01dbc2f35fea76d177110769e8ca40..12645c898bd832e965ac6b80b0c57669f4bbf0bc 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -2945,9 +2945,6 @@ class BasicContentPropertyList(AMQPContentPropertyList): :param reserved: reserved, must be empty :type reserved: binary type (max length 255) (AMQP as shortstr) """ - if 'headers' in kwargs: - kwargs['headers'] = argumentify(kwargs['headers']) - zpf = bytearray([ (('content_type' in kwargs) << 7) | (('content_encoding' in kwargs) << 6) | diff --git a/coolamqp/framing/field_table.py b/coolamqp/framing/field_table.py index 5d7b1de9923a0d8f0c0a45acc1ddb031ef35bcb4..a2752c671cfa6536ba83e01868ec4e046d723949 100644 --- a/coolamqp/framing/field_table.py +++ b/coolamqp/framing/field_table.py @@ -133,8 +133,12 @@ def get_type_for(val): return 'l' elif isinstance(val, float): return 'd' + elif isinstance(val, (tuple, list)): + return 'A' + elif isinstance(val, dict): + return 'F' else: - raise ValueError('Undeterminable type') + raise ValueError('I have zero idea what you have just passed') def enframe_field_value(buf, fv): @@ -203,8 +207,9 @@ def enframe_table(buf, table): # type (tp.BinaryIO, table) -> None :param buf: target buffer to write to :param table: table to write """ + if isinstance(table, tuple) and len(table) > 1 and table[1] == 'F': # Todo: fix an ugly hack + table = table[0] _tobuf(buf, '!I', frame_table_size(table) - 4) - for name, fv in table: _tobufv(buf, name, '!B', len(name)) enframe_field_value(buf, fv) @@ -240,10 +245,14 @@ def deframe_table(buf, start_offset): # -> (table, bytes_consumed) def frame_field_value_size(fv): v, t = fv - if FIELD_TYPES[t][0] is None: - return FIELD_TYPES[t][4](v) + 1 - else: - return FIELD_TYPES[t][0] + 1 + try: + if FIELD_TYPES[t][0] is None: + return FIELD_TYPES[t][4](v) + 1 + else: + return FIELD_TYPES[t][0] + 1 + except KeyError: + # todo: fix this hack + return frame_field_value_size(t) def frame_array_size(array): @@ -254,6 +263,9 @@ def frame_table_size(table): """ :return: length of table representation, in bytes, INCLUDING length header""" + if isinstance(table, tuple) and len(table) == 2: + table = table[0] + # todo: fix this hack return 4 + sum(1 + len(k) + frame_field_value_size(fv) for k, fv in table) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 3ab5a671592abd8ffdd4b5c308c4ccccc418c70e..bf10586a9d5139922f6c14c796a3b7f2f05f191f 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -11,10 +11,19 @@ import warnings import six from coolamqp.argumentify import argumentify, tobytes, toutf8 -from coolamqp.framing.definitions import BasicContentPropertyList as MessageProperties +from coolamqp.framing.definitions import BasicContentPropertyList logger = logging.getLogger(__name__) + +class MessageProperties(BasicContentPropertyList): + def __new__(cls, *args, **kwargs): + if 'headers' in kwargs: + if isinstance(kwargs['headers'], dict): + kwargs['headers'] = argumentify(kwargs['headers']) + return BasicContentPropertyList.__new__(cls, *args, **kwargs) + + EMPTY_PROPERTIES = MessageProperties() @@ -275,7 +284,7 @@ class Queue(object): self.exclusive = exclusive self.arguments = argumentify(arguments) self.arguments_bind = argumentify(arguments_bind) - self.anonymous = self.name is None # if this queue is anonymous, it must be regenerated upon reconnect + self.anonymous = self.name is None if self.auto_delete and self.durable: raise ValueError('Cannot create an auto_delete and durable queue') @@ -324,7 +333,7 @@ class QueueBind(object): exchange = exchange.name self.exchange = tobytes(exchange) # type: bytes self.routing_key = tobytes(routing_key) # type: bytes - self.arguments = argumentify(arguments) + self.arguments = arguments or [] def __eq__(self, other): return self.queue == other.queue and self.exchange == other.exchange and self.routing_key == other.routing_key diff --git a/tests/test_clustering/test_topic_exchanges.py b/tests/test_clustering/test_topic_exchanges.py index b44efe7131cab8e8cba7921b3e3bd3bf76dcb0d9..0a2bfab6f241268786a9415d9d557a8015471cc4 100644 --- a/tests/test_clustering/test_topic_exchanges.py +++ b/tests/test_clustering/test_topic_exchanges.py @@ -44,12 +44,13 @@ class TestTopic(unittest.TestCase): self.c.declare(queue2).result() test = {'a': 0} + def do_message(msg): msg.ack() test['a'] += 1 - cons1, fut1 = self.c.consume(queue1, no_ack=False) - cons2, fut2 = self.c.consume(queue2, no_ack=False) + cons1, fut1 = self.c.consume(queue1, on_message=do_message, no_ack=False) + cons2, fut2 = self.c.consume(queue2, on_message=do_message, no_ack=False) fut1.result() fut2.result() diff --git a/tests/test_objects.py b/tests/test_objects.py index 9d37c62d2cedd4f257a95a6e5646483e6e1ae2e2..19f12ca2e105f7a08f48833cad6849ecf625ac35 100644 --- a/tests/test_objects.py +++ b/tests/test_objects.py @@ -38,6 +38,11 @@ class TestObjects(unittest.TestCase): if IS_PY3: self.assertTrue(issubclass(w[1].category, DeprecationWarning)) + def test_headers(self): + msg = MessageProperties(headers={'city': 'sydney'}) + buf = io.BytesIO() + msg.write_to(buf) + def test_queue_declare(self): args = argumentify({'x-dead-letter-exchange': 'deadletter', 'x-message-ttl': 1000})