diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 79b226f9474083298a6bf35cf09c1ef63c8926eb..69e20500475cb78799bce78ee685ba91a22f0da4 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -64,18 +64,16 @@ class Consumer(Channeler): You can subscribe to be informed when the consumer is cancelled (for any reason, server or client side) with: - con, fut = Cluster.consume(...) - - def im_called_on_cancel_for_any_reason(): # must have arity of 0 - .. - - con.on_cancel.add(im_called_on_cancel_for_any_reason) - con.cancel() + >>> con, fut = Cluster.consume(...) + >>> def im_called_on_cancel_for_any_reason(): # must have arity of 0 + >>> .. + >>> con.on_cancel.add(im_called_on_cancel_for_any_reason) + >>> con.cancel() Or, if RabbitMQ is in use, you can be informed upon a Consumer Cancel Notification: - con.on_broker_cancel.add(im_cancelled_by_broker) + >>> con.on_broker_cancel.add(im_cancelled_by_broker) """ @@ -92,9 +90,11 @@ class Consumer(Channeler): :param queue: Queue object, being consumed from right now. Note that name of anonymous queue might change at any time! + :type queue: coolamqp.objects.Queue :param on_message: callable that will process incoming messages :type on_message: callable(ReceivedMessage instance) :param no_ack: Will this consumer require acknowledges from messages? + :type no_ack: bool :param qos: a tuple of (prefetch size, prefetch window) for this consumer, or an int (prefetch window only). If an int is passed, prefetch size will be set to 0 (which means @@ -107,6 +107,7 @@ class Consumer(Channeler): online for the first time. This future can also raise with AMQPError if it fails to. + :type future_to_notify: concurrent.futures.Future :param fail_on_first_time_resource_locked: When consumer is declared for the first time, and RESOURCE_LOCKED is encountered, it will fail the future with ResourceLocked, and consumer will cancel @@ -165,11 +166,12 @@ class Consumer(Channeler): self.method(BasicQos(prefetch_size or 0, prefetch_count, False)) self.qos = prefetch_size or 0, prefetch_count - def cancel(self): + def cancel(self): # type: () -> None """ Cancel the customer. .ack() or .nack() for messages from this customer will have no effect. + :return: a Future to tell when it's done. The future will always succeed - sooner, or later. NOTE: Future is OK'd when entire channel is destroyed @@ -191,9 +193,11 @@ class Consumer(Channeler): self.method_and_watch(BasicCancel(self.consumer_tag, False), [BasicCancelOk], self.on_close) + self.channel_close_sent = True else: if not self.channel_close_sent and self.state == ST_ONLINE: self.method(ChannelClose(0, b'cancelling', 0, 0)) + self.channel_close_sent = True if self.attache_group is not None: self.attache_group.on_cancel_customer(self) @@ -317,6 +321,7 @@ class Consumer(Channeler): def on_delivery(self, sth): """ Callback for delivery-related shit + :param sth: AMQPMethodFrame WITH basic-deliver, AMQPHeaderFrame or AMQPBodyFrame """ diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 8372e88fb3d21aff747574ee33174956cb42684b..f1fe916d4e5efeb53182b3346fbd020f873b70a6 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -136,12 +136,11 @@ class Cluster(object): def publish(self, message, exchange=None, routing_key=u'', tx=None, confirm=None): - # type: (coolamqp.objects.Message, tp.Optional[coolamqp.objects.Exchange], - # tp.Union[str, bytes], bool, bool) -> concurrent.futures.Future """ Publish a message. :param message: Message to publish + :type message: coolamqp.objects.Message :param exchange: exchange to use. Default is the "direct" empty-name exchange. :type exchange: unicode/bytes (exchange name) or Exchange object. :param routing_key: routing key to use @@ -149,8 +148,10 @@ class Cluster(object): If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. Note that if tx if False, and message cannot be delivered to broker at once, - it will be discarded. + it will be discarded + :type confirm: tp.Optional[bool] :param tx: deprecated, alias for confirm + :type tx: tp.Optional[bool] :return: Future or None """ if isinstance(exchange, Exchange): @@ -182,8 +183,7 @@ class Cluster(object): raise NotImplementedError( u'Sorry, this functionality is not yet implemented!') - def start(self, wait=True, timeout=10.0, log_frames=None): - # type: (bool, float, bool) -> None + def start(self, wait=True, timeout=10.0, log_frames=None): # type: (bool, float, bool) -> None """ Connect to broker. Initialize Cluster. @@ -191,7 +191,8 @@ class Cluster(object): It is not safe to fork after this. :param wait: block until connection is ready - :param timeout: timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised + :param timeout: timeout to wait until the connection is ready. If it is not, a + ConnectionDead error will be raised :raise RuntimeError: called more than once :raise ConnectionDead: failed to connect within timeout :param log_frames: whether to keep a log of sent/received frames in self.log_frames @@ -236,7 +237,7 @@ class Cluster(object): while not self.snr.is_connected() and monotonic.monotonic() - start_at < timeout: time.sleep(0.1) if not self.snr.is_connected(): - raise ConnectionDead('Could not connect within %s seconds' % (timeout, )) + raise ConnectionDead('Could not connect within %s seconds' % (timeout,)) def shutdown(self, wait=True): # type: (bool) -> None """