diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d82c6d30f965b85c37399d5f246d8cbade8ae5d..85fc4b57e000c01b534a5b81ddd9c524a90e51f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,11 @@ Previous release notes are hosted on [GitHub](https://github.com/smok-serwis/coolamqp/releases). Since v1.3.2 they'll be put here and in release description. -# v1.3.2 +# v1.4.0 * removed the requirement for a Queue that for it to be equal to other Queue if their types do match * compile_definitions will now depend on requests * added support for infinite (None) timeouts during start * stress tests will run for 120 seconds now * stress tests will be harder, and use more queues +* added arguments to queues, queue binds and exchanges diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 73027a1b053585622dd8680ab1b9288cbd743290..0e092e7a707ca84156f383e70a120de7040c9745 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1 @@ -__version__ = '1.3.2b5' +__version__ = '1.4.0b1' diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index c2cb8c1df211c7609c3d96c8cc11a0d0c0feac61..eea1f2ccd08b95151b2b5a52d76cf40bd56f170b 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -106,18 +106,18 @@ class Operation(object): self.declarer.method_and_watch( ExchangeDeclare(self.obj.name.encode('utf8'), obj.type, False, obj.durable, - obj.auto_delete, False, False, []), + obj.auto_delete, False, False, obj.arguments or []), (ExchangeDeclareOk, ChannelClose), self._callback) elif isinstance(obj, Queue): self.declarer.method_and_watch( QueueDeclare(obj.name, False, obj.durable, obj.exclusive, - obj.auto_delete, False, []), + obj.auto_delete, False, obj.arguments or []), (QueueDeclareOk, ChannelClose), self._callback) elif isinstance(obj, CommandQueueBind): self.declarer.method_and_watch( - QueueBind(obj.queue, obj.exchange, obj.routing_key, False, []), + QueueBind(obj.queue, obj.exchange, obj.routing_key, False, obj.arguments or []), (QueueBindOk, ChannelClose), self._callback ) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index b81c2625f647547949b8eeeb9eed04184d24f7d9..6a8432d2fa841c08dfa3480f3dcceed0613ecfc8 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -100,7 +100,7 @@ class Cluster(object): self.on_fail = None def bind(self, queue, exchange, routing_key, persistent=False, span=None, - dont_trace=False): + dont_trace=False, arguments=None): """ Bind a queue to an exchange """ @@ -108,7 +108,7 @@ class Cluster(object): child_span = self._make_span('bind', span) else: child_span = None - fut = self.decl.declare(QueueBind(queue, exchange, routing_key), + fut = self.decl.declare(QueueBind(queue, exchange, routing_key, arguments), persistent=persistent, span=child_span) return close_future(fut, child_span) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index e505920eb357a9a82266f221a20f7e1b1feeb5f2..d8cb7f0a8e8cace0bd24bdcaa7add7d1d5dd08ff 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -18,6 +18,28 @@ logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() +def argumentify(arguments): + if arguments is None: + return None + args = [] + if isinstance(arguments, dict): + for key, value in arguments.items(): + if not isinstance(value, six.string_types): + value = str(value) + if not isinstance(value, six.binary_type): + value = value.encode('utf8') + args.append((key, value)) + else: + for key, value in arguments: + if not isinstance(value, six.string_types): + value = str(value) + if not isinstance(value, six.binary_type): + value = value.encode('utf8') + args.append((key, value)) + + return args + + def toutf8(q): if isinstance(q, six.binary_type): q = q.decode('utf8') @@ -175,14 +197,15 @@ class Exchange(object): This represents an Exchange used in AMQP. This is hashable. """ - __slots__ = ('name', 'type', 'durable', 'auto_delete') + __slots__ = ('name', 'type', 'durable', 'auto_delete', 'arguments') direct = None # the direct exchange def __init__(self, name=u'', # type: tp.Union[str, bytes] type=b'direct', # type: tp.Union[str, bytes] durable=True, # type: bool - auto_delete=False # type: bool + auto_delete=False, # type: bool + arguments=None ): """ :type name: unicode is preferred, binary type will get decoded to @@ -192,6 +215,7 @@ class Exchange(object): self.type = tobytes(type) # must be bytes self.durable = durable self.auto_delete = auto_delete + self.arguments = argumentify(arguments) assert isinstance(self.name, six.text_type) assert isinstance(self.type, six.binary_type) @@ -224,13 +248,14 @@ class Queue(object): :param auto_delete: Is this queue auto_delete ? """ __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', - 'anonymous', 'consumer_tag') + 'anonymous', 'consumer_tag', 'arguments') def __init__(self, name=b'', # type: tp.Union[str, bytes] durable=False, # type: bool exchange=None, # type: tp.Optional[Exchange] exclusive=False, # type: bool - auto_delete=False # type: bool + auto_delete=False, # type: bool + arguments=None # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]] ): if not name: name = uuid.uuid4().hex @@ -240,6 +265,7 @@ class Queue(object): self.exchange = exchange self.auto_delete = auto_delete self.exclusive = exclusive + self.arguments = argumentify(arguments) self.anonymous = not len( self.name) # if this queue is anonymous, it must be regenerated upon reconnect @@ -259,9 +285,13 @@ class Queue(object): class QueueBind(object): """An order to be declared which binds a given queue to an exchange""" + + __slots__ = ('queue', 'exchange', 'routing_key', 'arguments') + def __init__(self, queue, # type: tp.Union[Queue, bytes, unicode] exchange, # type: tp.Union[Exchange, bytes, unicode] - routing_key # type: tp.Union[bytes, unicode] + routing_key, # type: tp.Union[bytes, unicode] + arguments=None # type: tp.Optional[tp.List[tuple[bytes, tp.Any]]] ): if isinstance(queue, Queue): queue = queue.name @@ -270,6 +300,7 @@ class QueueBind(object): exchange = exchange.name self.exchange = tobytes(exchange) # type: bytes self.routing_key = tobytes(routing_key) # type: bytes + self.arguments = argumentify(arguments) def __eq__(self, other): return self.queue == other.queue and self.exchange == other.exchange and self.routing_key == other.routing_key diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py index 146d3f4c7eecc29eebec2e1d61ba2e3c6821ac92..2cd252dec46d0f6f905da7429233d20eb6cffdb9 100644 --- a/tests/test_clustering/test_things.py +++ b/tests/test_clustering/test_things.py @@ -9,7 +9,7 @@ import unittest from coolamqp.clustering import Cluster from coolamqp.exceptions import ConnectionDead -from coolamqp.objects import NodeDefinition, Queue +from coolamqp.objects import NodeDefinition, Queue, Exchange NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) @@ -17,6 +17,25 @@ logging.basicConfig(level=logging.DEBUG) class TestConnecting(unittest.TestCase): + + def test_argumented_exchange(self): + xchg = Exchange('test', auto_delete=True) + c = Cluster([NODE]) + c.start(wait=True, timeout=None) + c.declare(xchg).result() + xchg2 = Exchange('test2', auto_delete=True, arguments={'alternate-exchange': 'test'}) + c.declare(xchg).result() + c.shutdown(True) + + def test_argumented_queue(self): + que = Queue(auto_delete=True, arguments=[(b'x-max-priority', 10)]) + que2 = Queue(auto_delete=True, arguments={'x-max-priority': 10}) + c = Cluster([NODE]) + c.start(wait=True, timeout=None) + c.declare(que).result() + c.declare(que2).result() + c.shutdown(True) + def test_connection_blocked(self): try: from coolamqp.framing.definitions import ConnectionBlocked