diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000000000000000000000000000000000000..fe033c73dbf5c34aa6e8ed4fbc747212c0b1deb7 --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +Vagrantfile eol=lf diff --git a/.gitignore b/.gitignore index 58c2fcacc5b5484c159228eadad06889cc92ac88..cb4ba96eeee4e415e284f6c644d931b517ffa269 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ __pycache__/ env/ build/ develop-eggs/ +htmlcov/ dist/ downloads/ eggs/ diff --git a/.travis.yml b/.travis.yml index f6c21a9865b5f6f35d9b331d3a1892661aaa8948..ee745ad9f0de77131e013fa24d0a5024dfadba18 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,10 +9,6 @@ install: - pip install coverage - pip install codeclimate-test-reporter - pip install -r requirements.txt - - sudo rabbitmq-plugins enable rabbitmq_management - - sudo rabbitmqctl add_user user user - - sudo rabbitmqctl set_permissions -p / user ".*" ".*" ".*" - - sudo rabbitmqctl set_user_tags user administrator after_success: - CODECLIMATE_REPO_TOKEN=e8e05973a6c49139de5f98013cd285f9238b78d20f053f37f6e7deeab2c2c52f codeclimate-test-reporter services: diff --git a/README.md b/README.md index 76408410831e4ad36bd31f2bc9010b560171538b..536783655cc259f97b221493df22284efd44dbf5 100644 --- a/README.md +++ b/README.md @@ -13,23 +13,21 @@ CoolAMQP When you're tired of fucking with AMQP reconnects. When a connection made by CoolAMQP to your broker fails, it will pick another -node, redeclare exchanges, queues, consumers and tell your application that -a disconnect happened. - -CoolAMQP makes you forget about all the nasty corner cases about AMQP reconnection. +node, redeclare exchanges, queues, consumers, QoS and all the other shit, and tell +your application that a disconnect happened. You only need to remember that: 1. Reconnects and redefinitions take a while. * Things will happen during that time. It is your responsibility to ensure that your distributed system is built to handle this 2. CoolAMQP will tell you when it senses losing broker connection. - * It will also tell you when it regains the connection (that means that everything is redefined). + * It will also tell you when it regains the connection (that means that everything is redefined and ready to go) 3. Delivering messages multiple times may happen. * Ensure you know when it happens. Keywords: message acknowledgement, amqp specification -As the project is in it's infancy stages, but actively maintained and used in a commercial project, -if you need a feature - just drop me a note or create a new issue here. +The project is actively maintained, used in a commercial project and unit tested with high coverage. +Enjoy! todo ---- diff --git a/Vagrantfile b/Vagrantfile index b67a7ca4ed9c8f582b1c1bf623c6778fdcbbcd95..25d3c70815ede7cf5a381f2a1e59dbf446a6acd6 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -4,8 +4,13 @@ Vagrant.configure("2") do |config| config.vm.box = "debian/contrib-jessie64" + + # Rabbit MQ management config.vm.network "forwarded_port", guest: 15672, host: 15672 + # HTTP for viewing coverage reports + config.vm.network "forwarded_port", guest: 80, host: 8765 + config.vm.provision "shell", inline: <<-SHELL apt-get update apt-get install -y htop curl python python-setuptools python-pip python-dev build-essential rabbitmq-server @@ -19,7 +24,17 @@ Vagrant.configure("2") do |config| # Install deps pip install -r /vagrant/requirements.txt - pip install nose + pip install nose coverage + + # HTTP server for viewing coverage reports + apt-get -y install nginx + rm -rf /var/www/html + ln -s /vagrant/htmlcov /var/www/html + + # .bashrc for default user + echo """# .bashrc + cd /vagrant""" > /home/vagrant/.bashrc + SHELL end diff --git a/setup.py b/setup.py index 4d20552ef685979b063fd2ef6c50603a3e0cffac..9dd7ca688040f5365a244e59db9f337aae189a3a 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ from distutils.core import setup setup(name='CoolAMQP', version='0.6', - description='The angry AMQP client library', + description=u'The reconnecting AMQP client', author=u'DMS Serwis s.c.', author_email='piotrm@smok.co', url='https://github.com/smok-serwis/coolamqp', @@ -12,7 +12,7 @@ setup(name='CoolAMQP', keywords=['amqp', 'pyamqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], packages=['coolamqp', 'coolamqp.backends'], license='MIT License', - long_description='''The Python AMQP client library that makes you STOP FUCKING WITH AMQP RECONNECTION''', + long_description=u'The AMQP client that handles reconnection madness for you', requires=[ "amqp", "six" diff --git a/tests/README.md b/tests/README.md index 08936ac03b31b58cc36364f9f7186954545a5264..ea9d4b8875b01c2f057284b1762636610a42320a 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,3 +1,15 @@ Tests work using either Travis CI or Vagrant. -If you want to debug things, you have RabbitMQ management enabled on Vagrant. Go to http://127.0.0.1:15672 and log in with **user** / **user** +If you want to debug things, you have RabbitMQ management enabled on Vagrant. +Go to [http://127.0.0.1:15672](http://127.0.0.1:15672) and log in with **user** / **user** + +RabbitMQ management is NOT enabled on Travis CI. + +If you want to see a coverage report, run tests like this: +```bash +nosetests --with-coverage --exe +coverage html +``` +*--exe* is for the cases where you run on Windows, and everything in */vagrant* is 777. + +and then go to [http://127.0.0.1:8765](http://127.0.0.1:8765) diff --git a/tests/test_basics.py b/tests/test_basics.py index 14b949be01107ca5c67ead13c4aad4bb15cba19b..6507dc83ce31d9303619e65c3fb062f06adda250 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -4,7 +4,7 @@ import unittest import six from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, \ - ConnectionDown, ConsumerCancelled, Message + ConnectionDown, ConsumerCancelled, Message, Exchange def getamqp(): @@ -13,6 +13,15 @@ def getamqp(): return amqp +class TestThings(unittest.TestCase): + def test_different_constructor_for_clusternode(self): + cn = ClusterNode(host='127.0.0.1', user='guest', password='guest', virtual_host='/') + amqp = Cluster([cn]) + amqp.start() + self.assertIsInstance(amqp.drain(1), ConnectionUp) + amqp.shutdown() + + class TestBasics(unittest.TestCase): def setUp(self): self.amqp = getamqp() @@ -68,6 +77,20 @@ class TestBasics(unittest.TestCase): amqp2.shutdown() + def test_qos(self): + self.amqp.qos(0, 1) + + self.amqp.consume(Queue('lol', exclusive=True)).result() + self.amqp.send(Message('what the fuck'), '', routing_key='lol') + self.amqp.send(Message('what the fuck'), '', routing_key='lol') + + p = self.amqp.drain(wait=4) + self.assertIsInstance(p, MessageReceived) + + self.assertIsNone(self.amqp.drain(wait=5)) + p.message.ack() + self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) + def test_consume_twice(self): """Spawn a second connection and try to consume an exclusive queue twice""" amqp2 = getamqp() @@ -106,3 +129,19 @@ class TestBasics(unittest.TestCase): self.amqp.cancel(myq) self.assertIsInstance(self.amqp.drain(wait=10), ConsumerCancelled) + + def test_exchanges(self): + xchg = Exchange('a_fanout', type='fanout') + self.amqp.declare_exchange(xchg) + + q1 = Queue('q1', exclusive=True, exchange=xchg) + q2 = Queue('q2', exclusive=True, exchange=xchg) + + self.amqp.consume(q1) + self.amqp.consume(q2) + + self.amqp.send(Message('hello'), xchg) + + self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) + self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) + diff --git a/tests/test_failures.py b/tests/test_failures.py index daa07db8a646dcae66a39e4168b9b47ad329c20d..a5530f969c527a872b8b53408185a676b4abc0f2 100644 --- a/tests/test_failures.py +++ b/tests/test_failures.py @@ -3,14 +3,35 @@ from __future__ import absolute_import, division, print_function import unittest import os +import time from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, \ - ConnectionDown, ConsumerCancelled, Message + ConnectionDown, ConsumerCancelled, Message, Exchange + + +NODE = ClusterNode('127.0.0.1', 'guest', 'guest') + + +class TestSpecialCases(unittest.TestCase): + def test_termination_while_disconnect(self): + self.amqp = Cluster([NODE]) + self.amqp.start() + self.assertIsInstance(self.amqp.drain(wait=1), ConnectionUp) + + os.system("sudo service rabbitmq-server stop") + time.sleep(5) + self.assertIsInstance(self.amqp.drain(wait=1), ConnectionDown) + + self.amqp.shutdown() + self.assertIsNone(self.amqp.thread.backend) + self.assertFalse(self.amqp.connected) + + os.system("sudo service rabbitmq-server start") class TestFailures(unittest.TestCase): def setUp(self): - self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) + self.amqp = Cluster([NODE]) self.amqp.start() self.assertIsInstance(self.amqp.drain(1), ConnectionUp) @@ -23,6 +44,31 @@ class TestFailures(unittest.TestCase): self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + def test_longer_disconnects(self): + os.system("sudo service rabbitmq-server stop") + self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) + time.sleep(12) + os.system("sudo service rabbitmq-server start") + self.assertIsInstance(self.amqp.drain(wait=30), ConnectionUp) + + def test_qos_redeclared_on_fail(self): + self.amqp.qos(0, 1).result() + + os.system("sudo service rabbitmq-server restart") + self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) + self.assertIsInstance(self.amqp.drain(wait=4), ConnectionUp) + + self.amqp.consume(Queue('lol', exclusive=True)).result() + self.amqp.send(Message('what the fuck'), '', routing_key='lol') + self.amqp.send(Message('what the fuck'), '', routing_key='lol') + + p = self.amqp.drain(wait=4) + self.assertIsInstance(p, MessageReceived) + + self.assertIs(self.amqp.drain(wait=5), None) + p.message.ack() + self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) + def test_connection_flags_are_okay(self): os.system("sudo service rabbitmq-server stop") self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) @@ -31,12 +77,29 @@ class TestFailures(unittest.TestCase): self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) self.assertTrue(self.amqp.connected) - def test_connection_down_and_up_redeclare_queues(self): + + def test_sending_a_message_is_cancelled(self): """are messages generated at all? does it reconnect?""" q1 = Queue('wtf1', exclusive=True) + self.amqp.consume(q1).result() - self.amqp.consume(q1) + os.system("sudo service rabbitmq-server stop") + self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) + result = self.amqp.send(Message('what the fuck'), '', routing_key='wtf1') + result.cancel() + + os.system("sudo service rabbitmq-server start") + + self.assertIsInstance(self.amqp.drain(wait=6), ConnectionUp) + self.assertIsNone(self.amqp.drain(wait=6)) # message is NOT received + + + def test_connection_down_and_up_redeclare_queues(self): + """are messages generated at all? does it reconnect?""" + + q1 = Queue('wtf1', exclusive=True) + self.amqp.consume(q1).result() os.system("sudo service rabbitmq-server restart") self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) @@ -45,3 +108,21 @@ class TestFailures(unittest.TestCase): self.amqp.send(Message('what the fuck'), '', routing_key='wtf1') self.assertIsInstance(self.amqp.drain(wait=10), MessageReceived) + + def test_exchanges_are_redeclared(self): + xchg = Exchange('a_fanout', type='fanout') + self.amqp.declare_exchange(xchg) + + q1 = Queue('q1', exclusive=True, exchange=xchg) + q2 = Queue('q2', exclusive=True, exchange=xchg) + + self.amqp.consume(q1) + self.amqp.consume(q2).result() + + os.system('sudo service rabbitmq-server restart') + + self.amqp.send(Message('hello'), xchg) + self.assertIsInstance(self.amqp.drain(wait=4), ConnectionDown) + self.assertIsInstance(self.amqp.drain(wait=4), ConnectionUp) + self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived) + self.assertIsInstance(self.amqp.drain(wait=4), MessageReceived)