Skip to content
Snippets Groups Projects
Commit b4e7de60 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

compat with context manager protocol

parent 51bfe4e1
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python #!/usr/bin/env python
#coding=UTF-8 # coding=UTF-8
from distutils.core import setup from setuptools import setup
setup(name='CoolAMQP', setup(name='CoolAMQP',
version='0.6', version='0.7',
description=u'The reconnecting AMQP client', description=u'AMQP client with sane reconnects',
author=u'DMS Serwis s.c.', author=u'DMS Serwis s.c.',
author_email='piotrm@smok.co', author_email='piotrm@smok.co',
url='https://github.com/smok-serwis/coolamqp', url='https://github.com/smok-serwis/coolamqp',
download_url='https://github.com/smok-serwis/coolamqp/archive/master.zip', download_url='https://github.com/smok-serwis/coolamqp/archive/v0.7.zip',
keywords=['amqp', 'pyamqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], keywords=['amqp', 'pyamqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'],
packages=['coolamqp', 'coolamqp.backends'], packages=['coolamqp', 'coolamqp.backends'],
license='MIT License', license='MIT License',
long_description=u'The AMQP client that handles reconnection madness for you', long_description=u'The AMQP client that handles reconnection madness for you',
requires=[ requires=[
"amqp", "amqp",
"six" "six",
"monotonic"
], ],
tests_require=["nose"],
test_suite='nose.collector',
classifiers=[ classifiers=[
'Programming Language :: Python', 'Programming Language :: Python',
'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 2.7',
...@@ -26,4 +29,4 @@ setup(name='CoolAMQP', ...@@ -26,4 +29,4 @@ setup(name='CoolAMQP',
'Programming Language :: Python :: Implementation :: PyPy', 'Programming Language :: Python :: Implementation :: PyPy',
'Operating System :: OS Independent' 'Operating System :: OS Independent'
] ]
) )
\ No newline at end of file
#coding=UTF-8
from __future__ import absolute_import, division, print_function
import unittest
import six
import time
from coolamqp import Cluster, ClusterNode, Queue, MessageReceived, ConnectionUp, \
ConnectionDown, ConsumerCancelled, Message, Exchange
class TestBasics(unittest.TestCase):
def setUp(self):
self.amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')])
self.amqp.start()
self.assertIsInstance(self.amqp.drain(1), ConnectionUp)
def tearDown(self):
self.amqp.shutdown()
def takes_less_than(self, max_time):
"""
Tests that your code executes in less time than specified value.
Use like:
with self.takes_less_than(0.9):
my_operation()
:param max_time: in seconds
"""
test = self
class CM(object):
def __enter__(self):
self.started_at = time.time()
def __exit__(self, tp, v, tb):
test.assertLess(time.time() - self.started_at, max_time)
return False
return CM()
def test_sending_a_message(self):
with self.takes_less_than(0.5):
self.amqp.send(Message(''), routing_key='nowhere').result()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment