From 77acf1507fedcb4d4593c7ae32e88f84b6d3ceaf Mon Sep 17 00:00:00 2001 From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl> Date: Sun, 29 Jan 2017 19:18:39 +0100 Subject: [PATCH] handling CCNs --- README.md | 6 ++++- coolamqp/attaches/consumer.py | 1 + coolamqp/uplink/connection/connection.py | 19 ++++++-------- tests/test_clustering/test_double.py | 32 ++++++++++++++++-------- 4 files changed, 35 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 0f94e2c..90c7c98 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,11 @@ if you need every CPU cycle you can get. * v0.89: * Events are no longer timestamped by CoolAMQP, it's your job now - * You can delete queues + * You can delete queues (_Cluster.delete_queue_) + * Race condition _Connection.start_ fixed + * Queue can accept _bytes_ as name + * Consumer will set _cancelled_ to _True_ if + [Consumer Cancel Notification](https://www.rabbitmq.com/consumer-cancel.html) is received * v0.88: * **API changes:** diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 20440ee..df922ff 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -197,6 +197,7 @@ class Consumer(Channeler): # on_close is a one_shot watch. We need to re-register it now. self.register_on_close_watch() self.methods([BasicCancelOk(payload.consumer_tag), ChannelClose(0, b'Received basic.cancel', 0, 0)]) + self.cancelled = True # wasn't I? return if isinstance(payload, BasicCancelOk): diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index ba65941..c36a056 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -185,17 +185,14 @@ class Connection(object): :param frames: list of frames or None to close the link :param reason: optional human-readable reason for this action """ - try: - if frames is not None: - # for frame in frames: - # if isinstance(frame, AMQPMethodFrame): - # print('Sending ', frame.payload) - self.sendf.send(frames, priority=priority) - else: - # Listener socket will kill us when time is right - self.listener_socket.send(None) - except AttributeError: # .sendf or .listener_socket not there yet - raise RuntimeError(u'Call start() first') + if frames is not None: + # for frame in frames: + # if isinstance(frame, AMQPMethodFrame): + # print('Sending ', frame.payload) + self.sendf.send(frames, priority=priority) + else: + # Listener socket will kill us when time is right + self.listener_socket.send(None) def on_frame(self, frame): """ diff --git a/tests/test_clustering/test_double.py b/tests/test_clustering/test_double.py index cb627c9..ca6cbef 100644 --- a/tests/test_clustering/test_double.py +++ b/tests/test_clustering/test_double.py @@ -28,17 +28,27 @@ class TestDouble(unittest.TestCase): self.c1.shutdown() self.c2.shutdown() - # def test_ccn(self): - # q1 = Queue(b'yo', auto_delete=True) - # - # con1, fut1 = self.c1.consume(q1) - # fut1.result() - # - # self.c2.delete_queue(q1) #.result() - # - # time.sleep(5) - # self.assertTrue(con1.cancelled) - # + @unittest.skip("Since RabbitMQ does not support queue deletion, you need to do this manually") + def test_ccn(self): + """ + Will consumer cancel itself after Consumer Cancel Notification? + + Manual procedure: + - start the test + - delete the queue using RabbitMQ Management web panel + you got 30 seconds to do this + see if it fails or not + """ + q1 = Queue(b'yo', auto_delete=True) + + con1, fut1 = self.c1.consume(q1) + fut1.result() + +# self.c2.delete_queue(q1) #.result() + + time.sleep(30) + self.assertTrue(con1.cancelled) + def test_resource_locked(self): q = Queue(u'yo', exclusive=True, auto_delete=True) -- GitLab