From fbb9c6076bbf671a70d44eca109bd268382af18a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@ericsson.com> Date: Mon, 2 Sep 2024 16:54:10 +0200 Subject: [PATCH] added arguments to queues, queue binds and exchanges --- CHANGELOG.md | 3 +- coolamqp/__init__.py | 2 +- coolamqp/attaches/declarer.py | 6 ++-- coolamqp/clustering/cluster.py | 4 +-- coolamqp/objects.py | 41 ++++++++++++++++++++++++---- tests/test_clustering/test_things.py | 21 +++++++++++++- 6 files changed, 64 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d82c6d..85fc4b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,11 @@ Previous release notes are hosted on [GitHub](https://github.com/smok-serwis/coolamqp/releases). Since v1.3.2 they'll be put here and in release description. -# v1.3.2 +# v1.4.0 * removed the requirement for a Queue that for it to be equal to other Queue if their types do match * compile_definitions will now depend on requests * added support for infinite (None) timeouts during start * stress tests will run for 120 seconds now * stress tests will be harder, and use more queues +* added arguments to queues, queue binds and exchanges diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 73027a1..0e092e7 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1 @@ -__version__ = '1.3.2b5' +__version__ = '1.4.0b1' diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index c2cb8c1..eea1f2c 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -106,18 +106,18 @@ class Operation(object): self.declarer.method_and_watch( ExchangeDeclare(self.obj.name.encode('utf8'), obj.type, False, obj.durable, - obj.auto_delete, False, False, []), + obj.auto_delete, False, False, obj.arguments or []), (ExchangeDeclareOk, ChannelClose), self._callback) elif isinstance(obj, Queue): self.declarer.method_and_watch( QueueDeclare(obj.name, False, obj.durable, obj.exclusive, - obj.auto_delete, False, []), + obj.auto_delete, False, obj.arguments or []), (QueueDeclareOk, ChannelClose), self._callback) elif isinstance(obj, CommandQueueBind): self.declarer.method_and_watch( - QueueBind(obj.queue, obj.exchange, obj.routing_key, False, []), + QueueBind(obj.queue, obj.exchange, obj.routing_key, False, obj.arguments or []), (QueueBindOk, ChannelClose), self._callback ) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index b81c262..6a8432d 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -100,7 +100,7 @@ class Cluster(object): self.on_fail = None def bind(self, queue, exchange, routing_key, persistent=False, span=None, - dont_trace=False): + dont_trace=False, arguments=None): """ Bind a queue to an exchange """ @@ -108,7 +108,7 @@ class Cluster(object): child_span = self._make_span('bind', span) else: child_span = None - fut = self.decl.declare(QueueBind(queue, exchange, routing_key), + fut = self.decl.declare(QueueBind(queue, exchange, routing_key, arguments), persistent=persistent, span=child_span) return close_future(fut, child_span) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index e505920..d8cb7f0 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -18,6 +18,28 @@ logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() +def argumentify(arguments): + if arguments is None: + return None + args = [] + if isinstance(arguments, dict): + for key, value in arguments.items(): + if not isinstance(value, six.string_types): + value = str(value) + if not isinstance(value, six.binary_type): + value = value.encode('utf8') + args.append((key, value)) + else: + for key, value in arguments: + if not isinstance(value, six.string_types): + value = str(value) + if not isinstance(value, six.binary_type): + value = value.encode('utf8') + args.append((key, value)) + + return args + + def toutf8(q): if isinstance(q, six.binary_type): q = q.decode('utf8') @@ -175,14 +197,15 @@ class Exchange(object): This represents an Exchange used in AMQP. This is hashable. """ - __slots__ = ('name', 'type', 'durable', 'auto_delete') + __slots__ = ('name', 'type', 'durable', 'auto_delete', 'arguments') direct = None # the direct exchange def __init__(self, name=u'', # type: tp.Union[str, bytes] type=b'direct', # type: tp.Union[str, bytes] durable=True, # type: bool - auto_delete=False # type: bool + auto_delete=False, # type: bool + arguments=None ): """ :type name: unicode is preferred, binary type will get decoded to @@ -192,6 +215,7 @@ class Exchange(object): self.type = tobytes(type) # must be bytes self.durable = durable self.auto_delete = auto_delete + self.arguments = argumentify(arguments) assert isinstance(self.name, six.text_type) assert isinstance(self.type, six.binary_type) @@ -224,13 +248,14 @@ class Queue(object): :param auto_delete: Is this queue auto_delete ? """ __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', - 'anonymous', 'consumer_tag') + 'anonymous', 'consumer_tag', 'arguments') def __init__(self, name=b'', # type: tp.Union[str, bytes] durable=False, # type: bool exchange=None, # type: tp.Optional[Exchange] exclusive=False, # type: bool - auto_delete=False # type: bool + auto_delete=False, # type: bool + arguments=None # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]] ): if not name: name = uuid.uuid4().hex @@ -240,6 +265,7 @@ class Queue(object): self.exchange = exchange self.auto_delete = auto_delete self.exclusive = exclusive + self.arguments = argumentify(arguments) self.anonymous = not len( self.name) # if this queue is anonymous, it must be regenerated upon reconnect @@ -259,9 +285,13 @@ class Queue(object): class QueueBind(object): """An order to be declared which binds a given queue to an exchange""" + + __slots__ = ('queue', 'exchange', 'routing_key', 'arguments') + def __init__(self, queue, # type: tp.Union[Queue, bytes, unicode] exchange, # type: tp.Union[Exchange, bytes, unicode] - routing_key # type: tp.Union[bytes, unicode] + routing_key, # type: tp.Union[bytes, unicode] + arguments=None # type: tp.Optional[tp.List[tuple[bytes, tp.Any]]] ): if isinstance(queue, Queue): queue = queue.name @@ -270,6 +300,7 @@ class QueueBind(object): exchange = exchange.name self.exchange = tobytes(exchange) # type: bytes self.routing_key = tobytes(routing_key) # type: bytes + self.arguments = argumentify(arguments) def __eq__(self, other): return self.queue == other.queue and self.exchange == other.exchange and self.routing_key == other.routing_key diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py index 146d3f4..2cd252d 100644 --- a/tests/test_clustering/test_things.py +++ b/tests/test_clustering/test_things.py @@ -9,7 +9,7 @@ import unittest from coolamqp.clustering import Cluster from coolamqp.exceptions import ConnectionDead -from coolamqp.objects import NodeDefinition, Queue +from coolamqp.objects import NodeDefinition, Queue, Exchange NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) @@ -17,6 +17,25 @@ logging.basicConfig(level=logging.DEBUG) class TestConnecting(unittest.TestCase): + + def test_argumented_exchange(self): + xchg = Exchange('test', auto_delete=True) + c = Cluster([NODE]) + c.start(wait=True, timeout=None) + c.declare(xchg).result() + xchg2 = Exchange('test2', auto_delete=True, arguments={'alternate-exchange': 'test'}) + c.declare(xchg).result() + c.shutdown(True) + + def test_argumented_queue(self): + que = Queue(auto_delete=True, arguments=[(b'x-max-priority', 10)]) + que2 = Queue(auto_delete=True, arguments={'x-max-priority': 10}) + c = Cluster([NODE]) + c.start(wait=True, timeout=None) + c.declare(que).result() + c.declare(que2).result() + c.shutdown(True) + def test_connection_blocked(self): try: from coolamqp.framing.definitions import ConnectionBlocked -- GitLab