diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index cb47c4b44dc35d24a727ab0cfe45ff75f6f43464..a9c49db1dbbcb42e5ffa457381a7f4eae9408da0 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -92,6 +92,8 @@ class AMQPContentPropertyList(object): """ PROPERTIES = [] + #todo they are immutable, so they could just serialize themselves... + @staticmethod def zero_property_flags(property_flags): """ @@ -109,7 +111,7 @@ class AMQPContentPropertyList(object): def write_to(self, buf): """Serialize itself (flags + values) to a buffer""" - raise Exception('This is an abstract method') + raise Exception(u'This is an abstract method') @staticmethod def from_buffer(self, buf, start_offset): @@ -120,14 +122,14 @@ class AMQPContentPropertyList(object): Buffer HAS TO start at property_flags """ - raise Exception('This is an abstract method') + raise Exception(u'This is an abstract method') def get_size(self): """ How long is property_flags + property_values :return: int """ - raise Exception('This is an abstract method') + raise Exception(u'This is an abstract method') class AMQPMethodPayload(AMQPPayload): @@ -140,7 +142,7 @@ class AMQPMethodPayload(AMQPPayload): Write own content to target buffer - starting from LENGTH, ending on FRAME_END :param buf: target buffer """ - from coolamqp.framing.definitions import FRAME_END + from coolamqp.framing.definitions import FRAME_END_BYTE if self.IS_CONTENT_STATIC: buf.write(self.STATIC_CONTENT) @@ -148,7 +150,7 @@ class AMQPMethodPayload(AMQPPayload): buf.write(struct.pack('!I', self.get_size()+2)) buf.write(self.BINARY_HEADER) self.write_arguments(buf) - buf.write(six.int2byte(FRAME_END)) + buf.write(FRAME_END_BYTE) def get_size(self): """ diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index de1aa0d9bcfcbf23f1ba8d84d802bb298f91fac6..7e517899019bcbff1c80d5825ee645153ce549c8 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -27,7 +27,7 @@ class TestA(unittest.TestCase): def test_consume(self): con, fut = self.c.consume(Queue(u'hello', exclusive=True)) -# fut.result() + fut.result() con.cancel() def test_send_recv_zerolen(self): @@ -46,6 +46,31 @@ class TestA(unittest.TestCase): self.assertTrue(P['q']) + + def test_message_with_propos(self): + + P = {'q': False} + + def ok(e): + self.assertIsInstance(e, ReceivedMessage) + self.assertEquals(e.body, b'hello') + #bcoz u can compare memoryviews to their providers :D + self.assertEquals(e.properties.content_type, b'text/plain') + self.assertEquals(e.properties.content_encoding, b'utf8') + P['q'] = True + + con, fut = self.c.consume(Queue(u'hello', exclusive=True), on_message=ok, no_ack=True) + fut.result() + self.c.publish(Message(b'hello', properties={ + 'content_type': b'text/plain', + 'content_encoding': b'utf8' + }), routing_key=u'hello', tx=True).result() + + time.sleep(1) + + self.assertTrue(P['q']) + + def test_send_recv_nonzerolen(self): P = {'q': False} diff --git a/tests/test_clustering/test_double.py b/tests/test_clustering/test_double.py new file mode 100644 index 0000000000000000000000000000000000000000..038e247e1b1c557a03d608d38f07324f13e2344b --- /dev/null +++ b/tests/test_clustering/test_double.py @@ -0,0 +1,40 @@ +# coding=UTF-8 +""" +Double trouble! +""" +from __future__ import print_function, absolute_import, division +import six +import unittest +import time, logging, threading +from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage +from coolamqp.clustering import Cluster +NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20) + + +class TestDouble(unittest.TestCase): + + def setUp(self): + self.c1 = Cluster([NODE]) + self.c1.start() + + self.c2 = Cluster([NODE]) + self.c2.start() + + def tearDown(self): + self.c1.shutdown() + self.c2.shutdown() + + def test_resource_locked(self): + + q = Queue(u'yo', exclusive=True, auto_delete=True) + + con, fut = self.c1.consume(q) + fut.result() + + con2, fut2 = self.c2.consume(q, fail_on_first_time_resource_locked=True) + + from coolamqp.exceptions import ResourceLocked + + self.assertRaises(ResourceLocked, lambda: fut2.result()) + +