diff --git a/CHANGELOG.md b/CHANGELOG.md index 50ce03d0bbc76bc259746f9ae0e5dda386962d05..41ef4c6f0c742bf7611ccafebdcf69b05a8d3a0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,3 +3,6 @@ on GitHub. This file serves to only note what changes have been made so far, between releases. # v1.2.9 + +* allow calling ack() and nack() multiple times + diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index ae096f259030f0a4931ec390e6bae9c75e893710..daaf074d56d7284253e5b22c40fcde922f5f0f1e 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1 @@ -__version__ = '1.2.9_a1' +__version__ = '1.2.9_a2' diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 8ccfe61ebed072b9297d294237e3117954be353d..1311d050b03bb0dcc4167109bc1bb8b9b63fb451 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -130,7 +130,8 @@ class ReceivedMessage(Message): Note that if the consumer that generated this message was no_ack, .ack() and .nack() are no-ops. """ - __slots__ = ('delivery_tag', 'exchange_name', 'routing_key', '_ack', '_nack') + __slots__ = ('delivery_tag', 'exchange_name', 'routing_key', '_ack', '_nack', + 'acked') def __init__(self, body, # type: tp.Union[str, bytes, bytearray, tp.List[memoryview]] exchange_name, # type: memoryview @@ -162,7 +163,7 @@ class ReceivedMessage(Message): self.delivery_tag = delivery_tag self.exchange_name = exchange_name self.routing_key = routing_key - + self.acked = False self._ack = ack or LAMBDA_NONE self._nack = nack or LAMBDA_NONE @@ -170,9 +171,14 @@ class ReceivedMessage(Message): """ Acknowledge reception of this message. - This is a no-op if a Consumer was called with no_ack=True + This is a no-op if a Consumer was called with no_ack=True. + + If called after an ack() or nack() was called, this will be a no-op. """ + if self.acked: + return self._ack() + self.acked = True def nack(self): """ @@ -180,8 +186,13 @@ class ReceivedMessage(Message): This is a no-op if a Consumer was called with no_ack=True. If no_ack was False, the message will be requeued and redelivered by the broker + + If called after an ack() or nack() was called, this will be a no-op. """ + if self.acked: + return self._nack() + self.acked = True class Exchange(object): diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 947dcff05f9b7a1f25692dea9acb11b9deeaf340..eb71d466e38699321f78c3bc3db6924b46e7b4f1 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -98,11 +98,11 @@ class TestA(unittest.TestCase): self.assertTrue(q.name) def test_send_recv_zerolen(self): - P = {'q': False} + p = {'q': False} def ok(e): self.assertIsInstance(e, ReceivedMessage) - P['q'] = True + p['q'] = True con, fut = self.c.consume(Queue(u'hello3', exclusive=True), on_message=ok, no_ack=True) @@ -111,7 +111,29 @@ class TestA(unittest.TestCase): time.sleep(1) - self.assertTrue(P['q']) + self.assertTrue(p['q']) + + def test_nacking_and_acking(self): + p = {'q': False, 'count': 0} + + def ok(msg): + if not p['count']: + msg.nack() + else: + msg.ack() + msg.ack() + self.assertIsInstance(msg, ReceivedMessage) + p['q'] = True + p['count'] += 1 + + con, fut = self.c.consume(Queue(u'hello3', exclusive=True), + on_message=ok, no_ack=False) + fut.result() + self.c.publish(Message(b''), routing_key=u'hello3', tx=True).result() + + time.sleep(1) + + self.assertTrue(p['q']) def test_message_with_propos_confirm(self): P = {'q': False}