diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index c50d6e6a6d1af570451b4f4505f09333a0618ae1..78ac2167c0d5a8eba67bddbb04db1f1ed6fa0b5c 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -48,6 +48,18 @@ class Consumer(Channeler): self.receiver = None # MessageReceiver instance + def cancel(self): + """ + Cancel the customer. + + Note that this is a departure form AMQP specification. We don't attempt to cancel the customer, + we simply trash the channel. Idk if it's a good idea... + + .ack() or .nack() for messages from this customer will have no effect. + """ + self.cancelled = True + self.method(ChannelClose(0, b'consumer cancelled', 0, 0)) + def on_operational(self, operational): super(Consumer, self).on_operational(operational) @@ -89,6 +101,8 @@ class Consumer(Channeler): else: super(Consumer, self).on_close(payload) + should_retry = should_retry and (not self.cancelled) + if should_retry: self.attach(self.connection) @@ -239,6 +253,9 @@ class MessageReceiver(object): if self.state == 3: return # Gone! + if self.consumer.cancelled: + return # cancelled! + if delivery_tag not in self.acks_pending: return # already confirmed/rejected diff --git a/tests/run.py b/tests/run.py index 7fcfe57c18c7b21f59fc45db9792be10f4c5f6ee..6ad963bef03cf1542f565cc63baa863b6f77e517 100644 --- a/tests/run.py +++ b/tests/run.py @@ -39,6 +39,9 @@ if __name__ == '__main__': IPublishThread().start() while True: - time.sleep(10) + time.sleep(30) + + if not cons.cancelled: + cons.cancel() lt.terminate()