Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • smokserwis/coolamqp
1 result
Show changes
Commits on Source (26)
Showing with 469 additions and 273 deletions
......@@ -9,6 +9,7 @@ __pycache__/
coolamqp/framing/definitions.py
.pycharm_helpers/
# Distribution / packaging
*.xml
.Python
env/
build/
......
Previous release notes are hosted on [GitHub](https://github.com/smok-serwis/coolamqp/releases).
Since v1.3.2 they'll be put here and in release description.
v2.1.1
======
* fixed a bug in Queue's __repr__
* fixed a bug in BasicContentPropertyList.typize
* removed Vagrantfile
v2.1.0
======
* removed extra logging from argumentify
* user will be notified upon declaring an auto-delete durable exchange
* deprecated Consumer(fail_on_first_time_resource_locked)
* we now support [streams](https://www.rabbitmq.com/docs/streams)
v2.0.0
======
......
......@@ -14,16 +14,12 @@ docker-compose up --build unittest
docker-compose up --build stress_tests
```
If you want to debug things, you have RabbitMQ management enabled on Vagrant.
Go to [http://127.0.0.1:15672](http://127.0.0.1:15672) and log in with **user** / **user**
RabbitMQ management is NOT enabled on Travis CI.
If you want to debug things, please install a RabbitMQ instance via Docker, expose the necessary ports and go from there.
If you want to see a coverage report, run tests like this:
```bash
nosetests --with-coverage --exe
coverage html
```
*--exe* is for the cases where you run on Windows, and everything in */vagrant* is 777.
and then go to [http://127.0.0.1:8765](http://127.0.0.1:8765)
......@@ -3,30 +3,10 @@ CoolAMQP
**A Python client for RabbitMQ**
[![license](https://img.shields.io/github/license/mashape/apistatus.svg)]()
**Warning!!** Since v1.3.1 development has been moved
from [GitHub](https://github.com/smok-serwis/coolamqp) to this GitLab.
To install CoolAMQP please use
```bash
pip install --extra-index-url https://git.dms-serwis.com.pl/api/v4/groups/330/-/packages/pypi/simple coolamqp
```
Or state it at the beginning of your `requirements.txt`:
```python
--extra-index-url https://git.dms-serwis.com.pl/api/v4/groups/330/-/packages/pypi/simple
coolamqp
```
**Version 2.0** is in [active development](https://git.dms-serwis.com.pl/smokserwis/coolamqp/-/milestones/3)
Why CoolAMQP?
-------------
It's the best way to talk Python to RabbitMQ with AMQP 0.9.1.
* it supports all types of exchanges
* it works on Python 2 and on Windows
* tested against all versions of RabbitMQ 3.x and RabbitMQ 4.0
* AMQP 0.9.1 client that's native Python
* heavily optimized for speed
......@@ -36,9 +16,21 @@ It's the best way to talk Python to RabbitMQ with AMQP 0.9.1.
* [Publisher confirms](https://www.rabbitmq.com/docs/confirms#publisher-confirms)
* [Negative Acknowledgements](https://www.rabbitmq.com/docs/nack)
* traceable using [opentracing](https://opentracing.io/)
* code coverage is 80% at the moment
* code coverage is 81% at the moment
* full support for [streams](https://www.rabbitmq.com/docs/streams)
* 120 second stress tests are part of each release
[![license](https://img.shields.io/github/license/mashape/apistatus.svg)]()
**Warning!!** Since v1.3.1 development has been moved
from [GitHub](https://github.com/smok-serwis/coolamqp) to this GitLab.
To install CoolAMQP please use
```bash
pip install coolamqp
```
Documentation (WIP) is available at [our site](http://smokserwis.docs.smok.co/coolamqp).
CoolAMQP uses [semantic versioning 2.0](https://semver.org/spec/v2.0.0.html).
......@@ -49,10 +41,6 @@ and there are memoryviews **_everywhere_**.
This is borderline absurd.
CoolAMQP is not a direct AMQP client - it also handles reconnections, transactional sending,
and so on, mostly via Futures. This means it has a certain opinion on how to
handle AMQP, but you can feel the spirit of AMQP underneath. *API is stable*.
The project is actively maintained and used in a commercial project. Tests can run
either on Vagrant (Vagrantfile attached) or Travis CI, and run against RabbitMQ.
......
# -*- mode: ruby -*-
# vi: set ft=ruby :
Vagrant.configure("2") do |config|
config.vm.box = "debian/bullseye64"
# Rabbit MQ management
config.vm.network "forwarded_port", guest: 15672, host: 15672, auto_correct: true
# HTTP for viewing coverage reports
config.vm.network "forwarded_port", guest: 80, host: 8765
config.vm.provision "shell", inline: <<-SHELL
apt-get update
# Python
apt-get install -y htop curl python python-setuptools python3-pip python-dev build-essential rabbitmq-server python3 python3-setuptools
# sudo python -m pip install --upgrade pip setuptools
sudo pip3 install --upgrade pip setuptools
/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
sudo service rabbitmq-server restart
rabbitmqctl add_user user user
rabbitmqctl set_permissions -p / user ".*" ".*" ".*"
rabbitmqctl set_permissions -p / guest ".*" ".*" ".*"
rabbitmqctl set_user_tags user administrator
# Install deps
# sudo python -m pip install -r /vagrant/requirements.txt
# sudo python -m pip install nose coverage mock yapf
sudo pip3 install -r /vagrant/requirements.txt
sudo pip3 install nose2[coverage_plugin] coverage mock yapf nose2
sudo pip3 install -r /vagrant/stress_tests/requirements.txt
# HTTP server for viewing coverage reports
apt-get -y install nginx
rm -rf /var/www/html
ln -s /vagrant/htmlcov /var/www/html
# .bashrc for default user
echo """# .bashrc
cd /vagrant""" > /home/vagrant/.bashrc
SHELL
end
......@@ -303,6 +303,16 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved
line(u'''
@staticmethod
def typize(*fields): # type: (*str) -> type
"""
Return an autonomous class definition which is a header supporting only particular fields.
Usage:
>>> Headers = BasicContentPropertyList.typize('content_type', 'content_encoding')
>>> headers = Headers('application/json', 'gzip')
The reason for this is speed.
"""
''')
line(u' zpf = bytearray([\n')
......@@ -325,7 +335,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved
if field.reserved or field.basic_type == 'bit':
pass # zero
else:
byte_chunk.append(u"int('%s' in kwargs)" % (
byte_chunk.append(u"int('%s' in fields)" % (
format_field_name(field.name),))
else:
# this is the "do we need moar flags" section
......
__version__ = '2.0.0'
__version__ = '2.1.1'
......@@ -24,7 +24,6 @@ def toutf8(q):
def argumentify(arguments):
if arguments is None:
return []
logger.warning('Input is %s' % (arguments, ))
# Was it argumented already?
# if isinstance(arguments, list):
# if len(arguments) >= 1:
......@@ -37,7 +36,6 @@ def argumentify(arguments):
for key, value in arguments.items():
key = tobytes(key)
args.append((key, (value, get_type_for(value))))
logger.warning('Output is %s', (args, 'F'))
return (args, 'F')
elif len(arguments[0]) == 2:
for key, value in arguments:
......
......@@ -18,6 +18,7 @@ from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \
BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, BasicQosOk
from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame
from coolamqp.objects import Callable
from coolamqp.argumentify import argumentify
from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch
logger = logging.getLogger(__name__)
......@@ -26,21 +27,19 @@ EMPTY_MEMORYVIEW = memoryview(b'') # for empty messages
class BodyReceiveMode(object):
# ZC - zero copy
# C - copy (copies every byte once)
#: # message.body will be a single bytes object
#: this will gather frames as memoryviews, and b''.join() them upon receiving last frame
BYTES = 0
BYTES = 0 # message.body will be a single bytes object
# this will gather frames as memoryviews, and b''.join() them upon
# receiving last frame
# this is C
#: message.body will be returned as a memoryview object. This is zero-copy for small messages and once-copy for
#: bigger ones
#: think less than 800B, since 2048 is the buffer for socket recv, and an
#: AMQP frame (or many frames!) have to fit there
MEMORYVIEW = 1
MEMORYVIEW = 1 # message.body will be returned as a memoryview object
# this is ZC for small messages, and C for multi-frame ones
# think less than 800B, since 2048 is the buffer for socket recv, and an
# AMQP frame (or many frames!) have to fit there
LIST_OF_MEMORYVIEW = 2 # message.body will be returned as list of
# memoryview objects these constitute received pieces. this is always ZC
#: message.body will be returned as list of memoryviews. memoryview objects these constitute received pieces.
#: this is fastest, and always zero-copy
LIST_OF_MEMORYVIEW = 2
class Consumer(Channeler):
......@@ -61,7 +60,7 @@ class Consumer(Channeler):
go.
WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS
DO!
DO!
You can subscribe to be informed when the consumer is cancelled (for any
reason, server or client side) with:
......@@ -100,23 +99,30 @@ class Consumer(Channeler):
If the consumer doesn't get the chance to be declared - because
of a connection fail - next reconnect will consider this to be
SECOND declaration, ie. it will retry ad infinitum
.. deprecated:: v2.0.1
Use normal reconnects, for fuck's sake!
:type fail_on_first_time_resource_locked: bool
:param body_receive_mode: how should message.body be received. This
has a performance impact
:type body_receive_mode: a property of BodyReceiveMode
:type body_receive_mode: a property of :class:`BodyReceiveMode`
:param arguments: a dictionary, extra set of arguments to be provided to RabbitMQ during binding.
Primarily to support streams.
"""
__slots__ = ('queue', 'no_ack', 'on_message', 'cancelled', 'receiver',
'attache_group', 'channel_close_sent', 'qos', 'qos_update_sent',
'future_to_notify', 'future_to_notify_on_dead',
'fail_on_first_time_resource_locked',
'body_receive_mode', 'consumer_tag', 'on_cancel', 'on_broker_cancel',
'hb_watch', 'deliver_watch', 'span')
'hb_watch', 'deliver_watch', 'span', 'arguments')
def __init__(self, queue, on_message, span=None,
no_ack=True, qos=0,
future_to_notify=None,
fail_on_first_time_resource_locked=False,
body_receive_mode=BodyReceiveMode.BYTES
body_receive_mode=BodyReceiveMode.BYTES,
arguments=None
):
"""
Note that if you specify QoS, it is applied before basic.consume is
......@@ -127,8 +133,12 @@ class Consumer(Channeler):
self.span = span
self.queue = queue
self.arguments = argumentify(arguments)
self.no_ack = no_ack
if fail_on_first_time_resource_locked:
warnings.warn('This is heavily deprecated and discouraged', DeprecationWarning)
self.on_message = on_message
# consumer?
......@@ -221,8 +231,7 @@ class Consumer(Channeler):
self.receiver.on_gone()
self.receiver = None
def on_close(self, payload=None):
# type: (tp.Optional[coolamqp.framing.base.AMQPMethodPayload]) -> None
def on_close(self, payload=None): # type: (tp.Optional[coolamqp.framing.base.AMQPMethodPayload]) -> None
"""
Handle closing the channel. It sounds like an exception...
......@@ -234,7 +243,6 @@ class Consumer(Channeler):
hannel has been physically torn down
Note, this can be called multiple times, and eventually with None.
"""
if not self.cancelled:
self.cancelled = True
......@@ -430,7 +438,7 @@ class Consumer(Channeler):
self.method_and_watch(
BasicConsume(self.queue.name, self.consumer_tag,
False, self.no_ack, self.queue.exclusive, False,
[]),
self.arguments),
BasicConsumeOk,
self.on_setup
)
......
......@@ -35,25 +35,13 @@ class Cluster(object):
Call .start() to connect to AMQP.
It is not safe to fork() after .start() is called, but it's OK before.
:param nodes: list of nodes, or a single node. For now, only one is supported.
:param on_fail: callable/0 to call when connection fails in an
unclean way. This is a one-shot
:param extra_properties: refer to :class:`coolamqp.uplink.connection.Connection`
:param log_frames: an object that supports logging each and every frame CoolAMQP sends and
receives from the broker
:type log_frames: tp.Optional[:class:`coolamqp.tracing.BaseFrameTracer`]
:param name: name to appear in log items and prctl() for the listener thread
:param on_blocked: callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be
called with a value of True if connection becomes blocked, and False upon an unblock
:param tracer: tracer, if opentracing is installed
"""
# Events you can be informed about
ST_LINK_LOST = 0 # Link has been lost
ST_LINK_REGAINED = 1 # Link has been regained
def __init__(self, nodes, # type: tp.Union[NodeDefinition, tp.List[NodeDefinition]]
def __init__(self, nodes, # type: NodeDefinition
on_fail=None, # type: tp.Optional[tp.Callable[[], None]]
extra_properties=None,
# type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]]
......@@ -62,6 +50,20 @@ class Cluster(object):
on_blocked=None, # type: tp.Callable[[bool], None],
tracer=None # type: opentracing.Traccer
):
"""
:param nodes: single node
:type nodes: NodeDefinition
:param on_fail: callable/0 to call when connection fails in an
unclean way. This is a one-shot
:param extra_properties: refer to :class:`coolamqp.uplink.connection.Connection`
:param log_frames: an object that supports logging each and every frame CoolAMQP sends and
receives from the broker
:type log_frames: tp.Optional[:class:`coolamqp.tracing.BaseFrameTracer`]
:param name: name to appear in log items and prctl() for the listener thread
:param on_blocked: callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be
called with a value of True if connection becomes blocked, and False upon an unblock
:param tracer: tracer, if opentracing is installed
"""
from coolamqp.objects import NodeDefinition
if isinstance(nodes, NodeDefinition):
nodes = [nodes]
......@@ -185,13 +187,15 @@ class Cluster(object):
"""
Start consuming from a queue.
args and kwargs will be passed to Consumer constructor (coolamqp.attaches.consumer.Consumer).
args and kwargs will be passed to Consumer constructor.
Don't use future_to_notify - it's done here!
Take care not to lose the Consumer object - it's the only way to cancel a consumer!
.. note:: You don't need to explicitly declare queues and exchanges that you will be using beforehand,
this will do this for you on the same channel.
this will do this for you on the same channel, which is also the only way to use anonymous queues.
If accepts more arguments. Consult :class:`coolamqp.attaches.consumer.Consumer` for details.
:param queue: Queue object, being consumed from right now.
Note that name of anonymous queue might change at any time!
......@@ -259,7 +263,7 @@ class Cluster(object):
it will be discarded
:param span: optionally, current span, if opentracing is installed
:param dont_trace: if set to True, a span won't be generated
:return: Future to be finished on completion or None, is confirm/tx was not chosen
:return: Future to be finished on completion or None, is confirm was not chosen
"""
if self.tracer is not None and not dont_trace:
span = self._make_span('publish', span)
......
# coding=UTF-8
from __future__ import print_function, absolute_import
import coolamqp.argumentify
from coolamqp.argumentify import argumentify
"""
A Python version of the AMQP machine-readable specification.
......@@ -2987,12 +2983,22 @@ class BasicContentPropertyList(AMQPContentPropertyList):
@staticmethod
def typize(*fields): # type: (*str) -> type
"""
Return an autonomous class definition which is a header supporting only particular fields.
Usage:
>>> Headers = BasicContentPropertyList.typize('content_type', 'content_encoding')
>>> headers = Headers('application/json', 'gzip')
The reason for this is speed.
"""
zpf = bytearray([
(('content_type' in fields) << 7) |
(('content_encoding' in fields) << 6) |
(('headers' in fields) << 5) | (('delivery_mode' in fields) << 4) |
(('priority' in fields) << 3) | (('correlation_id' in fields) << 2)
| (('reply_to' in fields) << 1) | int('expiration' in kwargs),
| (('reply_to' in fields) << 1) | int('expiration' in fields),
(('message_id' in fields) << 7) | (('timestamp' in fields) << 6) |
(('type_' in fields) << 5) | (('user_id' in fields) << 4) |
(('app_id' in fields) << 3) | (('reserved' in fields) << 2)
......@@ -3025,8 +3031,7 @@ class BasicContentPropertyList(AMQPContentPropertyList):
while buf[offset + pfl - 1] & 1:
pfl += 2
zpf = BasicContentPropertyList.zero_property_flags(buf[offset:offset +
pfl]).tobytes()
pfl]).tobytes()
if zpf in BasicContentPropertyList.PARTICULAR_CLASSES:
return BasicContentPropertyList.PARTICULAR_CLASSES[
zpf].from_buffer(buf, offset)
......
......@@ -17,7 +17,41 @@ logger = logging.getLogger(__name__)
class MessageProperties(BasicContentPropertyList):
"""
Properties you can attach to your messages. Only these keys are valid!
"""
def __new__(cls, *args, **kwargs):
"""
:param content_type: MIME content type
:type content_type: binary type (max length 255) (AMQP as shortstr)
:param content_encoding: MIME content encoding
:type content_encoding: binary type (max length 255) (AMQP as shortstr)
:param headers: message header field table. You can pass a dictionary here safely.
:type headers: table. See coolamqp.uplink.framing.field_table (AMQP as table)
:param delivery_mode: non-persistent (1) or persistent (2)
:type delivery_mode: int, 8 bit unsigned (AMQP as octet)
:param priority: message priority, 0 to 9
:type priority: int, 8 bit unsigned (AMQP as octet)
:param correlation_id: application correlation identifier
:type correlation_id: binary type (max length 255) (AMQP as shortstr)
:param reply_to: address to reply to
:type reply_to: binary type (max length 255) (AMQP as shortstr)
:param expiration: message expiration specification (in milliseconds)
:type expiration: binary type (max length 255) (AMQP as shortstr)
:param message_id: application message identifier
:type message_id: binary type (max length 255) (AMQP as shortstr)
:param timestamp: message timestamp
:type timestamp: 64 bit signed POSIX timestamp (in seconds) (AMQP as timestamp)
:param type_: message type name
:type type_: binary type (max length 255) (AMQP as shortstr)
:param user_id: creating user id
:type user_id: binary type (max length 255) (AMQP as shortstr)
:param app_id: creating application id
:type app_id: binary type (max length 255) (AMQP as shortstr)
:param reserved: reserved, must be empty
:type reserved: binary type (max length 255) (AMQP as shortstr)
"""
if 'headers' in kwargs:
if isinstance(kwargs['headers'], dict):
kwargs['headers'] = argumentify(kwargs['headers'])
......@@ -61,25 +95,26 @@ class Message(object):
:param body: stream of octets
:type body: anything with a buffer interface
:param properties: AMQP properties to be sent along.
default is 'no properties at all'
You can pass a dict - it will be passed to
MessageProperties,
but it's slow - don't do that.
:param properties: AMQP properties to be sent along. default is 'no properties at all'.
You can pass a dict - it will be passed to MessageProperties, but it's slow - don't do that.
:type properties: :class:`coolamqp.objects.MessageProperties` instance
"""
__slots__ = ('body', 'properties')
Properties = MessageProperties # an alias for easier use
#: an alias for easier use
Properties = MessageProperties
def __init__(self, body, # type: bytes
properties=None # type: tp.Optional[MessageProperties]
def __init__(self, body,
properties=None
):
"""
Create a Message object.
Please take care with passing empty bodies, as py-amqp has some
failure on it.
:param body: bytes
:param properties: a :class:`~coolamqp.objects.MessageProperties` to send along
"""
if isinstance(body, six.text_type):
raise TypeError(u'body cannot be a text type!')
......@@ -173,6 +208,7 @@ class Exchange(object):
:param name: exchange name
:param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument
:param type: type of the exchange. So far, valid types are 'direct', 'fanout', 'topic' and 'headers'
"""
__slots__ = ('name', 'type', 'durable', 'auto_delete', 'arguments')
......@@ -190,19 +226,22 @@ class Exchange(object):
self.auto_delete = auto_delete
self.arguments = argumentify(arguments)
if self.auto_delete and self.durable:
warnings.warn('What is your purpose in declaring a durable auto-delete exchange?', UserWarning)
assert isinstance(self.name, six.text_type)
assert isinstance(self.type, six.binary_type)
def __repr__(self): # type: () -> str
return u'Exchange(%s, %s, %s, %s)' % (
return u'Exchange(%s, %s, %s, %s, %s)' % (
repr(self.name), repr(self.type), repr(self.durable),
repr(self.auto_delete))
repr(self.auto_delete), repr(self.arguments))
def __hash__(self): # type: () -> int
return self.name.__hash__()
def __eq__(self, other): # type: (Exchange) -> bool
return (self.name == other.name) and (type(self) == type(other))
return (self.name == other.name) and (self.type == other.type) and isinstance(other, self.__class__)
Exchange.direct = Exchange()
......@@ -212,7 +251,7 @@ class ServerProperties(object):
"""
An object describing properties of the target server.
:ivar version: tuple of (major version, minor version)
:ivar version: tuple of (major version, minor version) of the AMQP protocol in use. Practically always 0, 9.
:ivar properties: dictionary of properties (key str, value any)
:ivar mechanisms: a list of strings, supported auth mechanisms
:ivar locales: locale in use
......@@ -239,39 +278,44 @@ class Queue(object):
"""
This object represents a Queue that applications consume from or publish to.
Create a queue definition.
:param name: name of the queue.
None (default) for autogeneration. Just follow the rules for :ref:`anonymq`.
If empty string, a UUID name will be generated, and you won't have an anonymous queue anymore.
:param durable: Is the queue durable?
:param exchange: Exchange for this queue to bind to. None for no binding.
:param exclusive: This queue will be deleted when the connection closes
:param auto_delete: This queue will be deleted when the last consumer unsubscribes
:param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument
:param routing_key: routing key that will be used to bind to an exchange. Used only when this
queue is associated with an exchange. Default value of blank should suffice.
:param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument during
declaration
:param arguments_bind: arguments to pass to binding to a (headers, I suppose exchange)
:raises ValueError: tried to create a queue that was not durable or auto_delete
:raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous
:raises ValueError: tried to create a queue that was anonymous and not auto_delete or durable
:warning DeprecationWarning: if a non-exclusive auto_delete queue is created or some other combinations
that will be soon unavailable (eg. RabbitMQ 4.0).
:warning UserWarning: if you're declaring an auto_delete or exclusive, anonymous queue
"""
__slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive',
'anonymous', 'consumer_tag', 'arguments', 'routing_key', 'arguments_bind')
def __init__(self, name=None, # type: tp.Union[str, bytes, None]
def __init__(self, name=None, # type: Union[str, bytes, None]
durable=False, # type: bool
exchange=None, # type: tp.Optional[Exchange]
exchange=None, # type: Optional[Exchange]
exclusive=True, # type: bool
auto_delete=True, # type: bool
arguments=None, # type: tp.Union[tp.List[bytes, tp.Any], tp.Dict[str, tp.Any]],
routing_key=b'', #: type: tp.Union[str, bytes]
arguments_bind=None,
arguments=None, # type: Union[List[bytes, Any], Dict[str, Any]],
routing_key=b'', # type: Union[str, bytes]
arguments_bind=None, # type: Optional[Dict]
):
"""
:param name: name of the queue.
None (default) for autogeneration. Just follow the rules for :ref:`anonymq`.
If empty string, a UUID name will be generated, and you won't have an anonymous queue anymore. This queue
will be considered named by all of the other code.
:param durable: Is the queue durable?
:param exchange: Exchange for this queue to bind to. None for no binding.
:param exclusive: This queue will be deleted when the connection closes
:param auto_delete: This queue will be deleted when the last consumer unsubscribes
:param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument
:param routing_key: routing key that will be used to bind to an exchange. Used only when this
queue is associated with an exchange. Default value of blank should suffice.
:param arguments: either a list of (bytes, values) or a dict of (str, value) to pass as an extra argument during
declaration
:param arguments_bind: arguments to pass to binding to a (headers, I suppose exchange)
:raises ValueError: tried to create a queue that was not durable or auto_delete
:raises ValueError: tried to create a queue that was not exclusive or auto_delete and not anonymous
:raises ValueError: tried to create a queue that was anonymous and not auto_delete or durable
.. warning::
If a non-exclusive auto_delete queue is created or some other combinations that will be soon unavailable
(eg. RabbitMQ 4.0), a DeprecationWarning will be raised.
If you're declaring an auto_delete non-exclusive a UserWarning will be raised instead.
"""
if name is None:
self.name = None
else:
......@@ -298,9 +342,6 @@ class Queue(object):
if self.durable and self.anonymous:
raise ValueError('Cannot declare an anonymous durable queue')
if self.auto_delete and not self.exclusive and not self.anonymous:
raise ValueError('Cannot create an auto_delete and durable queue non-anonymous')
self.consumer_tag = self.name if not self.anonymous else tobytes(uuid.uuid4().hex)
if not self.exclusive and self.auto_delete:
......@@ -313,7 +354,9 @@ class Queue(object):
return hash(self.name)
def __repr__(self):
return 'Queue(%s, %s, %s, %s, %s, %s' % (self.name, self.durable, self.exchange, self.exclusive, self.arguments)
return 'Queue(%s, %s, %s, %s, %s, %s, %s, %s)' % (self.name, self.durable, repr(self.exchange), self.exclusive,
self.auto_delete, repr(self.arguments), self.routing_key,
repr(self.arguments_bind))
class QueueBind(object):
......
version: '3.2'
services:
amqp:
image: rabbitmq:3-management
image: rabbitmq:4.0-management
unittest:
command: nose2 -vv
build:
context: .
dockerfile: tests/Dockerfile
......
Advanced things
===============
.. autoclass:: coolamqp.uplink.connection.Connection
:members:
Declaring anonymous queues
--------------------------
.. _anonymq:
In order to make use of an anonymous queue, you must first :meth:`coolamqp.clustering.Cluster.consume` it, since
:meth:`coolamqp.clustering.Cluster.declare` will use a separate channel, in which the queue will be invalid. It will
raise ValueError if you try to do that, anyway.
Anonymous queues must be auto_delete and exclusive, ValueError will be raised otherwise.
Usage basics
============
First off, you need a Cluster object:
.. autoclass:: coolamqp.clustering.Cluster
:members:
You will need to initialize it with NodeDefinitions:
.. autoclass:: coolamqp.objects.NodeDefinition
You can send messages:
.. autoclass:: coolamqp.objects.Message
and receive them
.. autoclass:: coolamqp.objects.ReceivedMessage
:members:
MessageProperties
-----------------
.. autoclass:: coolamqp.objects.MessageProperties
:members:
.. autoclass:: coolamqp.framing.definitions.BasicContentPropertyList
:members:
:undoc-members:
Take care, as :class:`~coolamqp.objects.MessageProperties` will hash the
keys found and store it within non-GCable memory. So each "variant" of message
properties encountered will be compiled as a separate class.
Who am I talking to?
--------------------
:class:`coolamqp.clustering.Cluster` has a nice property, that will return None until the connection is established.
If it is, it will return something like this:
.. autoclass:: coolamqp.objects.ServerProperties
:members:
......@@ -73,7 +73,7 @@ release = __version__
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
language = 'en'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
......@@ -102,7 +102,7 @@ html_theme = 'alabaster'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
#html_static_path = ['_static']
# -- Options for HTMLHelp output ------------------------------------------
......@@ -134,7 +134,7 @@ latex_elements = {
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'CoolAMQP.tex', u'CoolAMQP Documentation',
u'DMS Serwis s.c.', 'manual'),
u'Piotr Maślanka', 'manual'),
]
# -- Options for manual page output ---------------------------------------
......@@ -157,12 +157,24 @@ texinfo_documents = [
'Miscellaneous'),
]
autodoc_default_options = {
'members': True,
}
autodoc_default_flags = [
'show-inheritance'
]
autodoc_typehints = "description"
autoclass_content = 'both'
from coolamqp.framing.definitions import BINARY_HEADER_TO_METHOD
with open('frames.rst', 'w') as f_out:
f_out.write('''===========================
Glossary of all AMQP frames
===========================
Please note that this is automatically generated.
''')
for class_ in BINARY_HEADER_TO_METHOD.values():
f_out.write('.. autoclass:: coolamqp.framing.definitions.%s\n :members:\n\n' % (
......
To be automatically filled in during build by conf.py
===========================
Glossary of all AMQP frames
===========================
Please note that this is automatically generated.
.. autoclass:: coolamqp.framing.definitions.ConnectionBlocked
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionClose
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionCloseOk
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionOpen
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionOpenOk
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionStart
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionSecure
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionStartOk
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionSecureOk
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionTune
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionTuneOk
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionUpdateSecret
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionUnblocked
:members:
.. autoclass:: coolamqp.framing.definitions.ConnectionUpdateSecretOk
:members:
.. autoclass:: coolamqp.framing.definitions.ChannelClose
:members:
.. autoclass:: coolamqp.framing.definitions.ChannelCloseOk
:members:
.. autoclass:: coolamqp.framing.definitions.ChannelFlow
:members:
.. autoclass:: coolamqp.framing.definitions.ChannelFlowOk
:members:
.. autoclass:: coolamqp.framing.definitions.ChannelOpen
:members:
.. autoclass:: coolamqp.framing.definitions.ChannelOpenOk
:members:
.. autoclass:: coolamqp.framing.definitions.ExchangeBind
:members:
.. autoclass:: coolamqp.framing.definitions.ExchangeBindOk
:members:
.. autoclass:: coolamqp.framing.definitions.ExchangeDeclare
:members:
.. autoclass:: coolamqp.framing.definitions.ExchangeDelete
:members:
.. autoclass:: coolamqp.framing.definitions.ExchangeDeclareOk
:members:
.. autoclass:: coolamqp.framing.definitions.ExchangeDeleteOk
:members:
.. autoclass:: coolamqp.framing.definitions.ExchangeUnbind
:members:
.. autoclass:: coolamqp.framing.definitions.ExchangeUnbindOk
:members:
.. autoclass:: coolamqp.framing.definitions.QueueBind
:members:
.. autoclass:: coolamqp.framing.definitions.QueueBindOk
:members:
.. autoclass:: coolamqp.framing.definitions.QueueDeclare
:members:
.. autoclass:: coolamqp.framing.definitions.QueueDelete
:members:
.. autoclass:: coolamqp.framing.definitions.QueueDeclareOk
:members:
.. autoclass:: coolamqp.framing.definitions.QueueDeleteOk
:members:
.. autoclass:: coolamqp.framing.definitions.QueuePurge
:members:
.. autoclass:: coolamqp.framing.definitions.QueuePurgeOk
:members:
.. autoclass:: coolamqp.framing.definitions.QueueUnbind
:members:
.. autoclass:: coolamqp.framing.definitions.QueueUnbindOk
:members:
.. autoclass:: coolamqp.framing.definitions.BasicAck
:members:
.. autoclass:: coolamqp.framing.definitions.BasicConsume
:members:
.. autoclass:: coolamqp.framing.definitions.BasicCancel
:members:
.. autoclass:: coolamqp.framing.definitions.BasicConsumeOk
:members:
.. autoclass:: coolamqp.framing.definitions.BasicCancelOk
:members:
.. autoclass:: coolamqp.framing.definitions.BasicDeliver
:members:
.. autoclass:: coolamqp.framing.definitions.BasicGet
:members:
.. autoclass:: coolamqp.framing.definitions.BasicGetOk
:members:
.. autoclass:: coolamqp.framing.definitions.BasicGetEmpty
:members:
.. autoclass:: coolamqp.framing.definitions.BasicNack
:members:
.. autoclass:: coolamqp.framing.definitions.BasicPublish
:members:
.. autoclass:: coolamqp.framing.definitions.BasicQos
:members:
.. autoclass:: coolamqp.framing.definitions.BasicQosOk
:members:
.. autoclass:: coolamqp.framing.definitions.BasicReturn
:members:
.. autoclass:: coolamqp.framing.definitions.BasicReject
:members:
.. autoclass:: coolamqp.framing.definitions.BasicRecoverAsync
:members:
.. autoclass:: coolamqp.framing.definitions.BasicRecover
:members:
.. autoclass:: coolamqp.framing.definitions.BasicRecoverOk
:members:
.. autoclass:: coolamqp.framing.definitions.TxCommit
:members:
.. autoclass:: coolamqp.framing.definitions.TxCommitOk
:members:
.. autoclass:: coolamqp.framing.definitions.TxRollback
:members:
.. autoclass:: coolamqp.framing.definitions.TxRollbackOk
:members:
.. autoclass:: coolamqp.framing.definitions.TxSelect
:members:
.. autoclass:: coolamqp.framing.definitions.TxSelectOk
:members:
.. autoclass:: coolamqp.framing.definitions.ConfirmSelect
:members:
.. autoclass:: coolamqp.framing.definitions.ConfirmSelectOk
:members:
Tutorial
========
How to guide
============
Connecting to a broker
----------------------
If you want to connect to an AMQP broker, you need:
* its address (and port)
......@@ -26,7 +29,7 @@ accepts a list of nodes:
.. code-block:: python
from coolamqp.clustering import Cluster
cluster = Cluster([node], name='My Cluster')
cluster = Cluster(node, name='My Cluster')
cluster.start(wait=True)
*wait=True* will block until connection is completed. After this, you can use other methods.
......@@ -36,10 +39,6 @@ receive a provided label, postfixed by **AMQP listener thread**.
.. _setproctitle: https://pypi.org/project/setproctitle/
.. autoclass:: coolamqp.clustering.Cluster
:members:
Publishing and consuming
------------------------
......@@ -58,14 +57,8 @@ you must first define a queue, and register a consumer.
This will create an auto-delete and exclusive queue. After than, a consumer will be registered for this queue.
_no_ack=False_ will mean that we have to manually confirm messages.
.. warning:: if you declare a :class:`coolamqp.objects.Queue` without a name, this client will automatically
generate an UUID-name for you, and verify the queue is auto_delete. Since RabbitMQ supports
`automatic queue name generation <https://www.rabbitmq.com/docs/queues#names>`_,
this client does not use it, because the queue is valid only for the channel that declared it,
and CoolAMQP declares things with a dedicated channel.
You can specify a callback, that will be called with a message if one's received by this consumer. Since
we did not do that, this will go to a generic queue belonging to _Cluster_.
You should specify a callback. It will be executed in receiving thread's context, so it can't block for long.
If you're looking for receiving messages yourself, familiarize yourself with :meth:`coolamqp.clustering.Cluster.drain`.
_consumer_ is a _Consumer_ object. This allows us to do some things with the consumer (such as setting QoS),
but most importantly it allows us to cancel it later. _consume_confirm_ is a _Future_, that will succeed
......@@ -75,13 +68,12 @@ To send a message we need to construct it first, and later publish:
.. code-block:: python
from coolamqp.objects import Message
from coolamqp.objects import Message, MessageProperties
msg = Message(b'hello world', properties=Message.Properties())
msg = Message(b'hello world', properties=MessageProperties())
cluster.publish(msg, routing_key=u'my_queue')
.. autoclass:: coolamqp.objects.Message
:members:
The default exchange is the direct exchange, which will target the queue whose name is equal to routing_key.
This creates a message with no properties, and sends it through default (direct) exchange to our queue.
Note that CoolAMQP simply considers your messages to be bags of bytes + properties. It will not modify them,
......@@ -95,9 +87,27 @@ To actually get our message, we need to start a consumer first. To do that, just
Where kwargs are passed directly to Consumer class.
**cons** is a Consumer object, and **fut** is a Future that will happen when listening has been registered on target
server.
server. However, not providing a
.. autoclass:: coolamqp.attaches.Consumer
:members:
.. _anonymq:
Declaring anonymous queue
-------------------------
In order to make use of an anonymous queue, you must first :meth:`coolamqp.clustering.Cluster.consume` it, since
:meth:`coolamqp.clustering.Cluster.declare` will use a separate channel, in which the queue will be invalid. It will
raise ValueError if you try to do that, anyway.
Anonymous queues must be auto_delete and exclusive, ValueError will be raised otherwise.
Who am I talking to?
--------------------
To identify the server you're talking to just connect and use :attr:`coolamqp.clustering.Cluster.properties`,
which will return the following class:
.. autoclass:: coolamqp.objects.ServerProperties
:members:
......@@ -6,14 +6,21 @@ Welcome to CoolAMQP's documentation!
:caption: Contents
whatsnew
cluster
tutorials/send_and_receive
tutorials
how-to-guide
caveats
frames
basics
streams
tracing
advanced
reference
frames
It is highly recommended to set logging level of CoolAMQP to error:
.. code-block:: python
import logging
logging.getLogger('coolamqp').setLevel(logging.ERROR)
Quick FAQ
=========
......
CoolAMQP cluster
================
CoolAMQP classes reference
==========================
Cluster-related things
----------------------
.. autoclass:: coolamqp.clustering.Cluster
:members:
.. note:: If environment variable :code:`COOLAMQP_FORCE_SELECT_LISTENER` is defined, select will be used instead of epoll.
Publisher
---------
.. autoclass:: coolamqp.attaches.publisher.Publisher
:members:
:undoc-members:
Consumers
---------
This will automatically use select if epoll is not available (eg. Windows).
.. autoclass:: coolamqp.attaches.consumer.BodyReceiveMode
:members:
.. autoclass:: coolamqp.attaches.consumer.Consumer
:members:
:undoc-members:
Please note that :meth:`coolamqp.clustering.Cluster.consume` passes lot of it's
args and kwargs directly to the :class:`coolamqp.attaches.consumer.Consumer`.
Extra objects
-------------
.. autoclass:: coolamqp.objects.Message
:members:
.. autoclass:: coolamqp.objects.ReceivedMessage
:members:
.. autoclass:: coolamqp.objects.MessageProperties
:members:
.. autoclass:: coolamqp.objects.Queue
:members:
.. autoclass:: coolamqp.objects.Exchange
:members: