Skip to content
Snippets Groups Projects
Commit 2bfb1ebf authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

fixes #3

parent 3be70ade
No related branches found
No related tags found
No related merge requests found
Pipeline #63409 failed with stages
in 2 minutes and 3 seconds
......@@ -11,7 +11,7 @@ Since v1.3.2 they'll be put here and in release description.
* PendingDeprecationWarning changed into a DeprecationWarning
* changes to Cluster:
* declare will refuse to declare an anonymous queue
* removed publish(tx)
* renamed publish(tx) to publish(confirm)
# v1.5.0
========
......
__version__ = '1.5.1a1'
__version__ = '2.0.0a1'
......@@ -103,7 +103,12 @@ class Cluster(object):
dont_trace=False, arguments=None):
"""
Bind a queue to an exchange
:raise ValueError: cannot bind to anonymous queues
"""
if queue.anonymous:
raise ValueError('Canoot bind to anonymous queue')
if span is not None and not dont_trace:
child_span = self._make_span('bind', span)
else:
......@@ -121,6 +126,8 @@ class Cluster(object):
"""
Declare a Queue/Exchange.
Non-anonymous queues have to be declared. Anonymous can't.
.. note:: Note that if your queue relates to an exchange that has not yet been declared you'll be faced with
AMQP error 404: NOT_FOUND, so try to declare your exchanges before your queues.
......
......@@ -269,7 +269,7 @@ class Queue(object):
:param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument
:raises ValueError: tried to create a queue that was not durable or auto_delete
:raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous
:raises ValueError: tried to create a queue that was anonymous and not auto_delete
:raises ValueError: tried to create a queue that was anonymous and not auto_delete or durable
:warning DeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations
that will be soon unavailable (eg. RabbitMQ 4.0).
:warning UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue
......@@ -298,7 +298,10 @@ class Queue(object):
self.anonymous = self.name is None # if this queue is anonymous, it must be regenerated upon reconnect
if self.auto_delete and self.durable:
raise ValueError('Cannot create an auto_delete and exclusive queue')
raise ValueError('Cannot create an auto_delete and durable queue')
if self.anonymous and (not self.auto_delete or self.durable):
raise ValueError('Zero sense to make a anonymous non-auto-delete or durable queue')
if not self.anonymous:
if self.auto_delete or self.exclusive:
......
......@@ -5,6 +5,7 @@ Welcome to CoolAMQP's documentation!
:maxdepth: 2
:caption: Contents
whatsnew
cluster
tutorial
caveats
......
......@@ -5,4 +5,26 @@ CoolAMQP 2.0.0 marks a slight philosophy shift. Whereas 1.x used auto-generated
pick their names for themselves.
It also forbids some combinations of Queue arguments, and makes the default values more palatable, so for example
a naked :class:`coolamqp.objects.Queue` will be anonymous, non-durable, exclusive and auto-delete.
\ No newline at end of file
a naked :class:`coolamqp.objects.Queue` will be anonymous, non-durable, exclusive and auto-delete.
Queues
------
Following queues will fail now:
* auto_delete and durable
* anonymous and durables
* anonymous and not exclusives
* anonymous and not auto_delete
* auto_delete and not exclusive and not anonymous
Following will emit a warning:
* exclusive and auto_delete - DeprecationWarning, since they're removing it in RabbitMQ 4.0
* not anonymous, auto_delete and not exclusive - UserWarning, since this makes little sense
Anonymous queues
----------------
They are back. Besides, anything that you will pass to :meth:`coolamqp.clustering.Cluster.consume` will be declared, be
it an exchange, a queue or such shit. This allows you to declare anonymous queues.
\ No newline at end of file
......@@ -85,7 +85,7 @@ class TestA(unittest.TestCase):
def test_declare_anonymous(self):
xchg = Exchange('wtfzomg', type='fanout')
q = Queue(exchange=xchg)
q = Queue('', exchange=xchg)
self.c.declare(xchg).result()
self.c.declare(q).result()
self.assertTrue(q.name)
......@@ -110,7 +110,7 @@ class TestA(unittest.TestCase):
con, fut = self.c.consume(Queue(u'hello3', exclusive=True),
on_message=ok, no_ack=True)
fut.result()
self.c.publish(Message(b''), routing_key=u'hello3', tx=True).result()
self.c.publish(Message(b''), routing_key=u'hello3', confirm=True).result()
time.sleep(1)
......@@ -133,7 +133,7 @@ class TestA(unittest.TestCase):
con, fut = self.c.consume(Queue(u'hello3', exclusive=True),
on_message=ok, no_ack=False)
fut.result()
self.c.publish(Message(b''), routing_key=u'hello3', tx=True).result()
self.c.publish(Message(b''), routing_key=u'hello3', confirm=True).result()
time.sleep(1)
......@@ -159,13 +159,6 @@ class TestA(unittest.TestCase):
'content_encoding': b'utf8'
}), routing_key=u'hello4', confirm=True).result()
self.assertRaises(RuntimeError,
lambda: self.c.publish(Message(b'hello4', properties={
'content_type': b'text/plain',
'content_encoding': b'utf8'
}), routing_key=u'hello4', confirm=True,
tx=True).result())
time.sleep(1)
self.assertTrue(p['q'])
......@@ -187,7 +180,7 @@ class TestA(unittest.TestCase):
self.c.publish(Message(b'hello5', properties={
'content_type': b'text/plain',
'content_encoding': b'utf8'
}), routing_key=u'hello5', tx=True).result()
}), routing_key=u'hello5', confirm=True).result()
time.sleep(2)
......@@ -207,7 +200,7 @@ class TestA(unittest.TestCase):
on_message=ok, no_ack=True)
fut.result()
self.c.publish(Message(b'hello6'), routing_key=u'hello6',
tx=True).result()
confirm=True).result()
time.sleep(1)
......
......@@ -36,7 +36,7 @@ class TestExchanges(unittest.TestCase):
grave_queue_name = uuid.uuid4().hex
DEADLETTER = Exchange(xchg_name, type=b'direct')
QUEUE = Queue(dead_queue_name, durable=True, auto_delete=False, exclusive=False)
GRAVEYARD_QUEUE = Queue(grave_queue_name, durable=True, arguments={'x-dead-letter-exchange': xchg_name,
GRAVEYARD_QUEUE = Queue(grave_queue_name, durable=True, auto_delete=False, arguments={'x-dead-letter-exchange': xchg_name,
'x-message-ttl': 1000})
self.c.declare(DEADLETTER).result()
self.c.declare(QUEUE).result()
......@@ -66,7 +66,7 @@ class TestExchanges(unittest.TestCase):
f1.result()
f2.result()
self.c.publish(Message(b'hello'), exchange=x, tx=True).result()
self.c.publish(Message(b'hello'), exchange=x, confirm=True).result()
self.assertIsInstance(self.c.drain(2), MessageReceived)
self.assertIsInstance(self.c.drain(2), MessageReceived)
......
......@@ -29,17 +29,15 @@ class TestConnecting(unittest.TestCase):
def test_argumented_queue(self):
que = Queue(auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)])
que2 = Queue(auto_delete=True, exclusive=True, arguments={'x-max-priority': 10})
c = Cluster([NODE])
c.start(wait=True, timeout=None)
c.declare(que).result()
c.declare(que2).result()
self.assertRaises(ValueError, c.declare, que)
c.shutdown(True)
def test_argumented_bind(self):
c = Cluster([NODE])
c.start(wait=True, timeout=None)
que = Queue(auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)])
que = Queue('', auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)])
xchg = Exchange('test3-wertest', type='headers', durable=True)
c.declare(que).result()
c.declare(xchg).result()
......
......@@ -11,7 +11,7 @@ from coolamqp.objects import Exchange, Queue, NodeDefinition, Message
XCHG = Exchange('smok5.results', type='topic', durable=True)
QUEUE = Queue(exchange=XCHG, exclusive=True, auto_delete=True)
QUEUE = Queue('', exchange=XCHG, exclusive=True, auto_delete=True)
logger = logging.getLogger(__name__)
NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
......@@ -33,8 +33,8 @@ class TestTopic(unittest.TestCase):
self.c.shutdown()
def test_bind_stuff(self):
self.c.declare(QUEUE)
self.c.declare(XCHG).result()
self.c.declare(QUEUE).result()
self.c.bind(QUEUE, XCHG, routing_key='hello-world').result()
global did_receive
......
......@@ -21,6 +21,14 @@ class TestObjects(unittest.TestCase):
self.assertRaises(ValueError, Queue, 'test', auto_delete=True, durable=True)
self.assertRaises(ValueError, Queue, None, auto_delete=False)
self.assertRaises(ValueError, Queue, 'test', auto_delete=True, exclusive=False)
warnings.resetwarnings()
with warnings.catch_warnings(record=True) as w:
Queue('test', auto_delete=True, exclusive=True)
Queue(auto_delete=True, exclusive=False)
self.assertEqual(len(w), 2)
self.assertTrue(issubclass(w[0].category, UserWarning))
self.assertTrue(issubclass(w[1].category, DeprecationWarning))
def test_queue_declare(self):
args = argumentify({'x-dead-letter-exchange': 'deadletter',
......@@ -38,13 +46,6 @@ class TestObjects(unittest.TestCase):
ce_p_msg = MessageProperties(content_encoding=b'wtf')
self.assertIn('wtf', str(ce_p_msg))
def test_warning(self):
warnings.resetwarnings()
with warnings.catch_warnings(record=True) as w:
Queue(auto_delete=True, exclusive=False)
self.assertEqual(len(w), 1)
self.assertTrue(issubclass(w[0].category, PendingDeprecationWarning))
def test_node_definition_from_amqp(self):
n1 = NodeDefinition(u'amqp://ala:ma@kota/psa')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment