-
Piotr Maślanka authored8e64238e
test_basics.py 5.06 KiB
# coding=UTF-8
from __future__ import absolute_import, division, print_function
import unittest
import six
from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, ConsumerCancelled, Message, Exchange
def getamqp():
amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')])
amqp.start()
return amqp
class TestThings(unittest.TestCase):
def test_different_constructor_for_clusternode(self):
cn = ClusterNode(host='127.0.0.1', user='guest', password='guest', virtual_host='/')
amqp = Cluster([cn])
amqp.start()
self.assertIsInstance(amqp.drain(1), ConnectionUp)
amqp.shutdown()
class TestBasics(unittest.TestCase):
def setUp(self):
self.amqp = getamqp()
self.assertIsInstance(self.amqp.drain(1), ConnectionUp)
def tearDown(self):
self.amqp.shutdown()
def test_acknowledge(self):
myq = Queue('myqueue', exclusive=True)
self.amqp.consume(myq)
self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
p = self.amqp.drain(wait=1)
self.assertIsInstance(p, MessageReceived)
self.assertEquals(p.message.body, b'what the fuck')
self.assertIsInstance(p.message.body, six.binary_type)
p.message.ack()
self.assertIs(self.amqp.drain(wait=1), None)
def test_send_bullshit(self):
self.assertRaises(TypeError, lambda: Message(u'what the fuck'))
def test_send_nonobvious_bullshit(self):
self.assertEquals(Message(bytearray(b'what the fuck')).body, b'what the fuck')
def test_nacknowledge(self):
myq = Queue('myqueue', exclusive=True)
self.amqp.consume(myq)
self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
p = self.amqp.drain(wait=4)
self.assertIsInstance(p, MessageReceived)
self.assertEquals(p.message.body, b'what the fuck')
p.message.nack()
p = self.amqp.drain(wait=4)
self.assertIsInstance(p, MessageReceived)
self.assertEquals(p.message.body, b'what the fuck')
def test_bug_hangs(self):
p = Queue('lol', exclusive=True)
self.amqp.consume(p)
self.amqp.consume(p).result()
def test_consume_declare(self):
"""Spawn a second connection. One declares an exclusive queue, other tries to consume from it"""
amqp2 = getamqp()
has_failed = {'has_failed': False}
self.amqp.declare_queue(Queue('lol', exclusive=True)).result()
amqp2.consume(Queue('lol', exclusive=True), on_failed=lambda e: has_failed.update({'has_failed': True})).result()
self.assertTrue(has_failed['has_failed'])
amqp2.shutdown()
def test_qos(self):
self.amqp.qos(0, 1)
self.amqp.consume(Queue('lol', exclusive=True)).result()
self.amqp.send(Message(b'what the fuck'), '', routing_key='lol')
self.amqp.send(Message(b'what the fuck'), '', routing_key='lol')
p = self.amqp.drain(wait=4)
self.assertIsInstance(p, MessageReceived)
self.assertIsNone(self.amqp.drain(wait=5))
p.message.ack()
self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)
def test_consume_twice(self):
"""Spawn a second connection and try to consume an exclusive queue twice"""
amqp2 = getamqp()
has_failed = {'has_failed': False}
self.amqp.consume(Queue('lol', exclusive=True)).result()
amqp2.consume(Queue('lol', exclusive=True), on_failed=lambda e: has_failed.update({'has_failed': True})).result()
self.assertTrue(has_failed['has_failed'])
amqp2.shutdown()
def test_send_and_receive(self):
myq = Queue('myqueue', exclusive=True)
self.amqp.consume(myq)
self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
p = self.amqp.drain(wait=10)
self.assertIsInstance(p, MessageReceived)
self.assertEquals(p.message.body, b'what the fuck')
def test_consumer_cancelled_on_queue_deletion(self):
myq = Queue('myqueue', exclusive=True)
self.amqp.consume(myq)
self.amqp.delete_queue(myq)
self.assertIsInstance(self.amqp.drain(wait=10), ConsumerCancelled)
def test_consumer_cancelled_on_consumer_cancel(self):
myq = Queue('myqueue', exclusive=True)
self.amqp.consume(myq)
self.amqp.cancel(myq)
self.assertIsInstance(self.amqp.drain(wait=10), ConsumerCancelled)
def test_delete_exchange(self):
xchg = Exchange('a_fanout', type='fanout')
self.amqp.declare_exchange(xchg)
self.amqp.delete_exchange(xchg).result()
def test_exchanges(self):
xchg = Exchange('a_fanout', type='fanout')
self.amqp.declare_exchange(xchg)
q1 = Queue('q1', exclusive=True, exchange=xchg)
q2 = Queue('q2', exclusive=True, exchange=xchg)
self.amqp.consume(q1)
self.amqp.consume(q2)
self.amqp.send(Message(b'hello'), xchg)
self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)
self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)