Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • smokserwis/coolamqp
1 result
Show changes
Commits on Source (4)
Previous release notes are hosted on [GitHub](https://github.com/smok-serwis/coolamqp/releases).
Since v1.3.2 they'll be put here and in release description.
v2.1.2
======
* extra_properties passed to Cluster might now be a dict
* basically all arguments and argument_binds can be passed as dicts
* removed a bogus warning
v2.1.1
======
......
......@@ -16,15 +16,14 @@ Why CoolAMQP?
* [Publisher confirms](https://www.rabbitmq.com/docs/confirms#publisher-confirms)
* [Negative Acknowledgements](https://www.rabbitmq.com/docs/nack)
* traceable using [opentracing](https://opentracing.io/)
* code coverage is 81% at the moment
* high coverage
* tested in production conditions
* full support for [streams](https://www.rabbitmq.com/docs/streams)
* 120 second stress tests are part of each release
[![license](https://img.shields.io/github/license/mashape/apistatus.svg)]()
**Warning!!** Since v1.3.1 development has been moved
from [GitHub](https://github.com/smok-serwis/coolamqp) to this GitLab.
from [GitHub](https://github.com/smok-serwis/coolamqp) to this [GitLab](https://git.dms-serwis.com.pl/smokserwis/coolamqp).
To install CoolAMQP please use
```bash
......@@ -111,7 +110,7 @@ and you're all set. The only files modified is
### [docs](docs/)
Sources for the documentation, available
[here](https://coolamqp.readthedocs.io/en/latest/).
[here](http://smokserwis.docs.smok.co/coolamqp).
## Running unit tests
......@@ -122,7 +121,7 @@ The default username used is guest, and password is guest.
You can also run unit tests from Docker, if you wish so. To launch the unit test suite:
```bash
docker-compose up unittest
docker-compose up --build unittest
```
To launch the stress test suite
......
__version__ = '2.1.1'
__version__ = '2.1.2'
import warnings
import six
from coolamqp.framing.field_table import get_type_for
......@@ -24,28 +22,20 @@ def toutf8(q):
def argumentify(arguments):
if arguments is None:
return []
# Was it argumented already?
# if isinstance(arguments, list):
# if len(arguments) >= 1:
# if isinstance(arguments[0], tuple):
# if isinstance(arguments[0][1], str) and len(arguments[0][1]) == 1:
# # Looks argumentified already
# return arguments
args = []
if isinstance(arguments, dict):
for key, value in arguments.items():
key = tobytes(key)
args.append((key, (value, get_type_for(value))))
return (args, 'F')
return args, 'F'
elif len(arguments[0]) == 2:
for key, value in arguments:
key = tobytes(key)
args.append((key, (value, get_type_for(value))))
return (args, 'F')
return args, 'F'
elif isinstance(arguments, (list, tuple)):
for value in arguments:
args.append((value, get_type_for(value)))
return (args, 'A')
else:
warnings.warn('Unnecessary call to argumentify, see issue #11 for details', UserWarning)
return args
return args, 'A'
return args
......@@ -4,12 +4,11 @@ from __future__ import print_function, absolute_import, division
import logging
import time
import typing as tp
import warnings
from concurrent.futures import Future
import six
from coolamqp.argumentify import argumentify
from coolamqp.argumentify import argumentify, tobytes
from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer
from coolamqp.attaches.utils import close_future
from coolamqp.clustering.events import ConnectionLost, MessageReceived, \
......@@ -55,7 +54,8 @@ class Cluster(object):
:type nodes: NodeDefinition
:param on_fail: callable/0 to call when connection fails in an
unclean way. This is a one-shot
:param extra_properties: refer to :class:`coolamqp.uplink.connection.Connection`
:param extra_properties: refer to :class:`coolamqp.uplink.connection.Connection`, or just pass a dictionary
of anything you'd like
:param log_frames: an object that supports logging each and every frame CoolAMQP sends and
receives from the broker
:type log_frames: tp.Optional[:class:`coolamqp.tracing.BaseFrameTracer`]
......@@ -77,6 +77,9 @@ class Cluster(object):
except ImportError:
raise RuntimeError('tracer given, but opentracing is not installed!')
if isinstance(extra_properties, dict):
extra_properties = argumentify(extra_properties)[0]
self.started = False # type: bool
self.tracer = tracer
self.name = name or 'CoolAMQP' # type: str
......
......@@ -3,7 +3,7 @@ services:
amqp:
image: rabbitmq:4.0-management
unittest:
command: nose2 -vv
command: coverage run --append -m nose2 -F -vv
build:
context: .
dockerfile: tests/Dockerfile
......
......@@ -7,6 +7,8 @@ pick their names for themselves.
It also forbids some combinations of Queue arguments, and makes the default values more palatable, so for example
a naked :class:`coolamqp.objects.Queue` will be anonymous, non-durable, exclusive and auto-delete.
Also, any arguments marked as arguments or arguments_bind may accept a dictionary safely.
Cluster.publish
---------------
......
......@@ -43,9 +43,9 @@ class TestStreams(unittest.TestCase):
fut.result()
time.sleep(3)
cons.cancel().result()
self.assertEqual(test['a'], 6)
self.assertGreaterEqual(test['a'], 6) # might have some messages from previous runs
cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': 'first'}, no_ack=False)
fut.result()
time.sleep(3)
cons.cancel()
self.assertEqual(test['a'], 16)
self.assertGreaterEqual(test['a'], 16) # might have some messages from previous runs
......@@ -14,14 +14,23 @@ from coolamqp.exceptions import ConnectionDead
from coolamqp.objects import NodeDefinition, Queue, Exchange
NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('coolamqp').setLevel(logging.DEBUG)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s] p%(process)s {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s',
datefmt='%Y-%m-%d:%H:%M:%S',)
class TestConnecting(unittest.TestCase):
def test_dict_extra_args(self):
c = Cluster([NODE], extra_properties={'test': 'something'})
c.start(wait=True, timeout=None)
c.shutdown(True)
def test_argumented_exchange(self):
xchg = Exchange('test-wer', durable=True)
repr(xchg)
c = Cluster([NODE])
c.start(wait=True, timeout=None)
c.declare(xchg).result()
......@@ -31,6 +40,7 @@ class TestConnecting(unittest.TestCase):
def test_argumented_queue(self):
que = Queue(auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)])
repr(que)
c = Cluster([NODE])
c.start(wait=True, timeout=None)
self.assertRaises(ValueError, c.declare, que)
......
......@@ -3,10 +3,15 @@ from __future__ import print_function, absolute_import, division
import unittest
from coolamqp.argumentify import toutf8
from coolamqp.exceptions import AMQPError
class TestExcs(unittest.TestCase):
def test_utf8(self):
self.assertEqual(toutf8(memoryview(b'test')), 'test')
def test_exist(self):
e = AMQPError(100, u'wtf', 0, 0)
......
......@@ -38,6 +38,10 @@ class TestObjects(unittest.TestCase):
self.assertRaises(ValueError, Queue, auto_delete=False)
repr(Queue())
def test_queue_repr(self):
q = Queue('test')
repr(q)
@unittest.skipUnless(sys.version.startswith('3'), 'Needs Python 3.x')
def test_queue_warns(self):
warnings.resetwarnings()
......