Skip to content
Snippets Groups Projects
Commit 63185483 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

fixed #19

parent 2e62c783
No related branches found
No related tags found
No related merge requests found
......@@ -96,7 +96,7 @@ class Cluster(object):
self.attache_group.add(con)
return con, fut
def publish(self, message, exchange=None, routing_key=u'', tx=False):
def publish(self, message, exchange=None, routing_key=u'', tx=None, confirm=None):
"""
Publish a message.
......@@ -104,11 +104,12 @@ class Cluster(object):
:param exchange: exchange to use. Default is the "direct" empty-name exchange.
:type exchange: unicode/bytes (exchange name) or Exchange object.
:param routing_key: routing key to use
:param tx: Whether to publish it transactionally.
If you choose so, you will receive a Future that can be used
to check it broker took responsibility for this message.
Note that if tx if False, and message cannot be delivered to broker at once,
it will be discarded.
:param confirm: Whether to publish it using confirms/transactions.
If you choose so, you will receive a Future that can be used
to check it broker took responsibility for this message.
Note that if tx if False, and message cannot be delivered to broker at once,
it will be discarded.
:param tx: deprecated, alias for confirm
:return: Future or None
"""
if isinstance(exchange, Exchange):
......@@ -118,6 +119,19 @@ class Cluster(object):
else:
exchange = exchange.encode('utf8')
if tx is not None: # confirm is a drop-in replacement. tx is unfortunately named
warnings.warn(u'Use confirm kwarg instead', DeprecationWarning)
if confirm is None:
tx = False
else:
raise RuntimeError(u'Using both tx= and confirm= at once does not make sense')
elif confirm is None:
tx = False
else:
tx = confirm
try:
return (self.pub_tr if tx else self.pub_na).publish(message, exchange, routing_key.encode('utf8'))
except Publisher.UnusablePublisher:
......
......@@ -74,6 +74,33 @@ class TestA(unittest.TestCase):
self.assertTrue(P['q'])
def test_message_with_propos_confirm(self):
P = {'q': False}
def ok(e):
self.assertIsInstance(e, ReceivedMessage)
self.assertEquals(e.body, b'hello')
#bcoz u can compare memoryviews to their providers :D
self.assertEquals(e.properties.content_type, b'text/plain')
self.assertEquals(e.properties.content_encoding, b'utf8')
P['q'] = True
con, fut = self.c.consume(Queue(u'hello', exclusive=True), on_message=ok, no_ack=True)
fut.result()
self.c.publish(Message(b'hello', properties={
'content_type': b'text/plain',
'content_encoding': b'utf8'
}), routing_key=u'hello', confirm=True).result()
self.assertRaises(RuntimeError, lambda: self.c.publish(Message(b'hello', properties={
'content_type': b'text/plain',
'content_encoding': b'utf8'
}), routing_key=u'hello', confirm=True, tx=True).result())
time.sleep(1)
self.assertTrue(P['q'])
def test_message_with_propos(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment