diff --git a/CHANGELOG.md b/CHANGELOG.md index e9c1a5afefd6137a5cc66e2ea0843e5220bde0fd..05f7a92e3f0e80c0a524efc9d03c30e6be7b618f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ -# v0.107: +# v1.0: -* _TBA_ +* first solid API release +* improvements for more speed # v0.106: diff --git a/compile_definitions/__main__.py b/compile_definitions/__main__.py index c4226af2108886cece736194786c3e2d0cfa1e50..3097f68d989737c936a0fb5d23336fd321c79a0a 100644 --- a/compile_definitions/__main__.py +++ b/compile_definitions/__main__.py @@ -118,6 +118,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved line('\n%sS = [%s]', pythonify_name(constant_kind), u', '.join(constants)) + structers = {} + # get domains domain_to_basic_type = {} line('\n\n\nDOMAIN_TO_BASIC_TYPE = {\n') @@ -489,7 +491,9 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved from coolamqp.framing.compilation.textcode_fields import \ get_serializer, get_counter, get_from_buffer line('\n def write_arguments(self, buf): # type: (tp.BinaryIO) -> None\n') - line(get_serializer(method.fields, 'self.', 2)) + line_, new_structers = get_serializer(method.fields, 'self.', 2) + line(line_) + structers.update(new_structers) line(' def get_size(self): # type: () -> int\n') line(get_counter(method.fields, 'self.', 2)) @@ -499,8 +503,10 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved offset = start_offset ''', full_class_name) - line(get_from_buffer(method.fields, '', 2, - remark=(method.name == 'deliver'))) + line_, new_structers = get_from_buffer(method.fields, '', 2, + remark=(method.name == 'deliver')) + line(line_) + structers.update(new_structers) line(" return cls(%s)", u', '.join( @@ -549,6 +555,10 @@ REPLIES_FOR = {\n''') line(u' %s: [%s],\n' % (k, u', '.join(map(str, v)))) line(u'}\n') + # Output structers + for structer in structers: + line(u'STRUCT_%s = struct.Struct("!%s")\n' % (structer, structer)) + out.close() diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 7d410608f8b6d1917ccfeb57634c0d96386867f9..5eb7dae9ceab53f8900c0ec0c73d42793ecf6e54 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,2 +1,2 @@ # coding=UTF-8 -__version__ = '0.107rc1' +__version__ = '1.0' diff --git a/coolamqp/attaches/channeler.py b/coolamqp/attaches/channeler.py index b2c9ac788d7b1f0b8c616f329176ad362ab34ca3..b25a371b8ee409d06e2ec3016d6d9333626493d0 100644 --- a/coolamqp/attaches/channeler.py +++ b/coolamqp/attaches/channeler.py @@ -24,6 +24,7 @@ class Attache(object): """ Something that can be attached to connection. """ + __slots__ = ('cancelled', 'state', 'connection') def __init__(self): self.cancelled = False #: public, if this is True, it won't be attached to next connection @@ -62,6 +63,7 @@ class Channeler(Attache): but ordering it to do anything is pointless, because it will not get done until attach() with new connection is called. """ + __slots__ = ('channel_id', ) def __init__(self): """ diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 9a921b3f5f8b7b47d9f5d1a6ebd4594d4a3e63d5..6f9ae38607ffe229188f5409ee84a12254dadb72 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -108,6 +108,12 @@ class Consumer(Channeler): has a performance impact :type body_receive_mode: a property of BodyReceiveMode """ + __slots__ = ('queue', 'no_ack', 'on_message', 'cancelled', 'receiver', + 'attache_group', 'channel_close_sent', 'qos', 'qos_update_sent', + 'future_to_notify', 'future_to_notify_on_dead', + 'fail_on_first_time_resource_locked', 'cancel_on_failure', + 'body_receive_mode', 'consumer_tag', 'on_cancel', 'on_broker_cancel', + 'hb_watch', 'deliver_watch') def __init__(self, queue, on_message, no_ack=True, qos=None, cancel_on_failure=False, @@ -127,8 +133,6 @@ class Consumer(Channeler): self.on_message = on_message - # private - self.cancelled = False # did the client want to STOP using this # consumer? self.receiver = None # MessageReceiver instance @@ -469,6 +473,8 @@ class MessageReceiver(object): and may opt to kill the connection on bad framing with self.consumer.connection.send(None) """ + __slots__ = ('consumer', 'state', 'bdeliver', 'header', 'body', 'data_to_go', + 'message_size', 'offset', 'acks_pending', 'recv_mode') def __init__(self, consumer): # type: (Consumer) -> None self.consumer = consumer diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index 21511a09dd59e1c32d0ed9fb7240ce0ce8a27cce..7d6e95dec7275c7edf1e8ec3e3b0def77e144b33 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -29,6 +29,7 @@ class Operation(object): This will register it's own callback. Please, call on_connection_dead when connection is broken to fail futures with ConnectionDead, since this object does not watch for Fails """ + __slots__ = ('done', 'fut', 'declarer', 'obj', 'on_done') def __init__(self, declarer, obj, fut=None): self.done = False diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index a3f1aa084759e171b9a19e60581f69e2a2d8ec54..9f377822e3f38e9526cd4f7f617b16ed83d905e4 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -77,7 +77,6 @@ class Publisher(Channeler, Synchronized): # todo add fallback using plain AMQP transactions - this will remove UnusablePublisher and stuff - class UnusablePublisher(Exception): """This publisher will never work (eg. MODE_CNPUB on a broker not supporting publisher confirms)""" diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py index 2fc9f66e6fe28e0540449c83df7fec1831809afa..36ca346d11615aa0433bd52069533378991260e8 100644 --- a/coolamqp/attaches/utils.py +++ b/coolamqp/attaches/utils.py @@ -13,6 +13,7 @@ class ConfirmableRejectable(object): Protocol for objects put into AtomicTagger. You need not subclass it, just support this protocol. """ + __slots__ = () def confirm(self): # type: () -> None """ @@ -32,6 +33,7 @@ class FutureConfirmableRejectable(ConfirmableRejectable): A ConfirmableRejectable that can result a future (with None), or Exception it with a message """ + __slots__ = ('future', ) def __init__(self, future): # type: (concurrent.futures.Future) -> None self.future = future @@ -77,6 +79,7 @@ class AtomicTagger(object): This has to be fast for most common cases. Corner cases will be resolved correctly, but maybe not fast. """ + __slots__ = ('lock', 'next_tag', 'tags') def __init__(self): self.lock = threading.RLock() diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 07e6fa85f51f6d6d65e685ace39bb5a3be04d878..201384c1a8ada0bea0297ace9a1a25104c5f8516 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -6,19 +6,19 @@ from __future__ import print_function, absolute_import, division import logging import time -import typing as tp import warnings -from concurrent.futures import Future import monotonic import six +import typing as tp +from concurrent.futures import Future from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ NothingMuch, Event from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.exceptions import ConnectionDead -from coolamqp.objects import Exchange +from coolamqp.objects import Exchange, Message, Queue, FrameLogger from coolamqp.uplink import ListenerThread logger = logging.getLogger(__name__) @@ -37,15 +37,12 @@ class Cluster(object): It is not safe to fork() after .start() is called, but it's OK before. :param nodes: list of nodes, or a single node. For now, only one is supported. - :type nodes: NodeDefinition instance or a list of NodeDefinition instances :param on_fail: callable/0 to call when connection fails in an unclean way. This is a one-shot - :type on_fail: callable/0 :param extra_properties: refer to documentation in [/coolamqp/connection/connection.py] Connection.__init__ - :param log_frames: an object that will have it's method .on_frame(timestamp, - frame, direction) called upon receiving/sending a frame. Timestamp is UNIX timestamp, - frame is AMQPFrame, direction is one of 'to_client', 'to_server' + :param log_frames: an object that supports logging each and every frame CoolAMQP sends and + receives from the broker :param name: name to appear in log items and prctl() for the listener thread """ @@ -53,7 +50,12 @@ class Cluster(object): ST_LINK_LOST = 0 # Link has been lost ST_LINK_REGAINED = 1 # Link has been regained - def __init__(self, nodes, on_fail=None, extra_properties=None, log_frames=None, name=None): + def __init__(self, nodes, # type: tp.Union[NodeDefinition, tp.List[NodeDefinition]] + on_fail=None, # type: tp.Optional[tp.Callable[[], None]] + extra_properties=None, # type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]] + log_frames=None, # type: tp.Optional[FrameLogger] + name=None # type: tp.Optional[str] + ): from coolamqp.objects import NodeDefinition if isinstance(nodes, NodeDefinition): nodes = [nodes] @@ -75,9 +77,9 @@ class Cluster(object): else: self.on_fail = None - def declare(self, obj, persistent=False): - # type: (tp.Union[coolamqp.objects.Queue, coolamqp.objects.Exchange], bool) -> - # concurrent.futures.Future + def declare(self, obj, # type: tp.Union[Queue, Exchange] + persistent=False # type: bool + ): # type: (...) -> concurrent.futures.Future """ Declare a Queue/Exchange @@ -103,6 +105,7 @@ class Cluster(object): return THE_POPE_OF_NOPE def consume(self, queue, on_message=None, *args, **kwargs): + # type: (Queue, tp.Callable[[MessageReceived], None] -> tp.Tuple[Consumer, Future] """ Start consuming from a queue. @@ -115,7 +118,6 @@ class Cluster(object): Note that name of anonymous queue might change at any time! :param on_message: callable that will process incoming messages if you leave it at None, messages will be .put into self.events - :type on_message: callable(ReceivedMessage instance) or None :return: a tuple (Consumer instance, and a Future), that tells, when consumer is ready """ fut = Future() @@ -127,7 +129,7 @@ class Cluster(object): self.attache_group.add(con) return con, fut - def delete_queue(self, queue): # type: (coolamqp.objects.Queue) -> concurrent.futures.Future + def delete_queue(self, queue): # type: (coolamqp.objects.Queue) -> Future """ Delete a queue. @@ -136,26 +138,25 @@ class Cluster(object): """ return self.decl.delete_queue(queue) - def publish(self, message, exchange=None, routing_key=u'', tx=None, - confirm=None): + def publish(self, message, # type: Message + exchange=None, # type: tp.Union[Exchange, str, bytes] + routing_key=u'', # type: tp.Union[str, bytes] + tx=None, # type: tp.Optional[bool] + confirm=None # type: tp.Optional[bool] + ): # type: (...) -> tp.Optional[Future] """ Publish a message. :param message: Message to publish - :type message: coolamqp.objects.Message :param exchange: exchange to use. Default is the "direct" empty-name exchange. - :type exchange: unicode/bytes (exchange name) or Exchange object. :param routing_key: routing key to use - :type routing_key: tp.Union[str, bytes] :param confirm: Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. Note that if tx if False, and message cannot be delivered to broker at once, it will be discarded - :type confirm: tp.Optional[bool] :param tx: deprecated, alias for confirm - :type tx: tp.Optional[bool] - :return: Future or None + :return: Future to be finished on completion or None, is confirm/tx was not chosen """ if isinstance(exchange, Exchange): exchange = exchange.name.encode('utf8') @@ -206,7 +207,7 @@ class Cluster(object): except AttributeError: pass else: - raise RuntimeError(u'[%s] This was already called!' % (self.name, )) + raise RuntimeError(u'[%s] This was already called!' % (self.name,)) self.listener = ListenerThread(name=self.name) @@ -240,7 +241,8 @@ class Cluster(object): while not self.attache_group.is_online() and monotonic.monotonic() - start_at < timeout: time.sleep(0.1) if not self.attache_group.is_online(): - raise ConnectionDead('[%s] Could not connect within %s seconds' % (self.name, timeout,)) + raise ConnectionDead( + '[%s] Could not connect within %s seconds' % (self.name, timeout,)) def shutdown(self, wait=True): # type: (bool) -> None """ diff --git a/coolamqp/clustering/events.py b/coolamqp/clustering/events.py index 872fdc4c3584c1cd32e285491aeb5cd3a417dcb8..010ec2cc6ea098fe991573a628e147773e546bcc 100644 --- a/coolamqp/clustering/events.py +++ b/coolamqp/clustering/events.py @@ -18,10 +18,14 @@ class Event(object): An event emitted by Cluster """ + __slots__ = () + class NothingMuch(Event): """Nothing happened :D""" + __slots__ = () + class ConnectionLost(Event): """ @@ -35,11 +39,14 @@ class ConnectionLost(Event): Please examine your Consumer's .state's to check whether link was regained """ + __slots__ = () + class MessageReceived(ReceivedMessage, Event): """ Something that works as an ersatz ReceivedMessage, but is an event """ + __slots__ = () def __init__(self, msg): # type: (ReceivedMessage) -> None """:type msg: ReceivedMessage""" diff --git a/coolamqp/framing/base.py b/coolamqp/framing/base.py index eaf24bab3d1f29f47a475f19314c06d7f60803cb..043edf315a8db61f6625b95cf1477304acb4f884 100644 --- a/coolamqp/framing/base.py +++ b/coolamqp/framing/base.py @@ -65,6 +65,7 @@ class AMQPFrame(object): # base class for framing class AMQPPayload(object): """Payload is something that can write itself to bytes, or at least provide a buffer to do it.""" + __slots__ = () def write_to(self, buf): # type: (buffer) -> None """ @@ -84,6 +85,8 @@ class AMQPPayload(object): class AMQPClass(object): """An AMQP class""" + __slots__ = () + class AMQPContentPropertyList(object): """ @@ -152,6 +155,8 @@ class AMQPContentPropertyList(object): class AMQPMethodPayload(AMQPPayload): + __slots__ = () + RESPONSE_TO = None REPLY_WITH = [] FIELDS = [] diff --git a/coolamqp/framing/compilation/content_property.py b/coolamqp/framing/compilation/content_property.py index 3b94bc83cf5f50d3cd09daf8a047e7245e417cac..5c9dedbb0af3d111c99ea91d973b873336b7a9a7 100644 --- a/coolamqp/framing/compilation/content_property.py +++ b/coolamqp/framing/compilation/content_property.py @@ -3,6 +3,7 @@ from __future__ import absolute_import, division, print_function """Generate serializers/unserializers/length getters for given property_flags""" import six +import struct import logging from coolamqp.framing.compilation.textcode_fields import get_counter, \ get_from_buffer, get_serializer @@ -14,8 +15,7 @@ SLOTS_I = u'\n __slots__ = (%s)\n' FROM_BUFFER_1 = u' def from_buffer(cls, buf, start_offset):\n ' \ u'offset = start_offset + %s\n' ASSIGN_A = u' self.%s = %s\n' -STARTER = u'''import struct -from coolamqp.framing.base import AMQPContentPropertyList +STARTER = u'''from coolamqp.framing.base import AMQPContentPropertyList class ParticularContentTypeList(AMQPContentPropertyList): """ @@ -46,6 +46,8 @@ def _compile_particular_content_property_list_class(zpf, fields): """ from coolamqp.framing.compilation.utilities import format_field_name + structers = {} + if any(field.basic_type == 'bit' for field in fields): return NB @@ -115,7 +117,9 @@ def _compile_particular_content_property_list_class(zpf, fields): mod.append(repred_zpf) mod.append(u')\n') - mod.append(get_serializer(present_fields, prefix=u'self.', indent_level=2)) + line, new_structers = get_serializer(present_fields, prefix=u'self.', indent_level=2) + structers.update(new_structers) + mod.append(line) # from_buffer # note that non-bit values @@ -123,9 +127,11 @@ def _compile_particular_content_property_list_class(zpf, fields): mod.append( FROM_BUFFER_1 % ( zpf_length,)) - mod.append(get_from_buffer( + line, new_structers = get_from_buffer( present_fields - , prefix='', indent_level=2)) + , prefix='', indent_level=2) + structers.update(new_structers) + mod.append(line) mod.append(u' return cls(%s)\n' % (FFN,)) # get_size @@ -134,16 +140,26 @@ def _compile_particular_content_property_list_class(zpf, fields): :-1]) # skip eol mod.append(u' + %s\n' % (zpf_length,)) # account for pf length - return u''.join(mod) + return u''.join(mod), structers + + +STRUCTERS_FOR_NOW = {} # type: tp.Dict[str, struct.Struct] def compile_particular_content_property_list_class(zpf, fields): - import struct from coolamqp.framing.base import AMQPContentPropertyList + global STRUCTERS_FOR_NOW + + q, structers = _compile_particular_content_property_list_class(zpf, fields) + locals_ = { + 'AMQPContentPropertyList': AMQPContentPropertyList + } + for structer in structers: + if structer not in STRUCTERS_FOR_NOW: + STRUCTERS_FOR_NOW[structer] = struct.Struct('!%s' % (structer,)) + + locals_['STRUCT_%s' % (structer, )] = STRUCTERS_FOR_NOW[structer] - q = _compile_particular_content_property_list_class(zpf, fields) - loc = dict(globals(), **{ - 'struct': struct, - 'AMQPContentPropertyList': AMQPContentPropertyList}) + loc = dict(globals(), **locals_) exec (q, loc) return loc['ParticularContentTypeList'] diff --git a/coolamqp/framing/compilation/textcode_fields.py b/coolamqp/framing/compilation/textcode_fields.py index 3a27ee44e6fdb94c9f1d24b3d3b65da6c6bd9094..9a18a824a9fa9efddc3ad78d18a74e7535be7868 100644 --- a/coolamqp/framing/compilation/textcode_fields.py +++ b/coolamqp/framing/compilation/textcode_fields.py @@ -69,12 +69,14 @@ def get_counter(fields, prefix=u'', indent_level=2): u' + '.join([str(accumulator)] + parts)) + u'\n' + # type: (...) -> tp.Tuple[str, dict] def get_from_buffer(fields, prefix='', indent_level=2, remark=False): """ Emit code that collects values from buf:offset, updating offset as progressing. :param remark: BE FUCKING VERBOSE! #DEBUG """ code = [] + structers = {} def emit(fmt, *args): args = list(args) @@ -110,18 +112,20 @@ def get_from_buffer(fields, prefix='', indent_level=2, remark=False): del bits[:] - def emit_structures(dont_do_bits=False): + def emit_structures(dont_do_bits=False): # type: (bool) -> dict if not dont_do_bits: emit_bits() if len(to_struct) == 0: - return + return {} fffnames = [a for a, b in to_struct if a != u'_'] # skip reserved ffffmts = [b for a, b in to_struct] - emit("%s, = struct.unpack_from('!%s', buf, offset)", - u', '.join(fffnames), u''.join(ffffmts)) + fmts = u''.join(ffffmts) + emit("%s, = STRUCT_%s.unpack_from(buf, offset)", + u', '.join(fffnames), fmts) emit("offset += %s", ln['ln']) ln['ln'] = 0 del to_struct[:] + return {fmts: fmts} for field in fields: fieldname = prefix + format_field_name(field.name) @@ -149,7 +153,7 @@ def get_from_buffer(fields, prefix='', indent_level=2, remark=False): elif field.basic_type == u'bit': bits.append('_' if field.reserved else fieldname) elif field.basic_type == u'table': # oh my god - emit_structures() + structers.update(emit_structures()) assert len(bits) == 0 assert len(to_struct) == 0 @@ -160,7 +164,7 @@ def get_from_buffer(fields, prefix='', indent_level=2, remark=False): f_q, f_l = ('L', 4) if field.basic_type == u'longstr' else ('B', 1) to_struct.append(('s_len', f_q)) ln['ln'] += f_l - emit_structures() + structers.update(emit_structures()) if field.reserved: emit("offset += s_len # reserved field!") else: @@ -171,20 +175,21 @@ def get_from_buffer(fields, prefix='', indent_level=2, remark=False): if len(bits) == 8: emit_bits() - emit_structures() + structers.update(emit_structures()) - return u''.join(code) + return u''.join(code), structers -def get_serializer(fields, prefix='', indent_level=2): +def get_serializer(fields, prefix='', indent_level=2): # type: (list, str) -> str, dict """ Emit code that serializes the fields into buf at offset :param fields: list of Field instances :param prefix: pass "self." is inside a class - :return: block of code that does that + :return: block of code that does that, dictionary of struct-ers """ code = [] + structers = {} def emit(fmt, *args): args = list(args) @@ -213,7 +218,10 @@ def get_serializer(fields, prefix='', indent_level=2): del bits[:] def emit_single_struct_pack(): - emit("buf.write(struct.pack('!%s', %s))", u''.join(formats), + formats_str = u''.join(formats) + if formats_str not in structers: + structers[formats_str] = formats_str + emit("buf.write(STRUCT_%s.pack(%s))", formats_str, u', '.join(format_args)) del formats[:] del format_args[:] @@ -255,4 +263,4 @@ def get_serializer(fields, prefix='', indent_level=2): emit('') # eol - return u''.join(code) + return u''.join(code), structers diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 6fbccd83bcd2bed1295d36896f83fff5898e0cc8..a82197f22e4387d67b23eac60467e0bc41a2554c 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -241,7 +241,7 @@ class ConnectionBlocked(AMQPMethodPayload): self.reason = reason def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.reason))) + buf.write(STRUCT_B.pack(len(self.reason))) buf.write(self.reason) def get_size(self): # type: () -> int @@ -251,7 +251,7 @@ class ConnectionBlocked(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionBlocked offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 reason = buf[offset:offset + s_len] offset += s_len @@ -329,9 +329,9 @@ class ConnectionClose(AMQPMethodPayload): self.method_id = method_id def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) + buf.write(STRUCT_HB.pack(self.reply_code, len(self.reply_text))) buf.write(self.reply_text) - buf.write(struct.pack('!HH', self.class_id, self.method_id)) + buf.write(STRUCT_HH.pack(self.class_id, self.method_id)) def get_size(self): # type: () -> int return 7 + len(self.reply_text) @@ -340,11 +340,11 @@ class ConnectionClose(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionClose offset = start_offset - reply_code, s_len, = struct.unpack_from('!HB', buf, offset) + reply_code, s_len, = STRUCT_HB.unpack_from(buf, offset) offset += 3 reply_text = buf[offset:offset + s_len] offset += s_len - class_id, method_id, = struct.unpack_from('!HH', buf, offset) + class_id, method_id, = STRUCT_HH.unpack_from(buf, offset) offset += 4 return cls(reply_code, reply_text, class_id, method_id) @@ -439,10 +439,10 @@ class ConnectionOpen(AMQPMethodPayload): self.virtual_host = virtual_host def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.virtual_host))) + buf.write(STRUCT_B.pack(len(self.virtual_host))) buf.write(self.virtual_host) buf.write(b'\x00') - buf.write(struct.pack('!B', 0)) + buf.write(STRUCT_B.pack(0)) def get_size(self): # type: () -> int return 3 + len(self.virtual_host) @@ -451,11 +451,11 @@ class ConnectionOpen(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionOpen offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 virtual_host = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 offset += s_len # reserved field! offset += 1 @@ -500,7 +500,7 @@ class ConnectionOpenOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionOpenOk offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 offset += s_len # reserved field! return cls() @@ -602,11 +602,11 @@ class ConnectionStart(AMQPMethodPayload): self.locales = locales def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!BB', self.version_major, self.version_minor)) + buf.write(STRUCT_BB.pack(self.version_major, self.version_minor)) enframe_table(buf, self.server_properties) - buf.write(struct.pack('!I', len(self.mechanisms))) + buf.write(STRUCT_I.pack(len(self.mechanisms))) buf.write(self.mechanisms) - buf.write(struct.pack('!I', len(self.locales))) + buf.write(STRUCT_I.pack(len(self.locales))) buf.write(self.locales) def get_size(self): # type: () -> int @@ -617,15 +617,15 @@ class ConnectionStart(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionStart offset = start_offset - version_major, version_minor, = struct.unpack_from('!BB', buf, offset) + version_major, version_minor, = STRUCT_BB.unpack_from(buf, offset) offset += 2 server_properties, delta = deframe_table(buf, offset) offset += delta - s_len, = struct.unpack_from('!L', buf, offset) + s_len, = STRUCT_L.unpack_from(buf, offset) offset += 4 mechanisms = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!L', buf, offset) + s_len, = STRUCT_L.unpack_from(buf, offset) offset += 4 locales = buf[offset:offset + s_len] offset += s_len @@ -682,7 +682,7 @@ class ConnectionSecure(AMQPMethodPayload): self.challenge = challenge def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!I', len(self.challenge))) + buf.write(STRUCT_I.pack(len(self.challenge))) buf.write(self.challenge) def get_size(self): # type: () -> int @@ -692,7 +692,7 @@ class ConnectionSecure(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionSecure offset = start_offset - s_len, = struct.unpack_from('!L', buf, offset) + s_len, = STRUCT_L.unpack_from(buf, offset) offset += 4 challenge = buf[offset:offset + s_len] offset += s_len @@ -781,11 +781,11 @@ class ConnectionStartOk(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None enframe_table(buf, self.client_properties) - buf.write(struct.pack('!B', len(self.mechanism))) + buf.write(STRUCT_B.pack(len(self.mechanism))) buf.write(self.mechanism) - buf.write(struct.pack('!I', len(self.response))) + buf.write(STRUCT_I.pack(len(self.response))) buf.write(self.response) - buf.write(struct.pack('!B', len(self.locale))) + buf.write(STRUCT_B.pack(len(self.locale))) buf.write(self.locale) def get_size(self): # type: () -> int @@ -798,15 +798,15 @@ class ConnectionStartOk(AMQPMethodPayload): offset = start_offset client_properties, delta = deframe_table(buf, offset) offset += delta - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 mechanism = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!L', buf, offset) + s_len, = STRUCT_L.unpack_from(buf, offset) offset += 4 response = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 locale = buf[offset:offset + s_len] offset += s_len @@ -860,7 +860,7 @@ class ConnectionSecureOk(AMQPMethodPayload): self.response = response def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!I', len(self.response))) + buf.write(STRUCT_I.pack(len(self.response))) buf.write(self.response) def get_size(self): # type: () -> int @@ -870,7 +870,7 @@ class ConnectionSecureOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionSecureOk offset = start_offset - s_len, = struct.unpack_from('!L', buf, offset) + s_len, = STRUCT_L.unpack_from(buf, offset) offset += 4 response = buf[offset:offset + s_len] offset += s_len @@ -948,8 +948,7 @@ class ConnectionTune(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write( - struct.pack('!HIH', self.channel_max, self.frame_max, - self.heartbeat)) + STRUCT_HIH.pack(self.channel_max, self.frame_max, self.heartbeat)) def get_size(self): # type: () -> int return 8 @@ -958,8 +957,8 @@ class ConnectionTune(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionTune offset = start_offset - channel_max, frame_max, heartbeat, = struct.unpack_from( - '!HIH', buf, offset) + channel_max, frame_max, heartbeat, = STRUCT_HIH.unpack_from( + buf, offset) offset += 8 return cls(channel_max, frame_max, heartbeat) @@ -1036,8 +1035,7 @@ class ConnectionTuneOk(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write( - struct.pack('!HIH', self.channel_max, self.frame_max, - self.heartbeat)) + STRUCT_HIH.pack(self.channel_max, self.frame_max, self.heartbeat)) def get_size(self): # type: () -> int return 8 @@ -1046,8 +1044,8 @@ class ConnectionTuneOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConnectionTuneOk offset = start_offset - channel_max, frame_max, heartbeat, = struct.unpack_from( - '!HIH', buf, offset) + channel_max, frame_max, heartbeat, = STRUCT_HIH.unpack_from( + buf, offset) offset += 8 return cls(channel_max, frame_max, heartbeat) @@ -1169,9 +1167,9 @@ class ChannelClose(AMQPMethodPayload): self.method_id = method_id def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) + buf.write(STRUCT_HB.pack(self.reply_code, len(self.reply_text))) buf.write(self.reply_text) - buf.write(struct.pack('!HH', self.class_id, self.method_id)) + buf.write(STRUCT_HH.pack(self.class_id, self.method_id)) def get_size(self): # type: () -> int return 7 + len(self.reply_text) @@ -1180,11 +1178,11 @@ class ChannelClose(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelClose offset = start_offset - reply_code, s_len, = struct.unpack_from('!HB', buf, offset) + reply_code, s_len, = STRUCT_HB.unpack_from(buf, offset) offset += 3 reply_text = buf[offset:offset + s_len] offset += s_len - class_id, method_id, = struct.unpack_from('!HH', buf, offset) + class_id, method_id, = STRUCT_HH.unpack_from(buf, offset) offset += 4 return cls(reply_code, reply_text, class_id, method_id) @@ -1278,7 +1276,7 @@ class ChannelFlow(AMQPMethodPayload): self.active = active def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', (self.active << 0))) + buf.write(STRUCT_B.pack((self.active << 0))) def get_size(self): # type: () -> int return 1 @@ -1287,7 +1285,7 @@ class ChannelFlow(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelFlow offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 active = bool(_bit >> 0) offset += 1 @@ -1340,7 +1338,7 @@ class ChannelFlowOk(AMQPMethodPayload): self.active = active def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', (self.active << 0))) + buf.write(STRUCT_B.pack((self.active << 0))) def get_size(self): # type: () -> int return 1 @@ -1349,7 +1347,7 @@ class ChannelFlowOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelFlowOk offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 active = bool(_bit >> 0) offset += 1 @@ -1393,7 +1391,7 @@ class ChannelOpen(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelOpen offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 offset += s_len # reserved field! return cls() @@ -1437,7 +1435,7 @@ class ChannelOpenOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ChannelOpenOk offset = start_offset - s_len, = struct.unpack_from('!L', buf, offset) + s_len, = STRUCT_L.unpack_from(buf, offset) offset += 4 offset += s_len # reserved field! return cls() @@ -1530,13 +1528,13 @@ class ExchangeBind(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.destination))) + buf.write(STRUCT_B.pack(len(self.destination))) buf.write(self.destination) - buf.write(struct.pack('!B', len(self.source))) + buf.write(STRUCT_B.pack(len(self.source))) buf.write(self.source) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) - buf.write(struct.pack('!B', (self.no_wait << 0))) + buf.write(STRUCT_B.pack((self.no_wait << 0))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int @@ -1547,19 +1545,19 @@ class ExchangeBind(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeBind offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 destination = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 source = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_wait = bool(_bit >> 0) offset += 1 @@ -1726,14 +1724,14 @@ class ExchangeDeclare(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.exchange))) + buf.write(STRUCT_B.pack(len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.type_))) + buf.write(STRUCT_B.pack(len(self.type_))) buf.write(self.type_) buf.write( - struct.pack('!B', (self.passive << 0) | (self.durable << 1) | - (self.auto_delete << 2) | (self.internal << 3) | - (self.no_wait << 4))) + STRUCT_B.pack((self.passive << 0) | (self.durable << 1) + | (self.auto_delete << 2) | (self.internal << 3) + | (self.no_wait << 4))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int @@ -1744,15 +1742,15 @@ class ExchangeDeclare(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeDeclare offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 type_ = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 passive = bool(_bit >> 0) durable = bool(_bit >> 1) @@ -1829,10 +1827,9 @@ class ExchangeDelete(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.exchange))) + buf.write(STRUCT_B.pack(len(self.exchange))) buf.write(self.exchange) - buf.write( - struct.pack('!B', (self.if_unused << 0) | (self.no_wait << 1))) + buf.write(STRUCT_B.pack((self.if_unused << 0) | (self.no_wait << 1))) def get_size(self): # type: () -> int return 4 + len(self.exchange) @@ -1841,11 +1838,11 @@ class ExchangeDelete(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeDelete offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 exchange = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 if_unused = bool(_bit >> 0) no_wait = bool(_bit >> 1) @@ -1995,13 +1992,13 @@ class ExchangeUnbind(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.destination))) + buf.write(STRUCT_B.pack(len(self.destination))) buf.write(self.destination) - buf.write(struct.pack('!B', len(self.source))) + buf.write(STRUCT_B.pack(len(self.source))) buf.write(self.source) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) - buf.write(struct.pack('!B', (self.no_wait << 0))) + buf.write(STRUCT_B.pack((self.no_wait << 0))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int @@ -2012,19 +2009,19 @@ class ExchangeUnbind(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ExchangeUnbind offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 destination = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 source = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_wait = bool(_bit >> 0) offset += 1 @@ -2175,13 +2172,13 @@ class QueueBind(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', len(self.exchange))) + buf.write(STRUCT_B.pack(len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) - buf.write(struct.pack('!B', (self.no_wait << 0))) + buf.write(STRUCT_B.pack((self.no_wait << 0))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int @@ -2192,19 +2189,19 @@ class QueueBind(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueBind offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_wait = bool(_bit >> 0) offset += 1 @@ -2371,12 +2368,12 @@ class QueueDeclare(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) buf.write( - struct.pack('!B', (self.passive << 0) | (self.durable << 1) | - (self.exclusive << 2) | (self.auto_delete << 3) | - (self.no_wait << 4))) + STRUCT_B.pack((self.passive << 0) | (self.durable << 1) + | (self.exclusive << 2) | (self.auto_delete << 3) + | (self.no_wait << 4))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int @@ -2386,11 +2383,11 @@ class QueueDeclare(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueDeclare offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 passive = bool(_bit >> 0) durable = bool(_bit >> 1) @@ -2476,11 +2473,11 @@ class QueueDelete(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) buf.write( - struct.pack('!B', (self.if_unused << 0) | (self.if_empty << 1) | - (self.no_wait << 2))) + STRUCT_B.pack((self.if_unused << 0) | (self.if_empty << 1) + | (self.no_wait << 2))) def get_size(self): # type: () -> int return 4 + len(self.queue) @@ -2489,11 +2486,11 @@ class QueueDelete(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueDelete offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 if_unused = bool(_bit >> 0) if_empty = bool(_bit >> 1) @@ -2563,9 +2560,9 @@ class QueueDeclareOk(AMQPMethodPayload): self.consumer_count = consumer_count def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!II', self.message_count, self.consumer_count)) + buf.write(STRUCT_II.pack(self.message_count, self.consumer_count)) def get_size(self): # type: () -> int return 9 + len(self.queue) @@ -2574,11 +2571,11 @@ class QueueDeclareOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueDeclareOk offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 queue = buf[offset:offset + s_len] offset += s_len - message_count, consumer_count, = struct.unpack_from('!II', buf, offset) + message_count, consumer_count, = STRUCT_II.unpack_from(buf, offset) offset += 8 return cls(queue, message_count, consumer_count) @@ -2625,7 +2622,7 @@ class QueueDeleteOk(AMQPMethodPayload): self.message_count = message_count def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!I', self.message_count)) + buf.write(STRUCT_I.pack(self.message_count)) def get_size(self): # type: () -> int return 4 @@ -2634,7 +2631,7 @@ class QueueDeleteOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueDeleteOk offset = start_offset - message_count, = struct.unpack_from('!I', buf, offset) + message_count, = STRUCT_I.unpack_from(buf, offset) offset += 4 return cls(message_count) @@ -2691,9 +2688,9 @@ class QueuePurge(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', (self.no_wait << 0))) + buf.write(STRUCT_B.pack((self.no_wait << 0))) def get_size(self): # type: () -> int return 4 + len(self.queue) @@ -2702,11 +2699,11 @@ class QueuePurge(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueuePurge offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_wait = bool(_bit >> 0) offset += 1 @@ -2755,7 +2752,7 @@ class QueuePurgeOk(AMQPMethodPayload): self.message_count = message_count def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!I', self.message_count)) + buf.write(STRUCT_I.pack(self.message_count)) def get_size(self): # type: () -> int return 4 @@ -2764,7 +2761,7 @@ class QueuePurgeOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueuePurgeOk offset = start_offset - message_count, = struct.unpack_from('!I', buf, offset) + message_count, = STRUCT_I.unpack_from(buf, offset) offset += 4 return cls(message_count) @@ -2834,11 +2831,11 @@ class QueueUnbind(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', len(self.exchange))) + buf.write(STRUCT_B.pack(len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) enframe_table(buf, self.arguments) @@ -2850,15 +2847,15 @@ class QueueUnbind(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> QueueUnbind offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len @@ -3124,7 +3121,7 @@ class BasicAck(AMQPMethodPayload): self.multiple = multiple def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!QB', self.delivery_tag, (self.multiple << 0))) + buf.write(STRUCT_QB.pack(self.delivery_tag, (self.multiple << 0))) def get_size(self): # type: () -> int return 9 @@ -3132,7 +3129,7 @@ class BasicAck(AMQPMethodPayload): @classmethod def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicAck offset = start_offset - delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + delivery_tag, _bit, = STRUCT_QB.unpack_from(buf, offset) offset += 8 multiple = bool(_bit >> 0) offset += 1 @@ -3230,13 +3227,13 @@ class BasicConsume(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(STRUCT_B.pack(len(self.consumer_tag))) buf.write(self.consumer_tag) buf.write( - struct.pack('!B', (self.no_local << 0) | (self.no_ack << 1) | - (self.exclusive << 2) | (self.no_wait << 3))) + STRUCT_B.pack((self.no_local << 0) | (self.no_ack << 1) + | (self.exclusive << 2) | (self.no_wait << 3))) enframe_table(buf, self.arguments) def get_size(self): # type: () -> int @@ -3247,15 +3244,15 @@ class BasicConsume(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicConsume offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 consumer_tag = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_local = bool(_bit >> 0) no_ack = bool(_bit >> 1) @@ -3332,9 +3329,9 @@ class BasicCancel(AMQPMethodPayload): self.no_wait = no_wait def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(STRUCT_B.pack(len(self.consumer_tag))) buf.write(self.consumer_tag) - buf.write(struct.pack('!B', (self.no_wait << 0))) + buf.write(STRUCT_B.pack((self.no_wait << 0))) def get_size(self): # type: () -> int return 2 + len(self.consumer_tag) @@ -3343,11 +3340,11 @@ class BasicCancel(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicCancel offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 consumer_tag = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_wait = bool(_bit >> 0) offset += 1 @@ -3399,7 +3396,7 @@ class BasicConsumeOk(AMQPMethodPayload): self.consumer_tag = consumer_tag def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(STRUCT_B.pack(len(self.consumer_tag))) buf.write(self.consumer_tag) def get_size(self): # type: () -> int @@ -3409,7 +3406,7 @@ class BasicConsumeOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicConsumeOk offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 consumer_tag = buf[offset:offset + s_len] offset += s_len @@ -3457,7 +3454,7 @@ class BasicCancelOk(AMQPMethodPayload): self.consumer_tag = consumer_tag def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(STRUCT_B.pack(len(self.consumer_tag))) buf.write(self.consumer_tag) def get_size(self): # type: () -> int @@ -3467,7 +3464,7 @@ class BasicCancelOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicCancelOk offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 consumer_tag = buf[offset:offset + s_len] offset += s_len @@ -3549,13 +3546,13 @@ class BasicDeliver(AMQPMethodPayload): self.routing_key = routing_key def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', len(self.consumer_tag))) + buf.write(STRUCT_B.pack(len(self.consumer_tag))) buf.write(self.consumer_tag) buf.write( - struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), - len(self.exchange))) + STRUCT_QBB.pack(self.delivery_tag, (self.redelivered << 0), + len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) def get_size(self): # type: () -> int @@ -3566,19 +3563,19 @@ class BasicDeliver(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicDeliver offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 consumer_tag = buf[offset:offset + s_len] offset += s_len - delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + delivery_tag, _bit, = STRUCT_QB.unpack_from(buf, offset) offset += 8 redelivered = bool(_bit >> 0) offset += 1 - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len @@ -3640,9 +3637,9 @@ class BasicGet(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.queue))) + buf.write(STRUCT_B.pack(len(self.queue))) buf.write(self.queue) - buf.write(struct.pack('!B', (self.no_ack << 0))) + buf.write(STRUCT_B.pack((self.no_ack << 0))) def get_size(self): # type: () -> int return 4 + len(self.queue) @@ -3650,11 +3647,11 @@ class BasicGet(AMQPMethodPayload): @classmethod def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicGet offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 queue = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 no_ack = bool(_bit >> 0) offset += 1 @@ -3735,12 +3732,12 @@ class BasicGetOk(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write( - struct.pack('!QBB', self.delivery_tag, (self.redelivered << 0), - len(self.exchange))) + STRUCT_QBB.pack(self.delivery_tag, (self.redelivered << 0), + len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) - buf.write(struct.pack('!I', self.message_count)) + buf.write(STRUCT_I.pack(self.message_count)) def get_size(self): # type: () -> int return 15 + len(self.exchange) + len(self.routing_key) @@ -3749,19 +3746,19 @@ class BasicGetOk(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicGetOk offset = start_offset - delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + delivery_tag, _bit, = STRUCT_QB.unpack_from(buf, offset) offset += 8 redelivered = bool(_bit >> 0) offset += 1 - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len - message_count, = struct.unpack_from('!I', buf, offset) + message_count, = STRUCT_I.unpack_from(buf, offset) offset += 4 return cls(delivery_tag, redelivered, exchange, routing_key, message_count) @@ -3806,7 +3803,7 @@ class BasicGetEmpty(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicGetEmpty offset = start_offset - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 offset += s_len # reserved field! return cls() @@ -3886,8 +3883,8 @@ class BasicNack(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write( - struct.pack('!QB', self.delivery_tag, - (self.multiple << 0) | (self.requeue << 1))) + STRUCT_QB.pack(self.delivery_tag, + (self.multiple << 0) | (self.requeue << 1))) def get_size(self): # type: () -> int return 9 @@ -3896,7 +3893,7 @@ class BasicNack(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicNack offset = start_offset - delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + delivery_tag, _bit, = STRUCT_QB.unpack_from(buf, offset) offset += 8 multiple = bool(_bit >> 0) requeue = bool(_bit >> 1) @@ -3993,12 +3990,11 @@ class BasicPublish(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write(b'\x00\x00') - buf.write(struct.pack('!B', len(self.exchange))) + buf.write(STRUCT_B.pack(len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) - buf.write( - struct.pack('!B', (self.mandatory << 0) | (self.immediate << 1))) + buf.write(STRUCT_B.pack((self.mandatory << 0) | (self.immediate << 1))) def get_size(self): # type: () -> int return 5 + len(self.exchange) + len(self.routing_key) @@ -4007,15 +4003,15 @@ class BasicPublish(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicPublish offset = start_offset - s_len, = struct.unpack_from('!2xB', buf, offset) + s_len, = STRUCT_2xB.unpack_from(buf, offset) offset += 3 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 mandatory = bool(_bit >> 0) immediate = bool(_bit >> 1) @@ -4119,8 +4115,8 @@ class BasicQos(AMQPMethodPayload): def write_arguments(self, buf): # type: (tp.BinaryIO) -> None buf.write( - struct.pack('!IHB', self.prefetch_size, self.prefetch_count, - (self.global_ << 0))) + STRUCT_IHB.pack(self.prefetch_size, self.prefetch_count, + (self.global_ << 0))) def get_size(self): # type: () -> int return 7 @@ -4128,8 +4124,8 @@ class BasicQos(AMQPMethodPayload): @classmethod def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicQos offset = start_offset - prefetch_size, prefetch_count, _bit, = struct.unpack_from( - '!IHB', buf, offset) + prefetch_size, prefetch_count, _bit, = STRUCT_IHB.unpack_from( + buf, offset) offset += 6 global_ = bool(_bit >> 0) offset += 1 @@ -4245,11 +4241,11 @@ class BasicReturn(AMQPMethodPayload): self.routing_key = routing_key def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!HB', self.reply_code, len(self.reply_text))) + buf.write(STRUCT_HB.pack(self.reply_code, len(self.reply_text))) buf.write(self.reply_text) - buf.write(struct.pack('!B', len(self.exchange))) + buf.write(STRUCT_B.pack(len(self.exchange))) buf.write(self.exchange) - buf.write(struct.pack('!B', len(self.routing_key))) + buf.write(STRUCT_B.pack(len(self.routing_key))) buf.write(self.routing_key) def get_size(self): # type: () -> int @@ -4260,15 +4256,15 @@ class BasicReturn(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicReturn offset = start_offset - reply_code, s_len, = struct.unpack_from('!HB', buf, offset) + reply_code, s_len, = STRUCT_HB.unpack_from(buf, offset) offset += 3 reply_text = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 exchange = buf[offset:offset + s_len] offset += s_len - s_len, = struct.unpack_from('!B', buf, offset) + s_len, = STRUCT_B.unpack_from(buf, offset) offset += 1 routing_key = buf[offset:offset + s_len] offset += s_len @@ -4331,7 +4327,7 @@ class BasicReject(AMQPMethodPayload): self.requeue = requeue def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!QB', self.delivery_tag, (self.requeue << 0))) + buf.write(STRUCT_QB.pack(self.delivery_tag, (self.requeue << 0))) def get_size(self): # type: () -> int return 9 @@ -4340,7 +4336,7 @@ class BasicReject(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicReject offset = start_offset - delivery_tag, _bit, = struct.unpack_from('!QB', buf, offset) + delivery_tag, _bit, = STRUCT_QB.unpack_from(buf, offset) offset += 8 requeue = bool(_bit >> 0) offset += 1 @@ -4397,7 +4393,7 @@ class BasicRecoverAsync(AMQPMethodPayload): self.requeue = requeue def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', (self.requeue << 0))) + buf.write(STRUCT_B.pack((self.requeue << 0))) def get_size(self): # type: () -> int return 1 @@ -4406,7 +4402,7 @@ class BasicRecoverAsync(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicRecoverAsync offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 requeue = bool(_bit >> 0) offset += 1 @@ -4463,7 +4459,7 @@ class BasicRecover(AMQPMethodPayload): self.requeue = requeue def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', (self.requeue << 0))) + buf.write(STRUCT_B.pack((self.requeue << 0))) def get_size(self): # type: () -> int return 1 @@ -4472,7 +4468,7 @@ class BasicRecover(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> BasicRecover offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 requeue = bool(_bit >> 0) offset += 1 @@ -4835,7 +4831,7 @@ class ConfirmSelect(AMQPMethodPayload): self.nowait = nowait def write_arguments(self, buf): # type: (tp.BinaryIO) -> None - buf.write(struct.pack('!B', (self.nowait << 0))) + buf.write(STRUCT_B.pack((self.nowait << 0))) def get_size(self): # type: () -> int return 1 @@ -4844,7 +4840,7 @@ class ConfirmSelect(AMQPMethodPayload): def from_buffer(cls, buf, start_offset): # type: (buffer, int) -> ConfirmSelect offset = start_offset - _bit, = struct.unpack_from('!B', buf, offset) + _bit, = STRUCT_B.unpack_from(buf, offset) offset += 0 nowait = bool(_bit >> 0) offset += 1 @@ -5120,3 +5116,15 @@ REPLIES_FOR = { ConfirmSelect: [ConfirmSelectOk], ConfirmSelectOk: [], } +STRUCT_B = struct.Struct("!B") +STRUCT_HB = struct.Struct("!HB") +STRUCT_HH = struct.Struct("!HH") +STRUCT_BB = struct.Struct("!BB") +STRUCT_I = struct.Struct("!I") +STRUCT_L = struct.Struct("!L") +STRUCT_HIH = struct.Struct("!HIH") +STRUCT_2xB = struct.Struct("!2xB") +STRUCT_II = struct.Struct("!II") +STRUCT_QB = struct.Struct("!QB") +STRUCT_QBB = struct.Struct("!QBB") +STRUCT_IHB = struct.Struct("!IHB") diff --git a/coolamqp/framing/frames.py b/coolamqp/framing/frames.py index 86424233852bb74d6a0e69155a2cb43c24b12362..9705606bdf9f1ba67bf67694a181db2f6eabce10 100644 --- a/coolamqp/framing/frames.py +++ b/coolamqp/framing/frames.py @@ -13,6 +13,13 @@ from coolamqp.framing.definitions import FRAME_METHOD, FRAME_HEARTBEAT, \ FRAME_BODY, FRAME_HEADER, FRAME_END, \ IDENT_TO_METHOD, CLASS_ID_TO_CONTENT_PROPERTY_LIST, FRAME_END_BYTE +STRUCT_BH = struct.Struct('!BH') +STRUCT_BHL = struct.Struct('!BHL') +STRUCT_HH = struct.Struct('!HH') +STRUCT_BHLHHQ = struct.Struct('!BHLHHQ') +STRUCT_HHQ = struct.Struct('!HHQ') +STRUCT_BHLB = struct.Struct('!BHLB') + class AMQPMethodFrame(AMQPFrame): FRAME_TYPE = FRAME_METHOD @@ -31,10 +38,10 @@ class AMQPMethodFrame(AMQPFrame): def write_to(self, buf): if self.payload.IS_CONTENT_STATIC: - buf.write(struct.pack('!BH', FRAME_METHOD, self.channel)) + buf.write(STRUCT_BH.pack(FRAME_METHOD, self.channel)) buf.write(self.payload.STATIC_CONTENT) else: - buf.write(struct.pack('!BHL', FRAME_METHOD, self.channel, + buf.write(STRUCT_BHL.pack(FRAME_METHOD, self.channel, 4 + self.payload.get_size())) buf.write(self.payload.BINARY_HEADER) self.payload.write_arguments(buf) @@ -42,7 +49,7 @@ class AMQPMethodFrame(AMQPFrame): @classmethod def unserialize(cls, channel, payload_as_buffer): - clsmet = struct.unpack_from('!HH', payload_as_buffer, 0) + clsmet = STRUCT_HH.unpack_from(payload_as_buffer, 0) try: method_payload_class = IDENT_TO_METHOD[clsmet] @@ -79,7 +86,7 @@ class AMQPHeaderFrame(AMQPFrame): self.properties = properties def write_to(self, buf): - buf.write(struct.pack('!BHLHHQ', FRAME_HEADER, self.channel, + buf.write(STRUCT_BHLHHQ.pack(FRAME_HEADER, self.channel, 12 + self.properties.get_size(), self.class_id, 0, self.body_size)) @@ -89,8 +96,7 @@ class AMQPHeaderFrame(AMQPFrame): @staticmethod def unserialize(channel, payload_as_buffer): # payload starts with class ID - class_id, weight, body_size = struct.unpack_from('!HHQ', - payload_as_buffer, 0) + class_id, weight, body_size = STRUCT_HHQ.unpack_from(payload_as_buffer, 0) properties = CLASS_ID_TO_CONTENT_PROPERTY_LIST[class_id].from_buffer( payload_as_buffer, 12) return AMQPHeaderFrame(channel, class_id, weight, body_size, @@ -126,7 +132,7 @@ class AMQPBodyFrame(AMQPFrame): def write_to(self, buf): buf.write( - struct.pack('!BHL', FRAME_BODY, self.channel, len(self.data))) + STRUCT_BHL.pack(FRAME_BODY, self.channel, len(self.data))) buf.write(self.data) buf.write(FRAME_END_BYTE) @@ -144,7 +150,7 @@ class AMQPBodyFrame(AMQPFrame): class AMQPHeartbeatFrame(AMQPFrame): FRAME_TYPE = FRAME_HEARTBEAT LENGTH = 8 - DATA = struct.pack('!BHLB', FRAME_HEARTBEAT, 0, 0, FRAME_END) + DATA = STRUCT_BHLB.pack(FRAME_HEARTBEAT, 0, 0, FRAME_END) def __init__(self): AMQPFrame.__init__(self, 0) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index ee66fcf002b07fe1b994a28b7cbeafd0a925a252..2eeaffe11f099476e75f266baf2397860047116c 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -2,6 +2,7 @@ """ Core objects used in CoolAMQP """ +import typing as tp import logging import uuid @@ -9,11 +10,17 @@ import six from coolamqp.framing.definitions import \ BasicContentPropertyList as MessageProperties +from coolamqp.framing.base import AMQPFrame logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() +try: + Protocol = tp.Protocol +except AttributeError: + Protocol = object + def toutf8(q): if isinstance(q, six.binary_type): @@ -27,12 +34,28 @@ def tobytes(q): return q +class FrameLogger(Protocol): + def on_frame(self, timestamp, # type: float + frame, # type: AMQPFrame + direction # type: str + ): + """ + Log a frame + + :param timestamp: timestamp in seconds since Unix Epoch + :param frame: AMQPFrame to parse + :param direction: either 'to_client' when this is frame received from the broker, or + 'to_server' if it's a frame that's being sent to the broker + """ + + class Callable(object): """ Add a bunch of callables to one list, and just invoke'm. INTERNAL USE ONLY #todo not thread safe """ + __slots__ = ('callables', 'oneshots') def __init__(self, oneshots=False): """:param oneshots: if True, callables will be called and discarded""" @@ -57,10 +80,12 @@ class Message(object): coolamqp.framing.definitions.BasicContentPropertyList for a list of possible properties. """ + __slots__ = ('body', 'properties') Properties = MessageProperties # an alias for easier use - def __init__(self, body, properties=None): + def __init__(self, body, # type: bytes + properties=None): """ Create a Message object. @@ -103,35 +128,32 @@ class ReceivedMessage(Message): Note that if the consumer that generated this message was no_ack, .ack() and .nack() are no-ops. """ + __slots__ = ('delivery_tag', 'exchange_name', 'routing_key', 'ack', 'nack') - def __init__(self, body, - exchange_name, - routing_key, + def __init__(self, body, # type: tp.Union[str, bytes, bytearray, tp.List[memoryview]] + exchange_name, # type: memoryview + routing_key, # type: memoryview properties=None, - delivery_tag=None, - ack=None, - nack=None): + delivery_tag=None, # type: int + ack=None, # type: tp.Callable[[], None] + nack=None # type: tp.Callable[[], None] + ): """ :param body: message body. A stream of octets. :type body: str (py2) or bytes (py3) or a list of memoryviews, if particular disabled-by-default option is turned on, or a single memoryview :param exchange_name: name of exchange this message was submitted to - :type exchange_name: memoryview :param routing_key: routing key with which this message was sent - :type routing_key: memoryview :param properties: a suitable BasicContentPropertyList subinstance. be prepared that value of properties that are strings will be memoryviews :param delivery_tag: delivery tag assigned by AMQP broker to confirm this message - :type delivery_tag: int :param ack: a callable to call when you want to ack (via basic.ack) this message. None if received by the no-ack mechanism - :type ack: tp.Callable[[], None] :param nack: a callable to call when you want to nack (via basic.reject) this message. None if received by the no-ack mechanism - :type nack: tp.Callable[[], None] """ Message.__init__(self, body, properties=properties) @@ -148,15 +170,18 @@ class Exchange(object): This represents an Exchange used in AMQP. This is hashable. """ + __slots__ = ('name', 'type', 'durable', 'auto_delete') direct = None # the direct exchange - def __init__(self, name=u'', type=b'direct', durable=True, - auto_delete=False): + def __init__(self, name=u'', # type: tp.Union[str, bytes] + type=b'direct', # type: tp.Union[str, bytes] + durable=True, # type: bool + auto_delete=False # type: bool + ): """ :type name: unicode is preferred, binary type will get decoded to unicode with utf8 - :param type: exchange type. binary/unicode """ self.name = toutf8(name) # must be unicode self.type = tobytes(type) # must be bytes @@ -185,9 +210,15 @@ class Queue(object): """ This object represents a Queue that applications consume from or publish to. """ - - def __init__(self, name=b'', durable=False, exchange=None, exclusive=False, - auto_delete=False): + __slots__ = ('name', 'durable', 'exchange', 'auto_delete', 'exclusive', + 'anonymous', 'consumer_tag') + + def __init__(self, name=b'', # type: tp.Union[str, bytes] + durable=False, # type: bool + exchange=None, # type: tp.Optional[Exchange] + exclusive=False, # type: bool + auto_delete=False # type: bool + ): """ Create a queue definition. @@ -196,7 +227,6 @@ class Queue(object): upon declaration. If a disconnect happens, and connection to other node is reestablished, this name will CHANGE AGAIN, and be reflected in this object. This change will be done before CoolAMQP signals reconnection. - :type name: byte type or text type :param durable: Is the queue durable? :param exchange: Exchange for this queue to bind to. None for no binding. :param exclusive: Is this queue exclusive? @@ -230,37 +260,33 @@ class NodeDefinition(object): Definition of a reachable AMQP node. This object is hashable. + + >>> a = NodeDefinition(host='192.168.0.1', user='admin', password='password', + >>> virtual_host='vhost') + + or + + >>> a = NodeDefinition('192.168.0.1', 'admin', 'password') + + or + + >>> a = NodeDefinition('amqp://user:password@host/virtual_host') + + or + + >>> a = NodeDefinition('amqp://user:password@host:port/virtual_host', hearbeat=20) + + AMQP connection string may be either bytes or str/unicode + + + Additional keyword parameters that can be specified: + heartbeat - heartbeat interval in seconds + port - TCP port to use. Default is 5672 + + :raise ValueError: invalid parameters """ def __init__(self, *args, **kwargs): - """ - Create a cluster node definition. - - >>> a = NodeDefinition(host='192.168.0.1', user='admin', password='password', - >>> virtual_host='vhost') - - or - - >>> a = NodeDefinition('192.168.0.1', 'admin', 'password') - - or - - >>> a = NodeDefinition('amqp://user:password@host/virtual_host') - - or - - >>> a = NodeDefinition('amqp://user:password@host:port/virtual_host', hearbeat=20) - - AMQP connection string may be either bytes or str/unicode - - - Additional keyword parameters that can be specified: - heartbeat - heartbeat interval in seconds - port - TCP port to use. Default is 5672 - - :raise ValueError: invalid parameters - """ - self.heartbeat = kwargs.pop('heartbeat', None) self.port = kwargs.pop('port', 5672) diff --git a/setup.py b/setup.py index 77ea5c381d4b128875a059ec67427fb5f4494fb0..f7554e817d0355e5e83372ba2d726a97bb14222c 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # coding=UTF-8 from setuptools import setup, find_packages - from coolamqp import __version__ setup(keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], diff --git a/tests/Dockerfile b/tests/Dockerfile index d202d3786f705f29d4dae19a1dd919d178ccf3f4..f61ab622b61833268cba8c850e55affe056e7db3 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -16,5 +16,5 @@ ENV AMQP_HOST=amqp # for those pesky builds on Windows RUN chmod -R ugo-x /coolamqp -CMD python setup.py nosetests --tests tests +CMD python setup.py test diff --git a/tests/test_clustering/test_a.py b/tests/test_clustering/test_a.py index d841838d593e6763781b3080c3cc155b8866fbdc..11727c3efe798cb688602b5b41444ffa115d1b78 100644 --- a/tests/test_clustering/test_a.py +++ b/tests/test_clustering/test_a.py @@ -144,7 +144,7 @@ class TestA(unittest.TestCase): 'content_encoding': b'utf8' }), routing_key=u'hello5', tx=True).result() - time.sleep(1) + time.sleep(2) self.assertTrue(P['q'])