diff --git a/.travis.yml b/.travis.yml index 8f2be5b385d9cd801ec536982c678103e1bb28b2..fec2046e3bd40918d915521220358ccf0c7bebe9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,6 @@ language: python python: - "2.7" - - "3.3" - - "3.4" - "3.5" - "pypy" script: @@ -14,5 +12,8 @@ install: after_success: - codeclimate-test-reporter - bash build.sh -services: - - rabbitmq +services: rabbitmq +addons: + apt: + packages: + - rabbitmq-server \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b2b724d372a76ed6b58189fd053e489cc9da889..69881cae53045dd99d17539af0719e3a7ee572d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +* v0.95: + + * multiple bugs fixed + +* v0.94: + +_version skipped_ + * v0.93: * Large refactor of XML schema compiler diff --git a/LICENSE b/LICENSE index 5dfffdb0cdce280eecdca17f31d08c974799c208..e47e3d00861b6bb8b27b2918c6381ecd3a4fd2e4 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,7 @@ The MIT License (MIT) -Copyright (c) 2016-2017 DMS Serwis s.c. +Copyright (c) 2016-2018 DMS Serwis s.c. +Copyright (c) 2018-2019 SMOK sp. z o.o. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/compile_definitions.py b/compile_definitions.py index 8a02a93b594f737b9635c3b78abb0655ce74fe56..8ab87e2f054719f9ce33ba4a462b17d1973c5b75 100644 --- a/compile_definitions.py +++ b/compile_definitions.py @@ -42,7 +42,7 @@ Generated automatically by CoolAMQP from AMQP machine-readable specification. See coolamqp.uplink.framing.compilation for the tool AMQP is copyright (c) 2016 OASIS -CoolAMQP is copyright (c) 2016 DMS Serwis s.c. +CoolAMQP is copyright (c) 2016-2018 DMS Serwis s.c., 2018-2019 SMOK sp. z o.o. ########################################################### @@ -428,6 +428,16 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved line(' ]\n') + # __repr__ + line('''\n def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return '%s(%S)' % (', '.join(map(repr, [%s])))\n''', + full_class_name, + u", ".join(['self.'+format_field_name(field.name) for field in non_reserved_fields])) + # constructor line('''\n def __init__(%s): """ @@ -522,7 +532,7 @@ REPLY_REASONS_FOR = {\n''') # Methods that are replies for other, ie. ConnectionOpenOk: ConnectionOpen # a method may be a reply for ONE or NONE other methods # if a method has no replies, it will have an empty list as value here -REPLIES_FOR= {\n''') +REPLIES_FOR = {\n''') for k, v in methods_that_are_replies_for.items(): line(u' %s: [%s],\n' % (k, u', '.join(map(str, v)))) diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index e62f99c9b6357b23ce179ab150f6b22f3003daf1..3857ca8ce8e04d1de1445523765fa079d3527778 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -129,7 +129,7 @@ class Consumer(Channeler): # private self.cancelled = False # did the client want to STOP using this - # consumer? + # consumer? self.receiver = None # MessageReceiver instance self.attache_group = None # attache group this belongs to. @@ -141,8 +141,7 @@ class Consumer(Channeler): self.future_to_notify = future_to_notify self.future_to_notify_on_dead = None # .cancel - self.fail_on_first_time_resource_locked = \ - fail_on_first_time_resource_locked + self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked self.cancel_on_failure = cancel_on_failure self.body_receive_mode = body_receive_mode @@ -187,7 +186,9 @@ class Consumer(Channeler): # you'll blow up big next time you try to use this consumer if you # can't cancel, but just close if self.consumer_tag is not None: - self.method(BasicCancel(self.consumer_tag, False)) + self.method_and_watch(BasicCancel(self.consumer_tag, False), + [BasicCancelOk], + self.on_close) else: self.method(ChannelClose(0, b'cancelling', 0, 0)) @@ -228,7 +229,6 @@ class Consumer(Channeler): Note, this can be called multiple times, and eventually with None. """ - if self.cancel_on_failure and (not self.cancelled): logger.debug( 'Consumer is cancel_on_failure and failure seen, True->cancelled') @@ -286,6 +286,7 @@ class Consumer(Channeler): if self.future_to_notify: self.future_to_notify.set_exception(AMQPError(payload)) self.future_to_notify = None + logger.debug('Notifying connection closed with %s', payload) # We might not want to throw the connection away. should_retry = should_retry and (not self.cancelled) diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 6d480f5b1df3a0d8daef9436b00f949959bc3c14..3d75b9dad6e6edfa1ba3c3fd94e06685363de6a9 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -151,6 +151,7 @@ class Publisher(Channeler, Synchronized): """ assert self.state == ST_ONLINE assert self.mode == Publisher.MODE_CNPUB + assert self.tagger is not None while len(self.messages) > 0: try: @@ -234,8 +235,8 @@ class Publisher(Channeler, Synchronized): def on_operational(self, operational): state = {True: u'up', False: u'down'}[operational] mode = \ - {Publisher.MODE_NOACK: u'noack', Publisher.MODE_CNPUB: u'cnpub'}[ - self.mode] + {Publisher.MODE_NOACK: u'noack', Publisher.MODE_CNPUB: u'cnpub'}[ + self.mode] logger.info('Publisher %s is %s', mode, state) @@ -265,20 +266,17 @@ class Publisher(Channeler, Synchronized): self.state = ST_ONLINE self.on_operational(True) - elif self.mode == Publisher.MODE_CNPUB: + elif (self.mode == Publisher.MODE_CNPUB) and isinstance(payload, ConfirmSelectOk): # Because only in this case it makes sense to check for MODE_CNPUB - - if isinstance(payload, ConfirmSelectOk): - # A-OK! Boot it. - self.state = ST_ONLINE - self.on_operational(True) - - self.tagger = AtomicTagger() - - # now we need to listen for BasicAck and BasicNack - - mw = MethodWatch(self.channel_id, (BasicAck, BasicNack), - self._on_cnpub_delivery) - mw.oneshot = False - self.connection.watch(mw) - self._mode_cnpub_process_deliveries() + # A-OK! Boot it. + self.tagger = AtomicTagger() + self.state = ST_ONLINE + self.on_operational(True) + + # now we need to listen for BasicAck and BasicNack + + mw = MethodWatch(self.channel_id, (BasicAck, BasicNack), + self._on_cnpub_delivery) + mw.oneshot = False + self.connection.watch(mw) + self._mode_cnpub_process_deliveries() diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 9f31d190bf9bbc4c01ecaf6c6f76ed7eb2e962e4..57f16a75153a6e76c8079d4aac14c492599ca1a8 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -7,10 +7,12 @@ import six import logging import warnings import time +import monotonic from coolamqp.uplink import ListenerThread from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.objects import Exchange +from coolamqp.exceptions import ConnectionDead from concurrent.futures import Future from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ @@ -165,7 +167,7 @@ class Cluster(object): raise NotImplementedError( u'Sorry, this functionality is not yet implemented!') - def start(self, wait=True): + def start(self, wait=True, timeout=10.0): """ Connect to broker. Initialize Cluster. @@ -173,7 +175,9 @@ class Cluster(object): It is not safe to fork after this. :param wait: block until connection is ready + :param timeout: timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised :raise RuntimeError: called more than once + :raise ConnectionDead: failed to connect within timeout """ try: @@ -206,12 +210,15 @@ class Cluster(object): self.listener.init() self.listener.start() - self.snr.connect() + self.snr.connect(timeout=timeout) # todo not really elegant if wait: - while not self.snr.is_connected(): + start_at = monotonic.monotonic() + while not self.snr.is_connected() and monotonic.monotonic() - start_at < timeout: time.sleep(0.1) + if not self.snr.is_connected(): + raise ConnectionDead('Could not connect within %s seconds' % (timeout, )) def shutdown(self, wait=True): """ diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index 9219ef71051f4114cd7b5d8804848e632aeccdd4..bb5b752914a5d0cda645536cc8eb318d595075df 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -29,13 +29,13 @@ class SingleNodeReconnector(object): def is_connected(self): return self.connection is not None - def connect(self): + def connect(self, timeout): assert self.connection is None # Initiate connecting - this order is very important! self.connection = Connection(self.node_def, self.listener_thread) self.attache_group.attach(self.connection) - self.connection.start() + self.connection.start(timeout) self.connection.finalize.add(self.on_fail) def _on_fail(self): diff --git a/coolamqp/exceptions.py b/coolamqp/exceptions.py index 0e4fcf979e6e04ac09021ac53df46a51a672c3f3..e3024f7a7611806a1b0ded85e27f22350336d5cc 100644 --- a/coolamqp/exceptions.py +++ b/coolamqp/exceptions.py @@ -28,8 +28,9 @@ class AMQPError(CoolAMQPError): return 'AMQP error %s: %s' % (self.reply_code, self.reply_text) def __repr__(self): - return 'AMQPError('+repr(self.reply_code)+', '+repr(self.reply_text)+ \ - ', '+repr(self.class_id)+', '+repr(self.method_id)+')' + return 'AMQPError(' + repr(self.reply_code) + ', ' + repr( + self.reply_text) + \ + ', ' + repr(self.class_id) + ', ' + repr(self.method_id) + ')' def __init__(self, *args): """ diff --git a/coolamqp/framing/compilation/utilities.py b/coolamqp/framing/compilation/utilities.py index 8c333b32db655dfaed005fa87954bcba4ee53bf2..c450b2c275191e54f11537ee548b463a2f66fa05 100644 --- a/coolamqp/framing/compilation/utilities.py +++ b/coolamqp/framing/compilation/utilities.py @@ -9,6 +9,7 @@ from coolamqp.framing.base import BASIC_TYPES from .xml_tags import * + # docs may be None @@ -39,7 +40,6 @@ def get_size(fields): # assume all fields have static length return size - def as_unicode(callable): def roll(*args, **kwargs): return six.text_type(callable(*args, **kwargs)) @@ -139,4 +139,5 @@ def ffmt(data, *args, **kwargs): for arg in args: op = str if kwargs.get('sane', True) else frepr data = data.replace('%s', op(arg), 1) + data = data.replace('%S', '%s') return data diff --git a/coolamqp/framing/compilation/xml_fields.py b/coolamqp/framing/compilation/xml_fields.py index dfcb8f1156cd094118848e308a82a52ffa1d6a4a..2ef977dd19a98b2fb6ece1e8563346862b56da09 100644 --- a/coolamqp/framing/compilation/xml_fields.py +++ b/coolamqp/framing/compilation/xml_fields.py @@ -7,14 +7,21 @@ import logging class _Required(object): """Only a placeholder to tell apart None default values from required fields""" + def nop(x): return x + +def _get_tagchild(elem, tag): + return [e for e in elem.getchildren() if e.tag == tag] + + __all__ = [ '_name', '_docs', '_ComputedField', '_ValueField', '_SimpleField', '_docs_with_label', '_get_tagchild', '_ChildField' ] + class _Field(object): """Base field object""" @@ -33,6 +40,7 @@ class _ComputedField(_Field): There's no corresponding XML attribute name - value must be computed from element """ + def __init__(self, field_name, find_fun): super(_ComputedField, self).__init__(field_name) self.find = find_fun @@ -43,10 +51,11 @@ class _ValueField(_Field): Can hide under a pick of different XML attribute names. Has a type, can have a default value. """ + def __init__(self, xml_names, field_name, field_type=nop, default=_Required): if not isinstance(xml_names, tuple): - xml_names = (xml_names, ) + xml_names = (xml_names,) self.xml_names = xml_names assert field_type is not None self.field_name = field_name @@ -63,28 +72,30 @@ class _ValueField(_Field): return elem.attrib[xmln[0]] else: if self.default is _Required: - raise TypeError('Did not find field %s in elem tag %s, looked for names %s' % (self.field_name, elem.tag, self.xml_names)) + raise TypeError( + 'Did not find field %s in elem tag %s, looked for names %s' % ( + self.field_name, elem.tag, self.xml_names)) else: return self.default class _SimpleField(_ValueField): """XML attribute is the same as name, has a type and can be default""" + def __init__(self, name, field_type=nop, default=_Required): super(_SimpleField, self).__init__(name, name, field_type, default) -def _get_tagchild(elem, tag): - return [e for e in elem.getchildren() if e.tag == tag] - class _ChildField(_ComputedField): """ List of other properties """ + def __init__(self, name, xml_tag, fun, postexec=nop): super(_ChildField, self).__init__(name, lambda elem: \ postexec([fun(c) for c in _get_tagchild(elem, xml_tag)])) + def get_docs(elem, label): """Parse an XML element. Return documentation""" for kid in elem.getchildren(): @@ -98,8 +109,7 @@ def get_docs(elem, label): if label: return elem.attrib.get('label', None) + _name = _SimpleField('name', six.text_type) _docs = _ComputedField('docs', lambda elem: get_docs(elem, False)) _docs_with_label = _ComputedField('docs', lambda elem: get_docs(elem, True)) - - diff --git a/coolamqp/framing/compilation/xml_tags.py b/coolamqp/framing/compilation/xml_tags.py index 5c98e58d1922f8f45ff49bbe87f96f98d1725857..9dde0e4e8856db8161501ed93f3d577ee9d7461d 100644 --- a/coolamqp/framing/compilation/xml_tags.py +++ b/coolamqp/framing/compilation/xml_tags.py @@ -13,14 +13,15 @@ logger = logging.getLogger(__name__) def _boolint(x): return bool(int(x)) + __all__ = [ 'Domain', 'Method', 'Class', 'Field', 'Constant' ] class BaseObject(object): - FIELDS = [] + # tuples of (xml name, field name, type, (optional) default value) def __init__(self, elem): @@ -36,6 +37,7 @@ class BaseObject(object): c.__dict__.update(**kwargs) return c + class Constant(BaseObject): NAME = 'constant' FIELDS = [ @@ -45,6 +47,7 @@ class Constant(BaseObject): _docs, ] + class Field(BaseObject): NAME = 'field' FIELDS = [ @@ -52,7 +55,9 @@ class Field(BaseObject): _ValueField(('domain', 'type'), 'type', str), _SimpleField('label', default=None), _SimpleField('reserved', _boolint, default=0), - _ComputedField('basic_type', lambda elem: elem.attrib.get('type', '') == elem.attrib.get('name', '')), + _ComputedField('basic_type', lambda elem: elem.attrib.get('type', + '') == elem.attrib.get( + 'name', '')), _docs ] @@ -62,12 +67,11 @@ class Domain(BaseObject): FIELDS = [ _name, _SimpleField('type'), - _ComputedField('elementary', lambda a: a.attrib['type'] == a.attrib['name']) + _ComputedField('elementary', + lambda a: a.attrib['type'] == a.attrib['name']) ] - - class Method(BaseObject): NAME = 'method' FIELDS = [ @@ -78,12 +82,16 @@ class Method(BaseObject): _docs, _ChildField('fields', 'field', Field), _ChildField('response', 'response', lambda e: e.attrib['name']), - _ChildField('sent_by_client', 'chassis', lambda e: e.attrib.get('name', '') == 'client', postexec=any), - _ChildField('sent_by_server', 'chassis', lambda e: e.attrib.get('name', '') == 'server', postexec=any), - _ChildField('constant', 'field', lambda e: Field(e).reserved, postexec=all) + _ChildField('sent_by_client', 'chassis', + lambda e: e.attrib.get('name', '') == 'client', + postexec=any), + _ChildField('sent_by_server', 'chassis', + lambda e: e.attrib.get('name', '') == 'server', + postexec=any), + _ChildField('constant', 'field', lambda e: Field(e).reserved, + postexec=all) ] - def get_static_body(self): # only arguments part body = [] bits = 0 @@ -100,12 +108,14 @@ class Method(BaseObject): return b''.join(body) def is_static(self, domain_to_type=None): # is size constant? - return not any(field.basic_type in DYNAMIC_BASIC_TYPES for field in self.fields) + return not any( + field.basic_type in DYNAMIC_BASIC_TYPES for field in self.fields) _cls_method_sortkey = lambda m: (m.name.strip('-')[0], -len(m.response)) _cls_method_postexec = lambda q: sorted(q, key=_cls_method_sortkey) + class Class(BaseObject): NAME = 'class' FIELDS = [ @@ -116,4 +126,3 @@ class Class(BaseObject): _cls_method_postexec), _ChildField('properties', 'field', Field) ] - diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 61233a9ce4cc672eee6baa8bf00c7ae728db8e9e..e72b596923b92a5b00b935070b2fc92687ce5fac 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -7,7 +7,7 @@ Generated automatically by CoolAMQP from AMQP machine-readable specification. See coolamqp.uplink.framing.compilation for the tool AMQP is copyright (c) 2016 OASIS -CoolAMQP is copyright (c) 2016 DMS Serwis s.c. +CoolAMQP is copyright (c) 2016-2018 DMS Serwis s.c., 2018-2019 SMOK sp. z o.o. ########################################################### @@ -130,8 +130,8 @@ INTERNAL_ERROR = 541 # resume # normal operations. -HARD_ERRORS = [CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, SYNTAX_ERROR, COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, INTERNAL_ERROR] SOFT_ERRORS = [CONTENT_TOO_LARGE, NO_CONSUMERS, ACCESS_REFUSED, NOT_FOUND, RESOURCE_LOCKED, PRECONDITION_FAILED] +HARD_ERRORS = [CONNECTION_FORCED, INVALID_PATH, FRAME_ERROR, SYNTAX_ERROR, COMMAND_INVALID, CHANNEL_ERROR, UNEXPECTED_FRAME, RESOURCE_ERROR, NOT_ALLOWED, NOT_IMPLEMENTED, INTERNAL_ERROR] DOMAIN_TO_BASIC_TYPE = { @@ -195,6 +195,13 @@ class ConnectionBlocked(AMQPMethodPayload): Field(u'reason', u'shortstr', u'shortstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionBlocked(%s)' % (', '.join(map(repr, [self.reason]))) + def __init__(self, reason): """ Create frame connection.blocked @@ -253,6 +260,13 @@ class ConnectionClose(AMQPMethodPayload): Field(u'method-id', u'method-id', u'short', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionClose(%s)' % (', '.join(map(repr, [self.reply_code, self.reply_text, self.class_id, self.method_id]))) + def __init__(self, reply_code, reply_text, class_id, method_id): """ Create frame connection.close @@ -316,6 +330,13 @@ class ConnectionCloseOk(AMQPMethodPayload): IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionCloseOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame connection.close-ok @@ -348,7 +369,7 @@ class ConnectionOpen(AMQPMethodPayload): INDEX = (10, 40) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x0A\x00\x28' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -360,6 +381,13 @@ class ConnectionOpen(AMQPMethodPayload): Field(u'reserved-2', u'bit', u'bit', reserved=True), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionOpen(%s)' % (', '.join(map(repr, [self.virtual_host]))) + def __init__(self, virtual_host): """ Create frame connection.open @@ -407,7 +435,7 @@ class ConnectionOpenOk(AMQPMethodPayload): INDEX = (10, 41) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x0A\x00\x29' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content @@ -418,6 +446,13 @@ class ConnectionOpenOk(AMQPMethodPayload): Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionOpenOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame connection.open-ok @@ -450,7 +485,7 @@ class ConnectionStart(AMQPMethodPayload): INDEX = (10, 10) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x0A\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -464,6 +499,13 @@ class ConnectionStart(AMQPMethodPayload): Field(u'locales', u'longstr', u'longstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionStart(%s)' % (', '.join(map(repr, [self.version_major, self.version_minor, self.server_properties, self.mechanisms, self.locales]))) + def __init__(self, version_major, version_minor, server_properties, mechanisms, locales): """ Create frame connection.start @@ -552,7 +594,7 @@ class ConnectionSecure(AMQPMethodPayload): INDEX = (10, 20) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x0A\x00\x14' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -562,6 +604,13 @@ class ConnectionSecure(AMQPMethodPayload): Field(u'challenge', u'longstr', u'longstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionSecure(%s)' % (', '.join(map(repr, [self.challenge]))) + def __init__(self, challenge): """ Create frame connection.secure @@ -604,7 +653,7 @@ class ConnectionStartOk(AMQPMethodPayload): INDEX = (10, 11) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x0A\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -617,6 +666,13 @@ class ConnectionStartOk(AMQPMethodPayload): Field(u'locale', u'shortstr', u'shortstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionStartOk(%s)' % (', '.join(map(repr, [self.client_properties, self.mechanism, self.response, self.locale]))) + def __init__(self, client_properties, mechanism, response, locale): """ Create frame connection.start-ok @@ -698,7 +754,7 @@ class ConnectionSecureOk(AMQPMethodPayload): INDEX = (10, 21) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x0A\x00\x15' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -708,6 +764,13 @@ class ConnectionSecureOk(AMQPMethodPayload): Field(u'response', u'longstr', u'longstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionSecureOk(%s)' % (', '.join(map(repr, [self.response]))) + def __init__(self, response): """ Create frame connection.secure-ok @@ -752,7 +815,7 @@ class ConnectionTune(AMQPMethodPayload): INDEX = (10, 30) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x0A\x00\x1E' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -764,6 +827,13 @@ class ConnectionTune(AMQPMethodPayload): Field(u'heartbeat', u'short', u'short', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionTune(%s)' % (', '.join(map(repr, [self.channel_max, self.frame_max, self.heartbeat]))) + def __init__(self, channel_max, frame_max, heartbeat): """ Create frame connection.tune @@ -823,7 +893,7 @@ class ConnectionTuneOk(AMQPMethodPayload): INDEX = (10, 31) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x0A\x00\x1F' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -835,6 +905,13 @@ class ConnectionTuneOk(AMQPMethodPayload): Field(u'heartbeat', u'short', u'short', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionTuneOk(%s)' % (', '.join(map(repr, [self.channel_max, self.frame_max, self.heartbeat]))) + def __init__(self, channel_max, frame_max, heartbeat): """ Create frame connection.tune-ok @@ -897,6 +974,13 @@ class ConnectionUnblocked(AMQPMethodPayload): IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x0A\x00\x3D\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConnectionUnblocked(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame connection.unblocked @@ -953,6 +1037,13 @@ class ChannelClose(AMQPMethodPayload): Field(u'method-id', u'method-id', u'short', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ChannelClose(%s)' % (', '.join(map(repr, [self.reply_code, self.reply_text, self.class_id, self.method_id]))) + def __init__(self, reply_code, reply_text, class_id, method_id): """ Create frame channel.close @@ -1015,6 +1106,13 @@ class ChannelCloseOk(AMQPMethodPayload): IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x14\x00\x29\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ChannelCloseOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame channel.close-ok @@ -1058,6 +1156,13 @@ class ChannelFlow(AMQPMethodPayload): Field(u'active', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ChannelFlow(%s)' % (', '.join(map(repr, [self.active]))) + def __init__(self, active): """ Create frame channel.flow @@ -1110,6 +1215,13 @@ class ChannelFlowOk(AMQPMethodPayload): Field(u'active', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ChannelFlowOk(%s)' % (', '.join(map(repr, [self.active]))) + def __init__(self, active): """ Create frame channel.flow-ok @@ -1152,7 +1264,7 @@ class ChannelOpen(AMQPMethodPayload): INDEX = (20, 10) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x14\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content @@ -1163,6 +1275,13 @@ class ChannelOpen(AMQPMethodPayload): Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ChannelOpen(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame channel.open @@ -1192,7 +1311,7 @@ class ChannelOpenOk(AMQPMethodPayload): INDEX = (20, 11) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x14\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content @@ -1203,6 +1322,13 @@ class ChannelOpenOk(AMQPMethodPayload): Field(u'reserved-1', u'longstr', u'longstr', reserved=True), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ChannelOpenOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame channel.open-ok @@ -1242,7 +1368,7 @@ class ExchangeBind(AMQPMethodPayload): INDEX = (40, 30) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x28\x00\x1E' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -1257,6 +1383,13 @@ class ExchangeBind(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeBind(%s)' % (', '.join(map(repr, [self.destination, self.source, self.routing_key, self.no_wait, self.arguments]))) + def __init__(self, destination, source, routing_key, no_wait, arguments): """ Create frame exchange.bind @@ -1336,12 +1469,19 @@ class ExchangeBindOk(AMQPMethodPayload): INDEX = (40, 31) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x28\x00\x1F' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x1F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeBindOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame exchange.bind-ok @@ -1369,7 +1509,7 @@ class ExchangeDeclare(AMQPMethodPayload): INDEX = (40, 10) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x28\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -1387,6 +1527,13 @@ class ExchangeDeclare(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeDeclare(%s)' % (', '.join(map(repr, [self.exchange, self.type_, self.passive, self.durable, self.auto_delete, self.internal, self.no_wait, self.arguments]))) + def __init__(self, exchange, type_, passive, durable, auto_delete, internal, no_wait, arguments): """ Create frame exchange.declare @@ -1504,7 +1651,7 @@ class ExchangeDelete(AMQPMethodPayload): INDEX = (40, 20) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x28\x00\x14' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -1517,6 +1664,13 @@ class ExchangeDelete(AMQPMethodPayload): Field(u'no-wait', u'no-wait', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeDelete(%s)' % (', '.join(map(repr, [self.exchange, self.if_unused, self.no_wait]))) + def __init__(self, exchange, if_unused, no_wait): """ Create frame exchange.delete @@ -1576,12 +1730,19 @@ class ExchangeDeclareOk(AMQPMethodPayload): INDEX = (40, 11) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x28\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeDeclareOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame exchange.declare-ok @@ -1607,12 +1768,19 @@ class ExchangeDeleteOk(AMQPMethodPayload): INDEX = (40, 21) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x28\x00\x15' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeDeleteOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame exchange.delete-ok @@ -1638,7 +1806,7 @@ class ExchangeUnbind(AMQPMethodPayload): INDEX = (40, 40) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x28\x00\x28' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -1653,6 +1821,13 @@ class ExchangeUnbind(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeUnbind(%s)' % (', '.join(map(repr, [self.destination, self.source, self.routing_key, self.no_wait, self.arguments]))) + def __init__(self, destination, source, routing_key, no_wait, arguments): """ Create frame exchange.unbind @@ -1726,12 +1901,19 @@ class ExchangeUnbindOk(AMQPMethodPayload): INDEX = (40, 51) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x28\x00\x33' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x28\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ExchangeUnbindOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame exchange.unbind-ok @@ -1776,7 +1958,7 @@ class QueueBind(AMQPMethodPayload): INDEX = (50, 20) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x32\x00\x14' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -1791,6 +1973,13 @@ class QueueBind(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueBind(%s)' % (', '.join(map(repr, [self.queue, self.exchange, self.routing_key, self.no_wait, self.arguments]))) + def __init__(self, queue, exchange, routing_key, no_wait, arguments): """ Create frame queue.bind @@ -1882,12 +2071,19 @@ class QueueBindOk(AMQPMethodPayload): INDEX = (50, 21) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x32\x00\x15' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x32\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueBindOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame queue.bind-ok @@ -1917,7 +2113,7 @@ class QueueDeclare(AMQPMethodPayload): INDEX = (50, 10) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x32\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -1934,6 +2130,13 @@ class QueueDeclare(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueDeclare(%s)' % (', '.join(map(repr, [self.queue, self.passive, self.durable, self.exclusive, self.auto_delete, self.no_wait, self.arguments]))) + def __init__(self, queue, passive, durable, exclusive, auto_delete, no_wait, arguments): """ Create frame queue.declare @@ -2047,7 +2250,7 @@ class QueueDelete(AMQPMethodPayload): INDEX = (50, 40) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x32\x00\x28' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -2061,6 +2264,13 @@ class QueueDelete(AMQPMethodPayload): Field(u'no-wait', u'no-wait', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueDelete(%s)' % (', '.join(map(repr, [self.queue, self.if_unused, self.if_empty, self.no_wait]))) + def __init__(self, queue, if_unused, if_empty, no_wait): """ Create frame queue.delete @@ -2125,7 +2335,7 @@ class QueueDeclareOk(AMQPMethodPayload): INDEX = (50, 11) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x32\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -2137,6 +2347,13 @@ class QueueDeclareOk(AMQPMethodPayload): Field(u'consumer-count', u'long', u'long', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueDeclareOk(%s)' % (', '.join(map(repr, [self.queue, self.message_count, self.consumer_count]))) + def __init__(self, queue, message_count, consumer_count): """ Create frame queue.declare-ok @@ -2190,7 +2407,7 @@ class QueueDeleteOk(AMQPMethodPayload): INDEX = (50, 41) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x32\x00\x29' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -2200,6 +2417,13 @@ class QueueDeleteOk(AMQPMethodPayload): Field(u'message-count', u'message-count', u'long', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueDeleteOk(%s)' % (', '.join(map(repr, [self.message_count]))) + def __init__(self, message_count): """ Create frame queue.delete-ok @@ -2238,7 +2462,7 @@ class QueuePurge(AMQPMethodPayload): INDEX = (50, 30) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x32\x00\x1E' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -2250,6 +2474,13 @@ class QueuePurge(AMQPMethodPayload): Field(u'no-wait', u'no-wait', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueuePurge(%s)' % (', '.join(map(repr, [self.queue, self.no_wait]))) + def __init__(self, queue, no_wait): """ Create frame queue.purge @@ -2297,7 +2528,7 @@ class QueuePurgeOk(AMQPMethodPayload): INDEX = (50, 31) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x32\x00\x1F' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -2307,6 +2538,13 @@ class QueuePurgeOk(AMQPMethodPayload): Field(u'message-count', u'message-count', u'long', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueuePurgeOk(%s)' % (', '.join(map(repr, [self.message_count]))) + def __init__(self, message_count): """ Create frame queue.purge-ok @@ -2343,7 +2581,7 @@ class QueueUnbind(AMQPMethodPayload): INDEX = (50, 50) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x32\x00\x32' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -2357,6 +2595,13 @@ class QueueUnbind(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueUnbind(%s)' % (', '.join(map(repr, [self.queue, self.exchange, self.routing_key, self.arguments]))) + def __init__(self, queue, exchange, routing_key, arguments): """ Create frame queue.unbind @@ -2423,12 +2668,19 @@ class QueueUnbindOk(AMQPMethodPayload): INDEX = (50, 51) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x32\x00\x33' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x32\x00\x33\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'QueueUnbindOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame queue.unbind-ok @@ -2609,6 +2861,13 @@ class BasicAck(AMQPMethodPayload): Field(u'multiple', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicAck(%s)' % (', '.join(map(repr, [self.delivery_tag, self.multiple]))) + def __init__(self, delivery_tag, multiple): """ Create frame basic.ack @@ -2659,7 +2918,7 @@ class BasicConsume(AMQPMethodPayload): INDEX = (60, 20) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x14' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -2676,6 +2935,13 @@ class BasicConsume(AMQPMethodPayload): Field(u'arguments', u'table', u'table', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicConsume(%s)' % (', '.join(map(repr, [self.queue, self.consumer_tag, self.no_local, self.no_ack, self.exclusive, self.no_wait, self.arguments]))) + def __init__(self, queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments): """ Create frame basic.consume @@ -2786,6 +3052,13 @@ class BasicCancel(AMQPMethodPayload): Field(u'no-wait', u'no-wait', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicCancel(%s)' % (', '.join(map(repr, [self.consumer_tag, self.no_wait]))) + def __init__(self, consumer_tag, no_wait): """ Create frame basic.cancel @@ -2833,7 +3106,7 @@ class BasicConsumeOk(AMQPMethodPayload): INDEX = (60, 21) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x15' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -2843,6 +3116,13 @@ class BasicConsumeOk(AMQPMethodPayload): Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicConsumeOk(%s)' % (', '.join(map(repr, [self.consumer_tag]))) + def __init__(self, consumer_tag): """ Create frame basic.consume-ok @@ -2893,6 +3173,13 @@ class BasicCancelOk(AMQPMethodPayload): Field(u'consumer-tag', u'consumer-tag', u'shortstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicCancelOk(%s)' % (', '.join(map(repr, [self.consumer_tag]))) + def __init__(self, consumer_tag): """ Create frame basic.cancel-ok @@ -2937,7 +3224,7 @@ class BasicDeliver(AMQPMethodPayload): INDEX = (60, 60) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x3C' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -2951,6 +3238,13 @@ class BasicDeliver(AMQPMethodPayload): Field(u'routing-key', u'shortstr', u'shortstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicDeliver(%s)' % (', '.join(map(repr, [self.consumer_tag, self.delivery_tag, self.redelivered, self.exchange, self.routing_key]))) + def __init__(self, consumer_tag, delivery_tag, redelivered, exchange, routing_key): """ Create frame basic.deliver @@ -3023,7 +3317,7 @@ class BasicGet(AMQPMethodPayload): INDEX = (60, 70) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x46' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -3035,6 +3329,13 @@ class BasicGet(AMQPMethodPayload): Field(u'no-ack', u'no-ack', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicGet(%s)' % (', '.join(map(repr, [self.queue, self.no_ack]))) + def __init__(self, queue, no_ack): """ Create frame basic.get @@ -3086,7 +3387,7 @@ class BasicGetOk(AMQPMethodPayload): INDEX = (60, 71) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x47' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -3100,6 +3401,13 @@ class BasicGetOk(AMQPMethodPayload): Field(u'message-count', u'message-count', u'long', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicGetOk(%s)' % (', '.join(map(repr, [self.delivery_tag, self.redelivered, self.exchange, self.routing_key, self.message_count]))) + def __init__(self, delivery_tag, redelivered, exchange, routing_key, message_count): """ Create frame basic.get-ok @@ -3167,7 +3475,7 @@ class BasicGetEmpty(AMQPMethodPayload): INDEX = (60, 72) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x48' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content @@ -3178,6 +3486,13 @@ class BasicGetEmpty(AMQPMethodPayload): Field(u'reserved-1', u'shortstr', u'shortstr', reserved=True), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicGetEmpty(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame basic.get-empty @@ -3227,6 +3542,13 @@ class BasicNack(AMQPMethodPayload): Field(u'requeue', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicNack(%s)' % (', '.join(map(repr, [self.delivery_tag, self.multiple, self.requeue]))) + def __init__(self, delivery_tag, multiple, requeue): """ Create frame basic.nack @@ -3286,7 +3608,7 @@ class BasicPublish(AMQPMethodPayload): INDEX = (60, 40) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x28' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -3300,6 +3622,13 @@ class BasicPublish(AMQPMethodPayload): Field(u'immediate', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicPublish(%s)' % (', '.join(map(repr, [self.exchange, self.routing_key, self.mandatory, self.immediate]))) + def __init__(self, exchange, routing_key, mandatory, immediate): """ Create frame basic.publish @@ -3391,7 +3720,7 @@ class BasicQos(AMQPMethodPayload): INDEX = (60, 10) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -3403,6 +3732,13 @@ class BasicQos(AMQPMethodPayload): Field(u'global', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicQos(%s)' % (', '.join(map(repr, [self.prefetch_size, self.prefetch_count, self.global_]))) + def __init__(self, prefetch_size, prefetch_count, global_): """ Create frame basic.qos @@ -3483,12 +3819,19 @@ class BasicQosOk(AMQPMethodPayload): INDEX = (60, 11) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x3C\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicQosOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame basic.qos-ok @@ -3520,7 +3863,7 @@ class BasicReturn(AMQPMethodPayload): INDEX = (60, 50) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x32' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = False # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -3533,6 +3876,13 @@ class BasicReturn(AMQPMethodPayload): Field(u'routing-key', u'shortstr', u'shortstr', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicReturn(%s)' % (', '.join(map(repr, [self.reply_code, self.reply_text, self.exchange, self.routing_key]))) + def __init__(self, reply_code, reply_text, exchange, routing_key): """ Create frame basic.return @@ -3599,7 +3949,7 @@ class BasicReject(AMQPMethodPayload): INDEX = (60, 90) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x5A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -3610,6 +3960,13 @@ class BasicReject(AMQPMethodPayload): Field(u'requeue', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicReject(%s)' % (', '.join(map(repr, [self.delivery_tag, self.requeue]))) + def __init__(self, delivery_tag, requeue): """ Create frame basic.reject @@ -3658,7 +4015,7 @@ class BasicRecoverAsync(AMQPMethodPayload): INDEX = (60, 100) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x64' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -3668,6 +4025,13 @@ class BasicRecoverAsync(AMQPMethodPayload): Field(u'requeue', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicRecoverAsync(%s)' % (', '.join(map(repr, [self.requeue]))) + def __init__(self, requeue): """ Create frame basic.recover-async @@ -3715,7 +4079,7 @@ class BasicRecover(AMQPMethodPayload): INDEX = (60, 110) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x6E' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -3725,6 +4089,13 @@ class BasicRecover(AMQPMethodPayload): Field(u'requeue', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicRecover(%s)' % (', '.join(map(repr, [self.requeue]))) + def __init__(self, requeue): """ Create frame basic.recover @@ -3768,12 +4139,19 @@ class BasicRecoverOk(AMQPMethodPayload): INDEX = (60, 111) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x3C\x00\x6F' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x3C\x00\x6F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'BasicRecoverOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame basic.recover-ok @@ -3826,12 +4204,19 @@ class TxCommit(AMQPMethodPayload): INDEX = (90, 20) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x5A\x00\x14' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x14\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'TxCommit(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame tx.commit @@ -3859,13 +4244,20 @@ class TxCommitOk(AMQPMethodPayload): INDEX = (90, 21) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x5A\x00\x15' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x15\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END - def __init__(self): + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'TxCommitOk(%s)' % (', '.join(map(repr, []))) + + def __init__(self): """ Create frame tx.commit-ok """ @@ -3896,12 +4288,19 @@ class TxRollback(AMQPMethodPayload): INDEX = (90, 30) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x5A\x00\x1E' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x1E\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'TxRollback(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame tx.rollback @@ -3929,12 +4328,19 @@ class TxRollbackOk(AMQPMethodPayload): INDEX = (90, 31) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x5A\x00\x1F' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x1F\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'TxRollbackOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame tx.rollback-ok @@ -3963,12 +4369,19 @@ class TxSelect(AMQPMethodPayload): INDEX = (90, 10) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x5A\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x0A\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'TxSelect(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame tx.select @@ -3996,12 +4409,19 @@ class TxSelectOk(AMQPMethodPayload): INDEX = (90, 11) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x5A\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x5A\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'TxSelectOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame tx.select-ok @@ -4053,7 +4473,7 @@ class ConfirmSelect(AMQPMethodPayload): INDEX = (85, 10) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x55\x00\x0A' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = True, False + SENT_BY_CLIENT, SENT_BY_SERVER = False, True IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = False # this means that argument part has always the same content @@ -4063,6 +4483,13 @@ class ConfirmSelect(AMQPMethodPayload): Field(u'nowait', u'bit', u'bit', reserved=False), ] + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConfirmSelect(%s)' % (', '.join(map(repr, [self.nowait]))) + def __init__(self, nowait): """ Create frame confirm.select @@ -4106,12 +4533,19 @@ class ConfirmSelectOk(AMQPMethodPayload): INDEX = (85, 11) # (Class ID, Method ID) BINARY_HEADER = b'\x00\x55\x00\x0B' # CLASS ID + METHOD ID - SENT_BY_CLIENT, SENT_BY_SERVER = False, True + SENT_BY_CLIENT, SENT_BY_SERVER = True, False IS_SIZE_STATIC = True # this means that argument part has always the same length IS_CONTENT_STATIC = True # this means that argument part has always the same content STATIC_CONTENT = b'\x00\x00\x00\x04\x00\x55\x00\x0B\xCE' # spans LENGTH, CLASS ID, METHOD ID, ....., FRAME_END + def __repr__(self): + """ + Convert the frame to a Python-representable string + :return: Python string representation + """ + return 'ConfirmSelectOk(%s)' % (', '.join(map(repr, []))) + def __init__(self): """ Create frame confirm.select-ok @@ -4125,134 +4559,134 @@ class ConfirmSelectOk(AMQPMethodPayload): IDENT_TO_METHOD = { - (90, 21): TxCommitOk, - (60, 100): BasicRecoverAsync, + (10, 60): ConnectionBlocked, + (10, 50): ConnectionClose, + (10, 51): ConnectionCloseOk, + (10, 40): ConnectionOpen, + (10, 41): ConnectionOpenOk, + (10, 10): ConnectionStart, + (10, 20): ConnectionSecure, (10, 11): ConnectionStartOk, - (60, 40): BasicPublish, - (60, 50): BasicReturn, - (40, 21): ExchangeDeleteOk, - (20, 20): ChannelFlow, - (40, 31): ExchangeBindOk, - (60, 21): BasicConsumeOk, (10, 21): ConnectionSecureOk, - (90, 30): TxRollback, - (90, 10): TxSelect, - (85, 11): ConfirmSelectOk, - (10, 61): ConnectionUnblocked, - (50, 11): QueueDeclareOk, - (60, 70): BasicGet, - (90, 11): TxSelectOk, (10, 30): ConnectionTune, - (60, 11): BasicQosOk, - (60, 80): BasicAck, - (20, 21): ChannelFlowOk, - (60, 60): BasicDeliver, - (90, 31): TxRollbackOk, - (60, 20): BasicConsume, - (85, 10): ConfirmSelect, - (20, 40): ChannelClose, - (60, 71): BasicGetOk, - (50, 30): QueuePurge, (10, 31): ConnectionTuneOk, - (10, 40): ConnectionOpen, - (60, 30): BasicCancel, - (50, 50): QueueUnbind, - (40, 10): ExchangeDeclare, - (10, 50): ConnectionClose, - (20, 10): ChannelOpen, + (10, 61): ConnectionUnblocked, + (20, 40): ChannelClose, (20, 41): ChannelCloseOk, - (60, 110): BasicRecover, - (60, 90): BasicReject, - (50, 31): QueuePurgeOk, - (50, 40): QueueDelete, - (40, 20): ExchangeDelete, - (50, 20): QueueBind, - (10, 41): ConnectionOpenOk, - (60, 120): BasicNack, - (60, 31): BasicCancelOk, - (90, 20): TxCommit, - (10, 10): ConnectionStart, - (60, 10): BasicQos, - (40, 11): ExchangeDeclareOk, - (10, 51): ConnectionCloseOk, - (40, 51): ExchangeUnbindOk, + (20, 20): ChannelFlow, + (20, 21): ChannelFlowOk, + (20, 10): ChannelOpen, (20, 11): ChannelOpenOk, - (60, 72): BasicGetEmpty, (40, 30): ExchangeBind, - (60, 111): BasicRecoverOk, + (40, 31): ExchangeBindOk, + (40, 10): ExchangeDeclare, + (40, 20): ExchangeDelete, + (40, 11): ExchangeDeclareOk, + (40, 21): ExchangeDeleteOk, (40, 40): ExchangeUnbind, - (10, 20): ConnectionSecure, - (50, 41): QueueDeleteOk, - (50, 51): QueueUnbindOk, + (40, 51): ExchangeUnbindOk, + (50, 20): QueueBind, (50, 21): QueueBindOk, - (10, 60): ConnectionBlocked, (50, 10): QueueDeclare, + (50, 40): QueueDelete, + (50, 11): QueueDeclareOk, + (50, 41): QueueDeleteOk, + (50, 30): QueuePurge, + (50, 31): QueuePurgeOk, + (50, 50): QueueUnbind, + (50, 51): QueueUnbindOk, + (60, 80): BasicAck, + (60, 20): BasicConsume, + (60, 30): BasicCancel, + (60, 21): BasicConsumeOk, + (60, 31): BasicCancelOk, + (60, 60): BasicDeliver, + (60, 70): BasicGet, + (60, 71): BasicGetOk, + (60, 72): BasicGetEmpty, + (60, 120): BasicNack, + (60, 40): BasicPublish, + (60, 10): BasicQos, + (60, 11): BasicQosOk, + (60, 50): BasicReturn, + (60, 90): BasicReject, + (60, 100): BasicRecoverAsync, + (60, 110): BasicRecover, + (60, 111): BasicRecoverOk, + (90, 20): TxCommit, + (90, 21): TxCommitOk, + (90, 30): TxRollback, + (90, 31): TxRollbackOk, + (90, 10): TxSelect, + (90, 11): TxSelectOk, + (85, 10): ConfirmSelect, + (85, 11): ConfirmSelectOk, } BINARY_HEADER_TO_METHOD = { - b'\x00\x5A\x00\x15': TxCommitOk, - b'\x00\x3C\x00\x64': BasicRecoverAsync, + b'\x00\x0A\x00\x3C': ConnectionBlocked, + b'\x00\x0A\x00\x32': ConnectionClose, + b'\x00\x0A\x00\x33': ConnectionCloseOk, + b'\x00\x0A\x00\x28': ConnectionOpen, + b'\x00\x0A\x00\x29': ConnectionOpenOk, + b'\x00\x0A\x00\x0A': ConnectionStart, + b'\x00\x0A\x00\x14': ConnectionSecure, b'\x00\x0A\x00\x0B': ConnectionStartOk, - b'\x00\x3C\x00\x28': BasicPublish, - b'\x00\x3C\x00\x32': BasicReturn, - b'\x00\x28\x00\x15': ExchangeDeleteOk, - b'\x00\x14\x00\x14': ChannelFlow, - b'\x00\x28\x00\x1F': ExchangeBindOk, - b'\x00\x3C\x00\x15': BasicConsumeOk, b'\x00\x0A\x00\x15': ConnectionSecureOk, - b'\x00\x5A\x00\x1E': TxRollback, - b'\x00\x5A\x00\x0A': TxSelect, - b'\x00\x55\x00\x0B': ConfirmSelectOk, - b'\x00\x0A\x00\x3D': ConnectionUnblocked, - b'\x00\x32\x00\x0B': QueueDeclareOk, - b'\x00\x3C\x00\x46': BasicGet, - b'\x00\x5A\x00\x0B': TxSelectOk, b'\x00\x0A\x00\x1E': ConnectionTune, - b'\x00\x3C\x00\x0B': BasicQosOk, - b'\x00\x3C\x00\x50': BasicAck, - b'\x00\x14\x00\x15': ChannelFlowOk, - b'\x00\x3C\x00\x3C': BasicDeliver, - b'\x00\x5A\x00\x1F': TxRollbackOk, - b'\x00\x3C\x00\x14': BasicConsume, - b'\x00\x55\x00\x0A': ConfirmSelect, - b'\x00\x14\x00\x28': ChannelClose, - b'\x00\x3C\x00\x47': BasicGetOk, - b'\x00\x32\x00\x1E': QueuePurge, b'\x00\x0A\x00\x1F': ConnectionTuneOk, - b'\x00\x0A\x00\x28': ConnectionOpen, - b'\x00\x3C\x00\x1E': BasicCancel, - b'\x00\x32\x00\x32': QueueUnbind, - b'\x00\x28\x00\x0A': ExchangeDeclare, - b'\x00\x0A\x00\x32': ConnectionClose, - b'\x00\x14\x00\x0A': ChannelOpen, + b'\x00\x0A\x00\x3D': ConnectionUnblocked, + b'\x00\x14\x00\x28': ChannelClose, b'\x00\x14\x00\x29': ChannelCloseOk, - b'\x00\x3C\x00\x6E': BasicRecover, - b'\x00\x3C\x00\x5A': BasicReject, - b'\x00\x32\x00\x1F': QueuePurgeOk, - b'\x00\x32\x00\x28': QueueDelete, - b'\x00\x28\x00\x14': ExchangeDelete, - b'\x00\x32\x00\x14': QueueBind, - b'\x00\x0A\x00\x29': ConnectionOpenOk, - b'\x00\x3C\x00\x78': BasicNack, - b'\x00\x3C\x00\x1F': BasicCancelOk, - b'\x00\x5A\x00\x14': TxCommit, - b'\x00\x0A\x00\x0A': ConnectionStart, - b'\x00\x3C\x00\x0A': BasicQos, - b'\x00\x28\x00\x0B': ExchangeDeclareOk, - b'\x00\x0A\x00\x33': ConnectionCloseOk, - b'\x00\x28\x00\x33': ExchangeUnbindOk, + b'\x00\x14\x00\x14': ChannelFlow, + b'\x00\x14\x00\x15': ChannelFlowOk, + b'\x00\x14\x00\x0A': ChannelOpen, b'\x00\x14\x00\x0B': ChannelOpenOk, - b'\x00\x3C\x00\x48': BasicGetEmpty, b'\x00\x28\x00\x1E': ExchangeBind, - b'\x00\x3C\x00\x6F': BasicRecoverOk, + b'\x00\x28\x00\x1F': ExchangeBindOk, + b'\x00\x28\x00\x0A': ExchangeDeclare, + b'\x00\x28\x00\x14': ExchangeDelete, + b'\x00\x28\x00\x0B': ExchangeDeclareOk, + b'\x00\x28\x00\x15': ExchangeDeleteOk, b'\x00\x28\x00\x28': ExchangeUnbind, - b'\x00\x0A\x00\x14': ConnectionSecure, - b'\x00\x32\x00\x29': QueueDeleteOk, - b'\x00\x32\x00\x33': QueueUnbindOk, + b'\x00\x28\x00\x33': ExchangeUnbindOk, + b'\x00\x32\x00\x14': QueueBind, b'\x00\x32\x00\x15': QueueBindOk, - b'\x00\x0A\x00\x3C': ConnectionBlocked, b'\x00\x32\x00\x0A': QueueDeclare, + b'\x00\x32\x00\x28': QueueDelete, + b'\x00\x32\x00\x0B': QueueDeclareOk, + b'\x00\x32\x00\x29': QueueDeleteOk, + b'\x00\x32\x00\x1E': QueuePurge, + b'\x00\x32\x00\x1F': QueuePurgeOk, + b'\x00\x32\x00\x32': QueueUnbind, + b'\x00\x32\x00\x33': QueueUnbindOk, + b'\x00\x3C\x00\x50': BasicAck, + b'\x00\x3C\x00\x14': BasicConsume, + b'\x00\x3C\x00\x1E': BasicCancel, + b'\x00\x3C\x00\x15': BasicConsumeOk, + b'\x00\x3C\x00\x1F': BasicCancelOk, + b'\x00\x3C\x00\x3C': BasicDeliver, + b'\x00\x3C\x00\x46': BasicGet, + b'\x00\x3C\x00\x47': BasicGetOk, + b'\x00\x3C\x00\x48': BasicGetEmpty, + b'\x00\x3C\x00\x78': BasicNack, + b'\x00\x3C\x00\x28': BasicPublish, + b'\x00\x3C\x00\x0A': BasicQos, + b'\x00\x3C\x00\x0B': BasicQosOk, + b'\x00\x3C\x00\x32': BasicReturn, + b'\x00\x3C\x00\x5A': BasicReject, + b'\x00\x3C\x00\x64': BasicRecoverAsync, + b'\x00\x3C\x00\x6E': BasicRecover, + b'\x00\x3C\x00\x6F': BasicRecoverOk, + b'\x00\x5A\x00\x14': TxCommit, + b'\x00\x5A\x00\x15': TxCommitOk, + b'\x00\x5A\x00\x1E': TxRollback, + b'\x00\x5A\x00\x1F': TxRollbackOk, + b'\x00\x5A\x00\x0A': TxSelect, + b'\x00\x5A\x00\x0B': TxSelectOk, + b'\x00\x55\x00\x0A': ConfirmSelect, + b'\x00\x55\x00\x0B': ConfirmSelectOk, } @@ -4264,98 +4698,98 @@ CLASS_ID_TO_CONTENT_PROPERTY_LIST = { # if a method is NOT a reply, it will not be in this dict # a method may be a reply for AT MOST one method REPLY_REASONS_FOR = { - BasicGetEmpty: BasicGet, - BasicGetOk: BasicGet, + ConnectionCloseOk: ConnectionClose, + ConnectionOpenOk: ConnectionOpen, + ConnectionStartOk: ConnectionStart, + ConnectionSecureOk: ConnectionSecure, + ConnectionTuneOk: ConnectionTune, + ChannelCloseOk: ChannelClose, + ChannelFlowOk: ChannelFlow, + ChannelOpenOk: ChannelOpen, + ExchangeBindOk: ExchangeBind, + ExchangeDeclareOk: ExchangeDeclare, ExchangeDeleteOk: ExchangeDelete, - TxSelectOk: TxSelect, + ExchangeUnbindOk: ExchangeUnbind, QueueBindOk: QueueBind, + QueueDeclareOk: QueueDeclare, + QueueDeleteOk: QueueDelete, + QueuePurgeOk: QueuePurge, + QueueUnbindOk: QueueUnbind, BasicConsumeOk: BasicConsume, BasicCancelOk: BasicCancel, - TxRollbackOk: TxRollback, - TxCommitOk: TxCommit, - ChannelOpenOk: ChannelOpen, - QueueDeleteOk: QueueDelete, - ExchangeUnbindOk: ExchangeUnbind, - ExchangeBindOk: ExchangeBind, - ChannelCloseOk: ChannelClose, + BasicGetOk: BasicGet, + BasicGetEmpty: BasicGet, BasicQosOk: BasicQos, - ConnectionStartOk: ConnectionStart, - QueueUnbindOk: QueueUnbind, + TxCommitOk: TxCommit, + TxRollbackOk: TxRollback, + TxSelectOk: TxSelect, ConfirmSelectOk: ConfirmSelect, - ConnectionCloseOk: ConnectionClose, - QueuePurgeOk: QueuePurge, - QueueDeclareOk: QueueDeclare, - ExchangeDeclareOk: ExchangeDeclare, - ConnectionTuneOk: ConnectionTune, - ConnectionSecureOk: ConnectionSecure, - ConnectionOpenOk: ConnectionOpen, - ChannelFlowOk: ChannelFlow, } # Methods that are replies for other, ie. ConnectionOpenOk: ConnectionOpen # a method may be a reply for ONE or NONE other methods # if a method has no replies, it will have an empty list as value here -REPLIES_FOR= { - BasicGetEmpty: [], - BasicRecoverOk: [], - BasicReturn: [], - QueueDeclare: [QueueDeclareOk], - BasicGetOk: [], +REPLIES_FOR = { + ConnectionBlocked: [], + ConnectionClose: [ConnectionCloseOk], + ConnectionCloseOk: [], + ConnectionOpen: [ConnectionOpenOk], + ConnectionOpenOk: [], + ConnectionStart: [ConnectionStartOk], ConnectionSecure: [ConnectionSecureOk], + ConnectionStartOk: [], + ConnectionSecureOk: [], ConnectionTune: [ConnectionTuneOk], - TxRollback: [TxRollbackOk], - TxSelectOk: [], - QueueBindOk: [], + ConnectionTuneOk: [], + ConnectionUnblocked: [], + ChannelClose: [ChannelCloseOk], + ChannelCloseOk: [], ChannelFlow: [ChannelFlowOk], - BasicConsumeOk: [], - BasicConsume: [BasicConsumeOk], - BasicRecover: [], - BasicCancelOk: [], - ConfirmSelect: [ConfirmSelectOk], - BasicGet: [BasicGetOk, BasicGetEmpty], - TxRollbackOk: [], - QueueBind: [QueueBindOk], - ExchangeDelete: [ExchangeDeleteOk], - BasicAck: [], - ConnectionClose: [ConnectionCloseOk], + ChannelFlowOk: [], + ChannelOpen: [ChannelOpenOk], ChannelOpenOk: [], - QueueDeleteOk: [], - ExchangeUnbindOk: [], - ConnectionStart: [ConnectionStartOk], - BasicQos: [BasicQosOk], - QueueUnbind: [QueueUnbindOk], - BasicQosOk: [], - BasicReject: [], + ExchangeBind: [ExchangeBindOk], ExchangeBindOk: [], - ChannelCloseOk: [], ExchangeDeclare: [ExchangeDeclareOk], - ConnectionBlocked: [], - BasicPublish: [], - ExchangeUnbind: [ExchangeUnbindOk], + ExchangeDelete: [ExchangeDeleteOk], + ExchangeDeclareOk: [], ExchangeDeleteOk: [], - BasicNack: [], - ConnectionStartOk: [], - ExchangeBind: [ExchangeBindOk], + ExchangeUnbind: [ExchangeUnbindOk], + ExchangeUnbindOk: [], + QueueBind: [QueueBindOk], + QueueBindOk: [], + QueueDeclare: [QueueDeclareOk], QueueDelete: [QueueDeleteOk], - ConfirmSelectOk: [], - ConnectionCloseOk: [], + QueueDeclareOk: [], + QueueDeleteOk: [], QueuePurge: [QueuePurgeOk], - QueueUnbindOk: [], - ChannelOpen: [ChannelOpenOk], - ChannelClose: [ChannelCloseOk], QueuePurgeOk: [], - QueueDeclareOk: [], + QueueUnbind: [QueueUnbindOk], + QueueUnbindOk: [], + BasicAck: [], + BasicConsume: [BasicConsumeOk], BasicCancel: [BasicCancelOk], - ExchangeDeclareOk: [], - TxCommitOk: [], - ConnectionTuneOk: [], - ConnectionSecureOk: [], - ConnectionUnblocked: [], - ConnectionOpenOk: [], - ChannelFlowOk: [], - BasicRecoverAsync: [], - TxSelect: [TxSelectOk], + BasicConsumeOk: [], + BasicCancelOk: [], BasicDeliver: [], + BasicGet: [BasicGetOk, BasicGetEmpty], + BasicGetOk: [], + BasicGetEmpty: [], + BasicNack: [], + BasicPublish: [], + BasicQos: [BasicQosOk], + BasicQosOk: [], + BasicReturn: [], + BasicReject: [], + BasicRecoverAsync: [], + BasicRecover: [], + BasicRecoverOk: [], TxCommit: [TxCommitOk], - ConnectionOpen: [ConnectionOpenOk], + TxCommitOk: [], + TxRollback: [TxRollbackOk], + TxRollbackOk: [], + TxSelect: [TxSelectOk], + TxSelectOk: [], + ConfirmSelect: [ConfirmSelectOk], + ConfirmSelectOk: [], } diff --git a/coolamqp/framing/field_table.py b/coolamqp/framing/field_table.py index a8188590d4558eb4b4d5d316a2edc15a1642630b..98ffe811f180a94e777582a43dba673883b4838a 100644 --- a/coolamqp/framing/field_table.py +++ b/coolamqp/framing/field_table.py @@ -1,5 +1,6 @@ # coding=UTF-8 from __future__ import print_function, division, absolute_import + """ That funny type, field-table... @@ -67,6 +68,7 @@ def enframe_longstr(buf, value): def _c2none(buf, v): return None + FIELD_TYPES = { # length, struct, (option)to_bytes (callable(buffer, value)), # (option)from_bytes (callable(buffer, offset) -> diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index 688c28ffac5825180133b6d1586770ac24972089..fbecd1e2700d82f890eff2fe532d44798986fa5d 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -2,10 +2,13 @@ from __future__ import absolute_import, division, print_function import logging import collections +import monotonic +import uuid import time import socket import six +from coolamqp.exceptions import ConnectionDead from coolamqp.uplink.connection.recv_framer import ReceivingFramer from coolamqp.uplink.connection.send_framer import SendingFramer from coolamqp.framing.frames import AMQPMethodFrame @@ -19,6 +22,42 @@ from coolamqp.objects import Callable logger = logging.getLogger(__name__) +def alert_watches(watches, trigger): + """ + Notify all watches in this collection. + + Return a list of alive watches. + :param watches: list of Watch + :return: tuple of (list of Watch, bool - was any watch fired?) + """ + watch_handled = False + alive_watches = [] + while len(watches) > 0: + watch = watches.pop() + + if watch.cancelled: + continue + + watch_triggered = watch.is_triggered_by(trigger) + watch_handled |= watch_triggered + + if watch.cancelled: + continue + + if not any((watch_triggered, watch.oneshot, watch.cancelled)): + # Watch remains alive if it was NOT triggered, or it's NOT a oneshot or it's not cancelled + alive_watches.append(watch) + elif not watch.oneshot and not watch.cancelled: + alive_watches.append(watch) + elif watch.oneshot and not watch_triggered: + alive_watches.append(watch) + + if set(alive_watches) != set(watches): + for removed_watch in set(watches)-set(alive_watches): + logger.debug('Removing watch %s', repr(removed_watch)) + return alive_watches, watch_handled + + class Connection(object): """ An object that manages a connection in a comprehensive way. @@ -50,7 +89,7 @@ class Connection(object): """ self.listener_thread = listener_thread self.node_definition = node_definition - + self.uuid = uuid.uuid4().hex[:5] self.recvf = ReceivingFramer(self.on_frame) # todo a list doesn't seem like a very strong atomicity guarantee @@ -70,6 +109,10 @@ class Connection(object): self.heartbeat = None self.extensions = [] + # To be filled in later + self.listener_socket = None + self.sendf = None + def call_on_connected(self, callable): """ Register a callable to be called when this links to the server. @@ -87,14 +130,14 @@ class Connection(object): def on_connected(self): """Called by handshaker upon reception of final connection.open-ok""" - logger.info('Connection ready.') + logger.info('[%s] Connection ready.', self.uuid) self.state = ST_ONLINE while len(self.callables_on_connected) > 0: self.callables_on_connected.pop()() - def start(self): + def start(self, timeout): """ Start processing events for this connect. Create the socket, transmit 'AMQP\x00\x00\x09\x01' and roll. @@ -103,17 +146,19 @@ class Connection(object): """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - + start_at = monotonic.monotonic() while True: try: sock.connect( (self.node_definition.host, self.node_definition.port)) except socket.error as e: time.sleep(0.5) # Connection refused? Very bad things? + if monotonic.monotonic() - start_at > timeout: + raise ConnectionDead() else: break - logger.debug('TCP connection established, authentication in progress') + logger.debug('[%s] TCP connection established, authentication in progress', self.uuid) sock.settimeout(0) sock.send(b'AMQP\x00\x00\x09\x01') @@ -172,7 +217,8 @@ class Connection(object): if isinstance(payload, ConnectionClose): self.send([AMQPMethodFrame(0, ConnectionCloseOk())]) - logger.info(u'Broker closed our connection - code %s reason %s', + logger.info(u'[%s] Broker closed our connection - code %s reason %s', + self.uuid, payload.reply_code, payload.reply_text.tobytes().decode('utf8')) @@ -212,7 +258,7 @@ class Connection(object): watch_handled = False # True if ANY watch handled this if isinstance(frame, AMQPMethodFrame): - logger.debug('Received %s', frame.payload.NAME) + logger.debug('[%s] Received %s', self.uuid, frame.payload.NAME) # ==================== process per-channel watches # @@ -222,25 +268,8 @@ class Connection(object): watches = self.watches[frame.channel] # a list self.watches[frame.channel] = [] - alive_watches = [] - while len(watches) > 0: - watch = watches.pop() - - if watch.cancelled: - # print('watch',watch,'was cancelled') - continue - - watch_triggered = watch.is_triggered_by(frame) - watch_handled |= watch_triggered - - if watch.cancelled: - # print('watch',watch,'was cancelled') - continue - - if ((not watch_triggered) or (not watch.oneshot)) and ( - not watch.cancelled): - # Watch remains alive if it was NOT triggered, or it's NOT a oneshot - alive_watches.append(watch) + alive_watches, f = alert_watches(watches, frame) + watch_handled |= f if frame.channel in self.watches: # unwatch_all might have gotten called, check that @@ -248,33 +277,20 @@ class Connection(object): self.watches[frame.channel].append(watch) # ==================== process "any" watches - alive_watches = [] any_watches = self.any_watches self.any_watches = [] - while len(any_watches): - watch = any_watches.pop() - - if watch.cancelled: - # print('any watch', watch, 'was cancelled') - continue - - watch_triggered = watch.is_triggered_by(frame) - watch_handled |= watch_triggered + alive_watches, f = alert_watches(any_watches, frame) - if watch.cancelled: - # print('any watch', watch, 'was cancelled') - continue - - if ((not watch_triggered) or (not watch.oneshot)) and ( - not watch.cancelled): - # Watch remains alive if it was NOT triggered, or it's NOT a oneshot - alive_watches.append(watch) + watch_handled |= f for watch in alive_watches: self.any_watches.append(watch) if not watch_handled: - logger.warn('Unhandled frame %s', frame) + if isinstance(frame, AMQPMethodFrame): + logger.warning('[%s] Unhandled method frame %s', self.uuid, repr(frame.payload)) + else: + logger.warning('[%s] Unhandled frame %s', self.uuid, frame) def watchdog(self, delay, callback): """ diff --git a/coolamqp/uplink/connection/recv_framer.py b/coolamqp/uplink/connection/recv_framer.py index 946e25342eda4a2bb141e0b208b91787bab39387..afee76b57738d511a2d54e55f6a70ee76fa630f0 100644 --- a/coolamqp/uplink/connection/recv_framer.py +++ b/coolamqp/uplink/connection/recv_framer.py @@ -18,7 +18,6 @@ FRAME_TYPES = { FRAME_METHOD: AMQPMethodFrame } - ordpy2 = ord if six.PY2 else lambda x: x @@ -81,7 +80,7 @@ class ReceivingFramer(object): """ assert self.total_data_len >= up_to, \ 'Tried to extract %s but %s remaining' % ( - up_to, self.total_data_len) + up_to, self.total_data_len) if up_to >= len(self.chunks[0]): q = self.chunks.popleft() else: @@ -93,7 +92,6 @@ class ReceivingFramer(object): len(q), up_to) return q - def _statemachine(self): # state rule 1 if self.frame_type is None and self.total_data_len > 0: @@ -125,7 +123,7 @@ class ReceivingFramer(object): # state rule 3 elif (self.frame_type != FRAME_HEARTBEAT) and ( self.frame_type is not None) and ( - self.frame_size is None) and ( + self.frame_size is None) and ( self.total_data_len > 6): hdr = b'' while len(hdr) < 6: diff --git a/coolamqp/uplink/connection/watches.py b/coolamqp/uplink/connection/watches.py index 3f7893f344a44c7516f13291b808da93868945ae..33f6eae2ffce2214ff145b8c3652e7ec9bac32f4 100644 --- a/coolamqp/uplink/connection/watches.py +++ b/coolamqp/uplink/connection/watches.py @@ -1,8 +1,12 @@ # coding=UTF-8 from __future__ import absolute_import, division, print_function +import logging from coolamqp.framing.frames import AMQPMethodFrame, AMQPHeaderFrame, \ AMQPBodyFrame +from coolamqp.framing.base import AMQPMethodPayload + +logger = logging.getLogger(__name__) class Watch(object): @@ -123,10 +127,13 @@ class MethodWatch(Watch): self.callable = callable if isinstance(method_or_methods, (list, tuple)): self.methods = tuple(method_or_methods) - else: - self.methods = method_or_methods + elif issubclass(method_or_methods, AMQPMethodPayload): + self.methods = (method_or_methods, ) self.on_end = on_end + def __repr__(self): + return '<MethodWatch %s, %s, %s, on_end=%s>' % (self.channel, self.methods, self.callable, self.on_end) + def failed(self): if self.on_end is not None: self.on_end() diff --git a/setup.cfg b/setup.cfg index 727a1befa4a34eb6ac43dbd06bf8f3203154e3e3..def63c4b5c9ae3a9b6343912319aaba9589f1f6a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,13 +1,11 @@ [metadata] description-file = README.md name = CoolAMQP -version = 0.93 +version = 0.95 license = MIT License classifiers = Programming Language :: Python Programming Language :: Python :: 2.7 - Programming Language :: Python :: 3.3 - Programming Language :: Python :: 3.4 Programming Language :: Python :: 3.5 Programming Language :: Python :: Implementation :: CPython Programming Language :: Python :: Implementation :: PyPy @@ -18,8 +16,8 @@ classifiers = Intended Audience :: Developers Topic :: Software Development :: Libraries :: Python Modules description = Very fast pure-Python AMQP client -author = DMS Serwis s.c. -author_email = piotrm@dms-serwis.pl +author = SMOK sp. z o.o. +author_email = pmaslanka@smok.co url = https://github.com/smok-serwis/coolamqp platforms = posix @@ -31,7 +29,7 @@ max-line-length=80 universal=1 [nosetests] -verbosity=1 +verbosity=3 detailed-errors=1 with-coverage=1 diff --git a/tests/test_clustering/test_double.py b/tests/test_clustering/test_double.py index c6ae6beaf8c9d5886fef2c2969b8a03365a86b1a..b5dd6d11aef06b998940be6dcb42b786746cbcee 100644 --- a/tests/test_clustering/test_double.py +++ b/tests/test_clustering/test_double.py @@ -60,7 +60,7 @@ class TestDouble(unittest.TestCase): try: con2, fut2 = self.c2.consume(q, fail_on_first_time_resource_locked=True) - fut2.result() + fut2.result(timeout=20) except AMQPError as e: self.assertEquals(e.reply_code, RESOURCE_LOCKED) self.assertFalse(e.is_hard_error())