diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 74bd4dcf063e8e087355e9249b7693e09d31eded..cbfe6ecad644547e70ffff7bf687f4f10606c917 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -2,13 +2,14 @@ from __future__ import absolute_import, division, print_function import six import logging +import warnings from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ QueueBind, QueueBindOk, ChannelClose, BasicCancel, BasicDeliver, \ BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, HARD_ERROR from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch - +from coolamqp.objects import Future from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.exceptions import ResourceLocked, AMQPError @@ -90,6 +91,8 @@ class Consumer(Channeler): self.qos_update_sent = False # QoS was not sent to server self.future_to_notify = future_to_notify + self.future_to_notify_on_dead = None # .cancel + self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked self.cancel_on_failure = cancel_on_failure self.fucking_memoryviews = fucking_memoryviews @@ -114,12 +117,22 @@ class Consumer(Channeler): .ack() or .nack() for messages from this customer will have no effect. :return: a Future to tell when it's done. The future will always succeed - sooner, or later. + NOTE: Future is OK'd when entire channel is destroyed """ + + if self.future_to_notify_on_dead is not None: + # we cancelled it earlier + warnings.warn(u'Why would you cancel a consumer twice?', RuntimeWarning) + return self.future_to_notify_on_dead + self.cancelled = True self.method(ChannelClose(0, b'consumer cancelled', 0, 0)) if self.attache_group is not None: self.attache_group.on_cancel_customer(self) + self.future_to_notify_on_dead = Future() + return self.future_to_notify_on_dead + def on_operational(self, operational): super(Consumer, self).on_operational(operational) @@ -201,6 +214,10 @@ class Consumer(Channeler): super(Consumer, self).on_close(payload) # this None's self.connection self.fail_on_first_time_resource_locked = False + if self.future_to_notify_on_dead: # notify it was cancelled + self.future_to_notify_on_dead.set_result() + + if should_retry: if old_con.state == ST_ONLINE: logger.info('Retrying with %s', self.queue.name)