diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 497b93857e8df6cab41138ff1e5cdd8fe699a1b1..f10116aed63741e040db2db48cb40e06be11b08b 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -96,7 +96,7 @@ class Cluster(object): self.attache_group.add(con) return con, fut - def publish(self, message, exchange=None, routing_key=u'', tx=False): + def publish(self, message, exchange=None, routing_key=u'', tx=None, confirm=None): """ Publish a message. @@ -104,11 +104,12 @@ class Cluster(object): :param exchange: exchange to use. Default is the "direct" empty-name exchange. :type exchange: unicode/bytes (exchange name) or Exchange object. :param routing_key: routing key to use - :param tx: Whether to publish it transactionally. - If you choose so, you will receive a Future that can be used - to check it broker took responsibility for this message. - Note that if tx if False, and message cannot be delivered to broker at once, - it will be discarded. + :param confirm: Whether to publish it using confirms/transactions. + If you choose so, you will receive a Future that can be used + to check it broker took responsibility for this message. + Note that if tx if False, and message cannot be delivered to broker at once, + it will be discarded. + :param tx: deprecated, alias for confirm :return: Future or None """ if isinstance(exchange, Exchange): @@ -118,6 +119,19 @@ class Cluster(object): else: exchange = exchange.encode('utf8') + if tx is not None: # confirm is a drop-in replacement. tx is unfortunately named + warnings.warn(u'Use confirm kwarg instead', DeprecationWarning) + + if confirm is None: + tx = False + else: + raise RuntimeError(u'Using both tx= and confirm= at once does not make sense') + elif confirm is None: + tx = False + else: + tx = confirm + + try: return (self.pub_tr if tx else self.pub_na).publish(message, exchange, routing_key.encode('utf8')) except Publisher.UnusablePublisher: diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index 6c5f453988af23481971457298c027907faae0d7..6059529c964c85e7a037de73b441668b4720d3ff 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -74,6 +74,33 @@ class TestA(unittest.TestCase): self.assertTrue(P['q']) + def test_message_with_propos_confirm(self): + + P = {'q': False} + + def ok(e): + self.assertIsInstance(e, ReceivedMessage) + self.assertEquals(e.body, b'hello') + #bcoz u can compare memoryviews to their providers :D + self.assertEquals(e.properties.content_type, b'text/plain') + self.assertEquals(e.properties.content_encoding, b'utf8') + P['q'] = True + + con, fut = self.c.consume(Queue(u'hello', exclusive=True), on_message=ok, no_ack=True) + fut.result() + self.c.publish(Message(b'hello', properties={ + 'content_type': b'text/plain', + 'content_encoding': b'utf8' + }), routing_key=u'hello', confirm=True).result() + + self.assertRaises(RuntimeError, lambda: self.c.publish(Message(b'hello', properties={ + 'content_type': b'text/plain', + 'content_encoding': b'utf8' + }), routing_key=u'hello', confirm=True, tx=True).result()) + + time.sleep(1) + + self.assertTrue(P['q']) def test_message_with_propos(self):