diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index b8e3136c6fa5e8dfae6894931b46442505010fc0..d8a38d9a3b830dcc9c6af51661b69cb5db0b40e0 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -29,7 +29,7 @@ from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.uplink import PUBLISHER_CONFIRMS, MethodWatch, FailWatch from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, Synchronized -from coolamqp.objects import Future +from coolamqp.objects import Future, Exchange logger = logging.getLogger(__name__) @@ -149,6 +149,9 @@ class Publisher(Channeler, Synchronized): continue self.tagger.deposit(self.tagger.get_key(), FutureConfirmableRejectable(fut)) + + assert isinstance(xchg, (six.binary_type, six.text_type)) + self._pub(msg, xchg, rk) def _on_cnpub_delivery(self, payload): @@ -163,7 +166,7 @@ class Publisher(Channeler, Synchronized): self.tagger.nack(payload.delivery_tag, payload.multiple) @Synchronized.synchronized - def publish(self, message, exchange_name=b'', routing_key=b''): + def publish(self, message, exchange=b'', routing_key=b''): """ Schedule to have a message published. @@ -178,24 +181,33 @@ class Publisher(Channeler, Synchronized): this function returns None. Messages are dropped on the floor if there's no connection. :param message: Message object to send - :param exchange_name: exchange name to use. Default direct exchange by default + :param exchange: exchange name to use. Default direct exchange by default. Can also be an Exchange object. + :type exchange: bytes, str or Exchange instance :param routing_key: routing key to use :return: a Future instance, or None :raise Publisher.UnusablePublisher: this publisher will never work (eg. MODE_CNPUB on Non-RabbitMQ) """ + + if isinstance(exchange, Exchange): + exchange = exchange.name.encode('utf8') + elif isinstance(exchange, six.text_type): + exchange = exchange.encode('utf8') + + assert isinstance(exchange, six.binary_type) + # Formulate the request if self.mode == Publisher.MODE_NOACK: # If we are not connected right now, drop the message on the floor and log it with DEBUG if self.state != ST_ONLINE: logger.debug(u'Publish request, but not connected - dropping the message') else: - self._pub(message, exchange_name, routing_key) + self._pub(message, exchange, routing_key) elif self.mode == Publisher.MODE_CNPUB: fut = Future() #todo can optimize this not to create an object if ST_ONLINE already - cnpo = CnpubMessageSendOrder(message, exchange_name, routing_key, fut) + cnpo = CnpubMessageSendOrder(message, exchange, routing_key, fut) self.messages.append(cnpo) if self.state == ST_ONLINE: diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py new file mode 100644 index 0000000000000000000000000000000000000000..55c58f8635be6af05a5ac564f3516e35e3be270d --- /dev/null +++ b/tests/test_clustering/test_exchanges.py @@ -0,0 +1,44 @@ +# coding=UTF-8 +from __future__ import print_function, absolute_import, division +import six +import unittest +import time, logging, threading +from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage, Exchange +from coolamqp.clustering import Cluster, MessageReceived, NothingMuch + +import time + +#todo handle bad auth +NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20) +logging.basicConfig(level=logging.DEBUG) + + +class TestExchanges(unittest.TestCase): + def setUp(self): + self.c = Cluster([NODE]) + self.c.start() + + def tearDown(self): + self.c.shutdown() + + + def test_fanout(self): + x = Exchange(u'jola', type='direct', auto_delete=True) + + c1, f1 = self.c.consume(Queue('one', exchange=x, exclusive=True), no_ack=True) + c2, f2 = self.c.consume(Queue('two', exchange=x, exclusive=True), no_ack=True) + + f1.result() + f2.result() + + self.c.publish(Message(b'hello'), exchange=x, tx=True).result() + + self.assertIsInstance(self.c.drain(2), MessageReceived) + self.assertIsInstance(self.c.drain(2), MessageReceived) + self.assertIsInstance(self.c.drain(2), NothingMuch) + + + + + +