Skip to content
Snippets Groups Projects
Commit 038319b8 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

fixed #20 fixed #12

parent 77acf150
No related branches found
No related tags found
No related merge requests found
...@@ -57,6 +57,9 @@ if you need every CPU cycle you can get. ...@@ -57,6 +57,9 @@ if you need every CPU cycle you can get.
* Queue can accept _bytes_ as name * Queue can accept _bytes_ as name
* Consumer will set _cancelled_ to _True_ if * Consumer will set _cancelled_ to _True_ if
[Consumer Cancel Notification](https://www.rabbitmq.com/consumer-cancel.html) is received [Consumer Cancel Notification](https://www.rabbitmq.com/consumer-cancel.html) is received
* You can register callbacks for:
* Consumer being cancelled for any reason
* Consumer being cancelled with a CCN
* v0.88: * v0.88:
* **API changes:** * **API changes:**
......
...@@ -12,6 +12,7 @@ from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ ...@@ -12,6 +12,7 @@ from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \
BasicCancel, BasicQosOk BasicCancel, BasicQosOk
from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch
from concurrent.futures import Future from concurrent.futures import Future
from coolamqp.objects import Callable
from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE
from coolamqp.exceptions import AMQPError from coolamqp.exceptions import AMQPError
...@@ -39,6 +40,22 @@ class Consumer(Channeler): ...@@ -39,6 +40,22 @@ class Consumer(Channeler):
Since this implies cancelling the consumer, here you go. Since this implies cancelling the consumer, here you go.
WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS DO! WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS DO!
You can subscribe to be informed when the consumer is cancelled (for any reason,
server or client side) with:
con, fut = Cluster.consume(...)
def im_called_on_cancel_for_any_reason(): # must have arity of 0
..
con.on_cancel.add(im_called_on_cancel_for_any_reason)
con.cancel()
Or, if RabbitMQ is in use, you can be informed upon a Consumer Cancel Notification:
con.on_broker_cancel.add(im_cancelled_by_broker)
""" """
def __init__(self, queue, on_message, no_ack=True, qos=None, cancel_on_failure=False, def __init__(self, queue, on_message, no_ack=True, qos=None, cancel_on_failure=False,
...@@ -103,6 +120,9 @@ class Consumer(Channeler): ...@@ -103,6 +120,9 @@ class Consumer(Channeler):
self.consumer_tag = None self.consumer_tag = None
self.on_cancel = Callable(oneshots=True) #: public, called on cancel for any reason
self.on_broker_cancel = Callable(oneshots=True) #: public, called on Customer Cancel Notification (RabbitMQ)
def set_qos(self, prefetch_size, prefetch_count): def set_qos(self, prefetch_size, prefetch_count):
""" """
Set new QoS for this consumer. Set new QoS for this consumer.
...@@ -118,9 +138,6 @@ class Consumer(Channeler): ...@@ -118,9 +138,6 @@ class Consumer(Channeler):
""" """
Cancel the customer. Cancel the customer.
Note that this is a departure form AMQP specification. We don't attempt to cancel the customer,
we simply trash the channel. Idk if it's a good idea...
.ack() or .nack() for messages from this customer will have no effect. .ack() or .nack() for messages from this customer will have no effect.
:return: a Future to tell when it's done. The future will always succeed - sooner, or later. :return: a Future to tell when it's done. The future will always succeed - sooner, or later.
NOTE: Future is OK'd when entire channel is destroyed NOTE: Future is OK'd when entire channel is destroyed
...@@ -134,6 +151,7 @@ class Consumer(Channeler): ...@@ -134,6 +151,7 @@ class Consumer(Channeler):
self.future_to_notify_on_dead.set_running_or_notify_cancel() self.future_to_notify_on_dead.set_running_or_notify_cancel()
self.cancelled = True self.cancelled = True
self.on_cancel()
# you'll blow up big next time you try to use this consumer if you can't cancel, but just close # you'll blow up big next time you try to use this consumer if you can't cancel, but just close
if self.consumer_tag is not None: if self.consumer_tag is not None:
self.method(BasicCancel(self.consumer_tag, False)) self.method(BasicCancel(self.consumer_tag, False))
...@@ -180,6 +198,7 @@ class Consumer(Channeler): ...@@ -180,6 +198,7 @@ class Consumer(Channeler):
if self.cancel_on_failure and (not self.cancelled): if self.cancel_on_failure and (not self.cancelled):
logger.debug('Consumer is cancel_on_failure and failure seen, True->cancelled') logger.debug('Consumer is cancel_on_failure and failure seen, True->cancelled')
self.cancelled = True self.cancelled = True
self.on_cancel()
if self.state == ST_ONLINE: if self.state == ST_ONLINE:
# The channel has just lost operationality! # The channel has just lost operationality!
...@@ -198,6 +217,8 @@ class Consumer(Channeler): ...@@ -198,6 +217,8 @@ class Consumer(Channeler):
self.register_on_close_watch() self.register_on_close_watch()
self.methods([BasicCancelOk(payload.consumer_tag), ChannelClose(0, b'Received basic.cancel', 0, 0)]) self.methods([BasicCancelOk(payload.consumer_tag), ChannelClose(0, b'Received basic.cancel', 0, 0)])
self.cancelled = True # wasn't I? self.cancelled = True # wasn't I?
self.on_cancel()
self.on_broker_cancel()
return return
if isinstance(payload, BasicCancelOk): if isinstance(payload, BasicCancelOk):
...@@ -216,6 +237,7 @@ class Consumer(Channeler): ...@@ -216,6 +237,7 @@ class Consumer(Channeler):
if self.fail_on_first_time_resource_locked: if self.fail_on_first_time_resource_locked:
# still, a RESOURCE_LOCKED on a first declaration ever suggests something is very wrong # still, a RESOURCE_LOCKED on a first declaration ever suggests something is very wrong
self.cancelled = True self.cancelled = True
self.on_cancel()
else: else:
# Do not notify the user, and retry at will. # Do not notify the user, and retry at will.
# Do not zero the future - we will need to later confirm it, so it doesn't leak. # Do not zero the future - we will need to later confirm it, so it doesn't leak.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment