diff --git a/coolamqp/connection/connecting.py b/coolamqp/connection/connecting.py new file mode 100644 index 0000000000000000000000000000000000000000..6d1a0e967cf9d24b1846cea702197187a00284a4 --- /dev/null +++ b/coolamqp/connection/connecting.py @@ -0,0 +1,71 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import logging +import six +from coolamqp.state.orders import BaseOrder +from coolamqp.framing.definitions import ChannelOpenOk, ChannelOpen +from coolamqp.framing.frames import AMQPMethodFrame +from coolamqp.uplink import Handshaker +from coolamqp.framing.extensions import PUBLISHER_CONFIRMS +""" +All the routines required to go from connecting to synced +""" + +logger = logging.getLogger(__name__) + + +class ConnectingPeople(BaseOrder): + def configure(self, broker, handshaker, connection, nodedef): + self.handshaker = handshaker + self.broker = broker + self.connection = connection + self.node_definition = nodedef + + def on_fail(self, reason): + """Called at any time, by anything going wrong with basic init and sync""" + self.set_exception(Exception(reason or 'Initialization failed')) + + def handshake_complete(self): + """Called by handshaker, upon completing the initial frame exchange""" + logger.info('%s:%s entered RANGING', self.node_definition.host, self.node_definition.port) + self.broker.extensions = set(self.handshaker.extensions) + if self.handshaker.channel_max < 2: + self.connection.send(None, 'channel_max < 2 !!!') + self.on_fail('channel_max < 2 !!') + return + + for free_chan in six.moves.xrange(3, self.handshaker.channel_max + 1): + self.broker.free_channels.append(free_chan) + + # It's OK, open channel 1 for sending messages + self.connection.watch_for_method(1, ChannelOpenOk, self.send_nonconfirm_channel_opened) + self.connection.send([AMQPMethodFrame(1, ChannelOpen())]) + + def send_nonconfirm_channel_opened(self, payload): + """Called upon opening channel #1, which is used for publishing messages without confirmation""" + + if PUBLISHER_CONFIRMS in self.handshaker.extensions: + # We need to set up channel 2 with publisher confirms + self.connection.watch_for_method(2, ChannelOpenOk, self.on_channel_2) + self.connection.send([AMQPMethodFrame(2, ChannelOpen())]) + else: + # Else we don't set up this channel + self.on_syncing() + + def on_channel_2(self, payload): + """Do things with Channel 2 - only if PUBLISHER_CONFIRMATION extension is enabled!""" + from coolamqp.framing.definitions import ConfirmSelect, ConfirmSelectOk + if isinstance(payload, ChannelOpenOk): + # Ok, just opened the channel + self.connection.watch_for_method(2, ConfirmSelectOk, self.on_channel_2) + self.connection.send([AMQPMethodFrame(2, ConfirmSelect(False))]) + elif isinstance(payload, ConfirmSelectOk): + # A-OK! + logger.info('%s:%s entered SYNCING', self.node_definition.host, self.node_definition.port) + + def on_syncing(self): + """We are entering SYNCING""" + logger.info('%s:%s entered SYNCING', self.node_definition.host, self.node_definition.port) + self.connection.on_connected() + + diff --git a/coolamqp/connection/consumer.py b/coolamqp/connection/consumer.py new file mode 100644 index 0000000000000000000000000000000000000000..4979a64fce94d7b523a85c80919e20641050b283 --- /dev/null +++ b/coolamqp/connection/consumer.py @@ -0,0 +1,291 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import uuid +from coolamqp.framing.frames import AMQPMethodFrame, AMQPBodyFrame, AMQPHeaderFrame +from coolamqp.framing.definitions import ChannelOpen, ChannelOpenOk, BasicConsume, \ + BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, ExchangeDeclareOk, \ + QueueBind, QueueBindOk, ChannelClose, ChannelCloseOk, BasicCancel, BasicDeliver, \ + BasicAck, BasicReject, ACCESS_REFUSED, RESOURCE_LOCKED +from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch + + +ST_OFFLINE = 0 # Consumer is *not* consuming, no setup attempts are being made +ST_SYNCING = 1 # A process targeted at consuming has been started +ST_ONLINE = 2 # Consumer is declared all right + + +class Consumer(object): + """ + This object represents a consumer in the system. + + Consumer may reside on any AMQP broker, this is to be decided by CoolAMQP. + Consumer, when created, has the state of ST_SYNCING. CoolAMQP will + try to declare the consumer where it makes most sense for it to be. + + If it succeeds, the consumer will enter state ST_ONLINE, and callables + on_start will be called. This means that broker has confirmed that this + consumer is operational and receiving messages. + + If the consumer gets a message, it will relay it to a specified callable. + The message may need acking or rejecting. + + THIS OBJECT CAN OUTLIVE IT'S BROKER, AND THEREFORE .broker FIELD MUST BE SET + ON A NEW BROKER. HOWEVER, ALL WATCHES MUST BE CALLED BEFOREHAND. + + Note that does not attempt to cancel consumers, or any of such nonsense. Having + a channel per consumer gives you the unique possibility of simply closing the channel. + Since this implies cancelling the consumer, here you go. + """ + + def __init__(self, queue, no_ack=True, qos=None, dont_pause=False): + """ + To be instantiated only by Cluster + + :param state: state of the consumer + :param queue: Queue object, being consumed from right now. + Note that name of anonymous queue might change at any time! + :param no_ack: Will this consumer require acknowledges from messages? + :param dont_pause: Consumer will fail on the spot instead of pausing + """ + self.state = ST_OFFLINE + self.queue = queue + self.no_ack = no_ack + + # private + self.broker = None # broker on which was last seen + self.channel_id = None + + self.cancelled = False # did the client want to STOP using this consumer? + + # state machine for receiving messages + self.recv_state = 0 # 0 - waiting for basic.deliver + # 1 - waiting for header + # 2 - waiting for body + + self.delivery = None # tuple of (delivery tag, exchange, routing_key) + self.properties = None + self.content_parts = [] + self.length_remaining = 0 + + self.remaining_for_ack = set() # unacknowledged delivery tags + + def on_header_or_body_or_delivery(self, frame): + + if isinstance(frame, BasicDeliver) and self.state == 0: + self.delivery = frame.delivery_tag, frame.exchange, frame.routing_key + self.recv_state = 1 + + elif isinstance(frame, AMQPHeaderFrame) and self.state == 1: + self.properties = frame.properties + self.length_remaining = frame.body_size + self.recv_state = 2 + + elif isinstance(frame, AMQPBodyFrame) and self.state == 2: + + self.content_parts.append(frame.payload) + self.length_remaining -= len(frame.payload) + + if self.length_remaining == 0: + self.broker.on_new_message(self, self.delivery[0], + self.delivery[1], + self.delivery[2], + self.properties, + self.content_parts, + not self.no_ack + ) + if not self.no_ack: + self.remaining_for_ack.add(self.delivery[0]) + self.recv_state = 0 + else: + self.broker.connection.send(None, 'state assertion failed') + + def reject(self, consumer, delivery_tag): + + if self.cancelled: + return + + if self != consumer: + return # it was not me + + if delivery_tag not in self.remaining_for_ack: + return # not remaining + + self.broker.connection.send(AMQPMethodFrame( + self.channel_id, + BasicReject(delivery_tag, True) + )) + + self.remaining_for_ack.remove(delivery_tag) + + def acknowledge(self, consumer, delivery_tag): + + if self.cancelled: + return + + if self != consumer: + return # it was not me + + if delivery_tag not in self.remaining_for_ack: + return # not remaining + + self.broker.connection.send(AMQPMethodFrame( + self.channel_id, + BasicAck(delivery_tag, False) + )) + + self.remaining_for_ack.remove(delivery_tag) + + def cancel(self): + """Stop using this consumer""" + self.cancelled = True + + if self.state == ST_ONLINE: + # Consuming, close the channel please + self.broker.connection.send(AMQPMethodFrame(self.channel_id, + ChannelClose( + 0, 'Consumer cancelled', 0, 0 + ))) + + def on_close(self, payload=None): + """Handle closing the channel. It sounds like an exception...""" + + if self.channel_id is None: + return + + should_retry = False + + if isinstance(payload, ChannelClose): + # it sounds like an exception + self.broker.connection.send(AMQPMethodFrame(self.channel_id, + ChannelCloseOk())) + + should_retry = payload.reply_code in (ACCESS_REFUSED, RESOURCE_LOCKED) + + self.broker.connection.unwatch_all(self.channel_id) + self.broker.free_channels.append(self.channel_id) + self.channel_id = None + self.state = ST_OFFLINE + self.remaining_for_ack = set() + self.recv_state = 0 + + if should_retry: + # retry + self.on_uplink_established(self.broker) + + + def on_setup(self, payload): + """Called with different kinds of frames - during setup""" + + if self.cancelled: + # We were declaring this, but during this situation this + # consumer was cancelled. Close the channel and things. + self.broker.connection.send(self.channel_id, ChannelClose(0, 'Consumer cancelled', 0, 0)) + return + + if isinstance(payload, ChannelOpenOk): + # Do we need to declare the exchange? + + if self.queue.exchange is not None: + self.broker.connection.method_and_watch( + self.channel_id, + ExchangeDeclare(self.queue.exchange.name.encode('utf8'), + self.queue.exchange.type.encode('utf8'), + False, + self.queue.exchange.durable, + self.queue.exchange.auto_delete, + False, + False, + []), + ExchangeDeclareOk, + self.on_setup + ) + else: + self.on_setup(ExchangeDeclareOk()) + + elif isinstance(payload, ExchangeDeclareOk): + # Declare the queue + + name = b'' if self.queue.anonymous else self.queue.name.encode('utf8') + + self.broker.connection.method_and_watch( + self.channel_id, + QueueDeclare( + name, + False, + self.queue.durable, + self.queue.exclusive, + self.queue.auto_delete, + False, + [] + ), + QueueDeclareOk, + self.on_setup + ) + + elif isinstance(payload, QueueDeclareOk): + # did we need an anonymous name? + if self.queue.anonymous: + self.queue.name = payload.queue_name.decode('utf8') + + # We need any form of binding. + xchg_name = b'' if self.queue.exchange is None else self.queue.exchange.name.encode('utf8') + + self.broker.connection.method_and_watch( + self.channel_id, + QueueBind( + self.queue.name.encode('utf8'), + xchg_name, + b'', + False, + [] + ), + QueueBindOk, + self.on_setup + ) + elif isinstance(payload, QueueBindOk): + # itadakimasu + self.broker.connection.method_and_watch( + self.channel_id, + BasicConsume( + self.queue.name.encode('utf8'), + self.queue.name.encode('utf8'), + False, + self.no_ack, + self.queue.exclusive, + False, + [] + ), + BasicConsumeOk, + self.on_setup + ) + + elif isinstance(payload, BasicConsumeOk): + # AWWW RIGHT~!!! + self.state = ST_ONLINE + + self.broker.connection.watch(HeaderOrBodyWatch(self.channel_id, self.on_header_or_body_or_delivery)) + mw = MethodWatch(self.channel_id, BasicDeliver, self.on_header_or_body_or_delivery) + mw.oneshot = False + self.broker.connection.watch(mw) + + def on_uplink_established(self, broker): + """Consumer was created or uplink was regained. Try to declare it""" + if self.cancelled: + return # it's OK. + + self.broker = broker + + self.state = ST_SYNCING + self.channel_id = self.broker.free_channels.pop() + + self.broker.connection.watch_for_method(self.channel_id, + (ChannelClose, ChannelCloseOk, BasicCancel), + self.on_close, + on_fail=self.on_close) + + self.broker.connection.method_and_watch( + self.channel_id, + ChannelOpen(), + ChannelOpenOk, + self.on_setup + ) diff --git a/coolamqp/exceptions.py b/coolamqp/exceptions.py new file mode 100644 index 0000000000000000000000000000000000000000..3d10193114dc336f4f3b93b15ba99fe25256ca32 --- /dev/null +++ b/coolamqp/exceptions.py @@ -0,0 +1,36 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function + + +class CoolAMQPError(Exception): + """Base class for CoolAMQP errors""" + + + +class ConsumerError(CoolAMQPError): + """ + Exceptions passed to consumer callables. + """ + + +class UplinkLost(ConsumerError): + """ + Uplink to the network has been lost, I am working on regaining connectivity + right now. + """ + +class ConsumerCancelled(CoolAMQPError): + """ + The consumer has been cancelled + """ + + +class AMQPError(CoolAMQPError): + """ + Base class for errors received from AMQP server + """ + def __init__(self, reply_code, reply_text, class_id, method_id): + self.reply_code = reply_code + self.reply_text = reply_text + self.class_id = class_id + self.method_id = method_id diff --git a/coolamqp/framing/extensions.py b/coolamqp/framing/extensions.py new file mode 100644 index 0000000000000000000000000000000000000000..355ed0fc9ce824327970ff7786abef24d4133fc5 --- /dev/null +++ b/coolamqp/framing/extensions.py @@ -0,0 +1,5 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +"""Extension definitions""" + +PUBLISHER_CONFIRMS = b'publisher_confirms' \ No newline at end of file diff --git a/coolamqp/state/__init__.py b/coolamqp/state/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..2e65b9091f536371e7b6fa2a03590c2c3e286a63 --- /dev/null +++ b/coolamqp/state/__init__.py @@ -0,0 +1,12 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +""" +Any operations the user does, are against the BROKER STATE. + +The connections are not important - what is important, is the broker state. This is a virtual +aggregate of all operations that are running against the cluster - ie. queues subscribed, +messages pending to be sent. + +The client doesn't matter how this is handled. CoolAMQP (in the future) may decide to subscribe some +queues against other node, if it decides that the master queue is there, or do something entirely else. +""" \ No newline at end of file diff --git a/coolamqp/state/orders.py b/coolamqp/state/orders.py new file mode 100644 index 0000000000000000000000000000000000000000..906cca522656e384e8069f976c277709ea2819f6 --- /dev/null +++ b/coolamqp/state/orders.py @@ -0,0 +1,71 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import concurrent.futures +import threading +import logging + + +logger = logging.getLogger(__name__) + + +class BaseOrder(concurrent.futures.Future): + """ + A strange future - only one thread may .wait() for it. + And it's for the best. + """ + def __init__(self): + self.lock = threading.Lock() + self.lock.acquire() + + self.completed = False + self.successfully = None + self.result = None + self.cancelled = False + self.running = True + + self.callables = [] + + def add_done_callback(self, fn): + self.callables.append(fn) + + def cancel(self): + self.cancelled = True + + def __finish(self, result, successful): + self.completed = True + self.successfully = successful + self.result = result + self.lock.release() + + for callable in self.callables: + try: + callable(self) + except Exception as e: + logger.error('Exception in base order future: %s', repr(e)) + except BaseException as e: + logger.critical('WILD NASAL DEMON APPEARED: %s', repr(e)) + + def set_result(self, result=None): + self.__finish(result, True) + + def set_exception(self, exception): + self.__finish(exception, False) + + +class SendMessage(BaseOrder): + """ + An order to send a message somewhere, such that message will survive disconnects + from broker and so on. + """ + def __init__(self, message, exchange_name, routing_key): + """ + :param message: Message object to send + :param exchange_name: bytes, name of exchange + :param routing_key: bytes, routing key + """ + BaseOrder.__init__(self) + + self.message = message + self.exchange_name = exchange_name + self.routing_key = routing_key + diff --git a/coolamqp/state/state.py b/coolamqp/state/state.py new file mode 100644 index 0000000000000000000000000000000000000000..cc223ca446a78dfc620f3218ecf59ff9385e0a3d --- /dev/null +++ b/coolamqp/state/state.py @@ -0,0 +1,22 @@ +# coding=UTF-8 +from __future__ import absolute_import, division, print_function +import concurrent.futures + + +class BrokerState(object): + """ + A state of the broker. List of messages to send (including their dispositions) + and so on. + """ + + def __init__(self): + + # Transient messages - THESE WILL BE DROPPED ON THE FLOOR UPON A DISCONNECT + self.messages_to_push = [] # list of (Message object, exchange_name, routing_key) + + # Messages to publish - THESE WILL NOT BE DROPPED ON THE FLOOR UPON A DC + self.messages_to_tx = [] # list of (SendMessage object) + + # a list of (Queue instance, delivery_tag, auto_ack) + self.queues_subscribed = [] +