diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 69e20500475cb78799bce78ee685ba91a22f0da4..26a43302b47147709753d6dee9512f6ee4078706 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -75,6 +75,38 @@ class Consumer(Channeler): >>> con.on_broker_cancel.add(im_cancelled_by_broker) + :param queue: Queue object, being consumed from right now. + Note that name of anonymous queue might change at any time! + :type queue: coolamqp.objects.Queue + :param on_message: callable that will process incoming messages + :type on_message: callable(ReceivedMessage instance) + :param no_ack: Will this consumer require acknowledges from messages? + :type no_ack: bool + :param qos: a tuple of (prefetch size, prefetch window) for this + consumer, or an int (prefetch window only). + If an int is passed, prefetch size will be set to 0 (which means + undefined), and this int will be used for prefetch window + :type qos: tuple(int, int) or tuple(None, int) or int + :param cancel_on_failure: Consumer will cancel itself when link goes + down + :type cancel_on_failure: bool + :param future_to_notify: Future to succeed when this consumer goes + online for the first time. + This future can also raise with AMQPError if + it fails to. + :type future_to_notify: concurrent.futures.Future + :param fail_on_first_time_resource_locked: When consumer is declared + for the first time, and RESOURCE_LOCKED is encountered, it will + fail the future with ResourceLocked, and consumer will cancel + itself. + By default it will retry until success is made. + If the consumer doesn't get the chance to be declared - because + of a connection fail - next reconnect will consider this to be + SECOND declaration, ie. it will retry ad infinitum + :type fail_on_first_time_resource_locked: bool + :param body_receive_mode: how should message.body be received. This + has a performance impact + :type body_receive_mode: a property of BodyReceiveMode """ def __init__(self, queue, on_message, no_ack=True, qos=None, @@ -87,39 +119,6 @@ class Consumer(Channeler): Note that if you specify QoS, it is applied before basic.consume is sent. This will prevent the broker from hammering you into oblivion with a mountain of messages. - - :param queue: Queue object, being consumed from right now. - Note that name of anonymous queue might change at any time! - :type queue: coolamqp.objects.Queue - :param on_message: callable that will process incoming messages - :type on_message: callable(ReceivedMessage instance) - :param no_ack: Will this consumer require acknowledges from messages? - :type no_ack: bool - :param qos: a tuple of (prefetch size, prefetch window) for this - consumer, or an int (prefetch window only). - If an int is passed, prefetch size will be set to 0 (which means - undefined), and this int will be used for prefetch window - :type qos: tuple(int, int) or tuple(None, int) or int - :param cancel_on_failure: Consumer will cancel itself when link goes - down - :type cancel_on_failure: bool - :param future_to_notify: Future to succeed when this consumer goes - online for the first time. - This future can also raise with AMQPError if - it fails to. - :type future_to_notify: concurrent.futures.Future - :param fail_on_first_time_resource_locked: When consumer is declared - for the first time, and RESOURCE_LOCKED is encountered, it will - fail the future with ResourceLocked, and consumer will cancel - itself. - By default it will retry until success is made. - If the consumer doesn't get the chance to be declared - because - of a connection fail - next reconnect will consider this to be - SECOND declaration, ie. it will retry ad infinitum - :type fail_on_first_time_resource_locked: bool - :param body_receive_mode: how should message.body be received. This - has a performance impact - :type body_receive_mode: BodyReceiveMode.* """ super(Consumer, self).__init__() diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index f1fe916d4e5efeb53182b3346fbd020f873b70a6..78a9df3a3c2dd7246884f301c983ff4594c7b98d 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -6,6 +6,7 @@ from __future__ import print_function, absolute_import, division import logging import time +import typing as tp import warnings from concurrent.futures import Future @@ -34,6 +35,17 @@ class Cluster(object): Call .start() to connect to AMQP. It is not safe to fork() after .start() is called, but it's OK before. + + :param nodes: list of nodes, or a single node. For now, only one is supported. + :type nodes: NodeDefinition instance or a list of NodeDefinition instances + :param on_fail: callable/0 to call when connection fails in an + unclean way. This is a one-shot + :type on_fail: callable/0 + :param extra_properties: refer to documentation in [/coolamqp/connection/connection.py] + Connection.__init__ + :param log_frames: an object that will have it's method .on_frame(timestamp, + frame, direction) called upon receiving/sending a frame. Timestamp is UNIX timestamp, + frame is AMQPFrame, direction is one of 'to_client', 'to_server' """ # Events you can be informed about @@ -41,18 +53,6 @@ class Cluster(object): ST_LINK_REGAINED = 1 # Link has been regained def __init__(self, nodes, on_fail=None, extra_properties=None, log_frames=None): - """ - :param nodes: list of nodes, or a single node. For now, only one is supported. - :type nodes: NodeDefinition instance or a list of NodeDefinition instances - :param on_fail: callable/0 to call when connection fails in an - unclean way. This is a one-shot - :type on_fail: callable/0 - :param extra_properties: refer to documentation in [/coolamqp/connection/connection.py] - Connection.__init__ - :param log_frames: an object that will have it's method .on_frame(timestamp, - frame, direction) called upon receiving/sending a frame. Timestamp is UNIX timestamp, - frame is AMQPFrame, direction is one of 'to_client', 'to_server' - """ from coolamqp.objects import NodeDefinition if isinstance(nodes, NodeDefinition): nodes = [nodes] @@ -144,6 +144,7 @@ class Cluster(object): :param exchange: exchange to use. Default is the "direct" empty-name exchange. :type exchange: unicode/bytes (exchange name) or Exchange object. :param routing_key: routing key to use + :type routing_key: tp.Union[str, bytes] :param confirm: Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. diff --git a/coolamqp/exceptions.py b/coolamqp/exceptions.py index e3024f7a7611806a1b0ded85e27f22350336d5cc..557fbea5e8ecc5bfcb7256e034154b048b8e3741 100644 --- a/coolamqp/exceptions.py +++ b/coolamqp/exceptions.py @@ -1,7 +1,7 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -from coolamqp.framing.definitions import HARD_ERRORS, RESOURCE_LOCKED +from coolamqp.framing.definitions import HARD_ERRORS class CoolAMQPError(Exception): @@ -20,14 +20,14 @@ class AMQPError(CoolAMQPError): Base class for errors received from AMQP server """ - def is_hard_error(self): + def is_hard_error(self): # type: () -> bool """Does this error close the connection?""" return self.reply_code in HARD_ERRORS - def __str__(self): + def __str__(self): # type: () -> str return 'AMQP error %s: %s' % (self.reply_code, self.reply_text) - def __repr__(self): + def __repr__(self): # type: () -> str return 'AMQPError(' + repr(self.reply_code) + ', ' + repr( self.reply_text) + \ ', ' + repr(self.class_id) + ', ' + repr(self.method_id) + ')' diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 458b4afdbd02102a996ec154937621baabcb5bb5..a8f682ff4839df7c42d328fc36e8276ed1f6b7c6 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -4,7 +4,6 @@ Core objects used in CoolAMQP """ import logging import uuid -import typing as tp import six @@ -105,13 +104,13 @@ class ReceivedMessage(Message): and .nack() are no-ops. """ - def __init__(self, body, # type: tp.Union[six.binary_type, memoryview, tp.List[memoryview]] - exchange_name, # type: memoryview - routing_key, # type: memoryview - properties=None, # type: MessageProperties - delivery_tag=None, # type: int - ack=None, # type: tp.Callable[[], None] - nack=None): # type: tp.Callable[[], None] + def __init__(self, body, + exchange_name, + routing_key, + properties=None, + delivery_tag=None, + ack=None, + nack=None): """ :param body: message body. A stream of octets. :type body: str (py2) or bytes (py3) or a list of memoryviews, if @@ -125,13 +124,14 @@ class ReceivedMessage(Message): strings will be memoryviews :param delivery_tag: delivery tag assigned by AMQP broker to confirm this message + :type delivery_tag: int :param ack: a callable to call when you want to ack (via basic.ack) this message. None if received by the no-ack mechanism - :type ack: Callable[[], None] + :type ack: tp.Callable[[], None] :param nack: a callable to call when you want to nack (via basic.reject) this message. None if received by the no-ack mechanism - :type nack: Callable[[], None] + :type nack: tp.Callable[[], None] """ Message.__init__(self, body, properties=properties) @@ -166,15 +166,15 @@ class Exchange(object): assert isinstance(self.name, six.text_type) assert isinstance(self.type, six.binary_type) - def __repr__(self): + def __repr__(self): # type: () -> str return u'Exchange(%s, %s, %s, %s)' % ( repr(self.name), repr(self.type), repr(self.durable), repr(self.auto_delete)) - def __hash__(self): + def __hash__(self): # type: () -> int return self.name.__hash__() - def __eq__(self, other): + def __eq__(self, other): # type: (Exchange) -> bool return (self.name == other.name) and (type(self) == type(other)) @@ -300,7 +300,7 @@ class NodeDefinition(object): else: raise ValueError(u'What did you exactly pass?') - def __str__(self): + def __str__(self): # type: () -> str return six.text_type( b'amqp://%s:%s@%s/%s'.encode('utf8') % ( self.host, self.port, self.user, self.virtual_host)) diff --git a/docs/tutorial.rst b/docs/tutorial.rst index e86aeaa886b9da41962d84fafa1c82afc73a4b31..3961727741ca2c1adcd02b8f1c7b61345b6c7573 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -57,9 +57,6 @@ _no_ack=False_ will mean that we have to manually confirm messages. You can specify a callback, that will be called with a message if one's received by this consumer. Since we did not do that, this will go to a generic queue belonging to _Cluster_. -.. autoclass:: coolamqp.clustering.Cluster - :members: - _consumer_ is a _Consumer_ object. This allows us to do some things with the consumer (such as setting QoS), but most importantly it allows us to cancel it later. _consume_confirm_ is a _Future_, that will succeed when AMQP _basic.consume-ok_ is received. @@ -84,9 +81,13 @@ To actually get our message, we need to start a consumer first. To do that, just :: - cluster.consume(Queue('name of the queue'), **kwargs) + cons, fut = cluster.consume(Queue('name of the queue'), **kwargs) -Where kwargs are passed directly to Consumer class +Where kwargs are passed directly to Consumer class. +**cons** is a Consumer object, and **fut** is a Future that will happen when listening has been registered on target +server. .. autoclass:: coolamqp.attaches.Consumer :members: + +