Skip to content
Snippets Groups Projects
Commit 029732c7 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

fix for unit tests

parent 720037a0
No related branches found
No related tags found
No related merge requests found
Pipeline #62603 failed with stages
in 1 minute and 44 seconds
......@@ -5,6 +5,7 @@ Since v1.3.2 they'll be put here and in release description.
========
* added docs regarding consume method.
* added testing topic exchanges
# v1.4.1
=======
......
__version__ = '1.4.2a1'
__version__ = '1.4.2a2'
......@@ -244,7 +244,8 @@ class Queue(object):
:param exclusive: Is this queue exclusive?
:param auto_delete: Is this queue auto_delete ?
:param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument
:warn PendingDeprecationWarning: if a non-exclusive auto_delete queue is created
:warning PendingDeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations
that will be soon unavailable (eg. RabbitMQ 4.0).
"""
__slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive',
'anonymous', 'consumer_tag', 'arguments')
......@@ -265,6 +266,10 @@ class Queue(object):
self.auto_delete = auto_delete
self.exclusive = exclusive
self.arguments = argumentify(arguments)
if self.auto_delete and self.durable:
warnings.warn('This will be removed in RabbitMQ 4.0', PendingDeprecationWarning)
if self.auto_delete and not self.exclusive:
warnings.warn('This will be removed in RabbitMQ 4.0', PendingDeprecationWarning)
......
import time
import os
import unittest
import logging
import monotonic
from coolamqp.clustering import Cluster
from coolamqp.objects import Exchange, Queue, NodeDefinition, Message
XCHG = Exchange('topic', type='topic', durable=True)
QUEUE = Queue(exchange=XCHG, exclusive=True, auto_delete=True)
NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
logging.basicConfig(level=logging.DEBUG)
class TestTopic(unittest.TestCase):
def setUp(self):
self.c = Cluster([NODE])
self.c.start()
def tearDown(self):
self.c.shutdown()
def test_bind_stuff(self):
self.c.declare(QUEUE).result()
self.c.bind(QUEUE, XCHG, routing_key='hello-world')
did_receive = False
def do(msg):
nonlocal did_receive
did_receive = True
msg.ack()
cons, fut = self.c.consume(QUEUE, on_message=do, no_ack=False)
fut.result()
self.c.publish(Message(b'good boy'), exchange=XCHG, routing_key='hello-world')
start = monotonic.monotonic()
while not did_receive:
time.sleep(2)
if monotonic.monotonic() - start > 10:
self.fail("Message not received within 10 seconds")
self.cons.cancel.result()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment