From e1e8ff0fbf4faa2c446a5a614776c89394109597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Fri, 27 May 2016 20:23:41 +0200 Subject: [PATCH] second commit --- README.md | 16 +++++ coolamqp/README.md | 8 --- coolamqp/__init__.py | 1 + coolamqp/backends/__init__.py | 1 + coolamqp/backends/base.py | 12 ++++ coolamqp/backends/pyamqp.py | 7 +++ coolamqp/cluster.py | 64 ++++++++++++++++++++ coolamqp/handler.py | 62 ++++++++++++++++++++ coolamqp/messages.py | 107 ++++++++++++++++++++++++++++++++++ 9 files changed, 270 insertions(+), 8 deletions(-) create mode 100644 README.md delete mode 100644 coolamqp/README.md create mode 100644 coolamqp/backends/__init__.py create mode 100644 coolamqp/backends/base.py create mode 100644 coolamqp/backends/pyamqp.py create mode 100644 coolamqp/cluster.py create mode 100644 coolamqp/handler.py create mode 100644 coolamqp/messages.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..90e4c89 --- /dev/null +++ b/README.md @@ -0,0 +1,16 @@ +coolamqp +======== + +The one library to rule them all. + +This is a fault-tolerant-able AMQP library. + +What other libraries don't, is reconnect to another node +in the cluster if one goes down, restore all queues that were +listened to, and inform the user explicitly when connection goes down +or up to orchestrate itself properly and prepare for eg. missing messages. + +tl;dr +----- +This spawns a thread in the background for every AMQP cluster you connect to. That +thread performs all actions associated with it. \ No newline at end of file diff --git a/coolamqp/README.md b/coolamqp/README.md deleted file mode 100644 index a11e1e8..0000000 --- a/coolamqp/README.md +++ /dev/null @@ -1,8 +0,0 @@ -coolamqp -======== - -The one library to rule them all. - -This is a fault-tolerant-able AMQP library. - -Under the hood it uses pyamqp \ No newline at end of file diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index e69de29..1669822 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -0,0 +1 @@ +from .cluster import ClusterNode, Cluster \ No newline at end of file diff --git a/coolamqp/backends/__init__.py b/coolamqp/backends/__init__.py new file mode 100644 index 0000000..058bd39 --- /dev/null +++ b/coolamqp/backends/__init__.py @@ -0,0 +1 @@ +class AMQP \ No newline at end of file diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py new file mode 100644 index 0000000..fc66403 --- /dev/null +++ b/coolamqp/backends/base.py @@ -0,0 +1,12 @@ +class AMQPBackend(object): + """ + Connection to an AMQP backend + """ + + def __init__(self, host, user, password, virtual_host): + """ + Connects to an AMQP backend + """ + pass + + def \ No newline at end of file diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py new file mode 100644 index 0000000..95aeaa4 --- /dev/null +++ b/coolamqp/backends/pyamqp.py @@ -0,0 +1,7 @@ +"""Backend using pyamqp""" +import amqp +from .base import AMQPBackend + +class PyAMQP(AMQPBackend): + def __init__(self, host, user, password, virtual_host): + self.amqp = amqp.Connection() \ No newline at end of file diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py new file mode 100644 index 0000000..c9a7bce --- /dev/null +++ b/coolamqp/cluster.py @@ -0,0 +1,64 @@ +import itertools + +class ClusterNode(object): + """ + Definition of a reachable AMQP node. + + This object is hashable. + """ + + def __init__(self, *args, **kwargs): + """ + Create a cluster node definition. + + a = ClusterNode(host='192.168.0.1', user='admin', password='password', + vhost='vhost') + """ + + if len(kwargs) == 0: + # Prepare arguments for amqp.connection.Connection + self._amqpargs = { + 'host': kwargs['host'], + 'userid': kwargs['user'], + 'password': kwargs['password'], + 'virtual_host': kwargs.get('vhost', '/'), + } + + def __str__(self): + return '%s@%s/%s' % (self._amqpargs['userid'], + self._amqpargs['host'], + self._amqpargs['virtual_host']) + + + +class Cluster(object): + """ + Represents connection to an AMQP cluster. This internally connects only to one node. + """ + + def __init__(self, nodes): + """ + Construct the cluster definition + :param nodes: iterable of nodes to try connecting, in this order. + if list if exhaused, it will be started from beginning + """ + + self.node_to_connect_to = itertools.cycle(nodes) + + def start(self): + """ + Connect to the cluster. + :return: self + """ + from .handler import ClusterHandlerThread + self.thread = ClusterHandlerThread(self) + self.thread.start() + return self + + def shutdown(self): + """ + Cleans everything and returns + """ + self.thread.terminate() + self.thread.join() + return \ No newline at end of file diff --git a/coolamqp/handler.py b/coolamqp/handler.py new file mode 100644 index 0000000..ad85d5d --- /dev/null +++ b/coolamqp/handler.py @@ -0,0 +1,62 @@ +import threading +import Queue +import logging + +logger = logging.getLogger(__name__) + + + +class ClusterHandlerThread(threading.Thread): + """ + Thread that does bookkeeping for a Cluster + """ + def __init__(self, cluster): + """ + :param cluster: coolamqp.Cluster + """ + + self.cluster = cluster + self.is_terminating = False + self.order_queue = Queue.Queue() # queue for inbound orders + self.event_queue = Queue.Queue() # queue for tasks done + self.connect_id = -1 # connectID of current connection + + self.declared_exchanges = {} # declared exchanges, by their names + self.subscribed_queues = [] # list of subscribed queues + + + def _reconnect(self): + node = self.cluster.node_to_connect_to.next() + + logger.info('Connecting to ', node) + + + + + + def terminate(self): + """ + Called by Cluster. Tells to finish all jobs and quit. + Unacked messages will not be acked. If this is called, connection may die at any time. + """ + self.is_terminating = True + + + ## methods to enqueue something into CHT to execute + + def _do_ackmessage(self, receivedMessage, on_completed=None): + """ + Order acknowledging a message. + :param receivedMessage: a ReceivedMessage object to ack + :param on_completed: callable/0 to call when acknowledgemenet succeeded + """ + raise NotImplementedError + + + def _do_nackmessage(self, receivedMessage, on_completed=None): + """ + Order acknowledging a message. + :param receivedMessage: a ReceivedMessage object to ack + :param on_completed: callable/0 to call when acknowledgemenet succeeded + """ + raise NotImplementedError \ No newline at end of file diff --git a/coolamqp/messages.py b/coolamqp/messages.py new file mode 100644 index 0000000..c8742f6 --- /dev/null +++ b/coolamqp/messages.py @@ -0,0 +1,107 @@ +import uuid + +class Message(object): + """AMQP message object""" + + def __init__(self, body, headers={}): + """ + Create a Message object + :param body: stream of octets + :param headers: AMQP headers to be sent along + """ + self.body = body + self.headers = headers + + +class ReceivedMessage(Message): + """ + Message as received from AMQP system + """ + + def __init__(self, body, cht, connect_id, headers={}, delivery_tag=None): + """ + + :param body: message body. A stream of octets. + :param cht: parent ClusterHandlerThread that emitted this message + :param connect_id: connection ID. ClusterHandlerThread will check this in order + not to ack messages that were received from a dead connection + :param headers: dictionary. Headers received from AMQP + :param delivery_tag: delivery tag assigned by AMQP broker to confirm this message. + leave None if auto-ack + """ + Message.__init__(self, body, headers=headers) + + self.cht = cht + self.connect_id = connect_id + self.delivery_tag = delivery_tag + + def nack(self, on_completed=None): + """ + Negative-acknowledge this message to the broker. + :param on_completed: callable/0 to call on acknowledged. Callable will be executed in + ClusterHandlerThread's context. + """ + self.cht._do_nackmessage(self, on_completed=on_completed) + + def ack(self, on_completed=None): + """ + Acknowledge this message to the broker. + :param on_completed: callable/0 to call on acknowledged. Callable will be executed in + ClusterHandlerThread's context. + """ + self.cht._do_ackmessage(self, on_completed=on_completed) + + +class Exchange(object): + """ + This represents an Exchange used in AMQP. + This is hashable. + """ + + direct = None # the direct exchange + + def __init__(self, name='', type='direct', durable=True, auto_delete=False): + self.name = name + self.type = type + self.durable = durable + self.auto_delete = auto_delete + + def __hash__(self): + return self.name.__hash__() + + def __eq__(self, other): + return self.name == other.name + +Exchange.direct = Exchange() + + +class Queue(object): + """ + This object represents a Queue that applications consume from or publish to. + """ + + def __init__(self, name='', durable=False, exchange=None, exclusive=False, auto_delete=False): + """ + Create a queue definition + :param name: name of the queue. + Take special care if this is empty. If empty, this will be filled-in by the broker + upon declaration. If a disconnect happens, and connection to other node is + reestablished, this name will CHANGE AGAIN, and be reflected in this object. + This change will be done before CoolAMQP signals reconnection. + :param durable: Is the queue durable? + :param exchange: Exchange(s) this queue is bound to. None for no binding. + This might be a single Exchange object, or an iterable of exchanges. + :param exclusive: Is this queue exclusive? + :param auto_delete: Is this queue auto_delete ? + """ + self.name = name + # if name is '', this will be filled in with broker-generated name upon declaration + self.durable = durable + self.exchange = exchange + self.auto_delete = auto_delete + self.exclusive = exclusive + + self.anonymous = name == '' # if this queue is anonymous, it must be regenerated upon reconnect + + self.consumer_tag = name if name != '' else uuid.uuid4().hex # consumer tag to use in AMQP comms + -- GitLab