-
Piotr Maślanka authorede1e0eb3f
test_things.py 2.03 KiB
# coding=UTF-8
from __future__ import print_function, absolute_import, division
import logging
import os
import time
import unittest
from coolamqp.clustering import Cluster
from coolamqp.exceptions import ConnectionDead
from coolamqp.objects import NodeDefinition, Queue
NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
logging.basicConfig(level=logging.DEBUG)
class TestConnecting(unittest.TestCase):
def test_on_fail(self):
"""Assert that on_fail doesn't fire if the cluster fails to connect"""
q = {'failed': False}
c = Cluster(
NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'xguest', 'xguest', heartbeat=20),
on_fail=lambda: q.update(failed=True))
self.assertRaises(ConnectionDead, c.start)
c.shutdown()
self.assertFalse(q['failed'])
def test_on_clean(self):
q = {'failed': False}
c = Cluster([NODE], on_fail=lambda: q.update(failed=True))
c.start(wait=True)
c.shutdown()
time.sleep(5)
self.assertFalse(q['failed'])
def test_start_called_multiple_times(self):
c = Cluster([NODE])
c.start(wait=True)
self.assertRaises(RuntimeError, lambda: c.start())
c.shutdown(wait=True)
def test_shutdown_without_start(self):
c = Cluster([NODE])
self.assertRaises(RuntimeError, lambda: c.shutdown())
def test_queues_equal_and_hashable(self):
q1 = Queue(u'lolwut')
q2 = Queue(b'lolwut')
q3 = Queue(u'not')
self.assertEquals(q1, q2)
self.assertEquals(hash(q1), hash(q2))
self.assertNotEqual(q1, q3)
def test_node_with_kwargs(self):
node = NodeDefinition(host='127.0.0.1',
user='guest',
password='guest')
self.assertEquals(node.virtual_host, '/') # default
def test_amqpconnstring_port(self):
node = NodeDefinition('amqp://lol:lol@lol:4123/vhost')
self.assertEquals(node.port, 4123)