diff --git a/README.md b/README.md index 90c7c9863da7e2110275bfc5a9572d0338e1dc07..3306acc25ff1fc67a0d4410cd338c2842fddcfa0 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,9 @@ if you need every CPU cycle you can get. * Queue can accept _bytes_ as name * Consumer will set _cancelled_ to _True_ if [Consumer Cancel Notification](https://www.rabbitmq.com/consumer-cancel.html) is received + * You can register callbacks for: + * Consumer being cancelled for any reason + * Consumer being cancelled with a CCN * v0.88: * **API changes:** diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index df922ff35aa6bf4facaa97f9aed580d79bfb94cf..39ec99e2eb02d46cea6cb877dce69ba21c149623 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -12,6 +12,7 @@ from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ BasicCancel, BasicQosOk from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch from concurrent.futures import Future +from coolamqp.objects import Callable from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.exceptions import AMQPError @@ -39,6 +40,22 @@ class Consumer(Channeler): Since this implies cancelling the consumer, here you go. WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS DO! + + You can subscribe to be informed when the consumer is cancelled (for any reason, + server or client side) with: + + con, fut = Cluster.consume(...) + + def im_called_on_cancel_for_any_reason(): # must have arity of 0 + .. + + con.on_cancel.add(im_called_on_cancel_for_any_reason) + con.cancel() + + Or, if RabbitMQ is in use, you can be informed upon a Consumer Cancel Notification: + + con.on_broker_cancel.add(im_cancelled_by_broker) + """ def __init__(self, queue, on_message, no_ack=True, qos=None, cancel_on_failure=False, @@ -103,6 +120,9 @@ class Consumer(Channeler): self.consumer_tag = None + self.on_cancel = Callable(oneshots=True) #: public, called on cancel for any reason + self.on_broker_cancel = Callable(oneshots=True) #: public, called on Customer Cancel Notification (RabbitMQ) + def set_qos(self, prefetch_size, prefetch_count): """ Set new QoS for this consumer. @@ -118,9 +138,6 @@ class Consumer(Channeler): """ Cancel the customer. - Note that this is a departure form AMQP specification. We don't attempt to cancel the customer, - we simply trash the channel. Idk if it's a good idea... - .ack() or .nack() for messages from this customer will have no effect. :return: a Future to tell when it's done. The future will always succeed - sooner, or later. NOTE: Future is OK'd when entire channel is destroyed @@ -134,6 +151,7 @@ class Consumer(Channeler): self.future_to_notify_on_dead.set_running_or_notify_cancel() self.cancelled = True + self.on_cancel() # you'll blow up big next time you try to use this consumer if you can't cancel, but just close if self.consumer_tag is not None: self.method(BasicCancel(self.consumer_tag, False)) @@ -180,6 +198,7 @@ class Consumer(Channeler): if self.cancel_on_failure and (not self.cancelled): logger.debug('Consumer is cancel_on_failure and failure seen, True->cancelled') self.cancelled = True + self.on_cancel() if self.state == ST_ONLINE: # The channel has just lost operationality! @@ -198,6 +217,8 @@ class Consumer(Channeler): self.register_on_close_watch() self.methods([BasicCancelOk(payload.consumer_tag), ChannelClose(0, b'Received basic.cancel', 0, 0)]) self.cancelled = True # wasn't I? + self.on_cancel() + self.on_broker_cancel() return if isinstance(payload, BasicCancelOk): @@ -216,6 +237,7 @@ class Consumer(Channeler): if self.fail_on_first_time_resource_locked: # still, a RESOURCE_LOCKED on a first declaration ever suggests something is very wrong self.cancelled = True + self.on_cancel() else: # Do not notify the user, and retry at will. # Do not zero the future - we will need to later confirm it, so it doesn't leak.