diff --git a/coolamqp/argumentify.py b/coolamqp/argumentify.py new file mode 100644 index 0000000000000000000000000000000000000000..a8f0fd2b2af2cac689e279d09c563f696d128020 --- /dev/null +++ b/coolamqp/argumentify.py @@ -0,0 +1,31 @@ +import six + +from coolamqp.framing.field_table import get_type_for + + +def argumentify(arguments): + if arguments is None: + return [] + args = [] + if isinstance(arguments, dict): + for key, value in arguments.items(): + key = tobytes(key) + args.append((key, (value, get_type_for(value)))) + else: + for key, value in arguments: + key = tobytes(key) + args.append((key, (value, get_type_for(value)))) + + return args + + +def tobytes(q): + if isinstance(q, memoryview): + return q.tobytes() + return q.encode('utf-8') if isinstance(q, six.text_type) else q + + +def toutf8(q): + if isinstance(q, memoryview): + q = q.tobytes() + return q.decode('utf-8') if isinstance(q, six.binary_type) else q \ No newline at end of file diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index b22bb479ed7e1597bee406e3b2524593db5ee55e..45b427876197be9586cbbf68b0a2e0b06f12a83a 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -8,6 +8,7 @@ from __future__ import print_function, absolute_import, division import logging import typing as tp +import coolamqp.argumentify from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, \ ChannelClose, ChannelCloseOk, BasicCancel, \ BasicCancelOk @@ -162,7 +163,7 @@ class Channeler(Attache): if isinstance(payload, ChannelClose): logger.debug('Channel closed: %s %s', payload.reply_code, - payload.reply_text.tobytes()) + coolamqp.argumentify.tobytes()) def methods(self, payloads): # type: (tp.Iterable[coolamqp.framing.base.AMQPMethodPayload]) -> None diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 78077085670e3c300c4a9ef388e83de6b1fbe1e7..d64e7724f6193948b1b5004ae4773738ffe5236d 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -8,6 +8,7 @@ import uuid import warnings from concurrent.futures import Future +import coolamqp.argumentify from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.exceptions import AMQPError from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ @@ -402,7 +403,7 @@ class Consumer(Channeler): elif isinstance(payload, QueueDeclareOk): # did we need an anonymous name? if not self.queue.name: - self.queue.name = payload.queue.tobytes() + self.queue.name = coolamqp.argumentify.tobytes() queue_declared = False # We need any form of binding. @@ -413,7 +414,7 @@ class Consumer(Channeler): QueueBind( self.queue.name, self.queue.exchange.name.encode('utf8'), - b'', False, []), + b'', False, self.queue.arguments_bind), QueueBindOk, self.on_setup ) diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 68278f4291aa0e4cf4f3554639a284ad0483d875..ab45f904f7a5e32e0ebc9600e0ed60cf722aee28 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -1,5 +1,9 @@ # coding=UTF-8 from __future__ import print_function, absolute_import + +import coolamqp.argumentify +from coolamqp.argumentify import argumentify + """ A Python version of the AMQP machine-readable specification. @@ -2941,6 +2945,9 @@ class BasicContentPropertyList(AMQPContentPropertyList): :param reserved: reserved, must be empty :type reserved: binary type (max length 255) (AMQP as shortstr) """ + if 'headers' in kwargs: + kwargs['headers'] = argumentify(kwargs['headers']) + zpf = bytearray([ (('content_type' in kwargs) << 7) | (('content_encoding' in kwargs) << 6) | @@ -3020,8 +3027,7 @@ class BasicContentPropertyList(AMQPContentPropertyList): else: while buf[offset + pfl - 1] & 1: pfl += 2 - zpf = BasicContentPropertyList.zero_property_flags(buf[offset:offset + - pfl]).tobytes() + zpf = coolamqp.argumentify.tobytes() if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: return BasicContentPropertyList.PARTICULAR_CLASSES[ zpf].from_buffer(buf, offset) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index f1cf91999c650bee2cc642c26ee4a5a7e0df4b88..8cad72d29a203932f1b35b8230428bd549782c2e 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -10,44 +10,15 @@ import warnings import six +from coolamqp.argumentify import argumentify, tobytes, toutf8 from coolamqp.framing.definitions import \ BasicContentPropertyList as MessageProperties -from coolamqp.framing.field_table import get_type_for logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() - -def toutf8(q): - if isinstance(q, memoryview): - q = q.tobytes() - return q.decode('utf-8') if isinstance(q, six.binary_type) else q - - -def tobytes(q): - if isinstance(q, memoryview): - return q.tobytes() - return q.encode('utf-8') if isinstance(q, six.text_type) else q - - -def argumentify(arguments): - if arguments is None: - return None - args = [] - if isinstance(arguments, dict): - for key, value in arguments.items(): - key = tobytes(key) - args.append((key, (value, get_type_for(value)))) - else: - for key, value in arguments: - key = tobytes(key) - args.append((key, (value, get_type_for(value)))) - - return args - - class Callable(object): """ Add a bunch of callables to one list, and just invoke'm. @@ -268,7 +239,9 @@ class Queue(object): :param exchange: Exchange for this queue to bind to. None for no binding. :param exclusive: This queue will be deleted when the connection closes :param auto_delete: This queue will be deleted when the last consumer unsubscribes - :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument + :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument during + declaration + :param arguments_bind: arguments to pass to binding to a (headers, I suppose exchange) :raises ValueError: tried to create a queue that was not durable or auto_delete :raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous :raises ValueError: tried to create a queue that was anonymous and not auto_delete or durable @@ -277,14 +250,15 @@ class Queue(object): :warning UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue """ __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', - 'anonymous', 'consumer_tag', 'arguments') + 'anonymous', 'consumer_tag', 'arguments', 'arguments_bind') def __init__(self, name=None, # type: tp.Union[str, bytes, None] durable=False, # type: bool exchange=None, # type: tp.Optional[Exchange] exclusive=True, # type: bool auto_delete=True, # type: bool - arguments=None # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]] + arguments=None, # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]], + arguments_bind=None, ): if name is None: self.name = None @@ -296,6 +270,7 @@ class Queue(object): self.auto_delete = auto_delete self.exclusive = exclusive self.arguments = argumentify(arguments) + self.arguments_bind = argumentify(arguments_bind) self.anonymous = self.name is None # if this queue is anonymous, it must be regenerated upon reconnect if self.auto_delete and self.durable: diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 0974f5cf876993f70ea46bdd7f81961ed42c5043..cb66293f34fa9ff39519655479906cd3068edadf 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -8,6 +8,7 @@ import time import typing as tp import uuid +import coolamqp.argumentify from coolamqp.utils import monotonic from coolamqp.exceptions import ConnectionDead @@ -245,7 +246,7 @@ class Connection(object): logger.info(u'[%s] Broker closed our connection - code %s reason %s', self.name, payload.reply_code, - payload.reply_text.tobytes().decode('utf8')) + coolamqp.argumentify.tobytes().decode('utf8')) elif isinstance(payload, ConnectionCloseOk): self.send(None) diff --git a/coolamqp/uplink/connection/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py index cd35ec6bf8f18382f477e280ecf9534c5d25cb1a..eef95b3e6f8da711c5a35d82579d9bc5992d2246 100644 --- a/coolamqp/uplink/connection/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -7,6 +7,7 @@ import struct import six +import coolamqp.argumentify from coolamqp.framing.definitions import FRAME_HEADER, FRAME_HEARTBEAT, \ FRAME_END, FRAME_METHOD, FRAME_BODY from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame, \ @@ -108,8 +109,7 @@ class ReceivingFramer(object): self.total_data_len >= AMQPHeartbeatFrame.LENGTH - 1): data = b'' while len(data) < AMQPHeartbeatFrame.LENGTH - 1: - data = data + self._extract( - AMQPHeartbeatFrame.LENGTH - 1 - len(data)).tobytes() + data = data + coolamqp.argumentify.tobytes() if data != AMQPHeartbeatFrame.DATA[1:]: # Invalid heartbeat frame! @@ -127,7 +127,7 @@ class ReceivingFramer(object): self.total_data_len > 6): hdr = b'' while len(hdr) < 6: - hdr = hdr + self._extract(6 - len(hdr)).tobytes() + hdr = hdr + coolamqp.argumentify.tobytes() self.frame_channel, self.frame_size = struct.unpack('!HI', hdr) diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 99d99bbfc7e2bfc67fcadd5c15b38b4dd59b858d..311f22108000d675f7d04eb891d0617f365df802 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -1,6 +1,8 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function +import coolamqp.argumentify + """ Provides reactors that can authenticate an AQMP session """ @@ -88,8 +90,8 @@ class Handshaker(object): def on_connection_start(self, payload # type: coolamqp.framing.base.AMQPPayload ): - sasl_mechanisms = payload.mechanisms.tobytes().split(b' ') - locale_supported = payload.locales.tobytes().split(b' ') + sasl_mechanisms = coolamqp.argumentify.tobytes().split(b' ') + locale_supported = coolamqp.argumentify.tobytes().split(b' ') # Select a mechanism if b'PLAIN' not in sasl_mechanisms: diff --git a/tests/test_clustering/test_topic_exchanges.py b/tests/test_clustering/test_topic_exchanges.py index dc8c18ea29c1178c54286560b3ac3f17ca55a175..4d6554bb56dc7d00955604e96317f8554bb0d72a 100644 --- a/tests/test_clustering/test_topic_exchanges.py +++ b/tests/test_clustering/test_topic_exchanges.py @@ -3,11 +3,12 @@ import time import os import unittest import logging +import uuid import monotonic from coolamqp.clustering import Cluster -from coolamqp.objects import Exchange, Queue, NodeDefinition, Message +from coolamqp.objects import Exchange, Queue, NodeDefinition, Message, MessageProperties XCHG = Exchange('smok5.results', type='topic', durable=True) @@ -20,6 +21,7 @@ logging.getLogger('coolamqp').setLevel(logging.DEBUG) did_receive = False + class TestTopic(unittest.TestCase): def setUp(self): self.c = Cluster([NODE]) @@ -32,6 +34,31 @@ class TestTopic(unittest.TestCase): pass self.c.shutdown() + def test_headers_exchange(self): + xchg_name = uuid.uuid4().hex + exchange = Exchange(xchg_name, b'headers') + self.c.declare(exchange).result() + queue1 = Queue(exchange=exchange, arguments={'x-match': 'all', 'location': 'brisbane'}) + self.c.declare(queue1).result() + queue2 = Queue(exchange=exchange, arguments={'x-match': 'all', 'location': 'sydney'}) + self.c.declare(queue2).result() + + test = {'a': 0} + def do_message(msg): + msg.ack() + test['a'] += 1 + + cons1, fut1 = self.c.consume(queue1, no_ack=False).result() + cons2, fut2 = self.c.consume(queue2, no_ack=False).result() + + self.c.publish(Message(b'dupa', MessageProperties(headers={'location': 'sydney'})), exchange=exchange, confirm=True).result() + self.c.publish(Message(b'dupa', MessageProperties(headers={'location': 'brisbane'})), exchange=exchange, confirm=True).result() + self.c.publish(Message(b'dupa', MessageProperties(headers={'location': 'wtf'})), exchange=exchange, confirm=True).result() + time.sleep(1) + cons1.cancel().result() + cons2.cancel().result() + self.assertEqual(test['a'], 2) + def test_bind_stuff(self): self.c.declare(QUEUE) self.c.declare(XCHG).result() diff --git a/tests/test_framing.py b/tests/test_framing.py index 3b4c10b2a1a2280dadbb16b896b8393e6cfe4769..5442da31c53daab3423eca2e52cd95eda886df72 100644 --- a/tests/test_framing.py +++ b/tests/test_framing.py @@ -3,7 +3,7 @@ import unittest from coolamqp.framing.field_table import enframe_table -from coolamqp.objects import argumentify +from coolamqp.argumentify import argumentify class TestFraming(unittest.TestCase): diff --git a/tests/test_objects.py b/tests/test_objects.py index 1415ba61ded72bd1fd99a0f2d8e591fadae7c949..9d37c62d2cedd4f257a95a6e5646483e6e1ae2e2 100644 --- a/tests/test_objects.py +++ b/tests/test_objects.py @@ -8,8 +8,8 @@ import warnings from coolamqp.framing.definitions import QueueDeclare -from coolamqp.objects import NodeDefinition, MessageProperties, Queue, argumentify - +from coolamqp.objects import NodeDefinition, MessageProperties, Queue +from coolamqp.argumentify import argumentify logger = logging.getLogger(__name__) logging.getLogger('coolamqp').setLevel(logging.DEBUG)