Skip to content
Snippets Groups Projects
Commit e40ae6f3 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

moar unit tests

parent 2ac116e6
No related branches found
No related tags found
No related merge requests found
Vagrantfile eol=lf
...@@ -12,6 +12,7 @@ __pycache__/ ...@@ -12,6 +12,7 @@ __pycache__/
env/ env/
build/ build/
develop-eggs/ develop-eggs/
htmlcov/
dist/ dist/
downloads/ downloads/
eggs/ eggs/
......
...@@ -9,10 +9,6 @@ install: ...@@ -9,10 +9,6 @@ install:
- pip install coverage - pip install coverage
- pip install codeclimate-test-reporter - pip install codeclimate-test-reporter
- pip install -r requirements.txt - pip install -r requirements.txt
- sudo rabbitmq-plugins enable rabbitmq_management
- sudo rabbitmqctl add_user user user
- sudo rabbitmqctl set_permissions -p / user ".*" ".*" ".*"
- sudo rabbitmqctl set_user_tags user administrator
after_success: after_success:
- CODECLIMATE_REPO_TOKEN=e8e05973a6c49139de5f98013cd285f9238b78d20f053f37f6e7deeab2c2c52f codeclimate-test-reporter - CODECLIMATE_REPO_TOKEN=e8e05973a6c49139de5f98013cd285f9238b78d20f053f37f6e7deeab2c2c52f codeclimate-test-reporter
services: services:
......
...@@ -13,23 +13,21 @@ CoolAMQP ...@@ -13,23 +13,21 @@ CoolAMQP
When you're tired of fucking with AMQP reconnects. When you're tired of fucking with AMQP reconnects.
When a connection made by CoolAMQP to your broker fails, it will pick another When a connection made by CoolAMQP to your broker fails, it will pick another
node, redeclare exchanges, queues, consumers and tell your application that node, redeclare exchanges, queues, consumers, QoS and all the other shit, and tell
a disconnect happened. your application that a disconnect happened.
CoolAMQP makes you forget about all the nasty corner cases about AMQP reconnection.
You only need to remember that: You only need to remember that:
1. Reconnects and redefinitions take a while. 1. Reconnects and redefinitions take a while.
* Things will happen during that time. It is your responsibility to ensure that your distributed system is built to handle this * Things will happen during that time. It is your responsibility to ensure that your distributed system is built to handle this
2. CoolAMQP will tell you when it senses losing broker connection. 2. CoolAMQP will tell you when it senses losing broker connection.
* It will also tell you when it regains the connection (that means that everything is redefined). * It will also tell you when it regains the connection (that means that everything is redefined and ready to go)
3. Delivering messages multiple times may happen. 3. Delivering messages multiple times may happen.
* Ensure you know when it happens. Keywords: message acknowledgement, amqp specification * Ensure you know when it happens. Keywords: message acknowledgement, amqp specification
As the project is in it's infancy stages, but actively maintained and used in a commercial project, The project is actively maintained, used in a commercial project and unit tested with high coverage.
if you need a feature - just drop me a note or create a new issue here.
Enjoy!
todo todo
---- ----
......
...@@ -4,8 +4,13 @@ ...@@ -4,8 +4,13 @@
Vagrant.configure("2") do |config| Vagrant.configure("2") do |config|
config.vm.box = "debian/contrib-jessie64" config.vm.box = "debian/contrib-jessie64"
# Rabbit MQ management
config.vm.network "forwarded_port", guest: 15672, host: 15672 config.vm.network "forwarded_port", guest: 15672, host: 15672
# HTTP for viewing coverage reports
config.vm.network "forwarded_port", guest: 80, host: 8765
config.vm.provision "shell", inline: <<-SHELL config.vm.provision "shell", inline: <<-SHELL
apt-get update apt-get update
apt-get install -y htop curl python python-setuptools python-pip python-dev build-essential rabbitmq-server apt-get install -y htop curl python python-setuptools python-pip python-dev build-essential rabbitmq-server
...@@ -19,7 +24,17 @@ Vagrant.configure("2") do |config| ...@@ -19,7 +24,17 @@ Vagrant.configure("2") do |config|
# Install deps # Install deps
pip install -r /vagrant/requirements.txt pip install -r /vagrant/requirements.txt
pip install nose pip install nose coverage
# HTTP server for viewing coverage reports
apt-get -y install nginx
rm -rf /var/www/html
ln -s /vagrant/htmlcov /var/www/html
# .bashrc for default user
echo """# .bashrc
cd /vagrant""" > /home/vagrant/.bashrc
SHELL SHELL
end end
...@@ -4,7 +4,7 @@ from distutils.core import setup ...@@ -4,7 +4,7 @@ from distutils.core import setup
setup(name='CoolAMQP', setup(name='CoolAMQP',
version='0.6', version='0.6',
description='The angry AMQP client library', description=u'The reconnecting AMQP client',
author=u'DMS Serwis s.c.', author=u'DMS Serwis s.c.',
author_email='piotrm@smok.co', author_email='piotrm@smok.co',
url='https://github.com/smok-serwis/coolamqp', url='https://github.com/smok-serwis/coolamqp',
...@@ -12,7 +12,7 @@ setup(name='CoolAMQP', ...@@ -12,7 +12,7 @@ setup(name='CoolAMQP',
keywords=['amqp', 'pyamqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], keywords=['amqp', 'pyamqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'],
packages=['coolamqp', 'coolamqp.backends'], packages=['coolamqp', 'coolamqp.backends'],
license='MIT License', license='MIT License',
long_description='''The Python AMQP client library that makes you STOP FUCKING WITH AMQP RECONNECTION''', long_description=u'The AMQP client that handles reconnection madness for you',
requires=[ requires=[
"amqp", "amqp",
"six" "six"
......
Tests work using either Travis CI or Vagrant. Tests work using either Travis CI or Vagrant.
If you want to debug things, you have RabbitMQ management enabled on Vagrant. Go to http://127.0.0.1:15672 and log in with **user** / **user** If you want to debug things, you have RabbitMQ management enabled on Vagrant.
Go to [http://127.0.0.1:15672](http://127.0.0.1:15672) and log in with **user** / **user**
RabbitMQ management is NOT enabled on Travis CI.
If you want to see a coverage report, run tests like this:
```bash
nosetests --with-coverage --exe
coverage html
```
*--exe* is for the cases where you run on Windows, and everything in */vagrant* is 777.
and then go to [http://127.0.0.1:8765](http://127.0.0.1:8765)
...@@ -4,7 +4,7 @@ import unittest ...@@ -4,7 +4,7 @@ import unittest
import six import six
from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, \ from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, \
ConnectionDown, ConsumerCancelled, Message ConnectionDown, ConsumerCancelled, Message, Exchange
def getamqp(): def getamqp():
...@@ -13,6 +13,15 @@ def getamqp(): ...@@ -13,6 +13,15 @@ def getamqp():
return amqp return amqp
class TestThings(unittest.TestCase):
def test_different_constructor_for_clusternode(self):
cn = ClusterNode(host='127.0.0.1', user='guest', password='guest', virtual_host='/')
amqp = Cluster([cn])
amqp.start()
self.assertIsInstance(amqp.drain(1), ConnectionUp)
amqp.shutdown()
class TestBasics(unittest.TestCase): class TestBasics(unittest.TestCase):
def setUp(self): def setUp(self):
self.amqp = getamqp() self.amqp = getamqp()
...@@ -68,6 +77,20 @@ class TestBasics(unittest.TestCase): ...@@ -68,6 +77,20 @@ class TestBasics(unittest.TestCase):
amqp2.shutdown() amqp2.shutdown()
def test_qos(self):
self.amqp.qos(0, 1)
self.amqp.consume(Queue('lol', exclusive=True)).result()
self.amqp.send(Message('what the fuck'), '', routing_key='lol')
self.amqp.send(Message('what the fuck'), '', routing_key='lol')
p = self.amqp.drain(wait=4)
self.assertIsInstance(p, MessageReceived)
self.assertIsNone(self.amqp.drain(wait=5))
p.message.ack()
self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)
def test_consume_twice(self): def test_consume_twice(self):
"""Spawn a second connection and try to consume an exclusive queue twice""" """Spawn a second connection and try to consume an exclusive queue twice"""
amqp2 = getamqp() amqp2 = getamqp()
...@@ -106,3 +129,19 @@ class TestBasics(unittest.TestCase): ...@@ -106,3 +129,19 @@ class TestBasics(unittest.TestCase):
self.amqp.cancel(myq) self.amqp.cancel(myq)
self.assertIsInstance(self.amqp.drain(wait=10), ConsumerCancelled) self.assertIsInstance(self.amqp.drain(wait=10), ConsumerCancelled)
def test_exchanges(self):
xchg = Exchange('a_fanout', type='fanout')
self.amqp.declare_exchange(xchg)
q1 = Queue('q1', exclusive=True, exchange=xchg)
q2 = Queue('q2', exclusive=True, exchange=xchg)
self.amqp.consume(q1)
self.amqp.consume(q2)
self.amqp.send(Message('hello'), xchg)
self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)
self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)
...@@ -3,14 +3,35 @@ from __future__ import absolute_import, division, print_function ...@@ -3,14 +3,35 @@ from __future__ import absolute_import, division, print_function
import unittest import unittest
import os import os
import time
from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, \ from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, \
ConnectionDown, ConsumerCancelled, Message ConnectionDown, ConsumerCancelled, Message, Exchange
NODE = ClusterNode('127.0.0.1', 'guest', 'guest')
class TestSpecialCases(unittest.TestCase):
def test_termination_while_disconnect(self):
self.amqp = Cluster([NODE])
self.amqp.start()
self.assertIsInstance(self.amqp.drain(wait=1), ConnectionUp)
os.system("sudo service rabbitmq-server stop")
time.sleep(5)
self.assertIsInstance(self.amqp.drain(wait=1), ConnectionDown)
self.amqp.shutdown()
self.assertIsNone(self.amqp.thread.backend)
self.assertFalse(self.amqp.connected)
os.system("sudo service rabbitmq-server start")
class TestFailures(unittest.TestCase): class TestFailures(unittest.TestCase):
def setUp(self): def setUp(self):
self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) self.amqp = Cluster([NODE])
self.amqp.start() self.amqp.start()
self.assertIsInstance(self.amqp.drain(1), ConnectionUp) self.assertIsInstance(self.amqp.drain(1), ConnectionUp)
...@@ -23,6 +44,31 @@ class TestFailures(unittest.TestCase): ...@@ -23,6 +44,31 @@ class TestFailures(unittest.TestCase):
self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown)
self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp)
def test_longer_disconnects(self):
os.system("sudo service rabbitmq-server stop")
self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown)
time.sleep(12)
os.system("sudo service rabbitmq-server start")
self.assertIsInstance(self.amqp.drain(wait=30), ConnectionUp)
def test_qos_redeclared_on_fail(self):
self.amqp.qos(0, 1).result()
os.system("sudo service rabbitmq-server restart")
self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown)
self.assertIsInstance(self.amqp.drain(wait=4), ConnectionUp)
self.amqp.consume(Queue('lol', exclusive=True)).result()
self.amqp.send(Message('what the fuck'), '', routing_key='lol')
self.amqp.send(Message('what the fuck'), '', routing_key='lol')
p = self.amqp.drain(wait=4)
self.assertIsInstance(p, MessageReceived)
self.assertIs(self.amqp.drain(wait=5), None)
p.message.ack()
self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)
def test_connection_flags_are_okay(self): def test_connection_flags_are_okay(self):
os.system("sudo service rabbitmq-server stop") os.system("sudo service rabbitmq-server stop")
self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown)
...@@ -31,12 +77,29 @@ class TestFailures(unittest.TestCase): ...@@ -31,12 +77,29 @@ class TestFailures(unittest.TestCase):
self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp)
self.assertTrue(self.amqp.connected) self.assertTrue(self.amqp.connected)
def test_connection_down_and_up_redeclare_queues(self):
def test_sending_a_message_is_cancelled(self):
"""are messages generated at all? does it reconnect?""" """are messages generated at all? does it reconnect?"""
q1 = Queue('wtf1', exclusive=True) q1 = Queue('wtf1', exclusive=True)
self.amqp.consume(q1).result()
self.amqp.consume(q1) os.system("sudo service rabbitmq-server stop")
self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown)
result = self.amqp.send(Message('what the fuck'), '', routing_key='wtf1')
result.cancel()
os.system("sudo service rabbitmq-server start")
self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp)
self.assertIsNone(self.amqp.drain(wait=6)) # message is NOT received
def test_connection_down_and_up_redeclare_queues(self):
"""are messages generated at all? does it reconnect?"""
q1 = Queue('wtf1', exclusive=True)
self.amqp.consume(q1).result()
os.system("sudo service rabbitmq-server restart") os.system("sudo service rabbitmq-server restart")
self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown)
...@@ -45,3 +108,21 @@ class TestFailures(unittest.TestCase): ...@@ -45,3 +108,21 @@ class TestFailures(unittest.TestCase):
self.amqp.send(Message('what the fuck'), '', routing_key='wtf1') self.amqp.send(Message('what the fuck'), '', routing_key='wtf1')
self.assertIsInstance(self.amqp.drain(wait=10), MessageReceived) self.assertIsInstance(self.amqp.drain(wait=10), MessageReceived)
def test_exchanges_are_redeclared(self):
xchg = Exchange('a_fanout', type='fanout')
self.amqp.declare_exchange(xchg)
q1 = Queue('q1', exclusive=True, exchange=xchg)
q2 = Queue('q2', exclusive=True, exchange=xchg)
self.amqp.consume(q1)
self.amqp.consume(q2).result()
os.system('sudo service rabbitmq-server restart')
self.amqp.send(Message('hello'), xchg)
self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown)
self.assertIsInstance(self.amqp.drain(wait=4), ConnectionUp)
self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)
self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment