Skip to content
Snippets Groups Projects
Commit 0a95e58a authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

resource locking a-ok

parent 0ffba8a5
No related branches found
No related tags found
No related merge requests found
......@@ -92,6 +92,8 @@ class AMQPContentPropertyList(object):
"""
PROPERTIES = []
#todo they are immutable, so they could just serialize themselves...
@staticmethod
def zero_property_flags(property_flags):
"""
......@@ -109,7 +111,7 @@ class AMQPContentPropertyList(object):
def write_to(self, buf):
"""Serialize itself (flags + values) to a buffer"""
raise Exception('This is an abstract method')
raise Exception(u'This is an abstract method')
@staticmethod
def from_buffer(self, buf, start_offset):
......@@ -120,14 +122,14 @@ class AMQPContentPropertyList(object):
Buffer HAS TO start at property_flags
"""
raise Exception('This is an abstract method')
raise Exception(u'This is an abstract method')
def get_size(self):
"""
How long is property_flags + property_values
:return: int
"""
raise Exception('This is an abstract method')
raise Exception(u'This is an abstract method')
class AMQPMethodPayload(AMQPPayload):
......@@ -140,7 +142,7 @@ class AMQPMethodPayload(AMQPPayload):
Write own content to target buffer - starting from LENGTH, ending on FRAME_END
:param buf: target buffer
"""
from coolamqp.framing.definitions import FRAME_END
from coolamqp.framing.definitions import FRAME_END_BYTE
if self.IS_CONTENT_STATIC:
buf.write(self.STATIC_CONTENT)
......@@ -148,7 +150,7 @@ class AMQPMethodPayload(AMQPPayload):
buf.write(struct.pack('!I', self.get_size()+2))
buf.write(self.BINARY_HEADER)
self.write_arguments(buf)
buf.write(six.int2byte(FRAME_END))
buf.write(FRAME_END_BYTE)
def get_size(self):
"""
......
......@@ -27,7 +27,7 @@ class TestA(unittest.TestCase):
def test_consume(self):
con, fut = self.c.consume(Queue(u'hello', exclusive=True))
# fut.result()
fut.result()
con.cancel()
def test_send_recv_zerolen(self):
......@@ -46,6 +46,31 @@ class TestA(unittest.TestCase):
self.assertTrue(P['q'])
def test_message_with_propos(self):
P = {'q': False}
def ok(e):
self.assertIsInstance(e, ReceivedMessage)
self.assertEquals(e.body, b'hello')
#bcoz u can compare memoryviews to their providers :D
self.assertEquals(e.properties.content_type, b'text/plain')
self.assertEquals(e.properties.content_encoding, b'utf8')
P['q'] = True
con, fut = self.c.consume(Queue(u'hello', exclusive=True), on_message=ok, no_ack=True)
fut.result()
self.c.publish(Message(b'hello', properties={
'content_type': b'text/plain',
'content_encoding': b'utf8'
}), routing_key=u'hello', tx=True).result()
time.sleep(1)
self.assertTrue(P['q'])
def test_send_recv_nonzerolen(self):
P = {'q': False}
......
# coding=UTF-8
"""
Double trouble!
"""
from __future__ import print_function, absolute_import, division
import six
import unittest
import time, logging, threading
from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, ReceivedMessage
from coolamqp.clustering import Cluster
NODE = NodeDefinition('127.0.0.1', 'guest', 'guest', heartbeat=20)
class TestDouble(unittest.TestCase):
def setUp(self):
self.c1 = Cluster([NODE])
self.c1.start()
self.c2 = Cluster([NODE])
self.c2.start()
def tearDown(self):
self.c1.shutdown()
self.c2.shutdown()
def test_resource_locked(self):
q = Queue(u'yo', exclusive=True, auto_delete=True)
con, fut = self.c1.consume(q)
fut.result()
con2, fut2 = self.c2.consume(q, fail_on_first_time_resource_locked=True)
from coolamqp.exceptions import ResourceLocked
self.assertRaises(ResourceLocked, lambda: fut2.result())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment