diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index bee5f12ed7151017bcb632110380f3b682d60e66..a63a1f3b811e32803746b2954be7002157299a1c 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -367,6 +367,10 @@ class MessageReceiver(object): self.data_to_go = frame.body_size self.state = 2 + if self.header.body_size == 0: + # An empty message is no common guest. It won't have a BODY field though... + self.on_body(b'') # trigger it manually + def on_basic_deliver(self, payload): assert self.state == 0 self.bdeliver = payload diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 72fa3fd4961a35754f3066f1d12d771d720a17a7..d17a8e71319e6a25296dbd959a660d336f0ff287 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -97,10 +97,14 @@ class Cluster(object): :return: Future or None """ if isinstance(exchange, Exchange): - exchange = exchange.name + exchange = exchange.name.encode('utf8') + elif exchange is None: + exchange = b'' + else: + exchange = exchange.encode('utf8') try: - return (self.pub_tr if tx else self.pub_na).publish(message, exchange.encode('utf8'), routing_key.encode('utf8')) + return (self.pub_tr if tx else self.pub_na).publish(message, exchange, routing_key.encode('utf8')) except Publisher.UnusablePublisher: raise NotImplementedError(u'Sorry, this functionality is not yet implemented!') diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index bd1007ff520b619bde0ee713020770544c403486..a96764bdad53d453f66b75812f5ef7f2874a902c 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -6,7 +6,7 @@ from __future__ import print_function, absolute_import, division import six import unittest import time, logging, threading -from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue +from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage from coolamqp.clustering import Cluster import time @@ -17,18 +17,34 @@ logging.basicConfig(level=logging.DEBUG) class TestA(unittest.TestCase): - def test_link(self): - """Connect and disconnect""" - c = Cluster([NODE]) - c.start() - c.shutdown() + + def setUp(self): + self.c = Cluster([NODE]) + self.c.start() + + def tearDown(self): + self.c.shutdown() def test_consume(self): - c = Cluster([NODE]) - c.start() - con, fut = c.consume(Queue(u'hello', exclusive=True)) + con, fut = self.c.consume(Queue(u'hello', exclusive=True)) # fut.result() con.cancel() - c.shutdown() + + def test_send_recv(self): + + P = {'q': False} + + def ok(e): + self.assertIsInstance(e, ReceivedMessage) + P['q'] = True + + con, fut = self.c.consume(Queue(u'hello', exclusive=True), on_message=ok, no_ack=True) + fut.result() + self.c.publish(Message(b''), routing_key=u'hello', tx=True).result() + + time.sleep(1) + + self.assertTrue(P['q']) +