Skip to content
Snippets Groups Projects
Commit dcde5b3b authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

tests

parent 11aec527
No related branches found
No related tags found
No related merge requests found
......@@ -11,6 +11,10 @@ install:
- pip install coverage
- pip install codeclimate-test-reporter
- pip install -r requirements.txt
- sudo rabbitmq-plugins enable rabbitmq_management
- sudo rabbitmqctl add_user user user
- sudo rabbitmqctl set_permissions -p / user ".*" ".*" ".*"
- sudo rabbitmqctl set_user_tags user administrator
after_success:
- CODECLIMATE_REPO_TOKEN=e8e05973a6c49139de5f98013cd285f9238b78d20f053f37f6e7deeab2c2c52f codeclimate-test-reporter
services:
......
......@@ -12,7 +12,7 @@ Vagrant.configure("2") do |config|
pip install --upgrade pip setuptools
/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
sudo service rabbitmq-server restart
rabbitmqctl add_user user user
rabbitmqctl set_permissions -p / user ".*" ".*" ".*"
rabbitmqctl set_user_tags user administrator
......
......@@ -6,7 +6,7 @@ from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp,
ConnectionDown, ConsumerCancelled, Message
class MyTestCase(unittest.TestCase):
class TestBasics(unittest.TestCase):
def setUp(self):
self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')])
self.amqp.start()
......@@ -15,24 +15,38 @@ class MyTestCase(unittest.TestCase):
def tearDown(self):
self.amqp.shutdown()
def test_acknowledge(self):
myq = Queue('myqueue', exclusive=True)
self.amqp.consume(myq)
self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
p = self.amqp.drain(wait=4)
self.assertIsInstance(p, MessageReceived)
self.assertEquals(p.message.body, b'what the fuck')
p.message.ack()
self.assertIs(self.amqp.drain(wait=4), None)
self.amqp.delete_queue(myq)
def test_nacknowledge(self):
myq = Queue('myqueue', exclusive=True)
self.amqp.consume(myq)
self.amqp.send(Message(b'what the fuck'), '', routing_key='myqueue')
p = self.amqp.drain(wait=10)
p = self.amqp.drain(wait=4)
self.assertIsInstance(p, MessageReceived)
self.assertEquals(p.message.body, b'what the fuck')
p.message.nack()
p = self.amqp.drain(wait=10)
p = self.amqp.drain(wait=4)
self.assertIsInstance(p, MessageReceived)
self.assertEquals(p.message.body, b'what the fuck')
self.amqp.delete_queue(myq)
def test_send_and_receive(self):
myq = Queue('myqueue', exclusive=True)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment