diff --git a/.codeclimate.yml b/.codeclimate.yml index 3c66ca9047c32329dcb83d7a9cff2e19d5f36933..436bd443cd1b67785f394b0144dfe23f245f2ecc 100644 --- a/.codeclimate.yml +++ b/.codeclimate.yml @@ -14,11 +14,11 @@ engines: radon: enabled: true exclude_paths: -- examples/** -- tests/** -- coolamqp/framing/definitions.py -- compile_definitions/** -- stress_tests/** + - examples/** + - tests/** + - coolamqp/framing/definitions.py + - compile_definitions/** + - stress_tests/** ratings: paths: - - coolamqp/** + - coolamqp/** diff --git a/.travis.yml b/.travis.yml index dadcef730e7736d4f69087a3f1c7a5eb0ce7ffa8..97cbfc93d4c1df0cc5ca5053ef57fb58014b40e9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,30 +1,30 @@ language: python python: - - "2.7" - - "3.5" - - "3.6" - - "3.7" - - "3.8" - - "nightly" - - "pypy" - - "pypy3.5" + - "2.7" + - "3.5" + - "3.6" + - "3.7" + - "3.8" + - "nightly" + - "pypy" + - "pypy3.5" cache: pip before_script: - curl -L https://codeclimate.com/downloads/test-reporter/test-reporter-latest-linux-amd64 > ./cc-test-reporter - chmod +x ./cc-test-reporter - ./cc-test-reporter before-build script: - - python -m compile_definitions - - python setup.py test - - python -m stress_tests + - python -m compile_definitions + - python setup.py test + - python -m stress_tests install: - - pip install -r requirements.txt - - pip install -r stress_tests/requirements.txt - - pip install yapf nose2 mock coverage nose2[coverage_plugin] + - pip install -r requirements.txt + - pip install -r stress_tests/requirements.txt + - pip install yapf nose2 mock coverage nose2[coverage_plugin] after_success: - - coverage xml - - ./cc-test-reporter after-build -t coverage.py --exit-code $TRAVIS_TEST_RESULT - - bash build.sh + - coverage xml + - ./cc-test-reporter after-build -t coverage.py --exit-code $TRAVIS_TEST_RESULT + - bash build.sh services: rabbitmq addons: apt: diff --git a/CHANGELOG.md b/CHANGELOG.md index 6df9eaf2531cd74529c08a7621d78521f1efd97f..ac7f447a00488640d8fcca282a1a793a0068e078 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ # v1.0.1: -* _TBA_ +* added support for channel flow control and blocking and + unblocking the connection +* added support for notifying the app when connection + becomes blocked # v1.0: diff --git a/README.md b/README.md index 7fc18bfb915246d3fd4e85fc623f956eeb1b1d88..4981cb93ff14e1dbed60218322488429c4fa8d00 100644 --- a/README.md +++ b/README.md @@ -11,10 +11,12 @@ CoolAMQP [](http://coolamqp.readthedocs.io/en/latest/?badge=develop) []() -A **magical** AMQP 0.9.1 client, that uses **heavy sorcery** to achieve speeds that other AMQP clients cannot even hope to match. +A **magical** AMQP 0.9.1 client, that uses **heavy sorcery** to achieve speeds that other pure-Python AMQP clients cannot even hope to match. Documentation (WIP) is available at [Read the Docs](http://coolamqp.readthedocs.io/). +CoolAMQP uses [semantic versioning 2.0](https://semver.org/spec/v2.0.0.html). + tl;dr - [this](coolamqp/framing/definitions.py) is **machine-generated** compile-time. [this](coolamqp/framing/compilation/content_property.py) **generates classes run-time**, and there are memoryviews **_everywhere_**. diff --git a/build.sh b/build.sh index cfb7ee8b287622b902647f57a45a8ede35a1c5f2..413cacc14ee19bc24cd4bc369dc9092df2b49fd2 100644 --- a/build.sh +++ b/build.sh @@ -9,8 +9,8 @@ python setup.py bdist bdist_wheel if [ $TRAVIS_BRANCH == "master" ]; then - if [ $TRAVIS_PYTHON_VERSION == "2.7" ]; then - pip install wheel twine - twine upload -u $PYPI_USER -p $PYPI_PWD dist/* - fi + if [ $TRAVIS_PYTHON_VERSION == "2.7" ]; then + pip install wheel twine + twine upload -u $PYPI_USER -p $PYPI_PWD dist/* + fi fi diff --git a/compile_definitions/__main__.py b/compile_definitions/__main__.py index 3097f68d989737c936a0fb5d23336fd321c79a0a..c61f16799bd2cd8cae34265d8036ad94a341030c 100644 --- a/compile_definitions/__main__.py +++ b/compile_definitions/__main__.py @@ -1,14 +1,14 @@ from __future__ import division import collections +import math import struct import subprocess from xml.etree import ElementTree -import math import six -from compile_definitions.utilities import ffmt, to_docstring, pythonify_name, to_code_binary, frepr, \ +from compile_definitions.utilities import f_fmt, to_docstring, pythonify_name, to_code_binary, f_repr, \ format_method_class_name, name_class, get_size from coolamqp.framing.compilation.utilities import format_field_name from .xml_tags import Constant, Class, Domain @@ -75,7 +75,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved '''.encode('utf8')) def line(data, *args, **kwargs): - out.write(ffmt(data, *args, sane=True).encode('utf8')) + out.write(f_fmt(data, *args, sane=True).encode('utf8')) # Output core ones FRAME_END = None @@ -96,13 +96,13 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved line(u' # %s\n', ln) if pythonify_name(constant.name) == 'FRAME_END': FRAME_END = constant.value - g = ffmt('%s = %s\n', pythonify_name(constant.name), constant.value) + g = f_fmt('%s = %s\n', pythonify_name(constant.name), constant.value) line(g) if 0 <= constant.value <= 255: z = repr(six.int2byte(constant.value)) if not z.startswith(u'b'): z = u'b' + z - g = ffmt('%s_BYTE = %s\n', pythonify_name(constant.name), z) + g = f_fmt('%s_BYTE = %s\n', pythonify_name(constant.name), z) line(g) if was_docs_output: @@ -124,8 +124,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved domain_to_basic_type = {} line('\n\n\nDOMAIN_TO_BASIC_TYPE = {\n') for domain in Domain.findall(xml): - line(u' %s: %s,\n', frepr(domain.name), - frepr(None if domain.elementary else domain.type)) + line(u' %s: %s,\n', f_repr(domain.name), + f_repr(None if domain.elementary else domain.type)) domain_to_basic_type[domain.name] = domain.type line('}\n') @@ -140,7 +140,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved for cls in Class.findall(xml): cls.properties = [p._replace(basic_type=domain_to_basic_type[p.type]) for - p in cls.properties] + p in cls.properties] line('''\nclass %s(AMQPClass): """ @@ -151,7 +151,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved ''', name_class(cls.name), to_docstring(None, cls.docs), - frepr(cls.name), cls.index) + f_repr(cls.name), cls.index) if len(cls.properties) > 0: class_id_to_contentpropertylist[cls.index] = name_class( @@ -165,7 +165,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved ''', name_class(cls.name), to_docstring(None, cls.docs), - frepr(cls.name), cls.index, name_class(cls.name)) + f_repr(cls.name), cls.index, name_class(cls.name)) is_static = all( property.basic_type not in ('table', 'longstr', 'shortstr') for @@ -175,9 +175,9 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved if property.basic_type == 'bit': raise ValueError('bit properties are not supported!' ) - line(' Field(%s, %s, %s, %s),\n', frepr(property.name), - frepr(property.type), - frepr(property.basic_type), repr(property.reserved)) + line(' Field(%s, %s, %s, %s),\n', f_repr(property.name), + f_repr(property.type), + f_repr(property.basic_type), repr(property.reserved)) line(''' ] # A dictionary from a zero property list to a class typized with # some fields @@ -218,7 +218,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved pass # zero anyway else: byte_chunk.append(u"(('%s' in kwargs) << %s)" % ( - format_field_name(field.name), piece_index)) + format_field_name(field.name), piece_index)) piece_index -= 1 else: if first_byte: @@ -226,7 +226,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved pass # zero anyway else: byte_chunk.append(u"int('%s' in kwargs)" % ( - format_field_name(field.name),)) + format_field_name(field.name),)) else: # this is the "do we need moar flags" section byte_chunk.append(u"kwargs['%s']" % ( @@ -291,7 +291,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved pass # zero else: byte_chunk.append(u"(('%s' in fields) << %s)" % ( - format_field_name(field.name), piece_index)) + format_field_name(field.name), piece_index)) piece_index -= 1 else: if first_byte: @@ -299,7 +299,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved pass # zero else: byte_chunk.append(u"int('%s' in kwargs)" % ( - format_field_name(field.name),)) + format_field_name(field.name),)) else: # this is the "do we need moar flags" section byte_chunk.append(u"kwargs['%s']" % ( @@ -359,7 +359,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved # ============================================ Do methods for this class for method in cls.methods: full_class_name = u'%s%s' % ( - name_class(cls.name), format_method_class_name(method.name)) + name_class(cls.name), format_method_class_name(method.name)) # annotate types method.fields = [ @@ -380,7 +380,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved slots = u'' else: slots = (u', '.join( - map(lambda f: frepr(format_field_name(f.name)), + map(lambda f: f_repr(format_field_name(f.name)), non_reserved_fields))) + u', ' line('''\nclass %s(AMQPMethodPayload): @@ -414,8 +414,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved IS_CONTENT_STATIC = %s # this means that argument part has always the same content ''', slots, - frepr(cls.name + '.' + method.name), - frepr(cls.index), frepr(method.index), + f_repr(cls.name + '.' + method.name), + f_repr(cls.index), f_repr(method.index), to_code_binary(struct.pack("!HH", cls.index, method.index)), repr(method.sent_by_client), repr(method.sent_by_server), @@ -449,8 +449,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved for field in method.fields: line(' Field(%s, %s, %s, reserved=%s),\n', - frepr(field.name), frepr(field.type), - frepr(field.basic_type), repr(field.reserved)) + f_repr(field.name), f_repr(field.type), + f_repr(field.basic_type), repr(field.reserved)) line(' ]\n') @@ -463,7 +463,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved """ return '%s(%S)' % (', '.join(map(repr, [%s])))''', full_class_name, - u", ".join(['self.'+format_field_name(field.name) for field in non_reserved_fields])) + u", ".join(['self.' + format_field_name(field.name) for field in non_reserved_fields])) if len(non_reserved_fields) > 0: @@ -504,7 +504,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved ''', full_class_name) line_, new_structers = get_from_buffer(method.fields, '', 2, - remark=(method.name == 'deliver')) + remark=(method.name == 'deliver')) line(line_) structers.update(new_structers) diff --git a/compile_definitions/utilities.py b/compile_definitions/utilities.py index 4e9891dc22c5dab6db69dc53ee8464523c8846ad..b0f12e7b0d63003837eeb830819b88f5631e64f8 100644 --- a/compile_definitions/utilities.py +++ b/compile_definitions/utilities.py @@ -2,15 +2,16 @@ from __future__ import division, absolute_import, print_function import math + import six from coolamqp.framing.base import BASIC_TYPES from coolamqp.framing.compilation.utilities import as_unicode -def ffmt(data, *args, **kwargs): +def f_fmt(data, *args, **kwargs): for arg in args: - op = str if kwargs.get('sane', True) else frepr + op = str if kwargs.get('sane', True) else f_repr data = data.replace('%s', op(arg), 1) data = data.replace('%S', '%s') return data @@ -58,7 +59,7 @@ def to_code_binary(p): return u"b'" + (u''.join(body)) + u"'" -def frepr(p, sop=six.text_type): +def f_repr(p, sop=six.text_type): if isinstance(p, (six.binary_type, six.text_type)): p = sop(p) s = repr(p) @@ -71,13 +72,13 @@ def frepr(p, sop=six.text_type): @as_unicode -def format_method_class_name(methodname): - if '-' in methodname: - i = methodname.find('-') - return methodname[0:i].capitalize() + methodname[ - i + 1].upper() + methodname[i + 2:] +def format_method_class_name(method_name): + if '-' in method_name: + i = method_name.find('-') + return method_name[0:i].capitalize() + method_name[ + i + 1].upper() + method_name[i + 2:] else: - return methodname.capitalize() + return method_name.capitalize() @as_unicode diff --git a/compile_definitions/xml_fields.py b/compile_definitions/xml_fields.py index 28dac5a68eb2bfd281d4cf4ae97b1cdd19a37f5f..c3c6808fd1670b96189cbc0384bbcbef71f23db5 100644 --- a/compile_definitions/xml_fields.py +++ b/compile_definitions/xml_fields.py @@ -12,17 +12,17 @@ def nop(x): return x -def _get_tagchild(elem, tag): +def get_tag_child(elem, tag): return [e for e in list(elem) if e.tag == tag] __all__ = [ - '_name', '_docs', '_ComputedField', '_ValueField', '_SimpleField', - '_docs_with_label', '_get_tagchild', '_ChildField' + '_name', '_docs', 'ComputedField', 'ValueField', 'SimpleField', + '_docs_with_label', 'get_tag_child', 'ChildField' ] -class _Field(object): +class BaseField(object): """Base field object""" def set(self, obj, elem): @@ -35,18 +35,18 @@ class _Field(object): raise NotImplementedError('abstract') -class _ComputedField(_Field): +class ComputedField(BaseField): """ There's no corresponding XML attribute name - value must be computed from element """ def __init__(self, field_name, find_fun): - super(_ComputedField, self).__init__(field_name) + super(ComputedField, self).__init__(field_name) self.find = find_fun -class _ValueField(_Field): +class ValueField(BaseField): """ Can hide under a pick of different XML attribute names. Has a type, can have a default value. @@ -74,26 +74,26 @@ class _ValueField(_Field): if self.default is _Required: raise TypeError( 'Did not find field %s in elem tag %s, looked for names %s' % ( - self.field_name, elem.tag, self.xml_names)) + self.field_name, elem.tag, self.xml_names)) else: return self.default -class _SimpleField(_ValueField): +class SimpleField(ValueField): """XML attribute is the same as name, has a type and can be default""" def __init__(self, name, field_type=nop, default=_Required): - super(_SimpleField, self).__init__(name, name, field_type, default) + super(SimpleField, self).__init__(name, name, field_type, default) -class _ChildField(_ComputedField): +class ChildField(ComputedField): """ List of other properties """ - def __init__(self, name, xml_tag, fun, postexec=nop): - super(_ChildField, self).__init__(name, lambda elem: \ - postexec([fun(c) for c in _get_tagchild(elem, xml_tag)])) + def __init__(self, name, xml_tag, fun, post_exec=nop): + super(ChildField, self).__init__(name, lambda elem: \ + post_exec([fun(c) for c in get_tag_child(elem, xml_tag)])) def get_docs(elem, label): @@ -110,6 +110,6 @@ def get_docs(elem, label): return elem.attrib.get('label', None) -_name = _SimpleField('name', six.text_type) -_docs = _ComputedField('docs', lambda elem: get_docs(elem, False)) -_docs_with_label = _ComputedField('docs', lambda elem: get_docs(elem, True)) +_name = SimpleField('name', six.text_type) +_docs = ComputedField('docs', lambda elem: get_docs(elem, False)) +_docs_with_label = ComputedField('docs', lambda elem: get_docs(elem, True)) diff --git a/compile_definitions/xml_tags.py b/compile_definitions/xml_tags.py index 00a28af97e1d9e7d6fae91e0a850216cc05d502a..c69b9e42b182e8199100964627c3605935784d69 100644 --- a/compile_definitions/xml_tags.py +++ b/compile_definitions/xml_tags.py @@ -3,7 +3,6 @@ from __future__ import print_function, absolute_import, division import copy import logging - import math from compile_definitions.xml_fields import * @@ -12,7 +11,7 @@ from coolamqp.framing.base import BASIC_TYPES, DYNAMIC_BASIC_TYPES logger = logging.getLogger(__name__) -def _boolint(x): +def bool_int(x): return bool(int(x)) @@ -44,8 +43,8 @@ class Constant(BaseObject): NAME = 'constant' FIELDS = [ _name, - _SimpleField('value', int), - _ValueField('class', 'kind', default=''), + SimpleField('value', int), + ValueField('class', 'kind', default=''), _docs, ] @@ -54,10 +53,10 @@ class Field(BaseObject): NAME = 'field' FIELDS = [ _name, - _ValueField(('domain', 'type'), 'type', str), - _SimpleField('label', default=None), - _SimpleField('reserved', _boolint, default=0), - _ComputedField('basic_type', lambda elem: elem.attrib.get('type', + ValueField(('domain', 'type'), 'type', str), + SimpleField('label', default=None), + SimpleField('reserved', bool_int, default=0), + ComputedField('basic_type', lambda elem: elem.attrib.get('type', '') == elem.attrib.get( 'name', '')), _docs @@ -68,9 +67,9 @@ class Domain(BaseObject): NAME = 'domain' FIELDS = [ _name, - _SimpleField('type'), - _ComputedField('elementary', - lambda a: a.attrib['type'] == a.attrib['name']) + SimpleField('type'), + ComputedField('elementary', + lambda a: a.attrib['type'] == a.attrib['name']) ] @@ -78,20 +77,20 @@ class Method(BaseObject): NAME = 'method' FIELDS = [ _name, - _SimpleField('synchronous', _boolint, default=False), - _SimpleField('index', int), - _SimpleField('label', default=None), + SimpleField('synchronous', bool_int, default=False), + SimpleField('index', int), + SimpleField('label', default=None), _docs, - _ChildField('fields', 'field', Field), - _ChildField('response', 'response', lambda e: e.attrib['name']), - _ChildField('sent_by_client', 'chassis', - lambda e: e.attrib.get('name', '') == 'client', - postexec=any), - _ChildField('sent_by_server', 'chassis', - lambda e: e.attrib.get('name', '') == 'server', - postexec=any), - _ChildField('constant', 'field', lambda e: Field(e).reserved, - postexec=all) + ChildField('fields', 'field', Field), + ChildField('response', 'response', lambda e: e.attrib['name']), + ChildField('sent_by_client', 'chassis', + lambda e: e.attrib.get('name', '') == 'client', + post_exec=any), + ChildField('sent_by_server', 'chassis', + lambda e: e.attrib.get('name', '') == 'server', + post_exec=any), + ChildField('constant', 'field', lambda e: Field(e).reserved, + post_exec=all) ] def get_static_body(self): # only arguments part @@ -122,9 +121,9 @@ class Class(BaseObject): NAME = 'class' FIELDS = [ _name, - _SimpleField('index', int), + SimpleField('index', int), _docs_with_label, - _ChildField('methods', 'method', Method, postexec= \ + ChildField('methods', 'method', Method, post_exec= \ _cls_method_postexec), - _ChildField('properties', 'field', Field) + ChildField('properties', 'field', Field) ] diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index ce2fbe334830d0ea2f9721d64990d01eb109d465..2cecd627e5bcf40780fa1e9dc70f0dbd0d8bbc9f 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,2 +1,2 @@ # coding=UTF-8 -__version__ = '1.0.1_a1' +__version__ = '1.0.1' diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index b25a371b8ee409d06e2ec3016d6d9333626493d0..3338e0a30b1cfca0bfd04a363dd36e49299b709a 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -63,7 +63,7 @@ class Channeler(Attache): but ordering it to do anything is pointless, because it will not get done until attach() with new connection is called. """ - __slots__ = ('channel_id', ) + __slots__ = ('channel_id',) def __init__(self): """ @@ -140,7 +140,7 @@ class Channeler(Attache): self.state = ST_OFFLINE if not isinstance(payload, (ChannelClose, ChannelCloseOk)) and ( - payload is not None): + payload is not None): # I do not know how to handle that! return @@ -190,6 +190,15 @@ class Channeler(Attache): """ self.methods([payload]) + def watch_for_method(self, method, callback, on_fail=None): # type: () -> MethodWatch + """ + Syntactic sugar for + + >>> self.connection.watch_for_method(self.channel_id, method, callback, on_fail=on_fail) + """ + assert self.channel_id is not None + return self.connection.watch_for_method(self.channel_id, method, callback, on_fail=on_fail) + def method_and_watch(self, method_payload, method_classes_to_watch, callable): # type: (coolamqp.framing.base.AMQPMethodPayload, @@ -197,10 +206,10 @@ class Channeler(Attache): """ Syntactic sugar for - self.connection.method_and_watch(self.channel_id, - method_payload, - method_classes_to_watch, - callable) + >>> self.connection.method_and_watch(self.channel_id, + >>> method_payload, + >>> method_classes_to_watch, + >>> callable) """ assert self.channel_id is not None self.connection.method_and_watch(self.channel_id, method_payload, diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index 7d6e95dec7275c7edf1e8ec3e3b0def77e144b33..71891f05f64c598471c2d6f69e1b5d4d7b94e1af 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -237,7 +237,7 @@ class Declarer(Channeler, Synchronized): To be called when it's possible that something can be done """ if (self.state != ST_ONLINE) or len(self.left_to_declare) == 0 or ( - self.in_process is not None): + self.in_process is not None): return self.in_process = self.left_to_declare.popleft() diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 9f377822e3f38e9526cd4f7f617b16ed83d905e4..704d3d958fcd64eff2754c12ce6dafacba538738 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -24,7 +24,7 @@ from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, \ try: # these extensions will be available from coolamqp.framing.definitions import ConfirmSelect, ConfirmSelectOk, \ - BasicNack + BasicNack, ChannelFlow, ChannelFlowOk, ConnectionBlocked, ConnectionUnblocked except ImportError: pass @@ -96,12 +96,37 @@ class Publisher(Channeler, Synchronized): self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB self.critically_failed = False + self.content_flow = True + self.blocked = False + self.frames_to_send = [] @Synchronized.synchronized def attach(self, connection): Channeler.attach(self, connection) connection.watch(FailWatch(self.on_fail)) + def on_connection_blocked(self, payload): + if isinstance(payload, ConnectionBlocked): + self.blocked = True + elif isinstance(payload, ConnectionUnblocked): + self.blocked = False + + if self.content_flow: + self.connection.send(self.frames_to_send) + self.frames_to_send = [] + + def on_flow_control(self, payload): + """Called on ChannelFlow""" + assert isinstance(ChannelFlow, payload) + + self.content_flow = payload.active + self.connection.send([AMQPMethodFrame(self.channel_id, + ChannelFlowOk(payload.active))]) + + if payload.active and not self.blocked: + self.connection.send(self.frames_to_send) + self.frames_to_send = [] + @Synchronized.synchronized def on_fail(self): self.state = ST_OFFLINE @@ -122,23 +147,35 @@ class Publisher(Channeler, Synchronized): bodies = [] body = memoryview(message.body) - max_body_size = self.connection.frame_max - AMQPBodyFrame.FRAME_SIZE_WITHOUT_PAYLOAD + max_body_size = self.connection.frame_max - AMQPBodyFrame.FRAME_SIZE_WITHOUT_PAYLOAD - 16 while len(body) > 0: bodies.append(body[:max_body_size]) body = body[max_body_size:] - self.connection.send([ - AMQPMethodFrame(self.channel_id, - BasicPublish(exchange_name, routing_key, False, - False)), - AMQPHeaderFrame(self.channel_id, Basic.INDEX, 0, len(message.body), - message.properties) - ]) + frames_to_send = [AMQPMethodFrame(self.channel_id, + BasicPublish(exchange_name, routing_key, False, False)), + AMQPHeaderFrame(self.channel_id, Basic.INDEX, 0, len(message.body), + message.properties)] + + if len(bodies) == 1: + frames_to_send.append(AMQPBodyFrame(self.channel_id, bodies[0])) - # todo optimize it - if there's only one frame it can with previous send - # no frames will be sent if body.length == 0 - for body in bodies: - self.connection.send([AMQPBodyFrame(self.channel_id, body)]) + if self.content_flow and not self.blocked: + self.connection.send(frames_to_send) + + if len(bodies) > 1: + while self.content_flow and not self.blocked and len(bodies) > 0: + self.connection.send([AMQPBodyFrame(self.channel_id, bodies[0])]) + del bodies[0] + + if not self.content_flow and not self.blocked and len(bodies) > 0: + for body in bodies: + self.frames_to_send.append(AMQPBodyFrame(self.channel_id, body)) + else: + self.frames_to_send.extend(frames_to_send) + if len(bodies) > 1: + for body in bodies: + self.frames_to_send.append(AMQPBodyFrame(self.channel_id, body)) def _mode_cnpub_process_deliveries(self): """ @@ -254,6 +291,12 @@ class Publisher(Channeler, Synchronized): if isinstance(payload, ChannelOpenOk): # Ok, if this has a mode different from MODE_NOACK, we need to additionally set up # the functionality. + mw = self.watch_for_method(ChannelFlow, self.on_flow_control) + mw.oneshot = False + + mw = self.connection.watch_for_method(0, (ConnectionBlocked, ConnectionUnblocked), + self.on_connection_blocked) + mw.oneshot = False if self.mode == Publisher.MODE_CNPUB: self.method_and_watch(ConfirmSelect(False), ConfirmSelectOk, diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py index 36ca346d11615aa0433bd52069533378991260e8..3243a5b7766f308cf0890b713886ddb9fae4a2c1 100644 --- a/coolamqp/attaches/utils.py +++ b/coolamqp/attaches/utils.py @@ -33,7 +33,7 @@ class FutureConfirmableRejectable(ConfirmableRejectable): A ConfirmableRejectable that can result a future (with None), or Exception it with a message """ - __slots__ = ('future', ) + __slots__ = ('future',) def __init__(self, future): # type: (concurrent.futures.Future) -> None self.future = future diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 201384c1a8ada0bea0297ace9a1a25104c5f8516..d19baf5ea4e5430b67d7bc3051870570b72c798f 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -6,12 +6,12 @@ from __future__ import print_function, absolute_import, division import logging import time +import typing as tp import warnings +from concurrent.futures import Future import monotonic import six -import typing as tp -from concurrent.futures import Future from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ @@ -44,17 +44,20 @@ class Cluster(object): :param log_frames: an object that supports logging each and every frame CoolAMQP sends and receives from the broker :param name: name to appear in log items and prctl() for the listener thread + :param on_blocked: callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be + called with a value of True if connection becomes blocked, and False upon an unblock """ # Events you can be informed about ST_LINK_LOST = 0 # Link has been lost ST_LINK_REGAINED = 1 # Link has been regained - def __init__(self, nodes, # type: tp.Union[NodeDefinition, tp.List[NodeDefinition]] + def __init__(self, nodes, # type: tp.Union[NodeDefinition, tp.List[NodeDefinition]] on_fail=None, # type: tp.Optional[tp.Callable[[], None]] extra_properties=None, # type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]] - log_frames=None, # type: tp.Optional[FrameLogger] - name=None # type: tp.Optional[str] + log_frames=None, # type: tp.Optional[FrameLogger] + name=None, # type: tp.Optional[str] + on_blocked=None # type: tp.Callable[[bool], None] ): from coolamqp.objects import NodeDefinition if isinstance(nodes, NodeDefinition): @@ -67,6 +70,7 @@ class Cluster(object): self.node, = nodes self.extra_properties = extra_properties self.log_frames = log_frames + self.on_blocked = on_blocked if on_fail is not None: def decorated(): @@ -78,8 +82,8 @@ class Cluster(object): self.on_fail = None def declare(self, obj, # type: tp.Union[Queue, Exchange] - persistent=False # type: bool - ): # type: (...) -> concurrent.futures.Future + persistent=False # type: bool + ): # type: (...) -> concurrent.futures.Future """ Declare a Queue/Exchange @@ -140,9 +144,9 @@ class Cluster(object): def publish(self, message, # type: Message exchange=None, # type: tp.Union[Exchange, str, bytes] - routing_key=u'', # type: tp.Union[str, bytes] - tx=None, # type: tp.Optional[bool] - confirm=None # type: tp.Optional[bool] + routing_key=u'', # type: tp.Union[str, bytes] + tx=None, # type: tp.Optional[bool] + confirm=None # type: tp.Optional[bool] ): # type: (...) -> tp.Optional[Future] """ Publish a message. @@ -222,6 +226,9 @@ class Cluster(object): if self.on_fail is not None: self.snr.on_fail.add(self.on_fail) + if self.on_blocked is not None: + self.snr.on_blocked.add(self.on_blocked) + # Spawn a transactional publisher and a noack publisher self.pub_tr = Publisher(Publisher.MODE_CNPUB) self.pub_na = Publisher(Publisher.MODE_NOACK) diff --git a/coolamqp/clustering/events.py b/coolamqp/clustering/events.py index 010ec2cc6ea098fe991573a628e147773e546bcc..b57d99f78fc9aa579fdeaa5331df8d5c6d0c614d 100644 --- a/coolamqp/clustering/events.py +++ b/coolamqp/clustering/events.py @@ -48,7 +48,7 @@ class MessageReceived(ReceivedMessage, Event): """ __slots__ = () - def __init__(self, msg): # type: (ReceivedMessage) -> None + def __init__(self, msg): # type: (ReceivedMessage) -> None """:type msg: ReceivedMessage""" ReceivedMessage.__init__(self, msg.body, msg.exchange_name, msg.routing_key, diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index 8922b39061d7745da2f0a1cd73ddd1f66a834e69..5757f7d20d1b216af9cb164f66cfc45353236743 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -3,8 +3,10 @@ from __future__ import print_function, absolute_import, division import logging +from coolamqp.framing.definitions import ConnectionUnblocked, ConnectionBlocked from coolamqp.objects import Callable from coolamqp.uplink import Connection +from coolamqp.uplink.connection import MethodWatch logger = logging.getLogger(__name__) @@ -14,11 +16,11 @@ class SingleNodeReconnector(object): Connection to one node. It will do it's best to remain alive. """ - def __init__(self, node_def, # type: coolamqp.objects.NodeDefinition - attache_group, # type: coolamqp.attaches.AttacheGroup - listener_thread, # type: coolamqp.uplink.ListenerThread + def __init__(self, node_def, # type: coolamqp.objects.NodeDefinition + attache_group, # type: coolamqp.attaches.AttacheGroup + listener_thread, # type: coolamqp.uplink.ListenerThread extra_properties=None, # type: tp.Dict[bytes, tp.Tuple[tp.Any, str]] - log_frames=None, # type: tp.Callable[] + log_frames=None, # type: tp.Callable[] name=None): self.listener_thread = listener_thread self.node_def = node_def @@ -31,7 +33,7 @@ class SingleNodeReconnector(object): self.terminating = False self.on_fail = Callable() #: public - + self.on_blocked = Callable() #: public self.on_fail.add(self._on_fail) def is_connected(self): # type: () -> bool @@ -49,6 +51,15 @@ class SingleNodeReconnector(object): self.connection.start(timeout) self.connection.finalize.add(self.on_fail) + # Register the on-blocking watches + mw = MethodWatch(0, (ConnectionBlocked,), lambda: self.on_blocked(True)) + mw.oneshot = False + self.connection.watch(mw) + + mw = MethodWatch(0, (ConnectionUnblocked,), lambda: self.on_blocked(False)) + mw.oneshot = False + self.connection.watch(mw) + def _on_fail(self): if self.terminating: return diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index 043edf315a8db61f6625b95cf1477304acb4f884..6c4307ca50643ff461fd740528e8c17a82c94978 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -5,7 +5,6 @@ import typing as tp AMQP_HELLO_HEADER = b'AMQP\x00\x00\x09\x01' - # name => (length|None, struct ID|None, reserved-field-value : for struct if structable, bytes else, length of default) BASIC_TYPES = {u'bit': (None, None, "0", None), # special case u'octet': (1, 'B', "b'\\x00'", 1), diff --git a/coolamqp/framing/compilation/content_property.py b/coolamqp/framing/compilation/content_property.py index 5c9dedbb0af3d111c99ea91d973b873336b7a9a7..3d76107795aac7eccba3c8b06b5b6c413106bb75 100644 --- a/coolamqp/framing/compilation/content_property.py +++ b/coolamqp/framing/compilation/content_property.py @@ -143,7 +143,7 @@ def _compile_particular_content_property_list_class(zpf, fields): return u''.join(mod), structers -STRUCTERS_FOR_NOW = {} # type: tp.Dict[str, struct.Struct] +STRUCTERS_FOR_NOW = {} # type: tp.Dict[str, struct.Struct] def compile_particular_content_property_list_class(zpf, fields): @@ -158,8 +158,8 @@ def compile_particular_content_property_list_class(zpf, fields): if structer not in STRUCTERS_FOR_NOW: STRUCTERS_FOR_NOW[structer] = struct.Struct('!%s' % (structer,)) - locals_['STRUCT_%s' % (structer, )] = STRUCTERS_FOR_NOW[structer] + locals_['STRUCT_%s' % (structer,)] = STRUCTERS_FOR_NOW[structer] loc = dict(globals(), **locals_) - exec (q, loc) + exec(q, loc) return loc['ParticularContentTypeList'] diff --git a/coolamqp/framing/compilation/textcode_fields.py b/coolamqp/framing/compilation/textcode_fields.py index 9a18a824a9fa9efddc3ad78d18a74e7535be7868..0d5ea0fd3b316bf5e6b050d7d0a1dfa2d6295243 100644 --- a/coolamqp/framing/compilation/textcode_fields.py +++ b/coolamqp/framing/compilation/textcode_fields.py @@ -69,7 +69,7 @@ def get_counter(fields, prefix=u'', indent_level=2): u' + '.join([str(accumulator)] + parts)) + u'\n' - # type: (...) -> tp.Tuple[str, dict] +# type: (...) -> tp.Tuple[str, dict] def get_from_buffer(fields, prefix='', indent_level=2, remark=False): """ Emit code that collects values from buf:offset, updating offset as progressing. @@ -112,7 +112,7 @@ def get_from_buffer(fields, prefix='', indent_level=2, remark=False): del bits[:] - def emit_structures(dont_do_bits=False): # type: (bool) -> dict + def emit_structures(dont_do_bits=False): # type: (bool) -> dict if not dont_do_bits: emit_bits() if len(to_struct) == 0: @@ -180,7 +180,7 @@ def get_from_buffer(fields, prefix='', indent_level=2, remark=False): return u''.join(code), structers -def get_serializer(fields, prefix='', indent_level=2): # type: (list, str) -> str, dict +def get_serializer(fields, prefix='', indent_level=2): # type: (list, str) -> str, dict """ Emit code that serializes the fields into buf at offset diff --git a/coolamqp/framing/compilation/utilities.py b/coolamqp/framing/compilation/utilities.py index c77e2779cbc54b1175e2b03dde879fef36b984b6..cb1557e8f1e8e656c259cc7576bb4bdcf6dddecd 100644 --- a/coolamqp/framing/compilation/utilities.py +++ b/coolamqp/framing/compilation/utilities.py @@ -7,16 +7,15 @@ import six # docs may be None -def as_unicode(callable): +def as_unicode(clbl): def roll(*args, **kwargs): - return six.text_type(callable(*args, **kwargs)) + return six.text_type(clbl(*args, **kwargs)) return roll + @as_unicode def format_field_name(field): if field in (u'global', u'type'): field = field + '_' return field.replace('-', '_') - - diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index a82197f22e4387d67b23eac60467e0bc41a2554c..f905b8a7303077e830f1a6c19c095126d1a7b080 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -1,5 +1,6 @@ # coding=UTF-8 from __future__ import print_function, absolute_import + """ A Python version of the AMQP machine-readable specification. @@ -209,7 +210,7 @@ class ConnectionBlocked(AMQPMethodPayload): :type reason: binary type (max length 255) (shortstr in AMQP) """ - __slots__ = (u'reason', ) + __slots__ = (u'reason',) NAME = u'connection.blocked' @@ -404,7 +405,7 @@ class ConnectionOpen(AMQPMethodPayload): The name of the virtual host to work with. :type virtual_host: binary type (max length 255) (path in AMQP) """ - __slots__ = (u'virtual_host', ) + __slots__ = (u'virtual_host',) NAME = u'connection.open' @@ -649,7 +650,7 @@ class ConnectionSecure(AMQPMethodPayload): mechanism. :type challenge: binary type (longstr in AMQP) """ - __slots__ = (u'challenge', ) + __slots__ = (u'challenge',) NAME = u'connection.secure' @@ -827,7 +828,7 @@ class ConnectionSecureOk(AMQPMethodPayload): data are defined by the SASL security mechanism. :type response: binary type (longstr in AMQP) """ - __slots__ = (u'response', ) + __slots__ = (u'response',) NAME = u'connection.secure-ok' @@ -1080,7 +1081,7 @@ class ConnectionUnblocked(AMQPMethodPayload): @classmethod def from_buffer( - cls, buf, start_offset): # type: (buffer, int) -> ConnectionUnblocked + cls, buf, start_offset): # type: (buffer, int) -> ConnectionUnblocked offset = start_offset return cls() @@ -1244,7 +1245,7 @@ class ChannelFlow(AMQPMethodPayload): content frames. :type active: bool (bit in AMQP) """ - __slots__ = (u'active', ) + __slots__ = (u'active',) NAME = u'channel.flow' @@ -1306,7 +1307,7 @@ class ChannelFlowOk(AMQPMethodPayload): not. :type active: bool (bit in AMQP) """ - __slots__ = (u'active', ) + __slots__ = (u'active',) NAME = u'channel.flow-ok' @@ -2589,7 +2590,7 @@ class QueueDeleteOk(AMQPMethodPayload): :param message_count: Reports the number of messages deleted. :type message_count: int, 32 bit unsigned (message-count in AMQP) """ - __slots__ = (u'message_count', ) + __slots__ = (u'message_count',) NAME = u'queue.delete-ok' @@ -2719,7 +2720,7 @@ class QueuePurgeOk(AMQPMethodPayload): :param message_count: Reports the number of messages purged. :type message_count: int, 32 bit unsigned (message-count in AMQP) """ - __slots__ = (u'message_count', ) + __slots__ = (u'message_count',) NAME = u'queue.purge-ok' @@ -3047,7 +3048,7 @@ class BasicContentPropertyList(AMQPContentPropertyList): while buf[offset + pfl - 1] & 1: pfl += 2 zpf = BasicContentPropertyList.zero_property_flags(buf[offset:offset + - pfl]).tobytes() + pfl]).tobytes() if zpf in BasicContentPropertyList.PARTICULAR_CLASSES: return BasicContentPropertyList.PARTICULAR_CLASSES[ zpf].from_buffer(buf, offset) @@ -3363,7 +3364,7 @@ class BasicConsumeOk(AMQPMethodPayload): by the server. :type consumer_tag: binary type (max length 255) (consumer-tag in AMQP) """ - __slots__ = (u'consumer_tag', ) + __slots__ = (u'consumer_tag',) NAME = u'basic.consume-ok' @@ -3421,7 +3422,7 @@ class BasicCancelOk(AMQPMethodPayload): :type consumer_tag: binary type (max length 255) (consumer-tag in AMQP) """ - __slots__ = (u'consumer_tag', ) + __slots__ = (u'consumer_tag',) NAME = u'basic.cancel-ok' @@ -4361,7 +4362,7 @@ class BasicRecoverAsync(AMQPMethodPayload): potentially then delivering it to an alternative subscriber. :type requeue: bool (bit in AMQP) """ - __slots__ = (u'requeue', ) + __slots__ = (u'requeue',) NAME = u'basic.recover-async' @@ -4427,7 +4428,7 @@ class BasicRecover(AMQPMethodPayload): potentially then delivering it to an alternative subscriber. :type requeue: bool (bit in AMQP) """ - __slots__ = (u'requeue', ) + __slots__ = (u'requeue',) NAME = u'basic.recover' @@ -4799,7 +4800,7 @@ class ConfirmSelect(AMQPMethodPayload): method it will raise a channel or connection exception. :type nowait: bool (bit in AMQP) """ - __slots__ = (u'nowait', ) + __slots__ = (u'nowait',) NAME = u'confirm.select' diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index 9705606bdf9f1ba67bf67694a181db2f6eabce10..f8f3590c409d76f9e5ee3aa5d51f8e7e9fcc526e 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -42,7 +42,7 @@ class AMQPMethodFrame(AMQPFrame): buf.write(self.payload.STATIC_CONTENT) else: buf.write(STRUCT_BHL.pack(FRAME_METHOD, self.channel, - 4 + self.payload.get_size())) + 4 + self.payload.get_size())) buf.write(self.payload.BINARY_HEADER) self.payload.write_arguments(buf) buf.write(FRAME_END_BYTE) @@ -87,9 +87,9 @@ class AMQPHeaderFrame(AMQPFrame): def write_to(self, buf): buf.write(STRUCT_BHLHHQ.pack(FRAME_HEADER, self.channel, - 12 + self.properties.get_size(), self.class_id, - 0, - self.body_size)) + 12 + self.properties.get_size(), self.class_id, + 0, + self.body_size)) self.properties.write_to(buf) buf.write(FRAME_END_BYTE) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 2eeaffe11f099476e75f266baf2397860047116c..aa9fff321eb321ee49b4be85e848d96470f86132 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -2,15 +2,15 @@ """ Core objects used in CoolAMQP """ -import typing as tp import logging +import typing as tp import uuid import six +from coolamqp.framing.base import AMQPFrame from coolamqp.framing.definitions import \ BasicContentPropertyList as MessageProperties -from coolamqp.framing.base import AMQPFrame logger = logging.getLogger(__name__) @@ -35,9 +35,9 @@ def tobytes(q): class FrameLogger(Protocol): - def on_frame(self, timestamp, # type: float - frame, # type: AMQPFrame - direction # type: str + def on_frame(self, timestamp, # type: float + frame, # type: AMQPFrame + direction # type: str ): """ Log a frame @@ -79,27 +79,29 @@ class Message(object): Properties is a highly regularized class - see coolamqp.framing.definitions.BasicContentPropertyList for a list of possible properties. + + :param body: stream of octets + :type body: anything with a buffer interface + :param properties: AMQP properties to be sent along. + default is 'no properties at all' + You can pass a dict - it will be passed to + MessageProperties, + but it's slow - don't do that. + :type properties: MessageProperties instance, None or a dict (SLOW!) + """ __slots__ = ('body', 'properties') Properties = MessageProperties # an alias for easier use - def __init__(self, body, # type: bytes - properties=None): + def __init__(self, body, # type: bytes + properties=None # type: tp.Optional[MessageProperties] + ): """ Create a Message object. Please take care with passing empty bodies, as py-amqp has some failure on it. - - :param body: stream of octets - :type body: anything with a buffer interface - :param properties: AMQP properties to be sent along. - default is 'no properties at all' - You can pass a dict - it will be passed to - MessageProperties, - but it's slow - don't do that. - :type properties: MessageProperties instance, None or a dict (SLOW!) """ if isinstance(body, six.text_type): raise TypeError(u'body cannot be a text type!') @@ -128,15 +130,15 @@ class ReceivedMessage(Message): Note that if the consumer that generated this message was no_ack, .ack() and .nack() are no-ops. """ - __slots__ = ('delivery_tag', 'exchange_name', 'routing_key', 'ack', 'nack') + __slots__ = ('delivery_tag', 'exchange_name', 'routing_key', '_ack', '_nack') - def __init__(self, body, # type: tp.Union[str, bytes, bytearray, tp.List[memoryview]] - exchange_name, # type: memoryview - routing_key, # type: memoryview + def __init__(self, body, # type: tp.Union[str, bytes, bytearray, tp.List[memoryview]] + exchange_name, # type: memoryview + routing_key, # type: memoryview properties=None, delivery_tag=None, # type: int - ack=None, # type: tp.Callable[[], None] - nack=None # type: tp.Callable[[], None] + ack=None, # type: tp.Callable[[], None] + nack=None # type: tp.Callable[[], None] ): """ :param body: message body. A stream of octets. @@ -161,8 +163,25 @@ class ReceivedMessage(Message): self.exchange_name = exchange_name self.routing_key = routing_key - self.ack = ack or LAMBDA_NONE - self.nack = nack or LAMBDA_NONE + self._ack = ack or LAMBDA_NONE + self._nack = nack or LAMBDA_NONE + + def ack(self): + """ + Acknowledge reception of this message. + + This is a no-op if a Consumer was called with no_ack=True + """ + self._ack() + + def nack(self): + """ + Negatively acknowledge reception of this message. + + This is a no-op if a Consumer was called with no_ack=True. If no_ack was False, + the message will be requeued and redelivered by the broker + """ + self._nack() class Exchange(object): @@ -174,9 +193,9 @@ class Exchange(object): direct = None # the direct exchange - def __init__(self, name=u'', # type: tp.Union[str, bytes] - type=b'direct', # type: tp.Union[str, bytes] - durable=True, # type: bool + def __init__(self, name=u'', # type: tp.Union[str, bytes] + type=b'direct', # type: tp.Union[str, bytes] + durable=True, # type: bool auto_delete=False # type: bool ): """ @@ -213,10 +232,10 @@ class Queue(object): __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', 'anonymous', 'consumer_tag') - def __init__(self, name=b'', # type: tp.Union[str, bytes] - durable=False, # type: bool - exchange=None, # type: tp.Optional[Exchange] - exclusive=False, # type: bool + def __init__(self, name=b'', # type: tp.Union[str, bytes] + durable=False, # type: bool + exchange=None, # type: tp.Optional[Exchange] + exclusive=False, # type: bool auto_delete=False # type: bool ): """ diff --git a/coolamqp/uplink/connection/__init__.py b/coolamqp/uplink/connection/__init__.py index 47cdf5eafbaa64c6704c8e9c53d02a4598f190cb..8a07f01dcff33f918aa859559d29255aec4a4e9f 100644 --- a/coolamqp/uplink/connection/__init__.py +++ b/coolamqp/uplink/connection/__init__.py @@ -12,7 +12,7 @@ Connection is something that can: from __future__ import absolute_import, division, print_function from coolamqp.uplink.connection.connection import Connection -from coolamqp.uplink.connection.watches import FailWatch, Watch, \ - HeaderOrBodyWatch, MethodWatch, AnyWatch from coolamqp.uplink.connection.states import ST_OFFLINE, ST_CONNECTING, \ ST_ONLINE +from coolamqp.uplink.connection.watches import FailWatch, Watch, \ + HeaderOrBodyWatch, MethodWatch, AnyWatch diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index c5fec4920e5d1cf06b8d082d4bc14f0bce96f860..5a7e0f0c81519cdb5916a5de63da1422dc284618 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -5,8 +5,9 @@ import collections import logging import socket import time -import uuid import typing as tp +import uuid + import monotonic from coolamqp.exceptions import ConnectionDead @@ -55,7 +56,7 @@ def alert_watches(watches, trigger): alive_watches.append(watch) if set(alive_watches) != set(watches): - for removed_watch in set(watches)-set(alive_watches): + for removed_watch in set(watches) - set(alive_watches): logger.debug('Removing watch %s', repr(removed_watch)) return alive_watches, watch_handled @@ -83,7 +84,7 @@ class Connection(object): def __init__(self, node_definition, # type: coolamqp.objects.NodeDefinition listener_thread, extra_properties, # type: tp.Dict[bytes, tp.Tuple[tp.Any, str]] log_frames=None, - name=None + name=None ): """ Create an object that links to an AMQP broker. @@ -353,25 +354,32 @@ class Connection(object): else: self.watches[watch.channel].append(watch) - def watch_for_method(self, channel, method, callback, on_fail=None): - # type: (int, AMQPMethodPayload, tp.Callable[[AMQPMethodPayload], None], - # tp.Optional[tp.Callable[[AMQPMethodPayload], None]]) -> MethodWatch + def watch_for_method(self, channel, # type: int + method, # type: AMQPMethodPayload + callback, # type: tp.Callable[[AMQPMethodPayload], None] + on_fail=None # type: tp.Callable[[], None] + ): # type: (...) -> MethodWatch """ :param channel: channel to monitor :param method: AMQPMethodPayload class or tuple of AMQPMethodPayload classes :param callback: callable(AMQPMethodPayload instance) + :param on_fail: callable/0 to call when this connection fails """ mw = MethodWatch(channel, method, callback, on_end=on_fail) self.watch(mw) return mw - def method_and_watch(self, channel_id, method_payload, method_or_methods, - callback): + def method_and_watch(self, channel_id, # type: int + method_payload, # type: AMQPMethodPayload + method_or_methods, # type: tp.List[tp.type[AMQPMethodPayload]] + callback # type: tp.Callable[[AMQPMethodPayload], None] + ): # type: (...) -> MethodWatch """ A syntactic sugar for .watch_for_method(channel_id, method_or_methdods, callback) .send([AMQPMethodFrame(channel_id, method_payload)]) """ - self.watch_for_method(channel_id, method_or_methods, callback) + watch = self.watch_for_method(channel_id, method_or_methods, callback) self.send([AMQPMethodFrame(channel_id, method_payload)]) + return watch diff --git a/coolamqp/uplink/connection/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py index afee76b57738d511a2d54e55f6a70ee76fa630f0..cd35ec6bf8f18382f477e280ecf9534c5d25cb1a 100644 --- a/coolamqp/uplink/connection/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -105,7 +105,7 @@ class ReceivingFramer(object): # state rule 2 elif (self.frame_type == FRAME_HEARTBEAT) and ( - self.total_data_len >= AMQPHeartbeatFrame.LENGTH - 1): + self.total_data_len >= AMQPHeartbeatFrame.LENGTH - 1): data = b'' while len(data) < AMQPHeartbeatFrame.LENGTH - 1: data = data + self._extract( @@ -122,9 +122,9 @@ class ReceivingFramer(object): # state rule 3 elif (self.frame_type != FRAME_HEARTBEAT) and ( - self.frame_type is not None) and ( - self.frame_size is None) and ( - self.total_data_len > 6): + self.frame_type is not None) and ( + self.frame_size is None) and ( + self.total_data_len > 6): hdr = b'' while len(hdr) < 6: hdr = hdr + self._extract(6 - len(hdr)).tobytes() @@ -135,7 +135,7 @@ class ReceivingFramer(object): # state rule 4 elif (self.frame_size is not None) and ( - self.total_data_len >= (self.frame_size + 1)): + self.total_data_len >= (self.frame_size + 1)): if len(self.chunks[0]) >= self.frame_size: # We can subslice it - it's very fast diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py index 33f6eae2ffce2214ff145b8c3652e7ec9bac32f4..09c12ad7f40c5d48186f941a6df9a1f3d909434d 100644 --- a/coolamqp/uplink/connection/watches.py +++ b/coolamqp/uplink/connection/watches.py @@ -1,10 +1,11 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function + import logging +from coolamqp.framing.base import AMQPMethodPayload from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeaderFrame, \ AMQPBodyFrame -from coolamqp.framing.base import AMQPMethodPayload logger = logging.getLogger(__name__) @@ -12,6 +13,8 @@ logger = logging.getLogger(__name__) class Watch(object): """ A watch is placed per-channel, to listen for a particular frame. + + Multiple watches can be registered to listen for a single frame. All watches will be fired then. """ class CancelMe(Exception): @@ -128,7 +131,7 @@ class MethodWatch(Watch): if isinstance(method_or_methods, (list, tuple)): self.methods = tuple(method_or_methods) elif issubclass(method_or_methods, AMQPMethodPayload): - self.methods = (method_or_methods, ) + self.methods = (method_or_methods,) self.on_end = on_end def __repr__(self): diff --git a/coolamqp/uplink/listener/socket.py b/coolamqp/uplink/listener/socket.py index bb95f5284935cee1e843fa650e0215f44b09464a..f64bc3ffa67747cb6f402f6291d43fafdef2efb1 100644 --- a/coolamqp/uplink/listener/socket.py +++ b/coolamqp/uplink/listener/socket.py @@ -2,8 +2,8 @@ from __future__ import absolute_import, division, print_function import collections -import socket import logging +import socket logger = logging.getLogger(__name__) diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py index c7e03a5de12248c496664c8fea2dc6c309509701..0feea2a2d23ec9272211944f27788fb6af83b81a 100644 --- a/coolamqp/uplink/listener/thread.py +++ b/coolamqp/uplink/listener/thread.py @@ -15,12 +15,13 @@ class ListenerThread(threading.Thread): It automatically picks the best listener for given platform. """ - def __init__(self, name=None): # type: (tp.Optional[str]) -> None + def __init__(self, name=None): # type: (tp.Optional[str]) threading.Thread.__init__(self, name='coolamqp/ListenerThread') self.daemon = True self.name = name or 'CoolAMQP' self.terminating = False self._call_next_io_event = Callable(oneshots=True) + self.listener = None def call_next_io_event(self, callable): """ @@ -49,14 +50,16 @@ class ListenerThread(threading.Thread): except ImportError: pass else: - prctl.set_name(self.name+' - AMQP listener thread') + prctl.set_name(self.name + ' - AMQP listener thread') while not self.terminating: self.listener.wait(timeout=1) self.listener.shutdown() - def register(self, sock, on_read=lambda data: None, on_fail=lambda: None): + def register(self, sock, # type: socket.socket + on_read=lambda data: None, # type: tp.Callable[[bytes], None] + on_fail=lambda: None): # type: tp.Callable[[], None] """ Add a socket to be listened for by the loop. diff --git a/docs/basics.rst b/docs/basics.rst new file mode 100644 index 0000000000000000000000000000000000000000..5f80ace9385f7d00a28519559c4a731bfd1de55c --- /dev/null +++ b/docs/basics.rst @@ -0,0 +1,20 @@ +Usage basics +============ + +First off, you need a Cluster object: + +.. autoclass:: coolamqp.clustering.Cluster + :members: + +You will need to initialize it with NodeDefinitions: + +.. autoclass:: coolamqp.objects.NodeDefinition + +You can send messages: + +.. autoclass:: coolamqp.objects.Message + +and receive them + +.. autoclass:: coolamqp.objects.ReceivedMessage + :members: diff --git a/docs/index.rst b/docs/index.rst index e79a4a5c9f71a692a7bcff501fcc9642a3ffff99..8c37645d7214e475f619cdc7eff8109f5546324f 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -8,6 +8,7 @@ Welcome to CoolAMQP's documentation! tutorial caveats frames + basics Indices and tables ================== diff --git a/setup.cfg b/setup.cfg index ac0565a5e5a8ff71c0e5bdd9271158a211fcb7f9..08eaa44e60541ddb2e512eb783902765966250f3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,7 +3,7 @@ name = CoolAMQP long-description = file: README.md long-description-content-type = text/markdown; charset=UTF-8 license = MIT License -classifiers = +classifiers = Programming Language :: Python Programming Language :: Python :: 2.7 Programming Language :: Python :: 3.5 @@ -25,10 +25,10 @@ platforms = posix [pycodestyle] -max-line-length=80 +max-line-length = 80 [bdist_wheel] -universal=1 +universal = 1 [isort] add_imports = diff --git a/setup.py b/setup.py index f7554e817d0355e5e83372ba2d726a97bb14222c..6847b46e948f424d43ad841ee96ea3215a53db4b 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # coding=UTF-8 from setuptools import setup, find_packages + from coolamqp import __version__ setup(keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], @@ -9,7 +10,8 @@ setup(keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availabilit install_requires=['six', 'monotonic', 'futures', 'typing'], # per coverage version for codeclimate-reporter tests_require=["nose2", "coverage", "nose2[coverage_plugin]"], - test_suite='nose2.collector.collector' + test_suite='nose2.collector.collector', + extras_require={ + 'prctl': ['prctl'] + } ) - - diff --git a/stress_tests/server/__init__.py b/stress_tests/server/__init__.py index 1a2cd26984e75e0173fbd84ba4188d1651f88d72..87ff156defe686b52fa597e5a295625d43573608 100644 --- a/stress_tests/server/__init__.py +++ b/stress_tests/server/__init__.py @@ -4,7 +4,6 @@ from satella.coding.concurrent import TerminableThread from coolamqp.clustering.events import ReceivedMessage from coolamqp.objects import Queue, Message - from ..settings import queue_names, connect, LogFramesToFile diff --git a/tests/run.py b/tests/run.py index 2e02c0fbf92a8001cd9bbb7e1f999bdb38fb935e..7f6b6faa351376d7a09172bc75199a9828b9dd1d 100644 --- a/tests/run.py +++ b/tests/run.py @@ -1,13 +1,12 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -import time, logging, threading -from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, \ - Exchange -from coolamqp.exceptions import AMQPError -from coolamqp.clustering import Cluster +import logging import os +from coolamqp.clustering import Cluster +from coolamqp.objects import NodeDefinition, Queue + NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) diff --git a/tests/test_attaches/test_consumer.py b/tests/test_attaches/test_consumer.py index 3ddf662d060929b2f61bf543928650b6a69146eb..2bf4fc04e99794a80290652a149b7f6e72f6efdb 100644 --- a/tests/test_attaches/test_consumer.py +++ b/tests/test_attaches/test_consumer.py @@ -1,7 +1,8 @@ # coding=UTF-8 from __future__ import print_function, absolute_import, division -import six + import unittest + from coolamqp.attaches import Consumer from coolamqp.objects import Queue diff --git a/tests/test_clustering/__init__.py b/tests/test_clustering/__init__.py index db0a0a6db68923bea6cabb6a4005abf5fdb90ffb..a28d91bcf87fa9e385f76d338b2b8fbc9c85b997 100644 --- a/tests/test_clustering/__init__.py +++ b/tests/test_clustering/__init__.py @@ -1,6 +1,6 @@ # coding=UTF-8 from __future__ import print_function, absolute_import, division -import six + import logging logger = logging.getLogger(__name__) diff --git a/tests/test_clustering/test_double.py b/tests/test_clustering/test_double.py index 4e97f94c0e27adb386831158f4b0d1260f0a124e..12e8f8a2e37ce715edf78ea065e393dd7e07e951 100644 --- a/tests/test_clustering/test_double.py +++ b/tests/test_clustering/test_double.py @@ -3,9 +3,9 @@ from __future__ import print_function, absolute_import, division import logging +import os import time import unittest -import os from coolamqp.clustering import Cluster from coolamqp.exceptions import AMQPError, RESOURCE_LOCKED diff --git a/tests/test_clustering/test_exchanges.py b/tests/test_clustering/test_exchanges.py index 0838ad834d120348bfcfd17ed42b4c7c10b8a6d8..98dd406ec7151df1bad910a07db13086e47af7e8 100644 --- a/tests/test_clustering/test_exchanges.py +++ b/tests/test_clustering/test_exchanges.py @@ -1,14 +1,13 @@ # coding=UTF-8 from __future__ import print_function, absolute_import, division -import six -import unittest + +import logging import os -import time, logging, threading -from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, \ - ReceivedMessage, Exchange +import unittest + from coolamqp.clustering import Cluster, MessageReceived, NothingMuch -from coolamqp.exceptions import AMQPError -import time +from coolamqp.objects import Message, NodeDefinition, Queue, \ + Exchange # todo handle bad auth NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) diff --git a/tests/test_clustering/test_things.py b/tests/test_clustering/test_things.py index 79a80f4025bd70da2d0c37e9a1547fe0db7d10f1..7222a2abba00920238b12abf407290b716f5e95c 100644 --- a/tests/test_clustering/test_things.py +++ b/tests/test_clustering/test_things.py @@ -1,16 +1,15 @@ # coding=UTF-8 from __future__ import print_function, absolute_import, division -import six + +import logging import os +import time import unittest -import time, logging, threading, monotonic, warnings -from coolamqp.objects import Message, MessageProperties, NodeDefinition, Queue, \ - ReceivedMessage, Exchange -from coolamqp.clustering import Cluster, MessageReceived, NothingMuch -from coolamqp.exceptions import ConnectionDead -import time +from coolamqp.clustering import Cluster +from coolamqp.exceptions import ConnectionDead +from coolamqp.objects import NodeDefinition, Queue NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) diff --git a/tests/test_objects.py b/tests/test_objects.py index f7e5569d0aee2945218cc2c03f6109420aabc914..09bf92b29681522c4fb027f357eb57117d686c05 100644 --- a/tests/test_objects.py +++ b/tests/test_objects.py @@ -3,8 +3,9 @@ It sounds like a melody """ from __future__ import print_function, absolute_import, division -import six + import unittest + from coolamqp.objects import NodeDefinition, MessageProperties diff --git a/tests/utils.py b/tests/utils.py index 4b60f587bd368b861cea3f1cb1dd396c55e42923..abfdc412ad186cb916fcd4e66608641564e708d6 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,16 +1,18 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function -import unittest -import time -import socket + import collections -import monotonic import os +import socket +import time +import unittest -from coolamqp import Cluster, ClusterNode, ConnectionUp, ConnectionDown, \ - ConnectionUp, ConsumerCancelled +import monotonic from coolamqp.backends.base import AMQPBackend, ConnectionFailedError +from coolamqp import Cluster, ClusterNode, ConnectionDown, \ + ConnectionUp, ConsumerCancelled + def getamqp(): amqp = Cluster([ClusterNode(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest')], extra_properties=[