diff --git a/coolamqp/backends/__init__.py b/coolamqp/backends/__init__.py index 058bd39cf3b10b9a682d8412decfa0d2be37ee7b..31c5b38e3f4ecbce88e840a2bc9df611709232e3 100644 --- a/coolamqp/backends/__init__.py +++ b/coolamqp/backends/__init__.py @@ -1 +1 @@ -class AMQP \ No newline at end of file +from .pyamqp import PyAMQPBackend, AMQPError, RemoteAMQPError, ConnectionFailedError \ No newline at end of file diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index fc664035477613dce9fbf436221d774fa2975957..bd6a71dd4da080e2709362401c22dd36202adcce 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -1,12 +1,91 @@ + + +class AMQPError(Exception): + pass + + +class ConnectionFailedError(AMQPError): + """Connection to broker failed""" + +class RemoteAMQPError(AMQPError): + """ + Remote AMQP broker responded with an error code + """ + def __init__(self, code, text=None): + """ + :param code: AMQP error code + :param text: AMQP error text (optional) + """ + AMQPError.__init__(self, text) + self.code = code + + class AMQPBackend(object): """ - Connection to an AMQP backend + Dummy AMQP backend. + + Every method may raise either ConnectionFailedError (if connection failed) + or RemoteAMQPError (if broker returned an error response) """ - def __init__(self, host, user, password, virtual_host): + def __init__(self, cluster_node, cluster_handler_thread): + """ + Connects to an AMQP backend. + """ + self.cluster_handler_thread = cluster_handler_thread + + def process(self, max_time=10): + """ + Do bookkeeping, process messages, etc. + :param max_time: maximum time in seconds this call can take + :raises ConnectionFailedError: if connection failed in the meantime + """ + + def exchange_declare(self, exchange): """ - Connects to an AMQP backend + Declare an exchange + :param exchange: Exchange object """ - pass - def \ No newline at end of file + def queue_bind(self, queue, exchange, routing_key=''): + """ + Bind a queue to an exchange + :param queue: Queue object + :param exchange: Exchange object + :param routing_key: routing key to use + """ + + def queue_declare(self, queue): + """ + Declare a queue. + + This will change queue's name if anonymous + :param queue: Queue + """ + + def basic_cancel(self, consumer_tag): + """ + Cancel consuming, identified by a consumer_tag + :param consumer_tag: consumer_tag to cancel + """ + + def basic_consume(self, queue): + """ + Start consuming from a queue + :param queue: Queue object + """ + + def basic_publish(self, message, exchange, routing_key): + """ + Send a message + :param message: Message object to send + :param exchange: Exchange object to publish to + :param routing_key: routing key to use + """ + + def shutdown(self): + """ + Close this connection. + This is not allowed to return anything. + """ + self.cluster_handler_thread = None # break GC cycles \ No newline at end of file diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 95aeaa43256bb92af8a67e8a07ff393a059e25e6..6c195a1c52f833e1c1cbfcc6c9ee4cb150b351ef 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -1,7 +1,109 @@ """Backend using pyamqp""" import amqp -from .base import AMQPBackend +import functools +from .base import AMQPBackend, AMQPError, RemoteAMQPError, ConnectionFailedError + + +def translate_exceptions(fun): + """Translates pyamqp's exceptions to CoolAMQP's""" + @functools.wraps(fun) + def q(*args, **kwargs): + try: + return fun(*args, **kwargs) + except amqp.ChannelError as e: + raise RemoteAMQPError(e.reply_code, e.reply_text) + except IOError as e: + raise ConnectionFailedError + return q + + +class PyAMQPBackend(AMQPBackend): + def __init__(self, node, cluster_handler_thread): + AMQPBackend.__init__(self, cluster_handler_thread) + + self.connection = amqp.Connection(host=node.host, + userid=node.user, + password=node.password, + virtual_host=node.virtual_host) + self.connection.connect() #todo what does this raise? + self.channel = self.connection.channel() + + def shutdown(self): + AMQPBackend.shutdown(self) + try: + self.channel.close() + except: + pass + try: + self.connection.close() + except: + pass + + @translate_exceptions + def process(self, max_time=10): + self.connection.heartbeat_tick() + self.connection.drain_events(max_time) + + @translate_exceptions + def basic_cancel(self, consumer_tag): + self.amqp_channel.basic_cancel(consumer_tag) + + @translate_exceptions + def basic_publish(self, message, exchange, routing_key): + # convert this to pyamqp's Message + a = amqp.Message(message.body, + **message.properties) + + self.amqp_channel.basic_publish(a, exchange=exchange.name, routing_key=routing_key) + + @translate_exceptions + def exchange_declare(self, exchange): + self.channel.exchange_declare(exchange.name, exchange.type, durable=exchange.durable, + auto_delete=exchange.auto_delete) + + @translate_exceptions + def queue_bind(self, queue, exchange, routing_key=''): + self.channel.queue_bind(queue.name, exchange.name, routing_key) + + @translate_exceptions + def queue_declare(self, queue): + """ + Declare a queue. + + This will change queue's name if anonymous + :param queue: Queue + """ + + if queue.anonymous: + queue.name = '' + + qname, mc, cc = self.channel.queue_declare(queue.name, + durable=queue.durable, + exclusive=queue.exclusive, + auto_delete=queue.auto_delete) + if queue.anonymous: + queue.name = qname + + @translate_exceptions + def basic_consume(self, queue, on_message, on_cancel): + """ + Start consuming from a queue + :param queue: Queue object + :param on_message: callable/1 + """ + self.channel.basic_consume(queue.name, + consumer_tag=queue.consumer_tag, + exclusive=queue.exclusive, + callback=self.__on_message, + on_cancel=self.__on_consumercancelled) + + def __on_consumercancelled(self, consumer_tag): + self.cluster_handler_thread._on_consumercancelled(consumer_tag) + + def __on_message(self, message): + self.cluster_handler_thread._on_recvmessage(message.body, + message.delivery_info['exchange'], + message.delivery_info['routing_key'], + message.delivery_info['delivery_tag'], + message.properties) -class PyAMQP(AMQPBackend): - def __init__(self, host, user, password, virtual_host): - self.amqp = amqp.Connection() \ No newline at end of file diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index c9a7bce0a098b53d387288bc2d4e7871be31ac0b..751e72e5a09cbb1b767c7a3fffff26ef0497997b 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -1,4 +1,8 @@ import itertools +import Queue +from coolamqp.backends import PyAMQPBackend +from .orders import SendMessage + class ClusterNode(object): """ @@ -17,17 +21,17 @@ class ClusterNode(object): if len(kwargs) == 0: # Prepare arguments for amqp.connection.Connection - self._amqpargs = { - 'host': kwargs['host'], - 'userid': kwargs['user'], - 'password': kwargs['password'], - 'virtual_host': kwargs.get('vhost', '/'), - } + self.host = kwargs['host'] + self.user = kwargs['user'] + self.password = kwargs['password'] + self.virtual_host = kwargs.get('virtual_host', '/') + else: + raise NotImplementedError #todo implement this def __str__(self): - return '%s@%s/%s' % (self._amqpargs['userid'], - self._amqpargs['host'], - self._amqpargs['virtual_host']) + return '%s@%s/%s' % (self.host, + self.user, + self.virtual_host) @@ -36,22 +40,77 @@ class Cluster(object): Represents connection to an AMQP cluster. This internally connects only to one node. """ - def __init__(self, nodes): + def __init__(self, nodes, backend=PyAMQPBackend): """ Construct the cluster definition :param nodes: iterable of nodes to try connecting, in this order. if list if exhaused, it will be started from beginning + :param backend: backend to use + """ + + self.backend = backend + self.node_to_connect_to = itertools.cycle(nodes)\ + + from .handler import ClusterHandlerThread + self.thread = ClusterHandlerThread(self) + + def send(self, message, exchange, routing_key, on_completed=None, on_failed=None): + """ + Schedule a message to be sent. + :param message: Message object to send + :param exchange: Exchange to use + :param routing_key: routing key to use + :param on_completed: callable/0 to call when this succeeds + :param on_failed: callable/1 to call when this fails with AMQPError instance + """ + self.thread.order_queue.append(SendMessage(message, exchange, routing_key, + on_completed=on_completed, + on_failed=on_failed)) + + def declare_exchange(self, exchange, on_completed=None, on_failed=None): + """ + Declare an exchange + :param exchange: Exchange to declare + :param on_completed: callable/0 to call when this succeeds + :param on_failed: callable/1 to call when this fails with AMQPError instance + """ + + def consume(self, queue): + """ + Start consuming from a queue + + This queue will be declared to the broker. If this queue has any binds + (.exchange field is not empty), queue will be binded to exchanges. + + :param queue: Queue to consume from. + :return: """ - self.node_to_connect_to = itertools.cycle(nodes) + + def drain(self, wait=0): + """ + Return a ClusterEvent on what happened, or None if nothing could be obtained + within given time + :param wait: Interval to wait for events. + Finite number to wait this much seconds before returning None + None to wait for infinity + 0 to return immediately + :return: a ClusterEvent instance or None + """ + + if wait == 0: + try: + return self.thread.event_queue.get(False) + except Queue.Empty: + return None + else: + return self.thread.event_queue.get(True, wait) def start(self): """ Connect to the cluster. :return: self """ - from .handler import ClusterHandlerThread - self.thread = ClusterHandlerThread(self) self.thread.start() return self diff --git a/coolamqp/events.py b/coolamqp/events.py new file mode 100644 index 0000000000000000000000000000000000000000..334dcebec5ff20ced72bebdac4a2b1cd2e13ce73 --- /dev/null +++ b/coolamqp/events.py @@ -0,0 +1,34 @@ +""" +Events emitted by Cluster +""" + + +class ClusterEvent(object): + """Base class for events emitted by cluster""" + + +class ConnectionDown(ClusterEvent): + """Connection to broker has been broken""" + + +class ConnectionUp(ClusterEvent): + """Connection to broker has been (re)established""" + + +class MessageReceived(ClusterEvent): + """A message has been received from the broker""" + def __init__(self, message): + """ + :param message: ReceivedMessage instance + """ + self.message = message + + +class ConsumerCancelled(ClusterEvent): + """Broker cancelled a consumer of ours. + This is also generated in response to cancelling consumption from a queue""" + def __init__(self, queue): + """ + :param queue: Queue whose consumer was cancelled + """ + self.queue = queue \ No newline at end of file diff --git a/coolamqp/handler.py b/coolamqp/handler.py index ad85d5de655a7a3a696da4e555be4bc6871f14db..040244a068fad549cad27854de05885f92f1842c 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -1,11 +1,16 @@ import threading import Queue import logging +import collections +import time +from .backends import PyAMQPBackend, ConnectionFailedError, RemoteAMQPError, AMQPError +from .messages import Exchange +from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived +from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue logger = logging.getLogger(__name__) - class ClusterHandlerThread(threading.Thread): """ Thread that does bookkeeping for a Cluster @@ -17,21 +22,111 @@ class ClusterHandlerThread(threading.Thread): self.cluster = cluster self.is_terminating = False - self.order_queue = Queue.Queue() # queue for inbound orders + self.order_queue = collections.deque() # queue for inbound orders self.event_queue = Queue.Queue() # queue for tasks done self.connect_id = -1 # connectID of current connection self.declared_exchanges = {} # declared exchanges, by their names - self.subscribed_queues = [] # list of subscribed queues + self.queues_by_consumer_tags = {} # listened queues, by their consumer tags + self.backend = None def _reconnect(self): - node = self.cluster.node_to_connect_to.next() - - logger.info('Connecting to ', node) - - - + exponential_backoff_delay = 1 + + while True: + if self.backend is not None: + self.backend.shutdown() + self.backend = None + + self.connect_id += 1 + node = self.cluster.node_to_connect_to.next() + logger.info('Connecting to ', node) + + try: + self.backend = self.cluster.backend(node, self) + + for exchange in self.declared_exchanges: + self.backend.exchange_declare(exchange) + + for queue in self.queues_by_consumer_tags.itervalues(): + self.backend.queue_declare(queue) + if queue.exchange is not None: + if isinstance(queue.exchange, Exchange): + self.backend.queue_bind(queue, queue.exchange) + else: + for exchange in queue.exchange: + self.backend.queue_bind(queue, queue.exchange) + self.backend.basic_consume(queue) + + except ConnectionFailedError as e: + # a connection failure happened :( + logger.warning('Connecting to ', node, 'failed due to ', e) + if self.backend is not None: + self.backend.shutdown() + self.backend = None # good policy to release resources before you sleep + time.sleep(exponential_backoff_delay) + + if self.is_terminating: + raise SystemError('Thread was requested to terminate') + + if exponential_backoff_delay < 60: + exponential_backoff_delay *= 2 + else: + exponential_backoff_delay = 60 + else: + from .events import ConnectionUp + self.event_queue.put(ConnectionUp()) + break # we connected :) + + + def run(self): + self._reconnect() + + while not self.is_terminating: + try: + while len(self.order_queue) > 0: + order = self.order_queue.popleft() + + try: + if isinstance(order, SendMessage): + self.backend.basic_publish(order.message, order.exchange, order.routing_key) + elif isinstance(order, DeclareExchange): + self.backend.exchange_declare(order.exchange) + elif isinstance(order, ConsumeQueue): + self.backend.queue_declare(order.queue) + + if order.queue.exchange is not None: + if isinstance(order.queue.exchange, Exchange): + self.backend.queue_bind(order.queue, order.queue.exchange) + else: + for exchange in order.queue.exchange: + self.backend.queue_bind(order.queue, order.queue.exchange) + + self.backend.basic_consume(order.queue) + elif isinstance(order, CancelQueue): + try: + q = self.queues_by_consumer_tags.pop(order.queue.consumer_tag) + except KeyError: + pass # wat? + else: + self.backend.basic_cancel(order.queue.consumer_tag) + self.event_queue.put(ConsumerCancelled(order.queue)) + except RemoteAMQPError as e: + order.failed(e) # we are allowed to go on + except ConnectionFailedError: + self.order_queue.appendleft(order) + raise + else: + order.completed() + + # just drain shit + self.backend.process(max_time=2) + + except ConnectionFailedError as e: + logger.warning('Connection to broker lost') + self.event_queue.append(ConnectionDown()) + self._reconnect() def terminate(self): @@ -42,6 +137,31 @@ class ClusterHandlerThread(threading.Thread): self.is_terminating = True + ## events called + def _on_recvmessage(self, body, exchange_name, routing_key, delivery_tag, properties): + """ + Upon receiving a message + """ + from .messages import ReceivedMessage + + self.event_queue.put(MessageReceived(ReceivedMessage(body, self, + self.connect_id, + exchange_name, + routing_key, + properties, + delivery_tag=delivery_tag))) + + def _on_consumercancelled(self, consumer_tag): + """ + A consumer has been cancelled + """ + try: + queue = self.queues_by_consumer_tags.pop(consumer_tag) + except KeyError: + return # what? + + self.event_queue.put(ConsumerCancelled(queue)) + ## methods to enqueue something into CHT to execute def _do_ackmessage(self, receivedMessage, on_completed=None): diff --git a/coolamqp/messages.py b/coolamqp/messages.py index c8742f6ff46055f737fb4df860d33d341d61b586..1c59363e878427ad79a11bbdf3420b4c706432e6 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -3,14 +3,14 @@ import uuid class Message(object): """AMQP message object""" - def __init__(self, body, headers={}): + def __init__(self, body, properties={}): """ Create a Message object :param body: stream of octets - :param headers: AMQP headers to be sent along + :param properties: AMQP properties to be sent along """ self.body = body - self.headers = headers + self.properties = properties class ReceivedMessage(Message): @@ -18,14 +18,17 @@ class ReceivedMessage(Message): Message as received from AMQP system """ - def __init__(self, body, cht, connect_id, headers={}, delivery_tag=None): + def __init__(self, body, cht, connect_id, exchange_name, routing_key, properties={}, delivery_tag=None): """ :param body: message body. A stream of octets. :param cht: parent ClusterHandlerThread that emitted this message :param connect_id: connection ID. ClusterHandlerThread will check this in order not to ack messages that were received from a dead connection - :param headers: dictionary. Headers received from AMQP + :param exchange_name: name of exchange this message was submitted to + :param routing_key: routing key with which this message was sent + :param properties: dictionary. Headers received from AMQP + :param delivery_tag: delivery tag assigned by AMQP broker to confirm this message. leave None if auto-ack """ @@ -34,6 +37,8 @@ class ReceivedMessage(Message): self.cht = cht self.connect_id = connect_id self.delivery_tag = delivery_tag + self.exchange_name = exchange_name + self.routing_key = routing_key def nack(self, on_completed=None): """ diff --git a/coolamqp/orders.py b/coolamqp/orders.py new file mode 100644 index 0000000000000000000000000000000000000000..3bee67d9da6fb1bf733e086a8dec8bc4a0402bb9 --- /dev/null +++ b/coolamqp/orders.py @@ -0,0 +1,47 @@ +""" +Orders that can be dispatched to ClusterHandlerThread +""" + +class Order(object): + """Base class for orders dispatched to ClusterHandlerThread""" + def __init__(self, on_completed=None, on_failed=None): + self.on_completed = on_completed + self.on_failed = on_failed + + def completed(self): + if self.on_completed is not None: + self.on_completed() + + def failed(self, e): + """ + :param e: AMQPError instance + """ + if self.on_failed is not None: + self.on_failed(e) + + +class SendMessage(Order): + """Send a message""" + def __init__(self, message, exchange, routing_key, on_completed=None, on_failed=None): + Order.__init__(self, on_completed=on_completed, on_failed=on_failed) + self.message = message + self.exchange = exchange + self.routing_key = routing_key + +class DeclareExchange(Order): + """Declare an exchange""" + def __init__(self, exchange, on_completed=None, on_failed=None): + Order.__init__(self, on_completed=on_completed, on_failed=on_failed) + self.exchange = exchange + +class ConsumeQueue(Order): + """Declare and consume from a queue""" + def __init__(self, queue, on_completed=None, on_failed=None): + Order.__init__(self, on_completed=on_completed, on_failed=on_failed) + self.queue = queue + +class CancelQueue(Order): + """Cancel consuming from a queue""" + def __init__(self, queue, on_completed=None, on_failed=None): + Order.__init__(self, on_completed=on_completed, on_failed=on_failed) + self.queue = queue