From 76971ed78946a60b06da73b034e16220eb30cb6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sat, 28 Mar 2020 01:12:45 +0100 Subject: [PATCH] Version 1.0 (#46) * do it fastah! * fix code generation * use fixed struct in other places as well * __slots__ will finally work in these classes * fix docs * fix docs * optionally Protocol * optional slots for FASTER!!! * more __slots__ * added more __slots__ * logger.warning * fix __slots__ * fix locals in content_property.py * fix __slots__ * add __slots__ to commonly created classes * fix __slots__ * add __slots__ to event objects * add more __slots__ * fix on-the-fly compilation * removed import struct from on-the-fly generated code * make compliant with python 3.5 * remove logging in on-the-fly compilation * 1.0 --- CHANGELOG.md | 5 +- compile_definitions/__main__.py | 16 +- coolamqp/__init__.py | 2 +- coolamqp/attaches/channeler.py | 2 + coolamqp/attaches/consumer.py | 10 +- coolamqp/attaches/declarer.py | 1 + coolamqp/attaches/publisher.py | 1 - coolamqp/attaches/utils.py | 3 + coolamqp/clustering/cluster.py | 50 +-- coolamqp/clustering/events.py | 7 + coolamqp/framing/base.py | 5 + .../framing/compilation/content_property.py | 38 +- .../framing/compilation/textcode_fields.py | 32 +- coolamqp/framing/definitions.py | 362 +++++++++--------- coolamqp/framing/frames.py | 22 +- coolamqp/objects.py | 120 +++--- setup.py | 1 - tests/Dockerfile | 2 +- tests/test_clustering/test_a.py | 2 +- 19 files changed, 390 insertions(+), 291 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e9c1a5a..05f7a92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ -# v0.107: +# v1.0: -* _TBA_ +* first solid API release +* improvements for more speed # v0.106: diff --git a/compile_definitions/__main__.py b/compile_definitions/__main__.py index c4226af..3097f68 100644 --- a/compile_definitions/__main__.py +++ b/compile_definitions/__main__.py @@ -118,6 +118,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved line('\n%sS = [%s]', pythonify_name(constant_kind), u', '.join(constants)) + structers = {} + # get domains domain_to_basic_type = {} line('\n\n\nDOMAIN_TO_BASIC_TYPE = {\n') @@ -489,7 +491,9 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved from coolamqp.framing.compilation.textcode_fields import \ get_serializer, get_counter, get_from_buffer line('\n def write_arguments(self, buf): # type: (tp.BinaryIO) -> None\n') - line(get_serializer(method.fields, 'self.', 2)) + line_, new_structers = get_serializer(method.fields, 'self.', 2) + line(line_) + structers.update(new_structers) line(' def get_size(self): # type: () -> int\n') line(get_counter(method.fields, 'self.', 2)) @@ -499,8 +503,10 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved offset = start_offset ''', full_class_name) - line(get_from_buffer(method.fields, '', 2, - remark=(method.name == 'deliver'))) + line_, new_structers = get_from_buffer(method.fields, '', 2, + remark=(method.name == 'deliver')) + line(line_) + structers.update(new_structers) line(" return cls(%s)", u', '.join( @@ -549,6 +555,10 @@ REPLIES_FOR = {\n''') line(u' %s: [%s],\n' % (k, u', '.join(map(str, v)))) line(u'}\n') + # Output structers + for structer in structers: + line(u'STRUCT_%s = struct.Struct("!%s")\n' % (structer, structer)) + out.close() diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 7d41060..5eb7dae 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,2 +1,2 @@ # coding=UTF-8 -__version__ = '0.107rc1' +__version__ = '1.0' diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index b2c9ac7..b25a371 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -24,6 +24,7 @@ class Attache(object): """ Something that can be attached to connection. """ + __slots__ = ('cancelled', 'state', 'connection') def __init__(self): self.cancelled = False #: public, if this is True, it won't be attached to next connection @@ -62,6 +63,7 @@ class Channeler(Attache): but ordering it to do anything is pointless, because it will not get done until attach() with new connection is called. """ + __slots__ = ('channel_id', ) def __init__(self): """ diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 9a921b3..6f9ae38 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -108,6 +108,12 @@ class Consumer(Channeler): has a performance impact :type body_receive_mode: a property of BodyReceiveMode """ + __slots__ = ('queue', 'no_ack', 'on_message', 'cancelled', 'receiver', + 'attache_group', 'channel_close_sent', 'qos', 'qos_update_sent', + 'future_to_notify', 'future_to_notify_on_dead', + 'fail_on_first_time_resource_locked', 'cancel_on_failure', + 'body_receive_mode', 'consumer_tag', 'on_cancel', 'on_broker_cancel', + 'hb_watch', 'deliver_watch') def __init__(self, queue, on_message, no_ack=True, qos=None, cancel_on_failure=False, @@ -127,8 +133,6 @@ class Consumer(Channeler): self.on_message = on_message - # private - self.cancelled = False # did the client want to STOP using this # consumer? self.receiver = None # MessageReceiver instance @@ -469,6 +473,8 @@ class MessageReceiver(object): and may opt to kill the connection on bad framing with self.consumer.connection.send(None) """ + __slots__ = ('consumer', 'state', 'bdeliver', 'header', 'body', 'data_to_go', + 'message_size', 'offset', 'acks_pending', 'recv_mode') def __init__(self, consumer): # type: (Consumer) -> None self.consumer = consumer diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index 21511a0..7d6e95d 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -29,6 +29,7 @@ class Operation(object): This will register it's own callback. Please, call on_connection_dead when connection is broken to fail futures with ConnectionDead, since this object does not watch for Fails """ + __slots__ = ('done', 'fut', 'declarer', 'obj', 'on_done') def __init__(self, declarer, obj, fut=None): self.done = False diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index a3f1aa0..9f37782 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -77,7 +77,6 @@ class Publisher(Channeler, Synchronized): # todo add fallback using plain AMQP transactions - this will remove UnusablePublisher and stuff - class UnusablePublisher(Exception): """This publisher will never work (eg. MODE_CNPUB on a broker not supporting publisher confirms)""" diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py index 2fc9f66..36ca346 100644 --- a/coolamqp/attaches/utils.py +++ b/coolamqp/attaches/utils.py @@ -13,6 +13,7 @@ class ConfirmableRejectable(object): Protocol for objects put into AtomicTagger. You need not subclass it, just support this protocol. """ + __slots__ = () def confirm(self): # type: () -> None """ @@ -32,6 +33,7 @@ class FutureConfirmableRejectable(ConfirmableRejectable): A ConfirmableRejectable that can result a future (with None), or Exception it with a message """ + __slots__ = ('future', ) def __init__(self, future): # type: (concurrent.futures.Future) -> None self.future = future @@ -77,6 +79,7 @@ class AtomicTagger(object): This has to be fast for most common cases. Corner cases will be resolved correctly, but maybe not fast. """ + __slots__ = ('lock', 'next_tag', 'tags') def __init__(self): self.lock = threading.RLock() diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 07e6fa8..201384c 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -6,19 +6,19 @@ from __future__ import print_function, absolute_import, division import logging import time -import typing as tp import warnings -from concurrent.futures import Future import monotonic import six +import typing as tp +from concurrent.futures import Future from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ NothingMuch, Event from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.exceptions import ConnectionDead -from coolamqp.objects import Exchange +from coolamqp.objects import Exchange, Message, Queue, FrameLogger from coolamqp.uplink import ListenerThread logger = logging.getLogger(__name__) @@ -37,15 +37,12 @@ class Cluster(object): It is not safe to fork() after .start() is called, but it's OK before. :param nodes: list of nodes, or a single node. For now, only one is supported. - :type nodes: NodeDefinition instance or a list of NodeDefinition instances :param on_fail: callable/0 to call when connection fails in an unclean way. This is a one-shot - :type on_fail: callable/0 :param extra_properties: refer to documentation in [/coolamqp/connection/connection.py] Connection.__init__ - :param log_frames: an object that will have it's method .on_frame(timestamp, - frame, direction) called upon receiving/sending a frame. Timestamp is UNIX timestamp, - frame is AMQPFrame, direction is one of 'to_client', 'to_server' + :param log_frames: an object that supports logging each and every frame CoolAMQP sends and + receives from the broker :param name: name to appear in log items and prctl() for the listener thread """ @@ -53,7 +50,12 @@ class Cluster(object): ST_LINK_LOST = 0 # Link has been lost ST_LINK_REGAINED = 1 # Link has been regained - def __init__(self, nodes, on_fail=None, extra_properties=None, log_frames=None, name=None): + def __init__(self, nodes, # type: tp.Union[NodeDefinition, tp.List[NodeDefinition]] + on_fail=None, # type: tp.Optional[tp.Callable[[], None]] + extra_properties=None, # type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]] + log_frames=None, # type: tp.Optional[FrameLogger] + name=None # type: tp.Optional[str] + ): from coolamqp.objects import NodeDefinition if isinstance(nodes, NodeDefinition): nodes = [nodes] @@ -75,9 +77,9 @@ class Cluster(object): else: self.on_fail = None - def declare(self, obj, persistent=False): - # type: (tp.Union[coolamqp.objects.Queue, coolamqp.objects.Exchange], bool) -> - # concurrent.futures.Future + def declare(self, obj, # type: tp.Union[Queue, Exchange] + persistent=False # type: bool + ): # type: (...) -> concurrent.futures.Future """ Declare a Queue/Exchange @@ -103,6 +105,7 @@ class Cluster(object): return THE_POPE_OF_NOPE def consume(self, queue, on_message=None, *args, **kwargs): + # type: (Queue, tp.Callable[[MessageReceived], None] -> tp.Tuple[Consumer, Future] """ Start consuming from a queue. @@ -115,7 +118,6 @@ class Cluster(object): Note that name of anonymous queue might change at any time! :param on_message: callable that will process incoming messages if you leave it at None, messages will be .put into self.events - :type on_message: callable(ReceivedMessage instance) or None :return: a tuple (Consumer instance, and a Future), that tells, when consumer is ready """ fut = Future() @@ -127,7 +129,7 @@ class Cluster(object): self.attache_group.add(con) return con, fut - def delete_queue(self, queue): # type: (coolamqp.objects.Queue) -> concurrent.futures.Future + def delete_queue(self, queue): # type: (coolamqp.objects.Queue) -> Future """ Delete a queue. @@ -136,26 +138,25 @@ class Cluster(object): """ return self.decl.delete_queue(queue) - def publish(self, message, exchange=None, routing_key=u'', tx=None, - confirm=None): + def publish(self, message, # type: Message + exchange=None, # type: tp.Union[Exchange, str, bytes] + routing_key=u'', # type: tp.Union[str, bytes] + tx=None, # type: tp.Optional[bool] + confirm=None # type: tp.Optional[bool] + ): # type: (...) -> tp.Optional[Future] """ Publish a message. :param message: Message to publish - :type message: coolamqp.objects.Message :param exchange: exchange to use. Default is the "direct" empty-name exchange. - :type exchange: unicode/bytes (exchange name) or Exchange object. :param routing_key: routing key to use - :type routing_key: tp.Union[str, bytes] :param confirm: Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. Note that if tx if False, and message cannot be delivered to broker at once, it will be discarded - :type confirm: tp.Optional[bool] :param tx: deprecated, alias for confirm - :type tx: tp.Optional[bool] - :return: Future or None + :return: Future to be finished on completion or None, is confirm/tx was not chosen """ if isinstance(exchange, Exchange): exchange = exchange.name.encode('utf8') @@ -206,7 +207,7 @@ class Cluster(object): except AttributeError: pass else: - raise RuntimeError(u'[%s] This was already called!' % (self.name, )) + raise RuntimeError(u'[%s] This was already called!' % (self.name,)) self.listener = ListenerThread(name=self.name) @@ -240,7 +241,8 @@ class Cluster(object): while not self.attache_group.is_online() and monotonic.monotonic() - start_at < timeout: time.sleep(0.1) if not self.attache_group.is_online(): - raise ConnectionDead('[%s] Could not connect within %s seconds' % (self.name, timeout,)) + raise ConnectionDead( + '[%s] Could not connect within %s seconds' % (self.name, timeout,)) def shutdown(self, wait=True): # type: (bool) -> None """ diff --git a/coolamqp/clustering/events.py b/coolamqp/clustering/events.py index 872fdc4..010ec2c 100644 --- a/coolamqp/clustering/events.py +++ b/coolamqp/clustering/events.py @@ -18,10 +18,14 @@ class Event(object): An event emitted by Cluster """ + __slots__ = () + class NothingMuch(Event): """Nothing happened :D""" + __slots__ = () + class ConnectionLost(Event): """ @@ -35,11 +39,14 @@ class ConnectionLost(Event): Please examine your Consumer's .state's to check whether link was regained """ + __slots__ = () + class MessageReceived(ReceivedMessage, Event): """ Something that works as an ersatz ReceivedMessage, but is an event """ + __slots__ = () def __init__(self, msg): # type: (ReceivedMessage) -> None """:type msg: ReceivedMessage""" diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index eaf24ba..043edf3 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -65,6 +65,7 @@ class AMQPFrame(object): # base class for framing class AMQPPayload(object): """Payload is something that can write itself to bytes, or at least provide a buffer to do it.""" + __slots__ = () def write_to(self, buf): # type: (buffer) -> None """ @@ -84,6 +85,8 @@ class AMQPPayload(object): class AMQPClass(object): """An AMQP class""" + __slots__ = () + class AMQPContentPropertyList(object): """ @@ -152,6 +155,8 @@ class AMQPContentPropertyList(object): class AMQPMethodPayload(AMQPPayload): + __slots__ = () + RESPONSE_TO = None REPLY_WITH = [] FIELDS = [] diff --git a/coolamqp/framing/compilation/content_property.py b/coolamqp/framing/compilation/content_property.py index 3b94bc8..5c9dedb 100644 --- a/coolamqp/framing/compilation/content_property.py +++ b/coolamqp/framing/compilation/content_property.py @@ -3,6 +3,7 @@ from __future__ import absolute_import, division, print_function """Generate serializers/unserializers/length getters for given property_flags""" import six +import struct import logging from coolamqp.framing.compilation.textcode_fields import get_counter, \ get_from_buffer, get_serializer @@ -14,8 +15,7 @@ SLOTS_I = u'\n __slots__ = (%s)\n' FROM_BUFFER_1 = u' def from_buffer(cls, buf, start_offset):\n ' \ u'offset = start_offset + %s\n' ASSIGN_A = u' self.%s = %s\n' -STARTER = u'''import struct -from coolamqp.framing.base import AMQPContentPropertyList +STARTER = u'''from coolamqp.framing.base import AMQPContentPropertyList class ParticularContentTypeList(AMQPContentPropertyList): """ @@ -46,6 +46,8 @@ def _compile_particular_content_property_list_class(zpf, fields): """ from coolamqp.framing.compilation.utilities import format_field_name + structers = {} + if any(field.basic_type == 'bit' for field in fields): return NB @@ -115,7 +117,9 @@ def _compile_particular_content_property_list_class(zpf, fields): mod.append(repred_zpf) mod.append(u')\n') - mod.append(get_serializer(present_fields, prefix=u'self.', indent_level=2)) + line, new_structers = get_serializer(present_fields, prefix=u'self.', indent_level=2) + structers.update(new_structers) + mod.append(line) # from_buffer # note that non-bit values @@ -123,9 +127,11 @@ def _compile_particular_content_property_list_class(zpf, fields): mod.append( FROM_BUFFER_1 % ( zpf_length,)) - mod.append(get_from_buffer( + line, new_structers = get_from_buffer( present_fields - , prefix='', indent_level=2)) + , prefix='', indent_level=2) + structers.update(new_structers) + mod.append(line) mod.append(u' return cls(%s)\n' % (FFN,)) # get_size @@ -134,16 +140,26 @@ def _compile_particular_content_property_list_class(zpf, fields): :-1]) # skip eol mod.append(u' + %s\n' % (zpf_length,)) # account for pf length - return u''.join(mod) + return u''.join(mod), structers + + +STRUCTERS_FOR_NOW = {} # type: tp.Dict[str, struct.Struct] def compile_particular_content_property_list_class(zpf, fields): - import struct from coolamqp.framing.base import AMQPContentPropertyList + global STRUCTERS_FOR_NOW + + q, structers = _compile_particular_content_property_list_class(zpf, fields) + locals_ = { + 'AMQPContentPropertyList': AMQPContentPropertyList + } + for structer in structers: + if structer not in STRUCTERS_FOR_NOW: + STRUCTERS_FOR_NOW[structer] = struct.Struct('!%s' % (structer,)) + + locals_['STRUCT_%s' % (structer, )] = STRUCTERS_FOR_NOW[structer] - q = _compile_particular_content_property_list_class(zpf, fields) - loc = dict(globals(), **{ - 'struct': struct, - 'AMQPContentPropertyList': AMQPContentPropertyList}) + loc = dict(globals(), **locals_) exec (q, loc) return loc['ParticularContentTypeList'] diff --git a/coolamqp/framing/compilation/textcode_fields.py b/coolamqp/framing/compilation/textcode_fields.py index 3a27ee4..9a18a82 100644 --- a/coolamqp/framing/compilation/textcode_fields.py +++ b/coolamqp/framing/compilation/textcode_fields.py @@ -69,12 +69,14 @@ def get_counter(fields, prefix=u'', indent_level=2): u' + '.join([str(accumulator)] + parts)) + u'\n' + # type: (...) -> tp.Tuple[str, dict] def get_from_buffer(fields, prefix='', indent_level=2, remark=False): """ Emit code that collects values from buf:offset, updating offset as progressing. :param remark: BE FUCKING VERBOSE! #DEBUG """ code = [] + structers = {} def emit(fmt, *args): args = list(args) @@ -110,18 +112,20 @@ def get_from_buffer(fields, prefix='', indent_level=2, remark=False): del bits[:] - def emit_structures(dont_do_bits=False): + def emit_structures(dont_do_bits=False): # type: (bool) -> dict if not dont_do_bits: emit_bits() if len(to_struct) == 0: - return + return {} fffnames = [a for a, b in to_struct if a != u'_'] # skip reserved ffffmts = [b for a, b in to_struct] - emit("%s, = struct.unpack_from('!%s', buf, offset)", - u', '.join(fffnames), u''.join(ffffmts)) + fmts = u''.join(ffffmts) + emit("%s, = STRUCT_%s.unpack_from(buf, offset)", + u', '.join(fffnames), fmts) emit("offset += %s", ln['ln']) ln['ln'] = 0 del to_struct[:] + return {fmts: fmts} for field in fields: fieldname = prefix + format_field_name(field.name) @@ -149,7 +153,7 @@ def get_from_buffer(fields, prefix='', indent_level=2, remark=False): elif field.basic_type == u'bit': bits.append('_' if field.reserved else fieldname) elif field.basic_type == u'table': # oh my god - emit_structures() + structers.update(emit_structures()) assert len(bits) == 0 assert len(to_struct) == 0 @@ -160,7 +164,7 @@ def get_from_buffer(fields, prefix='', indent_level=2, remark=False): f_q, f_l = ('L', 4) if field.basic_type == u'longstr' else ('B', 1) to_struct.append(('s_len', f_q)) ln['ln'] += f_l - emit_structures() + structers.update(emit_structures()) if field.reserved: emit("offset += s_len # reserved field!") else: @@ -171,20 +175,21 @@ def get_from_buffer(fields, prefix='', indent_level=2, remark=False): if len(bits) == 8: emit_bits() - emit_structures() + structers.update(emit_structures()) - return u''.join(code) + return u''.join(code), structers -def get_serializer(fields, prefix='', indent_level=2): +def get_serializer(fields, prefix='', indent_level=2): # type: (list, str) -> str, dict """ Emit code that serializes the fields into buf at offset :param fields: list of Field instances :param prefix: pass "self." is inside a class - :return: block of code that does that + :return: block of code that does that, dictionary of struct-ers """ code = [] + structers = {} def emit(fmt, *args): args = list(args) @@ -213,7 +218,10 @@ def get_serializer(fields, prefix='', indent_level=2): del bits[:] def emit_single_struct_pack(): - emit("buf.write(struct.pack('!%s', %s))", u''.join(formats), + formats_str = u''.join(formats) + if formats_str not in structers: + structers[formats_str] = formats_str + emit("buf.write(STRUCT_%s.pack(%s))", formats_str, u', '.join(format_args)) del formats[:] del format_args[:] @@ -255,4 +263,4 @@ def get_serializer(fields, prefix='', indent_level=2): emit('') # eol - return u''.join(code) + return u''.join(code), structers diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 6fbccd8..a82197f 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -241,7 +241,7 @@ class ConnectionBlocked(AMQPMethodPayload): self.reason = reason def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.reason))) + buf.write(STRUCT_B.pack(len(self.reason))) buf.write(self.reason) def get_size(self): # type: () -> int @@ -251,7 +251,7 @@ class ConnectionBlocked(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionBlocked offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 reason = buf[offset:offset + s_len] offset += s_len @@ -329,9 +329,9 @@ class ConnectionClose(AMQPMethodPayload): self.method_id = method_id def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) + buf.write(STRUCT_HB.pack(self.reply_code, len(self.reply_text))) buf.write(self.reply_text) - buf.write(struct.pack('!HH', self.class_id, self.method_id)) + buf.write(STRUCT_HH.pack(self.class_id, self.method_id)) def get_size(self): # type: () -> int return 7 + len(self.reply_text) @@ -340,11 +340,11 @@ class ConnectionClose(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionClose offset = start_offset - reply_code, s_len, = struct.unpack_from('!HB', buf, offset) + reply_code, s_len, = STRUCT_HB.unpack_from(buf, offset) offset += 3 reply_text = buf[offset:offset + s_len] offset += s_len - class_id, method_id, = struct.unpack_from('!HH', buf, offset) + class_id, method_id, = STRUCT_HH.unpack_from(buf, offset) offset += 4 return cls(reply_code, reply_text, class_id, method_id) @@ -439,10 +439,10 @@ class ConnectionOpen(AMQPMethodPayload): self.virtual_host = virtual_host def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.virtual_host))) + buf.write(STRUCT_B.pack(len(self.virtual_host))) buf.write(self.virtual_host) buf.write(b'\x00') - buf.write(struct.pack('!B', 0)) + buf.write(STRUCT_B.pack(0)) def get_size(self): # type: () -> int return 3 + len(self.virtual_host) @@ -451,11 +451,11 @@ class ConnectionOpen(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionOpen offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 virtual_host = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 offset += s_len # reserved field! offset += 1 @@ -500,7 +500,7 @@ class ConnectionOpenOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionOpenOk offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 offset += s_len # reserved field! return cls() @@ -602,11 +602,11 @@ class ConnectionStart(AMQPMethodPayload): self.locales = locales def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!BB', self.version_major, self.version_minor)) + buf.write(STRUCT_BB.pack(self.version_major, self.version_minor)) enframe_table(buf, self.server_properties) - buf.write(struct.pack('!I', len(self.mechanisms))) + buf.write(STRUCT_I.pack(len(self.mechanisms))) buf.write(self.mechanisms) - buf.write(struct.pack('!I', len(self.locales))) + buf.write(STRUCT_I.pack(len(self.locales))) buf.write(self.locales) def get_size(self): # type: () -> int @@ -617,15 +617,15 @@ class ConnectionStart(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionStart offset = start_offset - version_major, version_minor, = struct.unpack_from('!BB', buf, offset) + version_major, version_minor, = STRUCT_BB.unpack_from(buf, offset) offset += 2 server_properties, delta = deframe_table(buf, offset) offset += delta - s_len, = struct.unpack_from('!L', buf, offset) + s_len, = STRUCT_L.unpack_from(buf, offset) offset += 4 mechanisms = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!L', buf, offset) + s_len, = STRUCT_L.unpack_from(buf, offset) offset += 4 locales = buf[offset:offset + s_len] offset += s_len @@ -682,7 +682,7 @@ class ConnectionSecure(AMQPMethodPayload): self.challenge = challenge def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!I', len(self.challenge))) + buf.write(STRUCT_I.pack(len(self.challenge))) buf.write(self.challenge) def get_size(self): # type: () -> int @@ -692,7 +692,7 @@ class ConnectionSecure(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionSecure offset = start_offset - s_len, = struct.unpack_from('!L', buf, offset) + s_len, = STRUCT_L.unpack_from(buf, offset) offset += 4 challenge = buf[offset:offset + s_len] offset += s_len @@ -781,11 +781,11 @@ class ConnectionStartOk(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None enframe_table(buf, self.client_properties) - buf.write(struct.pack('!B', len(self.mechanism))) + buf.write(STRUCT_B.pack(len(self.mechanism))) buf.write(self.mechanism) - buf.write(struct.pack('!I', len(self.response))) + buf.write(STRUCT_I.pack(len(self.response))) buf.write(self.response) - buf.write(struct.pack('!B', len(self.locale))) + buf.write(STRUCT_B.pack(len(self.locale))) buf.write(self.locale) def get_size(self): # type: () -> int @@ -798,15 +798,15 @@ class ConnectionStartOk(AMQPMethodPayload): offset = start_offset client_properties, delta = deframe_table(buf, offset) offset += delta - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 mechanism = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!L', buf, offset) + s_len, = STRUCT_L.unpack_from(buf, offset) offset += 4 response = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 locale = buf[offset:offset + s_len] offset += s_len @@ -860,7 +860,7 @@ class ConnectionSecureOk(AMQPMethodPayload): self.response = response def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!I', len(self.response))) + buf.write(STRUCT_I.pack(len(self.response))) buf.write(self.response) def get_size(self): # type: () -> int @@ -870,7 +870,7 @@ class ConnectionSecureOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionSecureOk offset = start_offset - s_len, = struct.unpack_from('!L', buf, offset) + s_len, = STRUCT_L.unpack_from(buf, offset) offset += 4 response = buf[offset:offset + s_len] offset += s_len @@ -948,8 +948,7 @@ class ConnectionTune(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write( - struct.pack('!HIH', self.channel_max, self.frame_max, - self.heartbeat)) + STRUCT_HIH.pack(self.channel_max, self.frame_max, self.heartbeat)) def get_size(self): # type: () -> int return 8 @@ -958,8 +957,8 @@ class ConnectionTune(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionTune offset = start_offset - channel_max, frame_max, heartbeat, = struct.unpack_from( - '!HIH', buf, offset) + channel_max, frame_max, heartbeat, = STRUCT_HIH.unpack_from( + buf, offset) offset += 8 return cls(channel_max, frame_max, heartbeat) @@ -1036,8 +1035,7 @@ class ConnectionTuneOk(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write( - struct.pack('!HIH', self.channel_max, self.frame_max, - self.heartbeat)) + STRUCT_HIH.pack(self.channel_max, self.frame_max, self.heartbeat)) def get_size(self): # type: () -> int return 8 @@ -1046,8 +1044,8 @@ class ConnectionTuneOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionTuneOk offset = start_offset - channel_max, frame_max, heartbeat, = struct.unpack_from( - '!HIH', buf, offset) + channel_max, frame_max, heartbeat, = STRUCT_HIH.unpack_from( + buf, offset) offset += 8 return cls(channel_max, frame_max, heartbeat) @@ -1169,9 +1167,9 @@ class ChannelClose(AMQPMethodPayload): self.method_id = method_id def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) + buf.write(STRUCT_HB.pack(self.reply_code, len(self.reply_text))) buf.write(self.reply_text) - buf.write(struct.pack('!HH', self.class_id, self.method_id)) + buf.write(STRUCT_HH.pack(self.class_id, self.method_id)) def get_size(self): # type: () -> int return 7 + len(self.reply_text) @@ -1180,11 +1178,11 @@ class ChannelClose(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelClose offset = start_offset - reply_code, s_len, = struct.unpack_from('!HB', buf, offset) + reply_code, s_len, = STRUCT_HB.unpack_from(buf, offset) offset += 3 reply_text = buf[offset:offset + s_len] offset += s_len - class_id, method_id, = struct.unpack_from('!HH', buf, offset) + class_id, method_id, = STRUCT_HH.unpack_from(buf, offset) offset += 4 return cls(reply_code, reply_text, class_id, method_id) @@ -1278,7 +1276,7 @@ class ChannelFlow(AMQPMethodPayload): self.active = active def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', (self.active << 0))) + buf.write(STRUCT_B.pack((self.active << 0))) def get_size(self): # type: () -> int return 1 @@ -1287,7 +1285,7 @@ class ChannelFlow(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelFlow offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 active = bool(_bit >> 0) offset += 1 @@ -1340,7 +1338,7 @@ class ChannelFlowOk(AMQPMethodPayload): self.active = active def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', (self.active << 0))) + buf.write(STRUCT_B.pack((self.active << 0))) def get_size(self): # type: () -> int return 1 @@ -1349,7 +1347,7 @@ class ChannelFlowOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelFlowOk offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 active = bool(_bit >> 0) offset += 1 @@ -1393,7 +1391,7 @@ class ChannelOpen(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelOpen offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 offset += s_len # reserved field! return cls() @@ -1437,7 +1435,7 @@ class ChannelOpenOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelOpenOk offset = start_offset - s_len, = struct.unpack_from('!L', buf, offset) + s_len, = STRUCT_L.unpack_from(buf, offset) offset += 4 offset += s_len # reserved field! return cls() @@ -1530,13 +1528,13 @@ class ExchangeBind(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.destination))) + buf.write(STRUCT_B.pack(len(self.destination))) buf.write(self.destination) - buf.write(struct.pack('!B', len(self.source))) + buf.write(STRUCT_B.pack(len(self.source))) buf.write(self.source) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) - buf.write(struct.pack('!B', (self.no_wait << 0))) + buf.write(STRUCT_B.pack((self.no_wait << 0))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int @@ -1547,19 +1545,19 @@ class ExchangeBind(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeBind offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 destination = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 source = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_wait = bool(_bit >> 0) offset += 1 @@ -1726,14 +1724,14 @@ class ExchangeDeclare(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.exchange))) + buf.write(STRUCT_B.pack(len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.type_))) + buf.write(STRUCT_B.pack(len(self.type_))) buf.write(self.type_) buf.write( - struct.pack('!B', (self.passive << 0) | (self.durable << 1) | - (self.auto_delete << 2) | (self.internal << 3) | - (self.no_wait << 4))) + STRUCT_B.pack((self.passive << 0) | (self.durable << 1) + | (self.auto_delete << 2) | (self.internal << 3) + | (self.no_wait << 4))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int @@ -1744,15 +1742,15 @@ class ExchangeDeclare(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeDeclare offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 type_ = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 passive = bool(_bit >> 0) durable = bool(_bit >> 1) @@ -1829,10 +1827,9 @@ class ExchangeDelete(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.exchange))) + buf.write(STRUCT_B.pack(len(self.exchange))) buf.write(self.exchange) - buf.write( - struct.pack('!B', (self.if_unused << 0) | (self.no_wait << 1))) + buf.write(STRUCT_B.pack((self.if_unused << 0) | (self.no_wait << 1))) def get_size(self): # type: () -> int return 4 + len(self.exchange) @@ -1841,11 +1838,11 @@ class ExchangeDelete(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeDelete offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 exchange = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 if_unused = bool(_bit >> 0) no_wait = bool(_bit >> 1) @@ -1995,13 +1992,13 @@ class ExchangeUnbind(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.destination))) + buf.write(STRUCT_B.pack(len(self.destination))) buf.write(self.destination) - buf.write(struct.pack('!B', len(self.source))) + buf.write(STRUCT_B.pack(len(self.source))) buf.write(self.source) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) - buf.write(struct.pack('!B', (self.no_wait << 0))) + buf.write(STRUCT_B.pack((self.no_wait << 0))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int @@ -2012,19 +2009,19 @@ class ExchangeUnbind(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeUnbind offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 destination = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 source = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_wait = bool(_bit >> 0) offset += 1 @@ -2175,13 +2172,13 @@ class QueueBind(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', len(self.exchange))) + buf.write(STRUCT_B.pack(len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) - buf.write(struct.pack('!B', (self.no_wait << 0))) + buf.write(STRUCT_B.pack((self.no_wait << 0))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int @@ -2192,19 +2189,19 @@ class QueueBind(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueBind offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_wait = bool(_bit >> 0) offset += 1 @@ -2371,12 +2368,12 @@ class QueueDeclare(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) buf.write( - struct.pack('!B', (self.passive << 0) | (self.durable << 1) | - (self.exclusive << 2) | (self.auto_delete << 3) | - (self.no_wait << 4))) + STRUCT_B.pack((self.passive << 0) | (self.durable << 1) + | (self.exclusive << 2) | (self.auto_delete << 3) + | (self.no_wait << 4))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int @@ -2386,11 +2383,11 @@ class QueueDeclare(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueDeclare offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 passive = bool(_bit >> 0) durable = bool(_bit >> 1) @@ -2476,11 +2473,11 @@ class QueueDelete(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) buf.write( - struct.pack('!B', (self.if_unused << 0) | (self.if_empty << 1) | - (self.no_wait << 2))) + STRUCT_B.pack((self.if_unused << 0) | (self.if_empty << 1) + | (self.no_wait << 2))) def get_size(self): # type: () -> int return 4 + len(self.queue) @@ -2489,11 +2486,11 @@ class QueueDelete(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueDelete offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 if_unused = bool(_bit >> 0) if_empty = bool(_bit >> 1) @@ -2563,9 +2560,9 @@ class QueueDeclareOk(AMQPMethodPayload): self.consumer_count = consumer_count def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!II', self.message_count, self.consumer_count)) + buf.write(STRUCT_II.pack(self.message_count, self.consumer_count)) def get_size(self): # type: () -> int return 9 + len(self.queue) @@ -2574,11 +2571,11 @@ class QueueDeclareOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueDeclareOk offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 queue = buf[offset:offset + s_len] offset += s_len - message_count, consumer_count, = struct.unpack_from('!II', buf, offset) + message_count, consumer_count, = STRUCT_II.unpack_from(buf, offset) offset += 8 return cls(queue, message_count, consumer_count) @@ -2625,7 +2622,7 @@ class QueueDeleteOk(AMQPMethodPayload): self.message_count = message_count def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!I', self.message_count)) + buf.write(STRUCT_I.pack(self.message_count)) def get_size(self): # type: () -> int return 4 @@ -2634,7 +2631,7 @@ class QueueDeleteOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueDeleteOk offset = start_offset - message_count, = struct.unpack_from('!I', buf, offset) + message_count, = STRUCT_I.unpack_from(buf, offset) offset += 4 return cls(message_count) @@ -2691,9 +2688,9 @@ class QueuePurge(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', (self.no_wait << 0))) + buf.write(STRUCT_B.pack((self.no_wait << 0))) def get_size(self): # type: () -> int return 4 + len(self.queue) @@ -2702,11 +2699,11 @@ class QueuePurge(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueuePurge offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_wait = bool(_bit >> 0) offset += 1 @@ -2755,7 +2752,7 @@ class QueuePurgeOk(AMQPMethodPayload): self.message_count = message_count def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!I', self.message_count)) + buf.write(STRUCT_I.pack(self.message_count)) def get_size(self): # type: () -> int return 4 @@ -2764,7 +2761,7 @@ class QueuePurgeOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueuePurgeOk offset = start_offset - message_count, = struct.unpack_from('!I', buf, offset) + message_count, = STRUCT_I.unpack_from(buf, offset) offset += 4 return cls(message_count) @@ -2834,11 +2831,11 @@ class QueueUnbind(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', len(self.exchange))) + buf.write(STRUCT_B.pack(len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) enframe_table(buf, self.arguments) @@ -2850,15 +2847,15 @@ class QueueUnbind(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueUnbind offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len @@ -3124,7 +3121,7 @@ class BasicAck(AMQPMethodPayload): self.multiple = multiple def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!QB', self.delivery_tag, (self.multiple << 0))) + buf.write(STRUCT_QB.pack(self.delivery_tag, (self.multiple << 0))) def get_size(self): # type: () -> int return 9 @@ -3132,7 +3129,7 @@ class BasicAck(AMQPMethodPayload): @classmethod def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicAck offset = start_offset - delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + delivery_tag, _bit, = STRUCT_QB.unpack_from(buf, offset) offset += 8 multiple = bool(_bit >> 0) offset += 1 @@ -3230,13 +3227,13 @@ class BasicConsume(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(STRUCT_B.pack(len(self.consumer_tag))) buf.write(self.consumer_tag) buf.write( - struct.pack('!B', (self.no_local << 0) | (self.no_ack << 1) | - (self.exclusive << 2) | (self.no_wait << 3))) + STRUCT_B.pack((self.no_local << 0) | (self.no_ack << 1) + | (self.exclusive << 2) | (self.no_wait << 3))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int @@ -3247,15 +3244,15 @@ class BasicConsume(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicConsume offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 consumer_tag = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_local = bool(_bit >> 0) no_ack = bool(_bit >> 1) @@ -3332,9 +3329,9 @@ class BasicCancel(AMQPMethodPayload): self.no_wait = no_wait def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(STRUCT_B.pack(len(self.consumer_tag))) buf.write(self.consumer_tag) - buf.write(struct.pack('!B', (self.no_wait << 0))) + buf.write(STRUCT_B.pack((self.no_wait << 0))) def get_size(self): # type: () -> int return 2 + len(self.consumer_tag) @@ -3343,11 +3340,11 @@ class BasicCancel(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicCancel offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 consumer_tag = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_wait = bool(_bit >> 0) offset += 1 @@ -3399,7 +3396,7 @@ class BasicConsumeOk(AMQPMethodPayload): self.consumer_tag = consumer_tag def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(STRUCT_B.pack(len(self.consumer_tag))) buf.write(self.consumer_tag) def get_size(self): # type: () -> int @@ -3409,7 +3406,7 @@ class BasicConsumeOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicConsumeOk offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 consumer_tag = buf[offset:offset + s_len] offset += s_len @@ -3457,7 +3454,7 @@ class BasicCancelOk(AMQPMethodPayload): self.consumer_tag = consumer_tag def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(STRUCT_B.pack(len(self.consumer_tag))) buf.write(self.consumer_tag) def get_size(self): # type: () -> int @@ -3467,7 +3464,7 @@ class BasicCancelOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicCancelOk offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 consumer_tag = buf[offset:offset + s_len] offset += s_len @@ -3549,13 +3546,13 @@ class BasicDeliver(AMQPMethodPayload): self.routing_key = routing_key def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(STRUCT_B.pack(len(self.consumer_tag))) buf.write(self.consumer_tag) buf.write( - struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), - len(self.exchange))) + STRUCT_QBB.pack(self.delivery_tag, (self.redelivered << 0), + len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) def get_size(self): # type: () -> int @@ -3566,19 +3563,19 @@ class BasicDeliver(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicDeliver offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 consumer_tag = buf[offset:offset + s_len] offset += s_len - delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + delivery_tag, _bit, = STRUCT_QB.unpack_from(buf, offset) offset += 8 redelivered = bool(_bit >> 0) offset += 1 - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len @@ -3640,9 +3637,9 @@ class BasicGet(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', (self.no_ack << 0))) + buf.write(STRUCT_B.pack((self.no_ack << 0))) def get_size(self): # type: () -> int return 4 + len(self.queue) @@ -3650,11 +3647,11 @@ class BasicGet(AMQPMethodPayload): @classmethod def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicGet offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_ack = bool(_bit >> 0) offset += 1 @@ -3735,12 +3732,12 @@ class BasicGetOk(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write( - struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), - len(self.exchange))) + STRUCT_QBB.pack(self.delivery_tag, (self.redelivered << 0), + len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) - buf.write(struct.pack('!I', self.message_count)) + buf.write(STRUCT_I.pack(self.message_count)) def get_size(self): # type: () -> int return 15 + len(self.exchange) + len(self.routing_key) @@ -3749,19 +3746,19 @@ class BasicGetOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicGetOk offset = start_offset - delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + delivery_tag, _bit, = STRUCT_QB.unpack_from(buf, offset) offset += 8 redelivered = bool(_bit >> 0) offset += 1 - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len - message_count, = struct.unpack_from('!I', buf, offset) + message_count, = STRUCT_I.unpack_from(buf, offset) offset += 4 return cls(delivery_tag, redelivered, exchange, routing_key, message_count) @@ -3806,7 +3803,7 @@ class BasicGetEmpty(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicGetEmpty offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 offset += s_len # reserved field! return cls() @@ -3886,8 +3883,8 @@ class BasicNack(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write( - struct.pack('!QB', self.delivery_tag, - (self.multiple << 0) | (self.requeue << 1))) + STRUCT_QB.pack(self.delivery_tag, + (self.multiple << 0) | (self.requeue << 1))) def get_size(self): # type: () -> int return 9 @@ -3896,7 +3893,7 @@ class BasicNack(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicNack offset = start_offset - delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + delivery_tag, _bit, = STRUCT_QB.unpack_from(buf, offset) offset += 8 multiple = bool(_bit >> 0) requeue = bool(_bit >> 1) @@ -3993,12 +3990,11 @@ class BasicPublish(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.exchange))) + buf.write(STRUCT_B.pack(len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) - buf.write( - struct.pack('!B', (self.mandatory << 0) | (self.immediate << 1))) + buf.write(STRUCT_B.pack((self.mandatory << 0) | (self.immediate << 1))) def get_size(self): # type: () -> int return 5 + len(self.exchange) + len(self.routing_key) @@ -4007,15 +4003,15 @@ class BasicPublish(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicPublish offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 mandatory = bool(_bit >> 0) immediate = bool(_bit >> 1) @@ -4119,8 +4115,8 @@ class BasicQos(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write( - struct.pack('!IHB', self.prefetch_size, self.prefetch_count, - (self.global_ << 0))) + STRUCT_IHB.pack(self.prefetch_size, self.prefetch_count, + (self.global_ << 0))) def get_size(self): # type: () -> int return 7 @@ -4128,8 +4124,8 @@ class BasicQos(AMQPMethodPayload): @classmethod def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicQos offset = start_offset - prefetch_size, prefetch_count, _bit, = struct.unpack_from( - '!IHB', buf, offset) + prefetch_size, prefetch_count, _bit, = STRUCT_IHB.unpack_from( + buf, offset) offset += 6 global_ = bool(_bit >> 0) offset += 1 @@ -4245,11 +4241,11 @@ class BasicReturn(AMQPMethodPayload): self.routing_key = routing_key def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) + buf.write(STRUCT_HB.pack(self.reply_code, len(self.reply_text))) buf.write(self.reply_text) - buf.write(struct.pack('!B', len(self.exchange))) + buf.write(STRUCT_B.pack(len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) def get_size(self): # type: () -> int @@ -4260,15 +4256,15 @@ class BasicReturn(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicReturn offset = start_offset - reply_code, s_len, = struct.unpack_from('!HB', buf, offset) + reply_code, s_len, = STRUCT_HB.unpack_from(buf, offset) offset += 3 reply_text = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len @@ -4331,7 +4327,7 @@ class BasicReject(AMQPMethodPayload): self.requeue = requeue def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!QB', self.delivery_tag, (self.requeue << 0))) + buf.write(STRUCT_QB.pack(self.delivery_tag, (self.requeue << 0))) def get_size(self): # type: () -> int return 9 @@ -4340,7 +4336,7 @@ class BasicReject(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicReject offset = start_offset - delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + delivery_tag, _bit, = STRUCT_QB.unpack_from(buf, offset) offset += 8 requeue = bool(_bit >> 0) offset += 1 @@ -4397,7 +4393,7 @@ class BasicRecoverAsync(AMQPMethodPayload): self.requeue = requeue def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', (self.requeue << 0))) + buf.write(STRUCT_B.pack((self.requeue << 0))) def get_size(self): # type: () -> int return 1 @@ -4406,7 +4402,7 @@ class BasicRecoverAsync(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicRecoverAsync offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 requeue = bool(_bit >> 0) offset += 1 @@ -4463,7 +4459,7 @@ class BasicRecover(AMQPMethodPayload): self.requeue = requeue def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', (self.requeue << 0))) + buf.write(STRUCT_B.pack((self.requeue << 0))) def get_size(self): # type: () -> int return 1 @@ -4472,7 +4468,7 @@ class BasicRecover(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicRecover offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 requeue = bool(_bit >> 0) offset += 1 @@ -4835,7 +4831,7 @@ class ConfirmSelect(AMQPMethodPayload): self.nowait = nowait def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', (self.nowait << 0))) + buf.write(STRUCT_B.pack((self.nowait << 0))) def get_size(self): # type: () -> int return 1 @@ -4844,7 +4840,7 @@ class ConfirmSelect(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConfirmSelect offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 nowait = bool(_bit >> 0) offset += 1 @@ -5120,3 +5116,15 @@ REPLIES_FOR = { ConfirmSelect: [ConfirmSelectOk], ConfirmSelectOk: [], } +STRUCT_B = struct.Struct("!B") +STRUCT_HB = struct.Struct("!HB") +STRUCT_HH = struct.Struct("!HH") +STRUCT_BB = struct.Struct("!BB") +STRUCT_I = struct.Struct("!I") +STRUCT_L = struct.Struct("!L") +STRUCT_HIH = struct.Struct("!HIH") +STRUCT_2xB = struct.Struct("!2xB") +STRUCT_II = struct.Struct("!II") +STRUCT_QB = struct.Struct("!QB") +STRUCT_QBB = struct.Struct("!QBB") +STRUCT_IHB = struct.Struct("!IHB") diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index 8642423..9705606 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -13,6 +13,13 @@ from coolamqp.framing.definitions import FRAME_METHOD, FRAME_HEARTBEAT, \ FRAME_BODY, FRAME_HEADER, FRAME_END, \ IDENT_TO_METHOD, CLASS_ID_TO_CONTENT_PROPERTY_LIST, FRAME_END_BYTE +STRUCT_BH = struct.Struct('!BH') +STRUCT_BHL = struct.Struct('!BHL') +STRUCT_HH = struct.Struct('!HH') +STRUCT_BHLHHQ = struct.Struct('!BHLHHQ') +STRUCT_HHQ = struct.Struct('!HHQ') +STRUCT_BHLB = struct.Struct('!BHLB') + class AMQPMethodFrame(AMQPFrame): FRAME_TYPE = FRAME_METHOD @@ -31,10 +38,10 @@ class AMQPMethodFrame(AMQPFrame): def write_to(self, buf): if self.payload.IS_CONTENT_STATIC: - buf.write(struct.pack('!BH', FRAME_METHOD, self.channel)) + buf.write(STRUCT_BH.pack(FRAME_METHOD, self.channel)) buf.write(self.payload.STATIC_CONTENT) else: - buf.write(struct.pack('!BHL', FRAME_METHOD, self.channel, + buf.write(STRUCT_BHL.pack(FRAME_METHOD, self.channel, 4 + self.payload.get_size())) buf.write(self.payload.BINARY_HEADER) self.payload.write_arguments(buf) @@ -42,7 +49,7 @@ class AMQPMethodFrame(AMQPFrame): @classmethod def unserialize(cls, channel, payload_as_buffer): - clsmet = struct.unpack_from('!HH', payload_as_buffer, 0) + clsmet = STRUCT_HH.unpack_from(payload_as_buffer, 0) try: method_payload_class = IDENT_TO_METHOD[clsmet] @@ -79,7 +86,7 @@ class AMQPHeaderFrame(AMQPFrame): self.properties = properties def write_to(self, buf): - buf.write(struct.pack('!BHLHHQ', FRAME_HEADER, self.channel, + buf.write(STRUCT_BHLHHQ.pack(FRAME_HEADER, self.channel, 12 + self.properties.get_size(), self.class_id, 0, self.body_size)) @@ -89,8 +96,7 @@ class AMQPHeaderFrame(AMQPFrame): @staticmethod def unserialize(channel, payload_as_buffer): # payload starts with class ID - class_id, weight, body_size = struct.unpack_from('!HHQ', - payload_as_buffer, 0) + class_id, weight, body_size = STRUCT_HHQ.unpack_from(payload_as_buffer, 0) properties = CLASS_ID_TO_CONTENT_PROPERTY_LIST[class_id].from_buffer( payload_as_buffer, 12) return AMQPHeaderFrame(channel, class_id, weight, body_size, @@ -126,7 +132,7 @@ class AMQPBodyFrame(AMQPFrame): def write_to(self, buf): buf.write( - struct.pack('!BHL', FRAME_BODY, self.channel, len(self.data))) + STRUCT_BHL.pack(FRAME_BODY, self.channel, len(self.data))) buf.write(self.data) buf.write(FRAME_END_BYTE) @@ -144,7 +150,7 @@ class AMQPBodyFrame(AMQPFrame): class AMQPHeartbeatFrame(AMQPFrame): FRAME_TYPE = FRAME_HEARTBEAT LENGTH = 8 - DATA = struct.pack('!BHLB', FRAME_HEARTBEAT, 0, 0, FRAME_END) + DATA = STRUCT_BHLB.pack(FRAME_HEARTBEAT, 0, 0, FRAME_END) def __init__(self): AMQPFrame.__init__(self, 0) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index ee66fcf..2eeaffe 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -2,6 +2,7 @@ """ Core objects used in CoolAMQP """ +import typing as tp import logging import uuid @@ -9,11 +10,17 @@ import six from coolamqp.framing.definitions import \ BasicContentPropertyList as MessageProperties +from coolamqp.framing.base import AMQPFrame logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() +try: + Protocol = tp.Protocol +except AttributeError: + Protocol = object + def toutf8(q): if isinstance(q, six.binary_type): @@ -27,12 +34,28 @@ def tobytes(q): return q +class FrameLogger(Protocol): + def on_frame(self, timestamp, # type: float + frame, # type: AMQPFrame + direction # type: str + ): + """ + Log a frame + + :param timestamp: timestamp in seconds since Unix Epoch + :param frame: AMQPFrame to parse + :param direction: either 'to_client' when this is frame received from the broker, or + 'to_server' if it's a frame that's being sent to the broker + """ + + class Callable(object): """ Add a bunch of callables to one list, and just invoke'm. INTERNAL USE ONLY #todo not thread safe """ + __slots__ = ('callables', 'oneshots') def __init__(self, oneshots=False): """:param oneshots: if True, callables will be called and discarded""" @@ -57,10 +80,12 @@ class Message(object): coolamqp.framing.definitions.BasicContentPropertyList for a list of possible properties. """ + __slots__ = ('body', 'properties') Properties = MessageProperties # an alias for easier use - def __init__(self, body, properties=None): + def __init__(self, body, # type: bytes + properties=None): """ Create a Message object. @@ -103,35 +128,32 @@ class ReceivedMessage(Message): Note that if the consumer that generated this message was no_ack, .ack() and .nack() are no-ops. """ + __slots__ = ('delivery_tag', 'exchange_name', 'routing_key', 'ack', 'nack') - def __init__(self, body, - exchange_name, - routing_key, + def __init__(self, body, # type: tp.Union[str, bytes, bytearray, tp.List[memoryview]] + exchange_name, # type: memoryview + routing_key, # type: memoryview properties=None, - delivery_tag=None, - ack=None, - nack=None): + delivery_tag=None, # type: int + ack=None, # type: tp.Callable[[], None] + nack=None # type: tp.Callable[[], None] + ): """ :param body: message body. A stream of octets. :type body: str (py2) or bytes (py3) or a list of memoryviews, if particular disabled-by-default option is turned on, or a single memoryview :param exchange_name: name of exchange this message was submitted to - :type exchange_name: memoryview :param routing_key: routing key with which this message was sent - :type routing_key: memoryview :param properties: a suitable BasicContentPropertyList subinstance. be prepared that value of properties that are strings will be memoryviews :param delivery_tag: delivery tag assigned by AMQP broker to confirm this message - :type delivery_tag: int :param ack: a callable to call when you want to ack (via basic.ack) this message. None if received by the no-ack mechanism - :type ack: tp.Callable[[], None] :param nack: a callable to call when you want to nack (via basic.reject) this message. None if received by the no-ack mechanism - :type nack: tp.Callable[[], None] """ Message.__init__(self, body, properties=properties) @@ -148,15 +170,18 @@ class Exchange(object): This represents an Exchange used in AMQP. This is hashable. """ + __slots__ = ('name', 'type', 'durable', 'auto_delete') direct = None # the direct exchange - def __init__(self, name=u'', type=b'direct', durable=True, - auto_delete=False): + def __init__(self, name=u'', # type: tp.Union[str, bytes] + type=b'direct', # type: tp.Union[str, bytes] + durable=True, # type: bool + auto_delete=False # type: bool + ): """ :type name: unicode is preferred, binary type will get decoded to unicode with utf8 - :param type: exchange type. binary/unicode """ self.name = toutf8(name) # must be unicode self.type = tobytes(type) # must be bytes @@ -185,9 +210,15 @@ class Queue(object): """ This object represents a Queue that applications consume from or publish to. """ - - def __init__(self, name=b'', durable=False, exchange=None, exclusive=False, - auto_delete=False): + __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', + 'anonymous', 'consumer_tag') + + def __init__(self, name=b'', # type: tp.Union[str, bytes] + durable=False, # type: bool + exchange=None, # type: tp.Optional[Exchange] + exclusive=False, # type: bool + auto_delete=False # type: bool + ): """ Create a queue definition. @@ -196,7 +227,6 @@ class Queue(object): upon declaration. If a disconnect happens, and connection to other node is reestablished, this name will CHANGE AGAIN, and be reflected in this object. This change will be done before CoolAMQP signals reconnection. - :type name: byte type or text type :param durable: Is the queue durable? :param exchange: Exchange for this queue to bind to. None for no binding. :param exclusive: Is this queue exclusive? @@ -230,37 +260,33 @@ class NodeDefinition(object): Definition of a reachable AMQP node. This object is hashable. + + >>> a = NodeDefinition(host='192.168.0.1', user='admin', password='password', + >>> virtual_host='vhost') + + or + + >>> a = NodeDefinition('192.168.0.1', 'admin', 'password') + + or + + >>> a = NodeDefinition('amqp://user:password@host/virtual_host') + + or + + >>> a = NodeDefinition('amqp://user:password@host:port/virtual_host', hearbeat=20) + + AMQP connection string may be either bytes or str/unicode + + + Additional keyword parameters that can be specified: + heartbeat - heartbeat interval in seconds + port - TCP port to use. Default is 5672 + + :raise ValueError: invalid parameters """ def __init__(self, *args, **kwargs): - """ - Create a cluster node definition. - - >>> a = NodeDefinition(host='192.168.0.1', user='admin', password='password', - >>> virtual_host='vhost') - - or - - >>> a = NodeDefinition('192.168.0.1', 'admin', 'password') - - or - - >>> a = NodeDefinition('amqp://user:password@host/virtual_host') - - or - - >>> a = NodeDefinition('amqp://user:password@host:port/virtual_host', hearbeat=20) - - AMQP connection string may be either bytes or str/unicode - - - Additional keyword parameters that can be specified: - heartbeat - heartbeat interval in seconds - port - TCP port to use. Default is 5672 - - :raise ValueError: invalid parameters - """ - self.heartbeat = kwargs.pop('heartbeat', None) self.port = kwargs.pop('port', 5672) diff --git a/setup.py b/setup.py index 77ea5c3..f7554e8 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # coding=UTF-8 from setuptools import setup, find_packages - from coolamqp import __version__ setup(keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], diff --git a/tests/Dockerfile b/tests/Dockerfile index d202d37..f61ab62 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -16,5 +16,5 @@ ENV AMQP_HOST=amqp # for those pesky builds on Windows RUN chmod -R ugo-x /coolamqp -CMD python setup.py nosetests --tests tests +CMD python setup.py test diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index d841838..11727c3 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -144,7 +144,7 @@ class TestA(unittest.TestCase): 'content_encoding': b'utf8' }), routing_key=u'hello5', tx=True).result() - time.sleep(1) + time.sleep(2) self.assertTrue(P['q']) -- GitLab