Skip to content
Snippets Groups Projects
Commit 99c6fb29 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

Merge branch 'develop'

parents ca82af14 4a2b6272
No related branches found
No related tags found
No related merge requests found
Showing
with 843 additions and 341 deletions
language: python
python:
- "2.7"
- "3.3"
- "3.4"
- "3.5"
- "pypy"
script:
......@@ -14,5 +12,8 @@ install:
after_success:
- codeclimate-test-reporter
- bash build.sh
services:
- rabbitmq
services: rabbitmq
addons:
apt:
packages:
- rabbitmq-server
\ No newline at end of file
* v0.95:
* multiple bugs fixed
* v0.94:
_version skipped_
* v0.93:
* Large refactor of XML schema compiler
......
The MIT License (MIT)
Copyright (c) 2016-2017 DMS Serwis s.c.
Copyright (c) 2016-2018 DMS Serwis s.c.
Copyright (c) 2018-2019 SMOK sp. z o.o.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
......
......@@ -42,7 +42,7 @@ Generated automatically by CoolAMQP from AMQP machine-readable specification.
See coolamqp.uplink.framing.compilation for the tool
AMQP is copyright (c) 2016 OASIS
CoolAMQP is copyright (c) 2016 DMS Serwis s.c.
CoolAMQP is copyright (c) 2016-2018 DMS Serwis s.c., 2018-2019 SMOK sp. z o.o.
###########################################################
......@@ -428,6 +428,16 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved
line(' ]\n')
# __repr__
line('''\n def __repr__(self):
"""
Convert the frame to a Python-representable string
:return: Python string representation
"""
return '%s(%S)' % (', '.join(map(repr, [%s])))\n''',
full_class_name,
u", ".join(['self.'+format_field_name(field.name) for field in non_reserved_fields]))
# constructor
line('''\n def __init__(%s):
"""
......@@ -522,7 +532,7 @@ REPLY_REASONS_FOR = {\n''')
# Methods that are replies for other, ie. ConnectionOpenOk: ConnectionOpen
# a method may be a reply for ONE or NONE other methods
# if a method has no replies, it will have an empty list as value here
REPLIES_FOR= {\n''')
REPLIES_FOR = {\n''')
for k, v in methods_that_are_replies_for.items():
line(u' %s: [%s],\n' % (k, u', '.join(map(str, v))))
......
......@@ -129,7 +129,7 @@ class Consumer(Channeler):
# private
self.cancelled = False # did the client want to STOP using this
# consumer?
# consumer?
self.receiver = None # MessageReceiver instance
self.attache_group = None # attache group this belongs to.
......@@ -141,8 +141,7 @@ class Consumer(Channeler):
self.future_to_notify = future_to_notify
self.future_to_notify_on_dead = None # .cancel
self.fail_on_first_time_resource_locked = \
fail_on_first_time_resource_locked
self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked
self.cancel_on_failure = cancel_on_failure
self.body_receive_mode = body_receive_mode
......@@ -187,7 +186,9 @@ class Consumer(Channeler):
# you'll blow up big next time you try to use this consumer if you
# can't cancel, but just close
if self.consumer_tag is not None:
self.method(BasicCancel(self.consumer_tag, False))
self.method_and_watch(BasicCancel(self.consumer_tag, False),
[BasicCancelOk],
self.on_close)
else:
self.method(ChannelClose(0, b'cancelling', 0, 0))
......@@ -228,7 +229,6 @@ class Consumer(Channeler):
Note, this can be called multiple times, and eventually with None.
"""
if self.cancel_on_failure and (not self.cancelled):
logger.debug(
'Consumer is cancel_on_failure and failure seen, True->cancelled')
......@@ -286,6 +286,7 @@ class Consumer(Channeler):
if self.future_to_notify:
self.future_to_notify.set_exception(AMQPError(payload))
self.future_to_notify = None
logger.debug('Notifying connection closed with %s', payload)
# We might not want to throw the connection away.
should_retry = should_retry and (not self.cancelled)
......
......@@ -151,6 +151,7 @@ class Publisher(Channeler, Synchronized):
"""
assert self.state == ST_ONLINE
assert self.mode == Publisher.MODE_CNPUB
assert self.tagger is not None
while len(self.messages) > 0:
try:
......@@ -234,8 +235,8 @@ class Publisher(Channeler, Synchronized):
def on_operational(self, operational):
state = {True: u'up', False: u'down'}[operational]
mode = \
{Publisher.MODE_NOACK: u'noack', Publisher.MODE_CNPUB: u'cnpub'}[
self.mode]
{Publisher.MODE_NOACK: u'noack', Publisher.MODE_CNPUB: u'cnpub'}[
self.mode]
logger.info('Publisher %s is %s', mode, state)
......@@ -265,20 +266,17 @@ class Publisher(Channeler, Synchronized):
self.state = ST_ONLINE
self.on_operational(True)
elif self.mode == Publisher.MODE_CNPUB:
elif (self.mode == Publisher.MODE_CNPUB) and isinstance(payload, ConfirmSelectOk):
# Because only in this case it makes sense to check for MODE_CNPUB
if isinstance(payload, ConfirmSelectOk):
# A-OK! Boot it.
self.state = ST_ONLINE
self.on_operational(True)
self.tagger = AtomicTagger()
# now we need to listen for BasicAck and BasicNack
mw = MethodWatch(self.channel_id, (BasicAck, BasicNack),
self._on_cnpub_delivery)
mw.oneshot = False
self.connection.watch(mw)
self._mode_cnpub_process_deliveries()
# A-OK! Boot it.
self.tagger = AtomicTagger()
self.state = ST_ONLINE
self.on_operational(True)
# now we need to listen for BasicAck and BasicNack
mw = MethodWatch(self.channel_id, (BasicAck, BasicNack),
self._on_cnpub_delivery)
mw.oneshot = False
self.connection.watch(mw)
self._mode_cnpub_process_deliveries()
......@@ -7,10 +7,12 @@ import six
import logging
import warnings
import time
import monotonic
from coolamqp.uplink import ListenerThread
from coolamqp.clustering.single import SingleNodeReconnector
from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer
from coolamqp.objects import Exchange
from coolamqp.exceptions import ConnectionDead
from concurrent.futures import Future
from coolamqp.clustering.events import ConnectionLost, MessageReceived, \
......@@ -165,7 +167,7 @@ class Cluster(object):
raise NotImplementedError(
u'Sorry, this functionality is not yet implemented!')
def start(self, wait=True):
def start(self, wait=True, timeout=10.0):
"""
Connect to broker. Initialize Cluster.
......@@ -173,7 +175,9 @@ class Cluster(object):
It is not safe to fork after this.
:param wait: block until connection is ready
:param timeout: timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised
:raise RuntimeError: called more than once
:raise ConnectionDead: failed to connect within timeout
"""
try:
......@@ -206,12 +210,15 @@ class Cluster(object):
self.listener.init()
self.listener.start()
self.snr.connect()
self.snr.connect(timeout=timeout)
# todo not really elegant
if wait:
while not self.snr.is_connected():
start_at = monotonic.monotonic()
while not self.snr.is_connected() and monotonic.monotonic() - start_at < timeout:
time.sleep(0.1)
if not self.snr.is_connected():
raise ConnectionDead('Could not connect within %s seconds' % (timeout, ))
def shutdown(self, wait=True):
"""
......
......@@ -29,13 +29,13 @@ class SingleNodeReconnector(object):
def is_connected(self):
return self.connection is not None
def connect(self):
def connect(self, timeout):
assert self.connection is None
# Initiate connecting - this order is very important!
self.connection = Connection(self.node_def, self.listener_thread)
self.attache_group.attach(self.connection)
self.connection.start()
self.connection.start(timeout)
self.connection.finalize.add(self.on_fail)
def _on_fail(self):
......
......@@ -28,8 +28,9 @@ class AMQPError(CoolAMQPError):
return 'AMQP error %s: %s' % (self.reply_code, self.reply_text)
def __repr__(self):
return 'AMQPError('+repr(self.reply_code)+', '+repr(self.reply_text)+ \
', '+repr(self.class_id)+', '+repr(self.method_id)+')'
return 'AMQPError(' + repr(self.reply_code) + ', ' + repr(
self.reply_text) + \
', ' + repr(self.class_id) + ', ' + repr(self.method_id) + ')'
def __init__(self, *args):
"""
......
......@@ -9,6 +9,7 @@ from coolamqp.framing.base import BASIC_TYPES
from .xml_tags import *
# docs may be None
......@@ -39,7 +40,6 @@ def get_size(fields): # assume all fields have static length
return size
def as_unicode(callable):
def roll(*args, **kwargs):
return six.text_type(callable(*args, **kwargs))
......@@ -139,4 +139,5 @@ def ffmt(data, *args, **kwargs):
for arg in args:
op = str if kwargs.get('sane', True) else frepr
data = data.replace('%s', op(arg), 1)
data = data.replace('%S', '%s')
return data
......@@ -7,14 +7,21 @@ import logging
class _Required(object):
"""Only a placeholder to tell apart None default values from required fields"""
def nop(x):
return x
def _get_tagchild(elem, tag):
return [e for e in elem.getchildren() if e.tag == tag]
__all__ = [
'_name', '_docs', '_ComputedField', '_ValueField', '_SimpleField',
'_docs_with_label', '_get_tagchild', '_ChildField'
]
class _Field(object):
"""Base field object"""
......@@ -33,6 +40,7 @@ class _ComputedField(_Field):
There's no corresponding XML attribute name - value must
be computed from element
"""
def __init__(self, field_name, find_fun):
super(_ComputedField, self).__init__(field_name)
self.find = find_fun
......@@ -43,10 +51,11 @@ class _ValueField(_Field):
Can hide under a pick of different XML attribute names.
Has a type, can have a default value.
"""
def __init__(self, xml_names, field_name, field_type=nop,
default=_Required):
if not isinstance(xml_names, tuple):
xml_names = (xml_names, )
xml_names = (xml_names,)
self.xml_names = xml_names
assert field_type is not None
self.field_name = field_name
......@@ -63,28 +72,30 @@ class _ValueField(_Field):
return elem.attrib[xmln[0]]
else:
if self.default is _Required:
raise TypeError('Did not find field %s in elem tag %s, looked for names %s' % (self.field_name, elem.tag, self.xml_names))
raise TypeError(
'Did not find field %s in elem tag %s, looked for names %s' % (
self.field_name, elem.tag, self.xml_names))
else:
return self.default
class _SimpleField(_ValueField):
"""XML attribute is the same as name, has a type and can be default"""
def __init__(self, name, field_type=nop, default=_Required):
super(_SimpleField, self).__init__(name, name, field_type, default)
def _get_tagchild(elem, tag):
return [e for e in elem.getchildren() if e.tag == tag]
class _ChildField(_ComputedField):
"""
List of other properties
"""
def __init__(self, name, xml_tag, fun, postexec=nop):
super(_ChildField, self).__init__(name, lambda elem: \
postexec([fun(c) for c in _get_tagchild(elem, xml_tag)]))
def get_docs(elem, label):
"""Parse an XML element. Return documentation"""
for kid in elem.getchildren():
......@@ -98,8 +109,7 @@ def get_docs(elem, label):
if label:
return elem.attrib.get('label', None)
_name = _SimpleField('name', six.text_type)
_docs = _ComputedField('docs', lambda elem: get_docs(elem, False))
_docs_with_label = _ComputedField('docs', lambda elem: get_docs(elem, True))
......@@ -13,14 +13,15 @@ logger = logging.getLogger(__name__)
def _boolint(x):
return bool(int(x))
__all__ = [
'Domain', 'Method', 'Class', 'Field', 'Constant'
]
class BaseObject(object):
FIELDS = []
# tuples of (xml name, field name, type, (optional) default value)
def __init__(self, elem):
......@@ -36,6 +37,7 @@ class BaseObject(object):
c.__dict__.update(**kwargs)
return c
class Constant(BaseObject):
NAME = 'constant'
FIELDS = [
......@@ -45,6 +47,7 @@ class Constant(BaseObject):
_docs,
]
class Field(BaseObject):
NAME = 'field'
FIELDS = [
......@@ -52,7 +55,9 @@ class Field(BaseObject):
_ValueField(('domain', 'type'), 'type', str),
_SimpleField('label', default=None),
_SimpleField('reserved', _boolint, default=0),
_ComputedField('basic_type', lambda elem: elem.attrib.get('type', '') == elem.attrib.get('name', '')),
_ComputedField('basic_type', lambda elem: elem.attrib.get('type',
'') == elem.attrib.get(
'name', '')),
_docs
]
......@@ -62,12 +67,11 @@ class Domain(BaseObject):
FIELDS = [
_name,
_SimpleField('type'),
_ComputedField('elementary', lambda a: a.attrib['type'] == a.attrib['name'])
_ComputedField('elementary',
lambda a: a.attrib['type'] == a.attrib['name'])
]
class Method(BaseObject):
NAME = 'method'
FIELDS = [
......@@ -78,12 +82,16 @@ class Method(BaseObject):
_docs,
_ChildField('fields', 'field', Field),
_ChildField('response', 'response', lambda e: e.attrib['name']),
_ChildField('sent_by_client', 'chassis', lambda e: e.attrib.get('name', '') == 'client', postexec=any),
_ChildField('sent_by_server', 'chassis', lambda e: e.attrib.get('name', '') == 'server', postexec=any),
_ChildField('constant', 'field', lambda e: Field(e).reserved, postexec=all)
_ChildField('sent_by_client', 'chassis',
lambda e: e.attrib.get('name', '') == 'client',
postexec=any),
_ChildField('sent_by_server', 'chassis',
lambda e: e.attrib.get('name', '') == 'server',
postexec=any),
_ChildField('constant', 'field', lambda e: Field(e).reserved,
postexec=all)
]
def get_static_body(self): # only arguments part
body = []
bits = 0
......@@ -100,12 +108,14 @@ class Method(BaseObject):
return b''.join(body)
def is_static(self, domain_to_type=None): # is size constant?
return not any(field.basic_type in DYNAMIC_BASIC_TYPES for field in self.fields)
return not any(
field.basic_type in DYNAMIC_BASIC_TYPES for field in self.fields)
_cls_method_sortkey = lambda m: (m.name.strip('-')[0], -len(m.response))
_cls_method_postexec = lambda q: sorted(q, key=_cls_method_sortkey)
class Class(BaseObject):
NAME = 'class'
FIELDS = [
......@@ -116,4 +126,3 @@ class Class(BaseObject):
_cls_method_postexec),
_ChildField('properties', 'field', Field)
]
This diff is collapsed.
# coding=UTF-8
from __future__ import print_function, division, absolute_import
"""
That funny type, field-table...
......@@ -67,6 +68,7 @@ def enframe_longstr(buf, value):
def _c2none(buf, v):
return None
FIELD_TYPES = {
# length, struct, (option)to_bytes (callable(buffer, value)),
# (option)from_bytes (callable(buffer, offset) ->
......
......@@ -2,10 +2,13 @@
from __future__ import absolute_import, division, print_function
import logging
import collections
import monotonic
import uuid
import time
import socket
import six
from coolamqp.exceptions import ConnectionDead
from coolamqp.uplink.connection.recv_framer import ReceivingFramer
from coolamqp.uplink.connection.send_framer import SendingFramer
from coolamqp.framing.frames import AMQPMethodFrame
......@@ -19,6 +22,42 @@ from coolamqp.objects import Callable
logger = logging.getLogger(__name__)
def alert_watches(watches, trigger):
"""
Notify all watches in this collection.
Return a list of alive watches.
:param watches: list of Watch
:return: tuple of (list of Watch, bool - was any watch fired?)
"""
watch_handled = False
alive_watches = []
while len(watches) > 0:
watch = watches.pop()
if watch.cancelled:
continue
watch_triggered = watch.is_triggered_by(trigger)
watch_handled |= watch_triggered
if watch.cancelled:
continue
if not any((watch_triggered, watch.oneshot, watch.cancelled)):
# Watch remains alive if it was NOT triggered, or it's NOT a oneshot or it's not cancelled
alive_watches.append(watch)
elif not watch.oneshot and not watch.cancelled:
alive_watches.append(watch)
elif watch.oneshot and not watch_triggered:
alive_watches.append(watch)
if set(alive_watches) != set(watches):
for removed_watch in set(watches)-set(alive_watches):
logger.debug('Removing watch %s', repr(removed_watch))
return alive_watches, watch_handled
class Connection(object):
"""
An object that manages a connection in a comprehensive way.
......@@ -50,7 +89,7 @@ class Connection(object):
"""
self.listener_thread = listener_thread
self.node_definition = node_definition
self.uuid = uuid.uuid4().hex[:5]
self.recvf = ReceivingFramer(self.on_frame)
# todo a list doesn't seem like a very strong atomicity guarantee
......@@ -70,6 +109,10 @@ class Connection(object):
self.heartbeat = None
self.extensions = []
# To be filled in later
self.listener_socket = None
self.sendf = None
def call_on_connected(self, callable):
"""
Register a callable to be called when this links to the server.
......@@ -87,14 +130,14 @@ class Connection(object):
def on_connected(self):
"""Called by handshaker upon reception of final connection.open-ok"""
logger.info('Connection ready.')
logger.info('[%s] Connection ready.', self.uuid)
self.state = ST_ONLINE
while len(self.callables_on_connected) > 0:
self.callables_on_connected.pop()()
def start(self):
def start(self, timeout):
"""
Start processing events for this connect. Create the socket,
transmit 'AMQP\x00\x00\x09\x01' and roll.
......@@ -103,17 +146,19 @@ class Connection(object):
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
start_at = monotonic.monotonic()
while True:
try:
sock.connect(
(self.node_definition.host, self.node_definition.port))
except socket.error as e:
time.sleep(0.5) # Connection refused? Very bad things?
if monotonic.monotonic() - start_at > timeout:
raise ConnectionDead()
else:
break
logger.debug('TCP connection established, authentication in progress')
logger.debug('[%s] TCP connection established, authentication in progress', self.uuid)
sock.settimeout(0)
sock.send(b'AMQP\x00\x00\x09\x01')
......@@ -172,7 +217,8 @@ class Connection(object):
if isinstance(payload, ConnectionClose):
self.send([AMQPMethodFrame(0, ConnectionCloseOk())])
logger.info(u'Broker closed our connection - code %s reason %s',
logger.info(u'[%s] Broker closed our connection - code %s reason %s',
self.uuid,
payload.reply_code,
payload.reply_text.tobytes().decode('utf8'))
......@@ -212,7 +258,7 @@ class Connection(object):
watch_handled = False # True if ANY watch handled this
if isinstance(frame, AMQPMethodFrame):
logger.debug('Received %s', frame.payload.NAME)
logger.debug('[%s] Received %s', self.uuid, frame.payload.NAME)
# ==================== process per-channel watches
#
......@@ -222,25 +268,8 @@ class Connection(object):
watches = self.watches[frame.channel] # a list
self.watches[frame.channel] = []
alive_watches = []
while len(watches) > 0:
watch = watches.pop()
if watch.cancelled:
# print('watch',watch,'was cancelled')
continue
watch_triggered = watch.is_triggered_by(frame)
watch_handled |= watch_triggered
if watch.cancelled:
# print('watch',watch,'was cancelled')
continue
if ((not watch_triggered) or (not watch.oneshot)) and (
not watch.cancelled):
# Watch remains alive if it was NOT triggered, or it's NOT a oneshot
alive_watches.append(watch)
alive_watches, f = alert_watches(watches, frame)
watch_handled |= f
if frame.channel in self.watches:
# unwatch_all might have gotten called, check that
......@@ -248,33 +277,20 @@ class Connection(object):
self.watches[frame.channel].append(watch)
# ==================== process "any" watches
alive_watches = []
any_watches = self.any_watches
self.any_watches = []
while len(any_watches):
watch = any_watches.pop()
if watch.cancelled:
# print('any watch', watch, 'was cancelled')
continue
watch_triggered = watch.is_triggered_by(frame)
watch_handled |= watch_triggered
alive_watches, f = alert_watches(any_watches, frame)
if watch.cancelled:
# print('any watch', watch, 'was cancelled')
continue
if ((not watch_triggered) or (not watch.oneshot)) and (
not watch.cancelled):
# Watch remains alive if it was NOT triggered, or it's NOT a oneshot
alive_watches.append(watch)
watch_handled |= f
for watch in alive_watches:
self.any_watches.append(watch)
if not watch_handled:
logger.warn('Unhandled frame %s', frame)
if isinstance(frame, AMQPMethodFrame):
logger.warning('[%s] Unhandled method frame %s', self.uuid, repr(frame.payload))
else:
logger.warning('[%s] Unhandled frame %s', self.uuid, frame)
def watchdog(self, delay, callback):
"""
......
......@@ -18,7 +18,6 @@ FRAME_TYPES = {
FRAME_METHOD: AMQPMethodFrame
}
ordpy2 = ord if six.PY2 else lambda x: x
......@@ -81,7 +80,7 @@ class ReceivingFramer(object):
"""
assert self.total_data_len >= up_to, \
'Tried to extract %s but %s remaining' % (
up_to, self.total_data_len)
up_to, self.total_data_len)
if up_to >= len(self.chunks[0]):
q = self.chunks.popleft()
else:
......@@ -93,7 +92,6 @@ class ReceivingFramer(object):
len(q), up_to)
return q
def _statemachine(self):
# state rule 1
if self.frame_type is None and self.total_data_len > 0:
......@@ -125,7 +123,7 @@ class ReceivingFramer(object):
# state rule 3
elif (self.frame_type != FRAME_HEARTBEAT) and (
self.frame_type is not None) and (
self.frame_size is None) and (
self.frame_size is None) and (
self.total_data_len > 6):
hdr = b''
while len(hdr) < 6:
......
# coding=UTF-8
from __future__ import absolute_import, division, print_function
import logging
from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeaderFrame, \
AMQPBodyFrame
from coolamqp.framing.base import AMQPMethodPayload
logger = logging.getLogger(__name__)
class Watch(object):
......@@ -123,10 +127,13 @@ class MethodWatch(Watch):
self.callable = callable
if isinstance(method_or_methods, (list, tuple)):
self.methods = tuple(method_or_methods)
else:
self.methods = method_or_methods
elif issubclass(method_or_methods, AMQPMethodPayload):
self.methods = (method_or_methods, )
self.on_end = on_end
def __repr__(self):
return '<MethodWatch %s, %s, %s, on_end=%s>' % (self.channel, self.methods, self.callable, self.on_end)
def failed(self):
if self.on_end is not None:
self.on_end()
......
[metadata]
description-file = README.md
name = CoolAMQP
version = 0.93
version = 0.95
license = MIT License
classifiers =
Programming Language :: Python
Programming Language :: Python :: 2.7
Programming Language :: Python :: 3.3
Programming Language :: Python :: 3.4
Programming Language :: Python :: 3.5
Programming Language :: Python :: Implementation :: CPython
Programming Language :: Python :: Implementation :: PyPy
......@@ -18,8 +16,8 @@ classifiers =
Intended Audience :: Developers
Topic :: Software Development :: Libraries :: Python Modules
description = Very fast pure-Python AMQP client
author = DMS Serwis s.c.
author_email = piotrm@dms-serwis.pl
author = SMOK sp. z o.o.
author_email = pmaslanka@smok.co
url = https://github.com/smok-serwis/coolamqp
platforms =
posix
......@@ -31,7 +29,7 @@ max-line-length=80
universal=1
[nosetests]
verbosity=1
verbosity=3
detailed-errors=1
with-coverage=1
......
......@@ -60,7 +60,7 @@ class TestDouble(unittest.TestCase):
try:
con2, fut2 = self.c2.consume(q,
fail_on_first_time_resource_locked=True)
fut2.result()
fut2.result(timeout=20)
except AMQPError as e:
self.assertEquals(e.reply_code, RESOURCE_LOCKED)
self.assertFalse(e.is_hard_error())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment