Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • smokserwis/coolamqp
1 result
Show changes
Commits on Source (2)
Previous release notes are hosted on [GitHub](https://github.com/smok-serwis/coolamqp/releases).
Since v1.3.2 they'll be put here and in release description.
v2.1.2
======
* extra_properties passed to Cluster might now be a dict
v2.1.1
======
......
......@@ -16,15 +16,14 @@ Why CoolAMQP?
* [Publisher confirms](https://www.rabbitmq.com/docs/confirms#publisher-confirms)
* [Negative Acknowledgements](https://www.rabbitmq.com/docs/nack)
* traceable using [opentracing](https://opentracing.io/)
* code coverage is 81% at the moment
* high coverage
* tested in production conditions
* full support for [streams](https://www.rabbitmq.com/docs/streams)
* 120 second stress tests are part of each release
[![license](https://img.shields.io/github/license/mashape/apistatus.svg)]()
**Warning!!** Since v1.3.1 development has been moved
from [GitHub](https://github.com/smok-serwis/coolamqp) to this GitLab.
from [GitHub](https://github.com/smok-serwis/coolamqp) to this [GitLab](https://git.dms-serwis.com.pl/smokserwis/coolamqp).
To install CoolAMQP please use
```bash
......@@ -111,7 +110,7 @@ and you're all set. The only files modified is
### [docs](docs/)
Sources for the documentation, available
[here](https://coolamqp.readthedocs.io/en/latest/).
[here](http://smokserwis.docs.smok.co/coolamqp).
## Running unit tests
......@@ -122,7 +121,7 @@ The default username used is guest, and password is guest.
You can also run unit tests from Docker, if you wish so. To launch the unit test suite:
```bash
docker-compose up unittest
docker-compose up --build unittest
```
To launch the stress test suite
......
__version__ = '2.1.1'
__version__ = '2.1.2a1'
......@@ -17,20 +17,13 @@ def tobytes(q):
def toutf8(q):
if isinstance(q, memoryview):
q = q.tobytes()
q = q.tobytes().decode('utf-8')
return q.decode('utf-8') if isinstance(q, six.binary_type) else q
def argumentify(arguments):
if arguments is None:
return []
# Was it argumented already?
# if isinstance(arguments, list):
# if len(arguments) >= 1:
# if isinstance(arguments[0], tuple):
# if isinstance(arguments[0][1], str) and len(arguments[0][1]) == 1:
# # Looks argumentified already
# return arguments
args = []
if isinstance(arguments, dict):
for key, value in arguments.items():
......@@ -46,6 +39,5 @@ def argumentify(arguments):
for value in arguments:
args.append((value, get_type_for(value)))
return (args, 'A')
else:
warnings.warn('Unnecessary call to argumentify, see issue #11 for details', UserWarning)
return args
warnings.warn('Unnecessary call to argumentify, see issue #11 for details', UserWarning)
return args
......@@ -4,12 +4,11 @@ from __future__ import print_function, absolute_import, division
import logging
import time
import typing as tp
import warnings
from concurrent.futures import Future
import six
from coolamqp.argumentify import argumentify
from coolamqp.argumentify import argumentify, tobytes
from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer
from coolamqp.attaches.utils import close_future
from coolamqp.clustering.events import ConnectionLost, MessageReceived, \
......@@ -55,7 +54,8 @@ class Cluster(object):
:type nodes: NodeDefinition
:param on_fail: callable/0 to call when connection fails in an
unclean way. This is a one-shot
:param extra_properties: refer to :class:`coolamqp.uplink.connection.Connection`
:param extra_properties: refer to :class:`coolamqp.uplink.connection.Connection`, or just pass a dictionary
of anything you'd like
:param log_frames: an object that supports logging each and every frame CoolAMQP sends and
receives from the broker
:type log_frames: tp.Optional[:class:`coolamqp.tracing.BaseFrameTracer`]
......@@ -77,6 +77,12 @@ class Cluster(object):
except ImportError:
raise RuntimeError('tracer given, but opentracing is not installed!')
if isinstance(extra_properties, dict):
extra_props = []
for key, value in extra_props.items():
extra_props.append((tobytes(key), argumentify(value)))
extra_properties = extra_props
self.started = False # type: bool
self.tracer = tracer
self.name = name or 'CoolAMQP' # type: str
......
......@@ -43,9 +43,9 @@ class TestStreams(unittest.TestCase):
fut.result()
time.sleep(3)
cons.cancel().result()
self.assertEqual(test['a'], 6)
self.assertGreaterEqual(test['a'], 6) # might have some messages from previous runs
cons, fut = c.consume(stream, qos=10, on_message=handle_msg, arguments={'x-stream-offset': 'first'}, no_ack=False)
fut.result()
time.sleep(3)
cons.cancel()
self.assertEqual(test['a'], 16)
self.assertGreaterEqual(test['a'], 16) # might have some messages from previous runs
......@@ -20,8 +20,14 @@ logging.getLogger('coolamqp').setLevel(logging.DEBUG)
class TestConnecting(unittest.TestCase):
def test_dict_extra_args(self):
c = Cluster([NODE], extra_properties={'test': 'something'})
c.start(wait=True, timeout=None)
c.shutdown(True)
def test_argumented_exchange(self):
xchg = Exchange('test-wer', durable=True)
repr(xchg)
c = Cluster([NODE])
c.start(wait=True, timeout=None)
c.declare(xchg).result()
......@@ -31,6 +37,7 @@ class TestConnecting(unittest.TestCase):
def test_argumented_queue(self):
que = Queue(auto_delete=True, exclusive=True, arguments=[(b'x-max-priority', 10)])
repr(que)
c = Cluster([NODE])
c.start(wait=True, timeout=None)
self.assertRaises(ValueError, c.declare, que)
......
......@@ -3,10 +3,15 @@ from __future__ import print_function, absolute_import, division
import unittest
from coolamqp.argumentify import toutf8
from coolamqp.exceptions import AMQPError
class TestExcs(unittest.TestCase):
def test_utf8(self):
self.assertEqual(toutf8(memoryview(b'test')), 'test')
def test_exist(self):
e = AMQPError(100, u'wtf', 0, 0)
......
......@@ -38,6 +38,10 @@ class TestObjects(unittest.TestCase):
self.assertRaises(ValueError, Queue, auto_delete=False)
repr(Queue())
def test_queue_repr(self):
q = Queue('test')
repr(q)
@unittest.skipUnless(sys.version.startswith('3'), 'Needs Python 3.x')
def test_queue_warns(self):
warnings.resetwarnings()
......