diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index a6b992102320531951b13dee6cde2ca2d13fde18..9f427e0e805b588b2fceeeae35b4b2248b0767ef 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -60,18 +60,18 @@ unittest_select: unittest_epoll_python27: - extends: .before_test + stage: unittest image: python:2.7 + variables: + AMQP_HOST: "rabbitmq" before_script: - - pip install nose2 nose2[coverage_plugin] + - pip install nose2 coverage requests yapf nose2[coverage_plugin] - python setup.py install script: - nose2 -F -vv - variables: - AMQP_HOST: "rabbitmq" - after_script: - - mv .coverage .coverage.python27epoll - + services: + - name: rabbitmq:3.10-management + alias: rabbitmq unittest_epoll: extends: .before_test diff --git a/CHANGELOG.md b/CHANGELOG.md index 966126608bf9dbcdc79f98277cba4d1127722a3f..0e7cd43f5a9907bc0990ec7140884334d4b8df48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,25 +1,49 @@ Previous release notes are hosted on [GitHub](https://github.com/smok-serwis/coolamqp/releases). Since v1.3.2 they'll be put here and in release description. -# v1.5.0 -======== +v2.0.0 +====== + +* **CoolAMQP switches now primarily to support RabbitMQ.** If it by accident supports your server, then that is a + pure coincidence and nothing is guaranteed. +* changes to Queues: + * anonymous queues are back, for usage refer [here](https://smokserwis.docs.smok.co/coolamqp/advanced.html) + * changed some default arguments for Queues for them to better make sense + * some argument combinations just raise ValueError + * PendingDeprecationWarning changed into a DeprecationWarning + * added support for headers exchanges +* changes to Cluster: + * declare will refuse to declare an anonymous queue + * renamed publish(tx) to publish(confirm) + * declare will expect qos to be given as an integer, and will be set as prefetch_count, since RabbitMQ no longer + supports prefetch_size + * same can be said of Consumer.set_qos(prefetch_count) + * added Cluster. + +Compatible changes +------------------ + +* fixed a bug wherein bad invocation of NodeDefinition would result in an exception + +v1.5.0 +====== * added properties to identify the server -# v1.4.4 -======== +v1.4.4 +====== * added unit tests for RabbitMQ 4.0 -# v1.4.3 -======== +v1.4.3 +====== * bugfix regarding deadlettering queues * prefetch_size will be forced to 0 to better comply with [RabbitMQ](https://www.rabbitmq.com/docs/specification#method-status-basic.qos) * and a DeprecationWarning will be shown to people who try to set something else. 0 will be forced upon them anyway. -# v1.4.2 -======== +v1.4.2 +====== * fixed and unit tested the topic exchanges * fixed declare documentation @@ -29,13 +53,13 @@ Since v1.3.2 they'll be put here and in release description. * prefetch_size will be forced to 0 to better comply with [RabbitMQ](https://www.rabbitmq.com/docs/specification#method-status-basic.qos) * and a DeprecationWarning will be shown to people who try to set something else. 0 will be forced upon them anyway. -# v1.4.1 -======= +v1.4.1 +====== * fixed a bug while setting up connection -# v1.2.16 -========= +v1.2.16 +======= * removed the requirement for a Queue that for it to be equal to other Queue if their types do match * compile_definitions will now depend on requests diff --git a/README.md b/README.md index 5f2331e511144dee3f93b436f9eb196154acecd3..6bd097dff499c622276fdbfca008c1b5e896e81f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ CoolAMQP ======== +**A Python client for RabbitMQ** + []() **Warning!!** Since v1.3.1 development has been moved @@ -18,6 +20,8 @@ Or state it at the beginning of your `requirements.txt`: coolamqp ``` +**Version 2.0** is in [active development](https://git.dms-serwis.com.pl/smokserwis/coolamqp/-/milestones/3) + Why CoolAMQP? ------------- @@ -60,8 +64,8 @@ Enjoy! _Watch out for memoryviews!_ They're here to stay. * [Short'n'sweet contributing guide](CONTRIBUTING.md) -* [Change log for past versions](https://github.com/smok-serwis/coolamqp/releases/) -* [Change log in this, unreleased version](CHANGELOG.md) +* [Change log for past historical versions](https://github.com/smok-serwis/coolamqp/releases/) +* [Change log for recent versions, unreleased version](CHANGELOG.md) ## Notes diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 77f1c8e63c9e11b968a1e66b0efebd57ee2a9240..0d6b713b1ca3d0c6c972d6eb2348da0fde32d4b3 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1 @@ -__version__ = '1.5.0' +__version__ = '2.0.0a3' diff --git a/coolamqp/argumentify.py b/coolamqp/argumentify.py new file mode 100644 index 0000000000000000000000000000000000000000..51aaed7708399cc688e486fa6fe124ee01b4c0f8 --- /dev/null +++ b/coolamqp/argumentify.py @@ -0,0 +1,53 @@ +import warnings + +import six + +from coolamqp.framing.field_table import get_type_for +import logging + +logger = logging.getLogger(__name__) + + + +def tobytes(q): + if isinstance(q, memoryview): + return q.tobytes() + return q.encode('utf-8') if isinstance(q, six.text_type) else q + + +def toutf8(q): + if isinstance(q, memoryview): + q = q.tobytes() + return q.decode('utf-8') if isinstance(q, six.binary_type) else q + + +def argumentify(arguments): + if arguments is None: + return [] + logger.warning('Input is %s' % (arguments, )) + # Was it argumented already? + # if isinstance(arguments, list): + # if len(arguments) >= 1: + # if isinstance(arguments[0], tuple): + # if isinstance(arguments[0][1], str) and len(arguments[0][1]) == 1: + # # Looks argumentified already + # return arguments + args = [] + if isinstance(arguments, dict): + for key, value in arguments.items(): + key = tobytes(key) + args.append((key, (value, get_type_for(value)))) + logger.warning('Output is %s', (args, 'F')) + return (args, 'F') + elif len(arguments[0]) == 2: + for key, value in arguments: + key = tobytes(key) + args.append((key, (value, get_type_for(value)))) + return (args, 'F') + elif isinstance(arguments, (list, tuple)): + for value in arguments: + args.append((value, get_type_for(value))) + return (args, 'A') + else: + warnings.warn('Unnecessary call to argumentify, see issue #11 for details', UserWarning) + return args diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index b22bb479ed7e1597bee406e3b2524593db5ee55e..a6ef07a6d94395265de06dc2371f06146acc9730 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -8,6 +8,7 @@ from __future__ import print_function, absolute_import, division import logging import typing as tp +import coolamqp.argumentify from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, \ ChannelClose, ChannelCloseOk, BasicCancel, \ BasicCancelOk @@ -161,8 +162,7 @@ class Channeler(Attache): self.channel_id = None if isinstance(payload, ChannelClose): - logger.debug('Channel closed: %s %s', payload.reply_code, - payload.reply_text.tobytes()) + logger.debug('Channel closed: %s %s', payload.reply_code, payload.reply_text) def methods(self, payloads): # type: (tp.Iterable[coolamqp.framing.base.AMQPMethodPayload]) -> None diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index bd43bf3356005f1f9460cb3042cfbe93b69c099e..dae02c08f6f00a85c2fe7adab8d5fddc28cfb2c4 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -8,6 +8,7 @@ import uuid import warnings from concurrent.futures import Future +import coolamqp.argumentify from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.exceptions import AMQPError from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ @@ -84,14 +85,8 @@ class Consumer(Channeler): :param span: optional span, if opentracing is installed :param no_ack: Will this consumer require acknowledges from messages? :type no_ack: bool - :param qos: a tuple of (prefetch size, prefetch window) for this - consumer, or an int (prefetch window only). - If an int is passed, prefetch size will be set to 0 (which means - undefined), and this int will be used for prefetch window - :type qos: tuple(int, int) or tuple(None, int) or int - :param cancel_on_failure: Consumer will cancel itself when link goes - down - :type cancel_on_failure: bool + :param qos: maximum messages to send to client that can stay unacknowledged (qos.prefetch_count) + :type qos: int, prefetch_count to use :param future_to_notify: Future to succeed when this consumer goes online for the first time. This future can also raise with AMQPError if @@ -113,13 +108,12 @@ class Consumer(Channeler): __slots__ = ('queue', 'no_ack', 'on_message', 'cancelled', 'receiver', 'attache_group', 'channel_close_sent', 'qos', 'qos_update_sent', 'future_to_notify', 'future_to_notify_on_dead', - 'fail_on_first_time_resource_locked', 'cancel_on_failure', + 'fail_on_first_time_resource_locked', 'body_receive_mode', 'consumer_tag', 'on_cancel', 'on_broker_cancel', 'hb_watch', 'deliver_watch', 'span') def __init__(self, queue, on_message, span=None, - no_ack=True, qos=None, - cancel_on_failure=False, + no_ack=True, qos=0, future_to_notify=None, fail_on_first_time_resource_locked=False, body_receive_mode=BodyReceiveMode.BYTES @@ -144,14 +138,13 @@ class Consumer(Channeler): self.channel_close_sent = False # for avoiding situations where ChannelClose is sent twice # if this is not None, then it has an attribute # on_cancel_customer(Consumer instance) - self.qos = _qosify(qos) + self.qos = qos self.qos_update_sent = False # QoS was not sent to server self.future_to_notify = future_to_notify self.future_to_notify_on_dead = None # .cancel self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked - self.cancel_on_failure = cancel_on_failure self.body_receive_mode = body_receive_mode self.consumer_tag = None @@ -162,18 +155,16 @@ class Consumer(Channeler): oneshots=True) #: public, called on Customer Cancel Notification # (RabbitMQ) - def set_qos(self, prefetch_size, prefetch_count): # type: (int, int) -> None + def set_qos(self, prefetch_count): # type: (int, int) -> None """ Set new QoS for this consumer. - :param prefetch_size: prefetch in octets :param prefetch_count: prefetch in whole messages + :type prefetch_count: int """ - if prefetch_size: - warnings.warn('RabbitMQ stopped supporting prefetch_sizes, will use 0 anyway', DeprecationWarning) if self.state == ST_ONLINE: self.method(BasicQos(0, prefetch_count, False)) - self.qos = 0, prefetch_count + self.qos = prefetch_count def cancel(self): # type: () -> Future """ @@ -245,9 +236,7 @@ class Consumer(Channeler): Note, this can be called multiple times, and eventually with None. """ - if self.cancel_on_failure and (not self.cancelled): - logger.debug( - 'Consumer is cancel_on_failure and failure seen, True->cancelled') + if not self.cancelled: self.cancelled = True self.on_cancel() @@ -377,7 +366,7 @@ class Consumer(Channeler): self.queue.exchange.auto_delete, False, False, - []), + self.queue.exchange.arguments), ExchangeDeclareOk, self.on_setup ) @@ -398,7 +387,7 @@ class Consumer(Channeler): self.queue.exclusive, self.queue.auto_delete, False, - [] + self.queue.arguments ), QueueDeclareOk, self.on_setup @@ -406,30 +395,30 @@ class Consumer(Channeler): elif isinstance(payload, QueueDeclareOk): # did we need an anonymous name? - if not self.queue.name: + if self.queue.anonymous: self.queue.name = payload.queue.tobytes() - queue_declared = False + queue_bound = False # We need any form of binding. if self.queue.exchange is not None: - if self.queue.exchange.type != b'topic': - queue_declared = True - self.method_and_watch( - QueueBind( - self.queue.name, - self.queue.exchange.name.encode('utf8'), - b'', False, []), - QueueBindOk, - self.on_setup - ) - - if not queue_declared: + queue_bound = True + qb = QueueBind( + self.queue.name, + self.queue.exchange.name.encode('utf-8'), + self.queue.routing_key, False, self.queue.arguments_bind) + self.method_and_watch( + qb, + QueueBindOk, + self.on_setup + ) + + if not queue_bound: # default exchange, pretend it was bind ok self.on_setup(QueueBindOk()) elif isinstance(payload, QueueBindOk): - if self.qos is not None: + if self.qos: self.method_and_watch( - BasicQos(0, self.qos[1], False), + BasicQos(0, self.qos, False), BasicQosOk, self.on_setup ) @@ -474,17 +463,8 @@ class Consumer(Channeler): return # resend QoS, in case of sth - if self.qos is not None: - self.set_qos(0, self.qos[1]) - - -def _qosify(qos): - if qos is not None: - if isinstance(qos, int): - qos = 0, qos - elif qos[0] is None: - qos = 0, qos[1] # prefetch_size=0=undefined - return qos + if self.qos: + self.set_qos(self.qos) class MessageReceiver(object): diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index eea1f2ccd08b95151b2b5a52d76cf40bd56f170b..b47818152e7419642511527243cf2a1b0084632e 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -192,9 +192,6 @@ class Declarer(Channeler, Synchronized): Channeler.__init__(self) Synchronized.__init__(self) self.cluster = cluster - self.declared = set() # since Queues and Exchanges are hashable... - # anonymous queues aren't, but we reject those - # persistent self.left_to_declare = collections.deque() # since last disconnect. persistent+transient # deque of Operation objects @@ -219,10 +216,6 @@ class Declarer(Channeler, Synchronized): while len(self.left_to_declare) > 0: self.left_to_declare.pop().on_connection_dead() - # recast current declarations as new operations - for dec in self.declared: - self.left_to_declare.append(Operation(self, dec)) - super(Declarer, self).on_close() return @@ -267,7 +260,7 @@ class Declarer(Channeler, Synchronized): return fut - def declare(self, obj, persistent=False, span=None): + def declare(self, obj, span=None): """ Schedule to have an object declared. @@ -280,11 +273,7 @@ class Declarer(Channeler, Synchronized): Queue declarations CAN fail. - Note that if re-declaring these fails, they will be silently discarded. - You can subscribe an on_discard(Exchange | Queue) here. - :param obj: Exchange or Queue instance - :param persistent: will be redeclared upon disconnect. To remove, use "undeclare" :param span: span if opentracing is installed :return: a Future instance :raise ValueError: tried to declare anonymous queue @@ -298,10 +287,6 @@ class Declarer(Channeler, Synchronized): fut = Future() fut.set_running_or_notify_cancel() - if persistent: - if obj not in self.declared: - self.declared.add(obj) - self.left_to_declare.append(Operation(self, obj, fut, span, enqueued_span)) self._do_operations() diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 82c729d490e9864996ffc277044833ee7187132d..0760c9d7c1810f3b3050700c997e2d328b0a8623 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -9,6 +9,7 @@ from concurrent.futures import Future import six +from coolamqp.argumentify import argumentify from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.attaches.utils import close_future from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ @@ -89,6 +90,7 @@ class Cluster(object): self.pub_tr = None # type: Publisher self.pub_na = None # type: Publisher self.decl = None # type: Declarer + self.on_fail = None if on_fail is not None: def decorated(): @@ -96,45 +98,50 @@ class Cluster(object): on_fail() self.on_fail = decorated - else: - self.on_fail = None - def bind(self, queue, exchange, routing_key, persistent=False, span=None, + def bind(self, queue, exchange, routing_key, span=None, dont_trace=False, arguments=None): """ Bind a queue to an exchange + + :raise ValueError: cannot bind to anonymous queues """ + if queue.anonymous: + raise ValueError('Canoot bind to anonymous queue') + if span is not None and not dont_trace: child_span = self._make_span('bind', span) else: child_span = None - fut = self.decl.declare(QueueBind(queue, exchange, routing_key, arguments), - persistent=persistent, + fut = self.decl.declare(QueueBind(queue, exchange, routing_key, argumentify(arguments)), span=child_span) return close_future(fut, child_span) def declare(self, obj, # type: tp.Union[Queue, Exchange] - persistent=False, # type: bool span=None, # type: tp.Optional[opentracing.Span] dont_trace=False # type: bool ): # type: (...) -> concurrent.futures.Future """ Declare a Queue/Exchange. + Non-anonymous queues have to be declared. Anonymous can't. + .. note:: Note that if your queue relates to an exchange that has not yet been declared you'll be faced with AMQP error 404: NOT_FOUND, so try to declare your exchanges before your queues. :param obj: Queue/Exchange object - :param persistent: should it be redefined upon reconnect? :param span: optional parent span, if opentracing is installed :param dont_trace: if True, a span won't be output :return: Future + :raises ValueError: tried to declare an anonymous queue """ + if isinstance(obj, Queue) and obj.anonymous: + raise ValueError('You cannot declare an anonymous queue!') if span is not None and not dont_trace: child_span = self._make_span('declare', span) else: child_span = None - fut = self.decl.declare(obj, persistent=persistent, span=child_span) + fut = self.decl.declare(obj, span=child_span) return close_future(fut, child_span) def drain(self, timeout, span=None, dont_trace=False): # type: (float) -> Event @@ -183,6 +190,9 @@ class Cluster(object): Take care not to lose the Consumer object - it's the only way to cancel a consumer! + .. note:: You don't need to explicitly declare queues and exchanges that you will be using beforehand, + this will do this for you on the same channel. + :param queue: Queue object, being consumed from right now. Note that name of anonymous queue might change at any time! :param on_message: callable that will process incoming messages @@ -232,7 +242,6 @@ class Cluster(object): def publish(self, message, # type: Message exchange=None, # type: tp.Union[Exchange, str, bytes] routing_key=u'', # type: tp.Union[str, bytes] - tx=None, # type: tp.Optional[bool] confirm=None, # type: tp.Optional[bool] span=None, # type: tp.Optional[opentracing.Span] dont_trace=False # type: bool @@ -246,9 +255,8 @@ class Cluster(object): :param confirm: Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. - Note that if tx if False, and message cannot be delivered to broker at once, + Note that if confirm is False, and message cannot be delivered to broker at once, it will be discarded - :param tx: deprecated, alias for confirm :param span: optionally, current span, if opentracing is installed :param dont_trace: if set to True, a span won't be generated :return: Future to be finished on completion or None, is confirm/tx was not chosen @@ -266,19 +274,8 @@ class Cluster(object): if isinstance(routing_key, six.text_type): routing_key = routing_key.encode('utf8') - if tx is not None: # confirm is a drop-in replacement. tx is unfortunately named - warnings.warn(u'Use confirm kwarg instead', DeprecationWarning) - - if confirm is not None: - raise RuntimeError( - u'Using both tx= and confirm= at once does not make sense') - elif confirm is not None: - tx = confirm - else: - tx = False - try: - if tx: + if confirm: clb = self.pub_tr else: clb = self.pub_na @@ -371,3 +368,9 @@ class Cluster(object): self.listener.terminate() if wait: self.listener.join() + + def is_shutdown(self): + """ + :return: bool, if this was started and later disconnected. + """ + return self.started and not self.connected diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index 1c856da672982b3ddd0f2604da30caa6ea5a7e32..634574a8fa14b2aae61dc3de900bc9554d7268e7 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -75,7 +75,6 @@ class SingleNodeReconnector(object): return self.connection = None - self.listener_thread.call_next_io_event(self.connect) def shutdown(self): """Close this connection""" diff --git a/coolamqp/framing/compilation/content_property.py b/coolamqp/framing/compilation/content_property.py index 0149db264f76358c1ce10e8c1a2c95261a18abf8..cf836613362ffa988ae30ae205217fe55e889fbc 100644 --- a/coolamqp/framing/compilation/content_property.py +++ b/coolamqp/framing/compilation/content_property.py @@ -1,7 +1,7 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -from coolamqp.framing.field_table import deframe_table +from coolamqp.framing.field_table import deframe_table, frame_table_size, enframe_table """Generate serializers/unserializers/length getters for given property_flags""" import six @@ -141,7 +141,6 @@ def _compile_particular_content_property_list_class(zpf, fields): mod.append(get_counter(present_fields, prefix=u'self.', indent_level=2)[ :-1]) # skip eol mod.append(u' + %s\n' % (zpf_length,)) # account for pf length - return u''.join(mod), structers @@ -156,6 +155,8 @@ def compile_particular_content_property_list_class(zpf, fields): locals_ = { 'AMQPContentPropertyList': AMQPContentPropertyList, 'deframe_table': deframe_table, + 'frame_table_size': frame_table_size, + 'enframe_table': enframe_table, } for structer in structers: if structer not in STRUCTERS_FOR_NOW: diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 68278f4291aa0e4cf4f3554639a284ad0483d875..12645c898bd832e965ac6b80b0c57669f4bbf0bc 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -1,5 +1,9 @@ # coding=UTF-8 from __future__ import print_function, absolute_import + +import coolamqp.argumentify +from coolamqp.argumentify import argumentify + """ A Python version of the AMQP machine-readable specification. @@ -3021,7 +3025,8 @@ class BasicContentPropertyList(AMQPContentPropertyList): while buf[offset + pfl - 1] & 1: pfl += 2 zpf = BasicContentPropertyList.zero_property_flags(buf[offset:offset + - pfl]).tobytes() + pfl]).tobytes() + if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: return BasicContentPropertyList.PARTICULAR_CLASSES[ zpf].from_buffer(buf, offset) diff --git a/coolamqp/framing/field_table.py b/coolamqp/framing/field_table.py index 5d7b1de9923a0d8f0c0a45acc1ddb031ef35bcb4..a2752c671cfa6536ba83e01868ec4e046d723949 100644 --- a/coolamqp/framing/field_table.py +++ b/coolamqp/framing/field_table.py @@ -133,8 +133,12 @@ def get_type_for(val): return 'l' elif isinstance(val, float): return 'd' + elif isinstance(val, (tuple, list)): + return 'A' + elif isinstance(val, dict): + return 'F' else: - raise ValueError('Undeterminable type') + raise ValueError('I have zero idea what you have just passed') def enframe_field_value(buf, fv): @@ -203,8 +207,9 @@ def enframe_table(buf, table): # type (tp.BinaryIO, table) -> None :param buf: target buffer to write to :param table: table to write """ + if isinstance(table, tuple) and len(table) > 1 and table[1] == 'F': # Todo: fix an ugly hack + table = table[0] _tobuf(buf, '!I', frame_table_size(table) - 4) - for name, fv in table: _tobufv(buf, name, '!B', len(name)) enframe_field_value(buf, fv) @@ -240,10 +245,14 @@ def deframe_table(buf, start_offset): # -> (table, bytes_consumed) def frame_field_value_size(fv): v, t = fv - if FIELD_TYPES[t][0] is None: - return FIELD_TYPES[t][4](v) + 1 - else: - return FIELD_TYPES[t][0] + 1 + try: + if FIELD_TYPES[t][0] is None: + return FIELD_TYPES[t][4](v) + 1 + else: + return FIELD_TYPES[t][0] + 1 + except KeyError: + # todo: fix this hack + return frame_field_value_size(t) def frame_array_size(array): @@ -254,6 +263,9 @@ def frame_table_size(table): """ :return: length of table representation, in bytes, INCLUDING length header""" + if isinstance(table, tuple) and len(table) == 2: + table = table[0] + # todo: fix this hack return 4 + sum(1 + len(k) + frame_field_value_size(fv) for k, fv in table) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index a7608d4a39d32a82f753b4ba4bf640bb120a17dc..445caea6a41052dd4332cec1627e5bf24a478f79 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -10,43 +10,21 @@ import warnings import six -from coolamqp.framing.definitions import \ - BasicContentPropertyList as MessageProperties -from coolamqp.framing.field_table import get_type_for +from coolamqp.argumentify import argumentify, tobytes, toutf8 +from coolamqp.framing.definitions import BasicContentPropertyList logger = logging.getLogger(__name__) -EMPTY_PROPERTIES = MessageProperties() - - -def argumentify(arguments): - if arguments is None: - return None - args = [] - if isinstance(arguments, dict): - for key, value in arguments.items(): - if not isinstance(key, six.binary_type): - key = key.encode('utf-8') - args.append((key, (value, get_type_for(value)))) - else: - for key, value in arguments: - if not isinstance(key, six.binary_type): - key = key.encode('utf-8') - args.append((key, (value, get_type_for(value)))) - return args +class MessageProperties(BasicContentPropertyList): + def __new__(cls, *args, **kwargs): + if 'headers' in kwargs: + if isinstance(kwargs['headers'], dict): + kwargs['headers'] = argumentify(kwargs['headers']) + return BasicContentPropertyList.__new__(cls, *args, **kwargs) -def toutf8(q): - if isinstance(q, six.binary_type): - q = q.decode('utf8') - return q - - -def tobytes(q): - if isinstance(q, six.text_type): - q = q.encode('utf8') - return q +EMPTY_PROPERTIES = MessageProperties() class Callable(object): @@ -88,8 +66,7 @@ class Message(object): You can pass a dict - it will be passed to MessageProperties, but it's slow - don't do that. - :type properties: MessageProperties instance, None or a dict (SLOW!) - + :type properties: :class:`coolamqp.objects.MessageProperties` instance """ __slots__ = ('body', 'properties') @@ -231,7 +208,6 @@ class Exchange(object): Exchange.direct = Exchange() - class ServerProperties(object): """ An object describing properties of the target server. @@ -255,11 +231,8 @@ class ServerProperties(object): elif isinstance(prop_value, list): prop_value = [toutf8(prop[0]) for prop in prop_value] self.properties[prop_name] = prop_value - self.mechanisms = data.mechanisms.tobytes().decode('utf-8').split(' ') - self.locales = data.locales.tobytes().decode('utf-8') - - def __str__(self): - return '%s %s %s %s' % (self.version, repr(self.properties), self.mechanisms, self.locales) + self.mechanisms = toutf8(data.mechanisms).split(' ') + self.locales = toutf8(data.locales) class Queue(object): @@ -267,50 +240,71 @@ class Queue(object): This object represents a Queue that applications consume from or publish to. Create a queue definition. - :param name: name of the queue. Generates a random uuid.uuid4().hex if not given. Note that this kind of queue - will probably require to be declared. + :param name: name of the queue. + None (default) for autogeneration. Just follow the rules for :ref:`anonymq`. + If empty string, a UUID name will be generated, and you won't have an anonymous queue anymore. :param durable: Is the queue durable? :param exchange: Exchange for this queue to bind to. None for no binding. - :param exclusive: Is this queue exclusive? - :param auto_delete: Is this queue auto_delete ? + :param exclusive: This queue will be deleted when the connection closes + :param auto_delete: This queue will be deleted when the last consumer unsubscribes :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument - :warning PendingDeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations + :param routing_key: routing key that will be used to bind to an exchange. Used only when this + queue is associated with an exchange. Default value of blank should suffice. + :param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument during + declaration + :param arguments_bind: arguments to pass to binding to a (headers, I suppose exchange) + :raises ValueError: tried to create a queue that was not durable or auto_delete + :raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous + :raises ValueError: tried to create a queue that was anonymous and not auto_delete or durable + :warning DeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations that will be soon unavailable (eg. RabbitMQ 4.0). + :warning UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue """ __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', - 'anonymous', 'consumer_tag', 'arguments') + 'anonymous', 'consumer_tag', 'arguments', 'routing_key', 'arguments_bind') - def __init__(self, name=b'', # type: tp.Union[str, bytes] + def __init__(self, name=None, # type: tp.Union[str, bytes, None] durable=False, # type: bool exchange=None, # type: tp.Optional[Exchange] - exclusive=False, # type: bool - auto_delete=False, # type: bool - arguments=None # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]] + exclusive=True, # type: bool + auto_delete=True, # type: bool + arguments=None, # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]], + routing_key=b'', #: type: tp.Union[str, bytes] + arguments_bind=None, ): - if not name: - name = uuid.uuid4().hex - self.name = tobytes(name) #: public, must be bytes - # if name is '', this will be filled in with broker-generated name upon declaration + if name is None: + self.name = None + else: + self.name = tobytes(uuid.uuid4().hex if not name else name) + self.durable = durable self.exchange = exchange + self.routing_key = tobytes(routing_key) self.auto_delete = auto_delete self.exclusive = exclusive self.arguments = argumentify(arguments) + self.arguments_bind = argumentify(arguments_bind) + self.anonymous = self.name is None if self.auto_delete and self.durable: - warnings.warn('This will be removed in RabbitMQ 4.0', PendingDeprecationWarning) + raise ValueError('Cannot create an auto_delete and durable queue') + + if self.anonymous and (not self.auto_delete or self.durable): + raise ValueError('Zero sense to make a anonymous non-auto-delete or durable queue') + + if not self.anonymous and (self.auto_delete or self.exclusive): + warnings.warn('This may cause unpredictable behaviour', UserWarning) - if self.auto_delete and not self.exclusive: - warnings.warn('This will be removed in RabbitMQ 4.0', PendingDeprecationWarning) + if self.durable and self.anonymous: + raise ValueError('Cannot declare an anonymous durable queue') - self.anonymous = not len( - self.name) # if this queue is anonymous, it must be regenerated upon reconnect + if self.auto_delete and not self.exclusive and not self.anonymous: + raise ValueError('Cannot create an auto_delete and durable queue non-anonymous') - self.consumer_tag = self.name if not self.anonymous else uuid.uuid4().hex.encode( - 'utf8') # bytes, consumer tag to use in AMQP comms + self.consumer_tag = self.name if not self.anonymous else tobytes(uuid.uuid4().hex) - assert isinstance(self.name, six.binary_type) - assert isinstance(self.consumer_tag, six.binary_type) + if not self.exclusive and self.auto_delete: + warnings.warn('This will be removed in RabbitMQ 4.0', DeprecationWarning) def __eq__(self, other): return self.name == other.name @@ -318,6 +312,9 @@ class Queue(object): def __hash__(self): return hash(self.name) + def __repr__(self): + return 'Queue(%s, %s, %s, %s, %s, %s' % (self.name, self.durable, self.exchange, self.exclusive, self.arguments) + class QueueBind(object): """An order to be declared which binds a given queue to an exchange""" @@ -336,7 +333,7 @@ class QueueBind(object): exchange = exchange.name self.exchange = tobytes(exchange) # type: bytes self.routing_key = tobytes(routing_key) # type: bytes - self.arguments = argumentify(arguments) + self.arguments = arguments or [] def __eq__(self, other): return self.queue == other.queue and self.exchange == other.exchange and self.routing_key == other.routing_key @@ -379,16 +376,13 @@ class NodeDefinition(object): def __init__(self, *args, **kwargs): self.heartbeat = kwargs.pop('heartbeat', None) self.port = kwargs.pop('port', 5672) + self.host = None + self.user = None + self.password = None + self.virtual_host = '/' - if len(kwargs) > 0: - # Prepare arguments for amqp.connection.Connection - self.host = kwargs['host'] - self.user = kwargs['user'] - self.password = kwargs['password'] - self.virtual_host = kwargs.get('virtual_host', '/') - elif len(args) == 3: + if len(args) == 3: self.host, self.user, self.password = args - self.virtual_host = '/' elif len(args) == 4: self.host, self.user, self.password, self.virtual_host = args elif len(args) == 1 and isinstance(args[0], @@ -412,8 +406,14 @@ class NodeDefinition(object): host, port = self.host.split(u':', 1) self.port = int(port) # else get that port from kwargs - else: - raise ValueError(u'What did you exactly pass?') + + if len(kwargs) > 0: + # Prepare arguments for amqp.connection.Connection + self.host = kwargs.get('host', self.host) + self.user = kwargs.get('user', self.user) + self.port = kwargs.get('port', self.port) + self.password = kwargs.get('password', self.password) + self.virtual_host = kwargs.get('virtual_host', self.virtual_host) def __str__(self): # type: () -> str return six.text_type( diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 0974f5cf876993f70ea46bdd7f81961ed42c5043..cf0cce9ff980ab798fd692863600116b825a321e 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -8,6 +8,7 @@ import time import typing as tp import uuid +import coolamqp.argumentify from coolamqp.utils import monotonic from coolamqp.exceptions import ConnectionDead diff --git a/coolamqp/uplink/connection/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py index cd35ec6bf8f18382f477e280ecf9534c5d25cb1a..4f4545229dd8836a93d6b00942b33bc51b95e41a 100644 --- a/coolamqp/uplink/connection/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -7,6 +7,7 @@ import struct import six +import coolamqp.argumentify from coolamqp.framing.definitions import FRAME_HEADER, FRAME_HEARTBEAT, \ FRAME_END, FRAME_METHOD, FRAME_BODY from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame, \ diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 99d99bbfc7e2bfc67fcadd5c15b38b4dd59b858d..77d805a6d42aee240b7485149e3abeb4da1d6df6 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -1,6 +1,8 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function +import coolamqp.argumentify + """ Provides reactors that can authenticate an AQMP session """ diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py index 69424de8796b5e2df3c73572bd6f004271522543..be8ec00dcdc0029385e66552259d36e62d15df5e 100644 --- a/coolamqp/uplink/listener/thread.py +++ b/coolamqp/uplink/listener/thread.py @@ -54,18 +54,6 @@ class ListenerThread(threading.Thread): self._call_next_io_event = Callable(oneshots=True) self.listener = None # type: BaseListener - def call_next_io_event(self, callable): - """ - Call callable after current I/O event is fully processed - - sometimes many callables are called in response to single - I/O (eg. teardown, startup). This guarantees a call after - all these are done. - :param callable: callable/0 - """ - pass -# self._call_next_io_event.add(callable) - dummy that out, causes AssertionError to appear - def terminate(self): self.terminating = True diff --git a/docs/advanced.rst b/docs/advanced.rst index 2f743c98cab9967642959243cc758a78e2aaccf0..de6b35fbf435b8d66a8dddbf9494570718c8cd93 100644 --- a/docs/advanced.rst +++ b/docs/advanced.rst @@ -3,3 +3,15 @@ Advanced things .. autoclass:: coolamqp.uplink.connection.Connection :members: + + +Declaring anonymous queues +-------------------------- + +.. _anonymq: + +In order to make use of an anonymous queue, you must first :meth:`coolamqp.clustering.Cluster.consume` it, since +:meth:`coolamqp.clustering.Cluster.declare` will use a separate channel, in which the queue will be invalid. It will +raise ValueError if you try to do that, anyway. + +Anonymous queues must be auto_delete and exclusive, ValueError will be raised otherwise. diff --git a/docs/conf.py b/docs/conf.py index 29896f768b9948b827dc234c7c95576286d2306a..f8f08c5733ae79e339c02418bdb18bbab9f58593 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -35,6 +35,12 @@ source_parsers = { # ones. extensions = ['sphinx.ext.autodoc'] +autodoc_default_options = { + 'members': True, +} +autodoc_default_flags = [ + 'show-inheritance' +] # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] diff --git a/docs/tutorial.rst b/docs/how-to-guide.rst similarity index 100% rename from docs/tutorial.rst rename to docs/how-to-guide.rst diff --git a/docs/index.rst b/docs/index.rst index f7237ce1327a5e9ff07f2fef4c5f7cd082bda8b7..102cf4c7a04b1fb519bf612db1361af5054a9390 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -5,8 +5,10 @@ Welcome to CoolAMQP's documentation! :maxdepth: 2 :caption: Contents + whatsnew cluster - tutorial + tutorials/send_and_receive + how-to-guide caveats frames basics diff --git a/docs/tutorials/send_and_receive.rst b/docs/tutorials/send_and_receive.rst new file mode 100644 index 0000000000000000000000000000000000000000..f8d2fe73559aa8a6e86ec0ba612a901a41b1985c --- /dev/null +++ b/docs/tutorials/send_and_receive.rst @@ -0,0 +1,59 @@ +Send and receive +================ + +In this tutorial we'll learn how to declare a named queue and send a message to it with acknowledgement: + +First we need to connect to the server. Let's assume you have configured a virtual host called /vhost + +.. code-block:: python + + from coolamqp.cluster import Cluster + from coolamqp.objects import NodeDefinition + + nd = NodeDefinition('amqp://127.0.0.1:5672/vhost', user='test', password='test', heartbeat=30) + c = Cluster(nd) + c.start() + +Then we'll need to declare a queue: + + +.. code-block:: python + from coolamqp.objects import Queue + + queue = Queue('my-named-queue') + c.declare(queue).result() + +You'll be calling :code:`result()` on most of CoolAMQP's calls, as they return futures that complete when the task is done. +Now let's try subscribing to this queue: + +.. code-block:: python + + def handle_message(msg): + print(msg.body.tobytes().encode('utf-8')) + msg.ack() + + cons, fut = c.consume(queue, no_ack=False, on_message=handle_message) + fut.result() + +Notice the :code:`tobytes()`. This is because CoolAMQP by default returns most of it's received bytes as memoryviews, +for speed's sake. + +Also, your message handler is executed within the CoolAMQP's connection thread, so don't block for too long. + +Now it's time to send a message: + +.. code-block:: python + + from coolamqp.objects import Message + c.publish(Message(b'my bag of bytes'), confirm=True).result() + +Without the confirm flag, publish would not return the future. + +Congratulations, you've just send and received a message! Now it's time to do some cleanup. First, we cancel the consumer, +and then disconnect from the server + +.. code-block:: python + + cons.cancel().result() + c.shutdown() + diff --git a/docs/whatsnew.rst b/docs/whatsnew.rst new file mode 100644 index 0000000000000000000000000000000000000000..1757077915348433f76bca7adfc863d100fdd702 --- /dev/null +++ b/docs/whatsnew.rst @@ -0,0 +1,35 @@ +What's new? +=========== + +CoolAMQP 2.0.0 marks a slight philosophy shift. Whereas 1.x used auto-generated UUID names, 2.0 will let the server +pick their names for themselves. + +It also forbids some combinations of Queue arguments, and makes the default values more palatable, so for example +a naked :class:`coolamqp.objects.Queue` will be anonymous, non-durable, exclusive and auto-delete. + +Cluster.publish +--------------- + +:meth:`coolamqp.clustering.Cluster.publish` has no longer the tx param, which has been deprecated. + +Queues +------ + +Following queues will fail now: + +* auto_delete and durable +* anonymous and durables +* anonymous and not exclusives +* anonymous and not auto_delete +* auto_delete and not exclusive and not anonymous + +Following will emit a warning: + +* exclusive and auto_delete - DeprecationWarning, since they're removing it in RabbitMQ 4.0 +* not anonymous, auto_delete and not exclusive - UserWarning, since this makes little sense + +Anonymous queues +---------------- + +They are back. Besides, anything that you will pass to :meth:`coolamqp.clustering.Cluster.consume` will be declared, be +it an exchange, a queue or such shit. This allows you to declare anonymous queues. diff --git a/tests/test_attaches/test_consumer.py b/tests/test_attaches/test_consumer.py index 8173a1cec6edf2ab2bb5d07d97770f37cde45a0d..25ac677745cd5bd433d90316347f935f2d1eeb81 100644 --- a/tests/test_attaches/test_consumer.py +++ b/tests/test_attaches/test_consumer.py @@ -11,4 +11,4 @@ class TestConsumer(unittest.TestCase): def test_issue_26(self): """Support for passing qos as int""" cons = Consumer(Queue('wtf'), lambda msg: None, qos=25) - self.assertEqual(cons.qos, (0, 25)) + self.assertEqual(cons.qos, 25) diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 27f5d69e58d3fe620b7cc85764546fac9d67caf6..42ab856e7435494e75420a7cb2fa226fe11b9422 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -26,7 +26,9 @@ class TestA(unittest.TestCase): self.c.start(timeout=20) def tearDown(self): + self.assertFalse(self.c.is_shutdown()) self.c.shutdown() + self.assertTrue(self.c.is_shutdown()) def test_properties(self): self.assertEqual(self.c.properties.properties['product'], 'RabbitMQ') @@ -75,17 +77,17 @@ class TestA(unittest.TestCase): fut.result() - con.set_qos(0, 100) + con.set_qos(100) time.sleep(1) - self.assertEqual(con.qos, (0, 100)) + self.assertEqual(con.qos, 100) - con.set_qos(None, 110) + con.set_qos(110) time.sleep(1) - self.assertEqual(con.qos, (0, 110)) + self.assertEqual(con.qos, 110) def test_declare_anonymous(self): xchg = Exchange('wtfzomg', type='fanout') - q = Queue(exchange=xchg) + q = Queue('', exchange=xchg) self.c.declare(xchg).result() self.c.declare(q).result() self.assertTrue(q.name) @@ -110,7 +112,7 @@ class TestA(unittest.TestCase): con, fut = self.c.consume(Queue(u'hello3', exclusive=True), on_message=ok, no_ack=True) fut.result() - self.c.publish(Message(b''), routing_key=u'hello3', tx=True).result() + self.c.publish(Message(b''), routing_key=u'hello3', confirm=True).result() time.sleep(1) @@ -133,7 +135,7 @@ class TestA(unittest.TestCase): con, fut = self.c.consume(Queue(u'hello3', exclusive=True), on_message=ok, no_ack=False) fut.result() - self.c.publish(Message(b''), routing_key=u'hello3', tx=True).result() + self.c.publish(Message(b''), routing_key=u'hello3', confirm=True).result() time.sleep(1) @@ -159,13 +161,6 @@ class TestA(unittest.TestCase): 'content_encoding': b'utf8' }), routing_key=u'hello4', confirm=True).result() - self.assertRaises(RuntimeError, - lambda: self.c.publish(Message(b'hello4', properties={ - 'content_type': b'text/plain', - 'content_encoding': b'utf8' - }), routing_key=u'hello4', confirm=True, - tx=True).result()) - time.sleep(1) self.assertTrue(p['q']) @@ -187,7 +182,7 @@ class TestA(unittest.TestCase): self.c.publish(Message(b'hello5', properties={ 'content_type': b'text/plain', 'content_encoding': b'utf8' - }), routing_key=u'hello5', tx=True).result() + }), routing_key=u'hello5', confirm=True).result() time.sleep(2) @@ -207,7 +202,7 @@ class TestA(unittest.TestCase): on_message=ok, no_ack=True) fut.result() self.c.publish(Message(b'hello6'), routing_key=u'hello6', - tx=True).result() + confirm=True).result() time.sleep(1) diff --git a/tests/test_clustering/test_double.py b/tests/test_clustering/test_double.py index a723c861d943047433f84c0fdd1cfedb08fa1a69..b9afbbcb33243c392e9a610cddd7c46c47d84005 100644 --- a/tests/test_clustering/test_double.py +++ b/tests/test_clustering/test_double.py @@ -54,7 +54,7 @@ class TestDouble(unittest.TestCase): q = Queue(u'yo', exclusive=True, auto_delete=True) - con, fut = self.c1.consume(q, qos=(None, 20)) + con, fut = self.c1.consume(q, qos=20) fut.result() try: diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py index 206ec06a66ef1922e6574f940ed270ea95a3b044..1fc5ebd9b62d2b1cadf4e1e92cd3686154bbd9b9 100644 --- a/tests/test_clustering/test_exchanges.py +++ b/tests/test_clustering/test_exchanges.py @@ -3,6 +3,7 @@ from __future__ import print_function, absolute_import, division import logging import os +import time import unittest import uuid @@ -10,9 +11,10 @@ from coolamqp.clustering import Cluster, MessageReceived, NothingMuch from coolamqp.objects import Message, NodeDefinition, Queue, MessageProperties, Exchange # todo handle bad auth -NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) +NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', hertbeat=20) logging.basicConfig(level=logging.DEBUG) logging.getLogger('coolamqp').setLevel(logging.DEBUG) +logger = logging.getLogger(__name__) class TestExchanges(unittest.TestCase): @@ -23,13 +25,39 @@ class TestExchanges(unittest.TestCase): def tearDown(self): self.c.shutdown() + def test_declare_anonymq(self): + que = Queue(auto_delete=True) + self.assertRaises(ValueError, self.c.declare, que) + cons, fut = self.c.consume(que, no_ack=True) + fut.result() + cons.cancel().result() + + def test_topic_exchanges(self): + xchg_name = uuid.uuid4().hex + test = {'a': 0} + + def do_msg(msg): + msg.ack() + test['a'] += 1 + + xchg = Exchange(xchg_name, type=b'topic') + queue = Queue(exchange=xchg, routing_key=b'test') + cons, fut = self.c.consume(queue, no_ack=False, on_message=do_msg) + fut.result() + self.c.publish(Message(b'dupa'), xchg_name, routing_key=b'test', confirm=True).result() + self.c.publish(Message(b'dupa2'), xchg_name, routing_key=b'test2', confirm=True).result() + self.c.publish(Message(b'dupa'), xchg_name, routing_key=b'test', confirm=True).result() + time.sleep(1) + cons.cancel().result() + self.assertEqual(test['a'], 2) + def test_deadlettering(self): xchg_name = uuid.uuid4().hex dead_queue_name = uuid.uuid4().hex grave_queue_name = uuid.uuid4().hex DEADLETTER = Exchange(xchg_name, type=b'direct') QUEUE = Queue(dead_queue_name, durable=True, auto_delete=False, exclusive=False) - GRAVEYARD_QUEUE = Queue(grave_queue_name, durable=True, arguments={'x-dead-letter-exchange': xchg_name, + GRAVEYARD_QUEUE = Queue(grave_queue_name, durable=True, auto_delete=False, arguments={'x-dead-letter-exchange': xchg_name, 'x-message-ttl': 1000}) self.c.declare(DEADLETTER).result() self.c.declare(QUEUE).result() @@ -59,7 +87,7 @@ class TestExchanges(unittest.TestCase): f1.result() f2.result() - self.c.publish(Message(b'hello'), exchange=x, tx=True).result() + self.c.publish(Message(b'hello'), exchange=x, confirm=True).result() self.assertIsInstance(self.c.drain(2), MessageReceived) self.assertIsInstance(self.c.drain(2), MessageReceived) diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py index e49fa12b7ddad7d9b0e3bcd81409dc58a1e47636..dee2bafe64df486159702c76a9771206ad01e830 100644 --- a/tests/test_clustering/test_things.py +++ b/tests/test_clustering/test_things.py @@ -29,17 +29,15 @@ class TestConnecting(unittest.TestCase): def test_argumented_queue(self): que = Queue(auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)]) - que2 = Queue(auto_delete=True, exclusive=True, arguments={'x-max-priority': 10}) c = Cluster([NODE]) c.start(wait=True, timeout=None) - c.declare(que).result() - c.declare(que2).result() + self.assertRaises(ValueError, c.declare, que) c.shutdown(True) def test_argumented_bind(self): c = Cluster([NODE]) c.start(wait=True, timeout=None) - que = Queue(auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)]) + que = Queue('', auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)]) xchg = Exchange('test3-wertest', type='headers', durable=True) c.declare(que).result() c.declare(xchg).result() diff --git a/tests/test_clustering/test_topic_exchanges.py b/tests/test_clustering/test_topic_exchanges.py index 45ef2f361d312bf1fc03307c97418f7a15d5e45a..0a2bfab6f241268786a9415d9d557a8015471cc4 100644 --- a/tests/test_clustering/test_topic_exchanges.py +++ b/tests/test_clustering/test_topic_exchanges.py @@ -3,15 +3,16 @@ import time import os import unittest import logging +import uuid import monotonic from coolamqp.clustering import Cluster -from coolamqp.objects import Exchange, Queue, NodeDefinition, Message +from coolamqp.objects import Exchange, Queue, NodeDefinition, Message, MessageProperties XCHG = Exchange('smok5.results', type='topic', durable=True) -QUEUE = Queue(exchange=XCHG, exclusive=True, auto_delete=True) +QUEUE = Queue('', exchange=XCHG, exclusive=True, auto_delete=True) logger = logging.getLogger(__name__) NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) @@ -20,6 +21,7 @@ logging.getLogger('coolamqp').setLevel(logging.DEBUG) did_receive = False + class TestTopic(unittest.TestCase): def setUp(self): self.c = Cluster([NODE]) @@ -32,9 +34,37 @@ class TestTopic(unittest.TestCase): pass self.c.shutdown() + def test_headers_exchange(self): + xchg_name = uuid.uuid4().hex + exchange = Exchange(xchg_name, b'headers') + self.c.declare(exchange).result() + queue1 = Queue(uuid.uuid4().hex, exchange=exchange, arguments_bind={'x-match': 'all', 'location': 'brisbane'}) + self.c.declare(queue1).result() + queue2 = Queue(uuid.uuid4().hex, exchange=exchange, arguments_bind={'x-match': 'all', 'location': 'sydney'}) + self.c.declare(queue2).result() + + test = {'a': 0} + + def do_message(msg): + msg.ack() + test['a'] += 1 + + cons1, fut1 = self.c.consume(queue1, on_message=do_message, no_ack=False) + cons2, fut2 = self.c.consume(queue2, on_message=do_message, no_ack=False) + fut1.result() + fut2.result() + + self.c.publish(Message(b'dupa', MessageProperties(headers={'location': 'sydney'})), exchange=exchange, confirm=True).result() + self.c.publish(Message(b'dupa', MessageProperties(headers={'location': 'brisbane'})), exchange=exchange, confirm=True).result() + self.c.publish(Message(b'dupa', MessageProperties(headers={'location': 'wtf'})), exchange=exchange, confirm=True).result() + time.sleep(1) + cons1.cancel().result() + cons2.cancel().result() + self.assertEqual(test['a'], 2) + def test_bind_stuff(self): + self.c.declare(QUEUE) self.c.declare(XCHG).result() - self.c.declare(QUEUE).result() self.c.bind(QUEUE, XCHG, routing_key='hello-world').result() global did_receive diff --git a/tests/test_framing.py b/tests/test_framing.py index 3b4c10b2a1a2280dadbb16b896b8393e6cfe4769..5442da31c53daab3423eca2e52cd95eda886df72 100644 --- a/tests/test_framing.py +++ b/tests/test_framing.py @@ -3,7 +3,7 @@ import unittest from coolamqp.framing.field_table import enframe_table -from coolamqp.objects import argumentify +from coolamqp.argumentify import argumentify class TestFraming(unittest.TestCase): diff --git a/tests/test_objects.py b/tests/test_objects.py index 9fbf5b156a777f194f03cc68c93dd25eb1681d4d..19f12ca2e105f7a08f48833cad6849ecf625ac35 100644 --- a/tests/test_objects.py +++ b/tests/test_objects.py @@ -1,5 +1,6 @@ # coding=UTF-8 from __future__ import print_function, absolute_import, division +import sys import logging import unittest import io @@ -7,15 +8,41 @@ import warnings from coolamqp.framing.definitions import QueueDeclare -from coolamqp.objects import NodeDefinition, MessageProperties, Queue, argumentify - +from coolamqp.objects import NodeDefinition, MessageProperties, Queue +from coolamqp.argumentify import argumentify logger = logging.getLogger(__name__) logging.getLogger('coolamqp').setLevel(logging.DEBUG) +IS_PY3 = sys.version.startswith('3') + class TestObjects(unittest.TestCase): + def test_queue_failures(self): + self.assertRaises(ValueError, Queue, None, durable=True) + self.assertRaises(ValueError, Queue, 'test', auto_delete=True, durable=True) + self.assertRaises(ValueError, Queue, None, auto_delete=False) + self.assertRaises(ValueError, Queue, 'test', auto_delete=True, exclusive=False) + + @unittest.skipUnless(sys.version.startswith('3'), 'Needs Python 3.x') + def test_queue_warns(self): + warnings.resetwarnings() + + with warnings.catch_warnings(record=True) as w: + Queue('test', auto_delete=True, exclusive=True) + Queue(auto_delete=True, exclusive=False) + logger.warning(repr(w)) + self.assertEqual(len(w), 2 if IS_PY3 else 1) + self.assertTrue(issubclass(w[0].category, UserWarning)) + if IS_PY3: + self.assertTrue(issubclass(w[1].category, DeprecationWarning)) + + def test_headers(self): + msg = MessageProperties(headers={'city': 'sydney'}) + buf = io.BytesIO() + msg.write_to(buf) + def test_queue_declare(self): args = argumentify({'x-dead-letter-exchange': 'deadletter', 'x-message-ttl': 1000}) @@ -32,13 +59,6 @@ class TestObjects(unittest.TestCase): ce_p_msg = MessageProperties(content_encoding=b'wtf') self.assertIn('wtf', str(ce_p_msg)) - def test_warning(self): - warnings.resetwarnings() - with warnings.catch_warnings(record=True) as w: - Queue(auto_delete=True, exclusive=False) - self.assertEqual(len(w), 1) - self.assertTrue(issubclass(w[0].category, PendingDeprecationWarning)) - def test_node_definition_from_amqp(self): n1 = NodeDefinition(u'amqp://ala:ma@kota/psa')