Skip to content
Snippets Groups Projects
Commit 9ff28737 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

moar tests

parent cf473286
No related branches found
No related tags found
No related merge requests found
...@@ -367,6 +367,10 @@ class MessageReceiver(object): ...@@ -367,6 +367,10 @@ class MessageReceiver(object):
self.data_to_go = frame.body_size self.data_to_go = frame.body_size
self.state = 2 self.state = 2
if self.header.body_size == 0:
# An empty message is no common guest. It won't have a BODY field though...
self.on_body(b'') # trigger it manually
def on_basic_deliver(self, payload): def on_basic_deliver(self, payload):
assert self.state == 0 assert self.state == 0
self.bdeliver = payload self.bdeliver = payload
......
...@@ -97,10 +97,14 @@ class Cluster(object): ...@@ -97,10 +97,14 @@ class Cluster(object):
:return: Future or None :return: Future or None
""" """
if isinstance(exchange, Exchange): if isinstance(exchange, Exchange):
exchange = exchange.name exchange = exchange.name.encode('utf8')
elif exchange is None:
exchange = b''
else:
exchange = exchange.encode('utf8')
try: try:
return (self.pub_tr if tx else self.pub_na).publish(message, exchange.encode('utf8'), routing_key.encode('utf8')) return (self.pub_tr if tx else self.pub_na).publish(message, exchange, routing_key.encode('utf8'))
except Publisher.UnusablePublisher: except Publisher.UnusablePublisher:
raise NotImplementedError(u'Sorry, this functionality is not yet implemented!') raise NotImplementedError(u'Sorry, this functionality is not yet implemented!')
......
...@@ -6,7 +6,7 @@ from __future__ import print_function, absolute_import, division ...@@ -6,7 +6,7 @@ from __future__ import print_function, absolute_import, division
import six import six
import unittest import unittest
import time, logging, threading import time, logging, threading
from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage
from coolamqp.clustering import Cluster from coolamqp.clustering import Cluster
import time import time
...@@ -17,18 +17,34 @@ logging.basicConfig(level=logging.DEBUG) ...@@ -17,18 +17,34 @@ logging.basicConfig(level=logging.DEBUG)
class TestA(unittest.TestCase): class TestA(unittest.TestCase):
def test_link(self):
"""Connect and disconnect""" def setUp(self):
c = Cluster([NODE]) self.c = Cluster([NODE])
c.start() self.c.start()
c.shutdown()
def tearDown(self):
self.c.shutdown()
def test_consume(self): def test_consume(self):
c = Cluster([NODE]) con, fut = self.c.consume(Queue(u'hello', exclusive=True))
c.start()
con, fut = c.consume(Queue(u'hello', exclusive=True))
# fut.result() # fut.result()
con.cancel() con.cancel()
c.shutdown()
def test_send_recv(self):
P = {'q': False}
def ok(e):
self.assertIsInstance(e, ReceivedMessage)
P['q'] = True
con, fut = self.c.consume(Queue(u'hello', exclusive=True), on_message=ok, no_ack=True)
fut.result()
self.c.publish(Message(b''), routing_key=u'hello', tx=True).result()
time.sleep(1)
self.assertTrue(P['q'])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment