Skip to content
Snippets Groups Projects
Commit 936254c3 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

third commit

parent 46f90f77
No related branches found
No related tags found
No related merge requests found
class AMQP
\ No newline at end of file
from .pyamqp import PyAMQPBackend, AMQPError, RemoteAMQPError, ConnectionFailedError
\ No newline at end of file
class AMQPError(Exception):
pass
class ConnectionFailedError(AMQPError):
"""Connection to broker failed"""
class RemoteAMQPError(AMQPError):
"""
Remote AMQP broker responded with an error code
"""
def __init__(self, code, text=None):
"""
:param code: AMQP error code
:param text: AMQP error text (optional)
"""
AMQPError.__init__(self, text)
self.code = code
class AMQPBackend(object):
"""
Connection to an AMQP backend
Dummy AMQP backend.
Every method may raise either ConnectionFailedError (if connection failed)
or RemoteAMQPError (if broker returned an error response)
"""
def __init__(self, host, user, password, virtual_host):
def __init__(self, cluster_node, cluster_handler_thread):
"""
Connects to an AMQP backend.
"""
self.cluster_handler_thread = cluster_handler_thread
def process(self, max_time=10):
"""
Do bookkeeping, process messages, etc.
:param max_time: maximum time in seconds this call can take
:raises ConnectionFailedError: if connection failed in the meantime
"""
def exchange_declare(self, exchange):
"""
Connects to an AMQP backend
Declare an exchange
:param exchange: Exchange object
"""
pass
def
\ No newline at end of file
def queue_bind(self, queue, exchange, routing_key=''):
"""
Bind a queue to an exchange
:param queue: Queue object
:param exchange: Exchange object
:param routing_key: routing key to use
"""
def queue_declare(self, queue):
"""
Declare a queue.
This will change queue's name if anonymous
:param queue: Queue
"""
def basic_cancel(self, consumer_tag):
"""
Cancel consuming, identified by a consumer_tag
:param consumer_tag: consumer_tag to cancel
"""
def basic_consume(self, queue):
"""
Start consuming from a queue
:param queue: Queue object
"""
def basic_publish(self, message, exchange, routing_key):
"""
Send a message
:param message: Message object to send
:param exchange: Exchange object to publish to
:param routing_key: routing key to use
"""
def shutdown(self):
"""
Close this connection.
This is not allowed to return anything.
"""
self.cluster_handler_thread = None # break GC cycles
\ No newline at end of file
"""Backend using pyamqp"""
import amqp
from .base import AMQPBackend
import functools
from .base import AMQPBackend, AMQPError, RemoteAMQPError, ConnectionFailedError
def translate_exceptions(fun):
"""Translates pyamqp's exceptions to CoolAMQP's"""
@functools.wraps(fun)
def q(*args, **kwargs):
try:
return fun(*args, **kwargs)
except amqp.ChannelError as e:
raise RemoteAMQPError(e.reply_code, e.reply_text)
except IOError as e:
raise ConnectionFailedError
return q
class PyAMQPBackend(AMQPBackend):
def __init__(self, node, cluster_handler_thread):
AMQPBackend.__init__(self, cluster_handler_thread)
self.connection = amqp.Connection(host=node.host,
userid=node.user,
password=node.password,
virtual_host=node.virtual_host)
self.connection.connect() #todo what does this raise?
self.channel = self.connection.channel()
def shutdown(self):
AMQPBackend.shutdown(self)
try:
self.channel.close()
except:
pass
try:
self.connection.close()
except:
pass
@translate_exceptions
def process(self, max_time=10):
self.connection.heartbeat_tick()
self.connection.drain_events(max_time)
@translate_exceptions
def basic_cancel(self, consumer_tag):
self.amqp_channel.basic_cancel(consumer_tag)
@translate_exceptions
def basic_publish(self, message, exchange, routing_key):
# convert this to pyamqp's Message
a = amqp.Message(message.body,
**message.properties)
self.amqp_channel.basic_publish(a, exchange=exchange.name, routing_key=routing_key)
@translate_exceptions
def exchange_declare(self, exchange):
self.channel.exchange_declare(exchange.name, exchange.type, durable=exchange.durable,
auto_delete=exchange.auto_delete)
@translate_exceptions
def queue_bind(self, queue, exchange, routing_key=''):
self.channel.queue_bind(queue.name, exchange.name, routing_key)
@translate_exceptions
def queue_declare(self, queue):
"""
Declare a queue.
This will change queue's name if anonymous
:param queue: Queue
"""
if queue.anonymous:
queue.name = ''
qname, mc, cc = self.channel.queue_declare(queue.name,
durable=queue.durable,
exclusive=queue.exclusive,
auto_delete=queue.auto_delete)
if queue.anonymous:
queue.name = qname
@translate_exceptions
def basic_consume(self, queue, on_message, on_cancel):
"""
Start consuming from a queue
:param queue: Queue object
:param on_message: callable/1
"""
self.channel.basic_consume(queue.name,
consumer_tag=queue.consumer_tag,
exclusive=queue.exclusive,
callback=self.__on_message,
on_cancel=self.__on_consumercancelled)
def __on_consumercancelled(self, consumer_tag):
self.cluster_handler_thread._on_consumercancelled(consumer_tag)
def __on_message(self, message):
self.cluster_handler_thread._on_recvmessage(message.body,
message.delivery_info['exchange'],
message.delivery_info['routing_key'],
message.delivery_info['delivery_tag'],
message.properties)
class PyAMQP(AMQPBackend):
def __init__(self, host, user, password, virtual_host):
self.amqp = amqp.Connection()
\ No newline at end of file
import itertools
import Queue
from coolamqp.backends import PyAMQPBackend
from .orders import SendMessage
class ClusterNode(object):
"""
......@@ -17,17 +21,17 @@ class ClusterNode(object):
if len(kwargs) == 0:
# Prepare arguments for amqp.connection.Connection
self._amqpargs = {
'host': kwargs['host'],
'userid': kwargs['user'],
'password': kwargs['password'],
'virtual_host': kwargs.get('vhost', '/'),
}
self.host = kwargs['host']
self.user = kwargs['user']
self.password = kwargs['password']
self.virtual_host = kwargs.get('virtual_host', '/')
else:
raise NotImplementedError #todo implement this
def __str__(self):
return '%s@%s/%s' % (self._amqpargs['userid'],
self._amqpargs['host'],
self._amqpargs['virtual_host'])
return '%s@%s/%s' % (self.host,
self.user,
self.virtual_host)
......@@ -36,22 +40,77 @@ class Cluster(object):
Represents connection to an AMQP cluster. This internally connects only to one node.
"""
def __init__(self, nodes):
def __init__(self, nodes, backend=PyAMQPBackend):
"""
Construct the cluster definition
:param nodes: iterable of nodes to try connecting, in this order.
if list if exhaused, it will be started from beginning
:param backend: backend to use
"""
self.backend = backend
self.node_to_connect_to = itertools.cycle(nodes)\
from .handler import ClusterHandlerThread
self.thread = ClusterHandlerThread(self)
def send(self, message, exchange, routing_key, on_completed=None, on_failed=None):
"""
Schedule a message to be sent.
:param message: Message object to send
:param exchange: Exchange to use
:param routing_key: routing key to use
:param on_completed: callable/0 to call when this succeeds
:param on_failed: callable/1 to call when this fails with AMQPError instance
"""
self.thread.order_queue.append(SendMessage(message, exchange, routing_key,
on_completed=on_completed,
on_failed=on_failed))
def declare_exchange(self, exchange, on_completed=None, on_failed=None):
"""
Declare an exchange
:param exchange: Exchange to declare
:param on_completed: callable/0 to call when this succeeds
:param on_failed: callable/1 to call when this fails with AMQPError instance
"""
def consume(self, queue):
"""
Start consuming from a queue
This queue will be declared to the broker. If this queue has any binds
(.exchange field is not empty), queue will be binded to exchanges.
:param queue: Queue to consume from.
:return:
"""
self.node_to_connect_to = itertools.cycle(nodes)
def drain(self, wait=0):
"""
Return a ClusterEvent on what happened, or None if nothing could be obtained
within given time
:param wait: Interval to wait for events.
Finite number to wait this much seconds before returning None
None to wait for infinity
0 to return immediately
:return: a ClusterEvent instance or None
"""
if wait == 0:
try:
return self.thread.event_queue.get(False)
except Queue.Empty:
return None
else:
return self.thread.event_queue.get(True, wait)
def start(self):
"""
Connect to the cluster.
:return: self
"""
from .handler import ClusterHandlerThread
self.thread = ClusterHandlerThread(self)
self.thread.start()
return self
......
"""
Events emitted by Cluster
"""
class ClusterEvent(object):
"""Base class for events emitted by cluster"""
class ConnectionDown(ClusterEvent):
"""Connection to broker has been broken"""
class ConnectionUp(ClusterEvent):
"""Connection to broker has been (re)established"""
class MessageReceived(ClusterEvent):
"""A message has been received from the broker"""
def __init__(self, message):
"""
:param message: ReceivedMessage instance
"""
self.message = message
class ConsumerCancelled(ClusterEvent):
"""Broker cancelled a consumer of ours.
This is also generated in response to cancelling consumption from a queue"""
def __init__(self, queue):
"""
:param queue: Queue whose consumer was cancelled
"""
self.queue = queue
\ No newline at end of file
import threading
import Queue
import logging
import collections
import time
from .backends import PyAMQPBackend, ConnectionFailedError, RemoteAMQPError, AMQPError
from .messages import Exchange
from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived
from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue
logger = logging.getLogger(__name__)
class ClusterHandlerThread(threading.Thread):
"""
Thread that does bookkeeping for a Cluster
......@@ -17,21 +22,111 @@ class ClusterHandlerThread(threading.Thread):
self.cluster = cluster
self.is_terminating = False
self.order_queue = Queue.Queue() # queue for inbound orders
self.order_queue = collections.deque() # queue for inbound orders
self.event_queue = Queue.Queue() # queue for tasks done
self.connect_id = -1 # connectID of current connection
self.declared_exchanges = {} # declared exchanges, by their names
self.subscribed_queues = [] # list of subscribed queues
self.queues_by_consumer_tags = {} # listened queues, by their consumer tags
self.backend = None
def _reconnect(self):
node = self.cluster.node_to_connect_to.next()
logger.info('Connecting to ', node)
exponential_backoff_delay = 1
while True:
if self.backend is not None:
self.backend.shutdown()
self.backend = None
self.connect_id += 1
node = self.cluster.node_to_connect_to.next()
logger.info('Connecting to ', node)
try:
self.backend = self.cluster.backend(node, self)
for exchange in self.declared_exchanges:
self.backend.exchange_declare(exchange)
for queue in self.queues_by_consumer_tags.itervalues():
self.backend.queue_declare(queue)
if queue.exchange is not None:
if isinstance(queue.exchange, Exchange):
self.backend.queue_bind(queue, queue.exchange)
else:
for exchange in queue.exchange:
self.backend.queue_bind(queue, queue.exchange)
self.backend.basic_consume(queue)
except ConnectionFailedError as e:
# a connection failure happened :(
logger.warning('Connecting to ', node, 'failed due to ', e)
if self.backend is not None:
self.backend.shutdown()
self.backend = None # good policy to release resources before you sleep
time.sleep(exponential_backoff_delay)
if self.is_terminating:
raise SystemError('Thread was requested to terminate')
if exponential_backoff_delay < 60:
exponential_backoff_delay *= 2
else:
exponential_backoff_delay = 60
else:
from .events import ConnectionUp
self.event_queue.put(ConnectionUp())
break # we connected :)
def run(self):
self._reconnect()
while not self.is_terminating:
try:
while len(self.order_queue) > 0:
order = self.order_queue.popleft()
try:
if isinstance(order, SendMessage):
self.backend.basic_publish(order.message, order.exchange, order.routing_key)
elif isinstance(order, DeclareExchange):
self.backend.exchange_declare(order.exchange)
elif isinstance(order, ConsumeQueue):
self.backend.queue_declare(order.queue)
if order.queue.exchange is not None:
if isinstance(order.queue.exchange, Exchange):
self.backend.queue_bind(order.queue, order.queue.exchange)
else:
for exchange in order.queue.exchange:
self.backend.queue_bind(order.queue, order.queue.exchange)
self.backend.basic_consume(order.queue)
elif isinstance(order, CancelQueue):
try:
q = self.queues_by_consumer_tags.pop(order.queue.consumer_tag)
except KeyError:
pass # wat?
else:
self.backend.basic_cancel(order.queue.consumer_tag)
self.event_queue.put(ConsumerCancelled(order.queue))
except RemoteAMQPError as e:
order.failed(e) # we are allowed to go on
except ConnectionFailedError:
self.order_queue.appendleft(order)
raise
else:
order.completed()
# just drain shit
self.backend.process(max_time=2)
except ConnectionFailedError as e:
logger.warning('Connection to broker lost')
self.event_queue.append(ConnectionDown())
self._reconnect()
def terminate(self):
......@@ -42,6 +137,31 @@ class ClusterHandlerThread(threading.Thread):
self.is_terminating = True
## events called
def _on_recvmessage(self, body, exchange_name, routing_key, delivery_tag, properties):
"""
Upon receiving a message
"""
from .messages import ReceivedMessage
self.event_queue.put(MessageReceived(ReceivedMessage(body, self,
self.connect_id,
exchange_name,
routing_key,
properties,
delivery_tag=delivery_tag)))
def _on_consumercancelled(self, consumer_tag):
"""
A consumer has been cancelled
"""
try:
queue = self.queues_by_consumer_tags.pop(consumer_tag)
except KeyError:
return # what?
self.event_queue.put(ConsumerCancelled(queue))
## methods to enqueue something into CHT to execute
def _do_ackmessage(self, receivedMessage, on_completed=None):
......
......@@ -3,14 +3,14 @@ import uuid
class Message(object):
"""AMQP message object"""
def __init__(self, body, headers={}):
def __init__(self, body, properties={}):
"""
Create a Message object
:param body: stream of octets
:param headers: AMQP headers to be sent along
:param properties: AMQP properties to be sent along
"""
self.body = body
self.headers = headers
self.properties = properties
class ReceivedMessage(Message):
......@@ -18,14 +18,17 @@ class ReceivedMessage(Message):
Message as received from AMQP system
"""
def __init__(self, body, cht, connect_id, headers={}, delivery_tag=None):
def __init__(self, body, cht, connect_id, exchange_name, routing_key, properties={}, delivery_tag=None):
"""
:param body: message body. A stream of octets.
:param cht: parent ClusterHandlerThread that emitted this message
:param connect_id: connection ID. ClusterHandlerThread will check this in order
not to ack messages that were received from a dead connection
:param headers: dictionary. Headers received from AMQP
:param exchange_name: name of exchange this message was submitted to
:param routing_key: routing key with which this message was sent
:param properties: dictionary. Headers received from AMQP
:param delivery_tag: delivery tag assigned by AMQP broker to confirm this message.
leave None if auto-ack
"""
......@@ -34,6 +37,8 @@ class ReceivedMessage(Message):
self.cht = cht
self.connect_id = connect_id
self.delivery_tag = delivery_tag
self.exchange_name = exchange_name
self.routing_key = routing_key
def nack(self, on_completed=None):
"""
......
"""
Orders that can be dispatched to ClusterHandlerThread
"""
class Order(object):
"""Base class for orders dispatched to ClusterHandlerThread"""
def __init__(self, on_completed=None, on_failed=None):
self.on_completed = on_completed
self.on_failed = on_failed
def completed(self):
if self.on_completed is not None:
self.on_completed()
def failed(self, e):
"""
:param e: AMQPError instance
"""
if self.on_failed is not None:
self.on_failed(e)
class SendMessage(Order):
"""Send a message"""
def __init__(self, message, exchange, routing_key, on_completed=None, on_failed=None):
Order.__init__(self, on_completed=on_completed, on_failed=on_failed)
self.message = message
self.exchange = exchange
self.routing_key = routing_key
class DeclareExchange(Order):
"""Declare an exchange"""
def __init__(self, exchange, on_completed=None, on_failed=None):
Order.__init__(self, on_completed=on_completed, on_failed=on_failed)
self.exchange = exchange
class ConsumeQueue(Order):
"""Declare and consume from a queue"""
def __init__(self, queue, on_completed=None, on_failed=None):
Order.__init__(self, on_completed=on_completed, on_failed=on_failed)
self.queue = queue
class CancelQueue(Order):
"""Cancel consuming from a queue"""
def __init__(self, queue, on_completed=None, on_failed=None):
Order.__init__(self, on_completed=on_completed, on_failed=on_failed)
self.queue = queue
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment