Skip to content
Snippets Groups Projects
Commit 850bdc2f authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

can cancel a consumer

parent eef5e2f0
No related branches found
No related tags found
No related merge requests found
......@@ -48,6 +48,18 @@ class Consumer(Channeler):
self.receiver = None # MessageReceiver instance
def cancel(self):
"""
Cancel the customer.
Note that this is a departure form AMQP specification. We don't attempt to cancel the customer,
we simply trash the channel. Idk if it's a good idea...
.ack() or .nack() for messages from this customer will have no effect.
"""
self.cancelled = True
self.method(ChannelClose(0, b'consumer cancelled', 0, 0))
def on_operational(self, operational):
super(Consumer, self).on_operational(operational)
......@@ -89,6 +101,8 @@ class Consumer(Channeler):
else:
super(Consumer, self).on_close(payload)
should_retry = should_retry and (not self.cancelled)
if should_retry:
self.attach(self.connection)
......@@ -239,6 +253,9 @@ class MessageReceiver(object):
if self.state == 3:
return # Gone!
if self.consumer.cancelled:
return # cancelled!
if delivery_tag not in self.acks_pending:
return # already confirmed/rejected
......
......@@ -39,6 +39,9 @@ if __name__ == '__main__':
IPublishThread().start()
while True:
time.sleep(10)
time.sleep(30)
if not cons.cancelled:
cons.cancel()
lt.terminate()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment