From 6318548351963fdef0e8f80a81ee85247aeddff2 Mon Sep 17 00:00:00 2001 From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl> Date: Sat, 28 Jan 2017 15:24:53 +0100 Subject: [PATCH] fixed #19 --- coolamqp/clustering/cluster.py | 26 ++++++++++++++++++++------ tests/test_clustering/test_a.py | 27 +++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 497b938..f10116a 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -96,7 +96,7 @@ class Cluster(object): self.attache_group.add(con) return con, fut - def publish(self, message, exchange=None, routing_key=u'', tx=False): + def publish(self, message, exchange=None, routing_key=u'', tx=None, confirm=None): """ Publish a message. @@ -104,11 +104,12 @@ class Cluster(object): :param exchange: exchange to use. Default is the "direct" empty-name exchange. :type exchange: unicode/bytes (exchange name) or Exchange object. :param routing_key: routing key to use - :param tx: Whether to publish it transactionally. - If you choose so, you will receive a Future that can be used - to check it broker took responsibility for this message. - Note that if tx if False, and message cannot be delivered to broker at once, - it will be discarded. + :param confirm: Whether to publish it using confirms/transactions. + If you choose so, you will receive a Future that can be used + to check it broker took responsibility for this message. + Note that if tx if False, and message cannot be delivered to broker at once, + it will be discarded. + :param tx: deprecated, alias for confirm :return: Future or None """ if isinstance(exchange, Exchange): @@ -118,6 +119,19 @@ class Cluster(object): else: exchange = exchange.encode('utf8') + if tx is not None: # confirm is a drop-in replacement. tx is unfortunately named + warnings.warn(u'Use confirm kwarg instead', DeprecationWarning) + + if confirm is None: + tx = False + else: + raise RuntimeError(u'Using both tx= and confirm= at once does not make sense') + elif confirm is None: + tx = False + else: + tx = confirm + + try: return (self.pub_tr if tx else self.pub_na).publish(message, exchange, routing_key.encode('utf8')) except Publisher.UnusablePublisher: diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 6c5f453..6059529 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -74,6 +74,33 @@ class TestA(unittest.TestCase): self.assertTrue(P['q']) + def test_message_with_propos_confirm(self): + + P = {'q': False} + + def ok(e): + self.assertIsInstance(e, ReceivedMessage) + self.assertEquals(e.body, b'hello') + #bcoz u can compare memoryviews to their providers :D + self.assertEquals(e.properties.content_type, b'text/plain') + self.assertEquals(e.properties.content_encoding, b'utf8') + P['q'] = True + + con, fut = self.c.consume(Queue(u'hello', exclusive=True), on_message=ok, no_ack=True) + fut.result() + self.c.publish(Message(b'hello', properties={ + 'content_type': b'text/plain', + 'content_encoding': b'utf8' + }), routing_key=u'hello', confirm=True).result() + + self.assertRaises(RuntimeError, lambda: self.c.publish(Message(b'hello', properties={ + 'content_type': b'text/plain', + 'content_encoding': b'utf8' + }), routing_key=u'hello', confirm=True, tx=True).result()) + + time.sleep(1) + + self.assertTrue(P['q']) def test_message_with_propos(self): -- GitLab