From 850bdc2ff9fe02724ec55df7ea89e18040ffe4f2 Mon Sep 17 00:00:00 2001
From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl>
Date: Mon, 9 Jan 2017 14:38:03 +0100
Subject: [PATCH] can cancel a consumer

---
 coolamqp/attaches/consumer.py | 17 +++++++++++++++++
 tests/run.py                  |  5 ++++-
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py
index c50d6e6..78ac216 100644
--- a/coolamqp/attaches/consumer.py
+++ b/coolamqp/attaches/consumer.py
@@ -48,6 +48,18 @@ class Consumer(Channeler):
         self.receiver = None  # MessageReceiver instance
 
 
+    def cancel(self):
+        """
+        Cancel the customer.
+
+        Note that this is a departure form AMQP specification. We don't attempt to cancel the customer,
+        we simply trash the channel. Idk if it's a good idea...
+
+        .ack() or .nack() for messages from this customer will have no effect.
+        """
+        self.cancelled = True
+        self.method(ChannelClose(0, b'consumer cancelled', 0, 0))
+
     def on_operational(self, operational):
         super(Consumer, self).on_operational(operational)
 
@@ -89,6 +101,8 @@ class Consumer(Channeler):
         else:
             super(Consumer, self).on_close(payload)
 
+        should_retry = should_retry and (not self.cancelled)
+
         if should_retry:
             self.attach(self.connection)
 
@@ -239,6 +253,9 @@ class MessageReceiver(object):
             if self.state == 3:
                 return  # Gone!
 
+            if self.consumer.cancelled:
+                return # cancelled!
+
             if delivery_tag not in self.acks_pending:
                 return  # already confirmed/rejected
 
diff --git a/tests/run.py b/tests/run.py
index 7fcfe57..6ad963b 100644
--- a/tests/run.py
+++ b/tests/run.py
@@ -39,6 +39,9 @@ if __name__ == '__main__':
     IPublishThread().start()
 
     while True:
-        time.sleep(10)
+        time.sleep(30)
+
+        if not cons.cancelled:
+            cons.cancel()
 
     lt.terminate()
-- 
GitLab