diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 78ac2167c0d5a8eba67bddbb04db1f1ed6fa0b5c..8c75efcc9e238758e21a72418a4a44c4965e1cc8 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -28,10 +28,10 @@ class Consumer(Channeler): Since this implies cancelling the consumer, here you go. """ - def __init__(self, queue, no_ack=True, qos=None, dont_pause=False): + def __init__(self, queue, no_ack=True, qos=None, dont_pause=False, + future_to_notify=None + ): """ - To be instantiated only by Cluster - :param state: state of the consumer :param queue: Queue object, being consumed from right now. Note that name of anonymous queue might change at any time! diff --git a/coolamqp/framing/compilation/compile_definitions.py b/coolamqp/framing/compilation/compile_definitions.py index 33ac881f7203505031238a3b8a5f515e277e2fa4..3ffcbe5f8e0676e37fc23d22c217962c5ea04077 100644 --- a/coolamqp/framing/compilation/compile_definitions.py +++ b/coolamqp/framing/compilation/compile_definitions.py @@ -302,6 +302,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved # annotate types method.fields = [field._replace(basic_type=domain_to_basic_type[field.type]) for field in method.fields] + non_reserved_fields = [field for field in method.fields if not field.reserved] + is_static = method.is_static() if is_static: static_size = get_size(method.fields) @@ -312,6 +314,8 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved """ %s """ + __slots__ = (%s) + NAME = %s INDEX = (%s, %s) # (Class ID, Method ID) @@ -322,8 +326,10 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved IS_SIZE_STATIC = %s # this means that argument part has always the same length IS_CONTENT_STATIC = %s # this means that argument part has always the same content ''', + full_class_name, to_docstring(method.label, method.docs), + u', '.join(map(lambda f: frepr(format_field_name(f.name)), non_reserved_fields)), frepr(cls.name + '.' + method.name), frepr(cls.index), frepr(method.index), to_code_binary(struct.pack("!HH", cls.index, method.index)), @@ -358,7 +364,7 @@ Field = collections.namedtuple('Field', ('name', 'type', 'basic_type', 'reserved line(' ]\n') - non_reserved_fields = [field for field in method.fields if not field.reserved] + # constructor line('''\n def __init__(%s): diff --git a/coolamqp/framing/definitions.py b/coolamqp/framing/definitions.py index 3f1505ec1cf1cb07ef9feb1d4d305f2d1069b711..f2d50a5b2c1a9c399118bdb284c9c87ceea33075 100644 --- a/coolamqp/framing/definitions.py +++ b/coolamqp/framing/definitions.py @@ -115,6 +115,8 @@ class ConnectionBlocked(AMQPMethodPayload): and does not accept new publishes. """ + __slots__ = (u'reason') + NAME = u'connection.blocked' INDEX = (10, 60) # (Class ID, Method ID) @@ -164,6 +166,8 @@ class ConnectionClose(AMQPMethodPayload): a specific method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception. """ + __slots__ = (u'reply_code', u'reply_text', u'class_id', u'method_id') + NAME = u'connection.close' INDEX = (10, 50) # (Class ID, Method ID) @@ -228,6 +232,8 @@ class ConnectionCloseOk(AMQPMethodPayload): This method confirms a Connection.Close method and tells the recipient that it is safe to release resources for the connection and close the socket. """ + __slots__ = () + NAME = u'connection.close-ok' INDEX = (10, 51) # (Class ID, Method ID) @@ -260,6 +266,8 @@ class ConnectionOpen(AMQPMethodPayload): The server may apply arbitrary limits per virtual host, such as the number of each type of entity that may be used, per connection and/or in total. """ + __slots__ = (u'virtual_host') + NAME = u'connection.open' INDEX = (10, 40) # (Class ID, Method ID) @@ -316,6 +324,8 @@ class ConnectionOpenOk(AMQPMethodPayload): This method signals to the client that the connection is ready for use. """ + __slots__ = () + NAME = u'connection.open-ok' INDEX = (10, 41) # (Class ID, Method ID) @@ -355,6 +365,8 @@ class ConnectionStart(AMQPMethodPayload): protocol version that the server proposes, along with a list of security mechanisms which the client can use for authentication. """ + __slots__ = (u'version_major', u'version_minor', u'server_properties', u'mechanisms', u'locales') + NAME = u'connection.start' INDEX = (10, 10) # (Class ID, Method ID) @@ -444,6 +456,8 @@ class ConnectionSecure(AMQPMethodPayload): received sufficient information to authenticate each other. This method challenges the client to provide more information. """ + __slots__ = (u'challenge') + NAME = u'connection.secure' INDEX = (10, 20) # (Class ID, Method ID) @@ -493,6 +507,8 @@ class ConnectionStartOk(AMQPMethodPayload): This method selects a SASL security mechanism. """ + __slots__ = (u'client_properties', u'mechanism', u'response', u'locale') + NAME = u'connection.start-ok' INDEX = (10, 11) # (Class ID, Method ID) @@ -578,6 +594,8 @@ class ConnectionSecureOk(AMQPMethodPayload): This method attempts to authenticate, passing a block of SASL data for the security mechanism at the server side. """ + __slots__ = (u'response') + NAME = u'connection.secure-ok' INDEX = (10, 21) # (Class ID, Method ID) @@ -628,6 +646,8 @@ class ConnectionTune(AMQPMethodPayload): This method proposes a set of connection configuration values to the client. The client can accept and/or adjust these. """ + __slots__ = (u'channel_max', u'frame_max', u'heartbeat') + NAME = u'connection.tune' INDEX = (10, 30) # (Class ID, Method ID) @@ -689,6 +709,8 @@ class ConnectionTuneOk(AMQPMethodPayload): This method sends the client's connection tuning parameters to the server. Certain fields are negotiated, others provide capability information. """ + __slots__ = (u'channel_max', u'frame_max', u'heartbeat') + NAME = u'connection.tune-ok' INDEX = (10, 31) # (Class ID, Method ID) @@ -749,6 +771,8 @@ class ConnectionUnblocked(AMQPMethodPayload): and now accepts publishes. """ + __slots__ = () + NAME = u'connection.unblocked' INDEX = (10, 61) # (Class ID, Method ID) @@ -791,6 +815,8 @@ class ChannelClose(AMQPMethodPayload): method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception. """ + __slots__ = (u'reply_code', u'reply_text', u'class_id', u'method_id') + NAME = u'channel.close' INDEX = (20, 40) # (Class ID, Method ID) @@ -855,6 +881,8 @@ class ChannelCloseOk(AMQPMethodPayload): This method confirms a Channel.Close method and tells the recipient that it is safe to release resources for the channel. """ + __slots__ = () + NAME = u'channel.close-ok' INDEX = (20, 41) # (Class ID, Method ID) @@ -888,6 +916,8 @@ class ChannelFlow(AMQPMethodPayload): it can process. Note that this method is not intended for window control. It does not affect contents returned by Basic.Get-Ok methods. """ + __slots__ = (u'active') + NAME = u'channel.flow' INDEX = (20, 20) # (Class ID, Method ID) @@ -936,6 +966,8 @@ class ChannelFlowOk(AMQPMethodPayload): Confirms to the peer that a flow command was received and processed. """ + __slots__ = (u'active') + NAME = u'channel.flow-ok' INDEX = (20, 21) # (Class ID, Method ID) @@ -984,6 +1016,8 @@ class ChannelOpen(AMQPMethodPayload): This method opens a channel to the server. """ + __slots__ = () + NAME = u'channel.open' INDEX = (20, 10) # (Class ID, Method ID) @@ -1021,6 +1055,8 @@ class ChannelOpenOk(AMQPMethodPayload): This method signals to the client that the channel is ready for use. """ + __slots__ = () + NAME = u'channel.open-ok' INDEX = (20, 11) # (Class ID, Method ID) @@ -1068,6 +1104,8 @@ class ExchangeBind(AMQPMethodPayload): This method binds an exchange to an exchange. """ + __slots__ = (u'destination', u'source', u'routing_key', u'no_wait', u'arguments') + NAME = u'exchange.bind' INDEX = (40, 30) # (Class ID, Method ID) @@ -1160,6 +1198,8 @@ class ExchangeBindOk(AMQPMethodPayload): This method confirms that the bind was successful. """ + __slots__ = () + NAME = u'exchange.bind-ok' INDEX = (40, 31) # (Class ID, Method ID) @@ -1190,6 +1230,8 @@ class ExchangeDeclare(AMQPMethodPayload): This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class. """ + __slots__ = (u'exchange', u'type_', u'passive', u'durable', u'auto_delete', u'internal', u'no_wait', u'arguments') + NAME = u'exchange.declare' INDEX = (40, 10) # (Class ID, Method ID) @@ -1308,6 +1350,8 @@ class ExchangeDelete(AMQPMethodPayload): This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled. """ + __slots__ = (u'exchange', u'if_unused', u'no_wait') + NAME = u'exchange.delete' INDEX = (40, 20) # (Class ID, Method ID) @@ -1374,6 +1418,8 @@ class ExchangeDeclareOk(AMQPMethodPayload): This method confirms a Declare method and confirms the name of the exchange, essential for automatically-named exchanges. """ + __slots__ = () + NAME = u'exchange.declare-ok' INDEX = (40, 11) # (Class ID, Method ID) @@ -1403,6 +1449,8 @@ class ExchangeDeleteOk(AMQPMethodPayload): This method confirms the deletion of an exchange. """ + __slots__ = () + NAME = u'exchange.delete-ok' INDEX = (40, 21) # (Class ID, Method ID) @@ -1432,6 +1480,8 @@ class ExchangeUnbind(AMQPMethodPayload): This method unbinds an exchange from an exchange. """ + __slots__ = (u'destination', u'source', u'routing_key', u'no_wait', u'arguments') + NAME = u'exchange.unbind' INDEX = (40, 40) # (Class ID, Method ID) @@ -1518,6 +1568,8 @@ class ExchangeUnbindOk(AMQPMethodPayload): This method confirms that the unbind was successful. """ + __slots__ = () + NAME = u'exchange.unbind-ok' INDEX = (40, 51) # (Class ID, Method ID) @@ -1561,6 +1613,8 @@ class QueueBind(AMQPMethodPayload): are bound to a direct exchange and subscription queues are bound to a topic exchange. """ + __slots__ = (u'queue', u'exchange', u'routing_key', u'no_wait', u'arguments') + NAME = u'queue.bind' INDEX = (50, 20) # (Class ID, Method ID) @@ -1656,6 +1710,8 @@ class QueueBindOk(AMQPMethodPayload): This method confirms that the bind was successful. """ + __slots__ = () + NAME = u'queue.bind-ok' INDEX = (50, 21) # (Class ID, Method ID) @@ -1687,6 +1743,8 @@ class QueueDeclare(AMQPMethodPayload): specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue. """ + __slots__ = (u'queue', u'passive', u'durable', u'exclusive', u'auto_delete', u'no_wait', u'arguments') + NAME = u'queue.declare' INDEX = (50, 10) # (Class ID, Method ID) @@ -1795,6 +1853,8 @@ class QueueDelete(AMQPMethodPayload): to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled. """ + __slots__ = (u'queue', u'if_unused', u'if_empty', u'no_wait') + NAME = u'queue.delete' INDEX = (50, 40) # (Class ID, Method ID) @@ -1867,6 +1927,8 @@ class QueueDeclareOk(AMQPMethodPayload): This method confirms a Declare method and confirms the name of the queue, essential for automatically-named queues. """ + __slots__ = (u'queue', u'message_count', u'consumer_count') + NAME = u'queue.declare-ok' INDEX = (50, 11) # (Class ID, Method ID) @@ -1927,6 +1989,8 @@ class QueueDeleteOk(AMQPMethodPayload): This method confirms the deletion of a queue. """ + __slots__ = (u'message_count') + NAME = u'queue.delete-ok' INDEX = (50, 41) # (Class ID, Method ID) @@ -1972,6 +2036,8 @@ class QueuePurge(AMQPMethodPayload): This method removes all messages from a queue which are not awaiting acknowledgment. """ + __slots__ = (u'queue', u'no_wait') + NAME = u'queue.purge' INDEX = (50, 30) # (Class ID, Method ID) @@ -2029,6 +2095,8 @@ class QueuePurgeOk(AMQPMethodPayload): This method confirms the purge of a queue. """ + __slots__ = (u'message_count') + NAME = u'queue.purge-ok' INDEX = (50, 31) # (Class ID, Method ID) @@ -2073,6 +2141,8 @@ class QueueUnbind(AMQPMethodPayload): This method unbinds a queue from an exchange. """ + __slots__ = (u'queue', u'exchange', u'routing_key', u'arguments') + NAME = u'queue.unbind' INDEX = (50, 50) # (Class ID, Method ID) @@ -2151,6 +2221,8 @@ class QueueUnbindOk(AMQPMethodPayload): This method confirms that the unbind was successful. """ + __slots__ = () + NAME = u'queue.unbind-ok' INDEX = (50, 51) # (Class ID, Method ID) @@ -2319,6 +2391,8 @@ class BasicAck(AMQPMethodPayload): The acknowledgement can be for a single message or a set of messages up to and including a specific message. """ + __slots__ = (u'delivery_tag', u'multiple') + NAME = u'basic.ack' INDEX = (60, 80) # (Class ID, Method ID) @@ -2376,6 +2450,8 @@ class BasicConsume(AMQPMethodPayload): messages from a specific queue. Consumers last as long as the channel they were declared on, or until the client cancels them. """ + __slots__ = (u'queue', u'consumer_tag', u'no_local', u'no_ack', u'exclusive', u'no_wait', u'arguments') + NAME = u'basic.consume' INDEX = (60, 20) # (Class ID, Method ID) @@ -2482,6 +2558,8 @@ class BasicCancel(AMQPMethodPayload): capable of accepting the method, through some means of capability negotiation. """ + __slots__ = (u'consumer_tag', u'no_wait') + NAME = u'basic.cancel' INDEX = (60, 30) # (Class ID, Method ID) @@ -2537,6 +2615,8 @@ class BasicConsumeOk(AMQPMethodPayload): The server provides the client with a consumer tag, which is used by the client for methods called on the consumer at a later stage. """ + __slots__ = (u'consumer_tag') + NAME = u'basic.consume-ok' INDEX = (60, 21) # (Class ID, Method ID) @@ -2584,6 +2664,8 @@ class BasicCancelOk(AMQPMethodPayload): This method confirms that the cancellation was completed. """ + __slots__ = (u'consumer_tag') + NAME = u'basic.cancel-ok' INDEX = (60, 31) # (Class ID, Method ID) @@ -2633,6 +2715,8 @@ class BasicDeliver(AMQPMethodPayload): the server responds with Deliver methods as and when messages arrive for that consumer. """ + __slots__ = (u'consumer_tag', u'delivery_tag', u'redelivered', u'exchange', u'routing_key') + NAME = u'basic.deliver' INDEX = (60, 60) # (Class ID, Method ID) @@ -2713,6 +2797,8 @@ class BasicGet(AMQPMethodPayload): dialogue that is designed for specific types of application where synchronous functionality is more important than performance. """ + __slots__ = (u'queue', u'no_ack') + NAME = u'basic.get' INDEX = (60, 70) # (Class ID, Method ID) @@ -2772,6 +2858,8 @@ class BasicGetOk(AMQPMethodPayload): delivered by 'get-ok' must be acknowledged unless the no-ack option was set in the get method. """ + __slots__ = (u'delivery_tag', u'redelivered', u'exchange', u'routing_key', u'message_count') + NAME = u'basic.get-ok' INDEX = (60, 71) # (Class ID, Method ID) @@ -2848,6 +2936,8 @@ class BasicGetEmpty(AMQPMethodPayload): This method tells the client that the queue has no messages available for the client. """ + __slots__ = () + NAME = u'basic.get-empty' INDEX = (60, 72) # (Class ID, Method ID) @@ -2890,6 +2980,8 @@ class BasicNack(AMQPMethodPayload): confirm mode of unhandled messages. If a publisher receives this method, it probably needs to republish the offending messages. """ + __slots__ = (u'delivery_tag', u'multiple', u'requeue') + NAME = u'basic.nack' INDEX = (60, 120) # (Class ID, Method ID) @@ -2955,6 +3047,8 @@ class BasicPublish(AMQPMethodPayload): to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed. """ + __slots__ = (u'exchange', u'routing_key', u'mandatory', u'immediate') + NAME = u'basic.publish' INDEX = (60, 40) # (Class ID, Method ID) @@ -3043,6 +3137,8 @@ class BasicQos(AMQPMethodPayload): qos method could in principle apply to both peers, it is currently meaningful only for the server. """ + __slots__ = (u'prefetch_size', u'prefetch_count', u'global_') + NAME = u'basic.qos' INDEX = (60, 10) # (Class ID, Method ID) @@ -3119,6 +3215,8 @@ class BasicQosOk(AMQPMethodPayload): server. The requested QoS applies to all active consumers until a new QoS is defined. """ + __slots__ = () + NAME = u'basic.qos-ok' INDEX = (60, 11) # (Class ID, Method ID) @@ -3151,6 +3249,8 @@ class BasicReturn(AMQPMethodPayload): reply code and text provide information about the reason that the message was undeliverable. """ + __slots__ = (u'reply_code', u'reply_text', u'exchange', u'routing_key') + NAME = u'basic.return' INDEX = (60, 50) # (Class ID, Method ID) @@ -3224,6 +3324,8 @@ class BasicReject(AMQPMethodPayload): cancel large incoming messages, or return untreatable messages to their original queue. """ + __slots__ = (u'delivery_tag', u'requeue') + NAME = u'basic.reject' INDEX = (60, 90) # (Class ID, Method ID) @@ -3277,6 +3379,8 @@ class BasicRecoverAsync(AMQPMethodPayload): specified channel. Zero or more messages may be redelivered. This method is deprecated in favour of the synchronous Recover/Recover-Ok. """ + __slots__ = (u'requeue') + NAME = u'basic.recover-async' INDEX = (60, 100) # (Class ID, Method ID) @@ -3328,6 +3432,8 @@ class BasicRecover(AMQPMethodPayload): specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover. """ + __slots__ = (u'requeue') + NAME = u'basic.recover' INDEX = (60, 110) # (Class ID, Method ID) @@ -3377,6 +3483,8 @@ class BasicRecoverOk(AMQPMethodPayload): This method acknowledges a Basic.Recover method. """ + __slots__ = () + NAME = u'basic.recover-ok' INDEX = (60, 111) # (Class ID, Method ID) @@ -3424,6 +3532,8 @@ class TxCommit(AMQPMethodPayload): This method commits all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a commit. """ + __slots__ = () + NAME = u'tx.commit' INDEX = (90, 20) # (Class ID, Method ID) @@ -3454,6 +3564,8 @@ class TxCommitOk(AMQPMethodPayload): This method confirms to the client that the commit succeeded. Note that if a commit fails, the server raises a channel exception. """ + __slots__ = () + NAME = u'tx.commit-ok' INDEX = (90, 21) # (Class ID, Method ID) @@ -3486,6 +3598,8 @@ class TxRollback(AMQPMethodPayload): Note that unacked messages will not be automatically redelivered by rollback; if that is required an explicit recover call should be issued. """ + __slots__ = () + NAME = u'tx.rollback' INDEX = (90, 30) # (Class ID, Method ID) @@ -3516,6 +3630,8 @@ class TxRollbackOk(AMQPMethodPayload): This method confirms to the client that the rollback succeeded. Note that if an rollback fails, the server raises a channel exception. """ + __slots__ = () + NAME = u'tx.rollback-ok' INDEX = (90, 31) # (Class ID, Method ID) @@ -3546,6 +3662,8 @@ class TxSelect(AMQPMethodPayload): This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods. """ + __slots__ = () + NAME = u'tx.select' INDEX = (90, 10) # (Class ID, Method ID) @@ -3576,6 +3694,8 @@ class TxSelectOk(AMQPMethodPayload): This method confirms to the client that the channel was successfully set to use standard transactions. """ + __slots__ = () + NAME = u'tx.select-ok' INDEX = (90, 11) # (Class ID, Method ID) @@ -3631,6 +3751,8 @@ class ConfirmSelect(AMQPMethodPayload): The client can only use this method on a non-transactional channel. """ + __slots__ = (u'nowait') + NAME = u'confirm.select' INDEX = (85, 10) # (Class ID, Method ID) @@ -3679,6 +3801,8 @@ class ConfirmSelectOk(AMQPMethodPayload): set to use publisher acknowledgements. """ + __slots__ = () + NAME = u'confirm.select-ok' INDEX = (85, 11) # (Class ID, Method ID)