From e2e882e41669fa3ddd6167d16374ea2b608f9be2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sat, 28 May 2016 01:15:03 +0200 Subject: [PATCH] something works + example added --- coolamqp/__init__.py | 4 ++- coolamqp/backends/base.py | 12 ++++++++ coolamqp/backends/pyamqp.py | 30 +++++++++++++++----- coolamqp/cluster.py | 56 ++++++++++++++++++++++++++----------- coolamqp/handler.py | 33 ++++++++++++++++------ coolamqp/messages.py | 2 +- coolamqp/orders.py | 19 +++++++++++++ examples/send_to_myself.py | 33 ++++++++++++++++++++++ 8 files changed, 154 insertions(+), 35 deletions(-) create mode 100644 examples/send_to_myself.py diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 1669822..669a034 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1,3 @@ -from .cluster import ClusterNode, Cluster \ No newline at end of file +from .cluster import ClusterNode, Cluster +from .events import ConnectionDown, ConnectionUp, MessageReceived, ConsumerCancelled +from .messages import Message, Exchange, Queue diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index bd6a71d..6d7e42f 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -75,6 +75,18 @@ class AMQPBackend(object): :param queue: Queue object """ + def basic_ack(self, delivery_tag): + """ + ACK a message. + :param delivery_tag: delivery tag to ack + """ + + def basic_nack(self, delivery_tag): + """ + NACK a message. + :param delivery_tag: delivery tag to nack + """ + def basic_publish(self, message, exchange, routing_key): """ Send a message diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 6c195a1..1f9de7b 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -1,5 +1,6 @@ """Backend using pyamqp""" import amqp +import socket import functools from .base import AMQPBackend, AMQPError, RemoteAMQPError, ConnectionFailedError @@ -13,18 +14,21 @@ def translate_exceptions(fun): except amqp.ChannelError as e: raise RemoteAMQPError(e.reply_code, e.reply_text) except IOError as e: - raise ConnectionFailedError + raise ConnectionFailedError(e.message) return q class PyAMQPBackend(AMQPBackend): + @translate_exceptions def __init__(self, node, cluster_handler_thread): - AMQPBackend.__init__(self, cluster_handler_thread) + AMQPBackend.__init__(self, node, cluster_handler_thread) self.connection = amqp.Connection(host=node.host, userid=node.user, password=node.password, - virtual_host=node.virtual_host) + virtual_host=node.virtual_host, + heartbeat=node.heartbeat or 0) + self.connection.connect() #todo what does this raise? self.channel = self.connection.channel() @@ -42,7 +46,10 @@ class PyAMQPBackend(AMQPBackend): @translate_exceptions def process(self, max_time=10): self.connection.heartbeat_tick() - self.connection.drain_events(max_time) + try: + self.connection.drain_events(max_time) + except socket.timeout: + pass @translate_exceptions def basic_cancel(self, consumer_tag): @@ -54,7 +61,7 @@ class PyAMQPBackend(AMQPBackend): a = amqp.Message(message.body, **message.properties) - self.amqp_channel.basic_publish(a, exchange=exchange.name, routing_key=routing_key) + self.channel.basic_publish(a, exchange=exchange.name, routing_key=routing_key) @translate_exceptions def exchange_declare(self, exchange): @@ -65,6 +72,14 @@ class PyAMQPBackend(AMQPBackend): def queue_bind(self, queue, exchange, routing_key=''): self.channel.queue_bind(queue.name, exchange.name, routing_key) + @translate_exceptions + def basic_ack(self, delivery_tag): + self.channel.basic_ack(delivery_tag, multiple=False) + + @translate_exceptions + def basic_nack(self, delivery_tag): + self.channel.basic_nack(delivery_tag, multiple=False) + @translate_exceptions def queue_declare(self, queue): """ @@ -73,10 +88,11 @@ class PyAMQPBackend(AMQPBackend): This will change queue's name if anonymous :param queue: Queue """ - if queue.anonymous: queue.name = '' + print 'declaring queue that is %s %s %s %s' % (queue.name, queue.durable, queue.exclusive, queue.auto_delete) + qname, mc, cc = self.channel.queue_declare(queue.name, durable=queue.durable, exclusive=queue.exclusive, @@ -85,7 +101,7 @@ class PyAMQPBackend(AMQPBackend): queue.name = qname @translate_exceptions - def basic_consume(self, queue, on_message, on_cancel): + def basic_consume(self, queue): """ Start consuming from a queue :param queue: Queue object diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 751e72e..b91cd31 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -1,7 +1,8 @@ import itertools import Queue from coolamqp.backends import PyAMQPBackend -from .orders import SendMessage +from .orders import SendMessage, ConsumeQueue, DeclareExchange +from .messages import Exchange class ClusterNode(object): @@ -17,14 +18,28 @@ class ClusterNode(object): a = ClusterNode(host='192.168.0.1', user='admin', password='password', vhost='vhost') + + or + + a = ClusterNode('192.168.0.1', 'admin', 'password') + + Additional keyword parameters that can be specified: + heartbeat - heartbeat interval in seconds """ - if len(kwargs) == 0: + self.heartbeat = kwargs.get('heartbeat', None) + + if len(kwargs) > 0: # Prepare arguments for amqp.connection.Connection self.host = kwargs['host'] self.user = kwargs['user'] self.password = kwargs['password'] self.virtual_host = kwargs.get('virtual_host', '/') + elif len(args) == 3: + self.host, self.user, self.password = args + self.virtual_host = '/' + elif len(args) == 4: + self.host, self.user, self.password, self.virtual_host = args else: raise NotImplementedError #todo implement this @@ -34,7 +49,6 @@ class ClusterNode(object): self.virtual_host) - class Cluster(object): """ Represents connection to an AMQP cluster. This internally connects only to one node. @@ -49,21 +63,23 @@ class Cluster(object): """ self.backend = backend - self.node_to_connect_to = itertools.cycle(nodes)\ + self.node_to_connect_to = itertools.cycle(nodes) from .handler import ClusterHandlerThread self.thread = ClusterHandlerThread(self) - def send(self, message, exchange, routing_key, on_completed=None, on_failed=None): + def send(self, message, exchange=None, routing_key='', on_completed=None, on_failed=None): """ Schedule a message to be sent. :param message: Message object to send - :param exchange: Exchange to use + :param exchange: Exchange to use. Leave None to use the default exchange :param routing_key: routing key to use :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance """ - self.thread.order_queue.append(SendMessage(message, exchange, routing_key, + self.thread.order_queue.append(SendMessage(message, + exchange or Exchange.direct, + routing_key, on_completed=on_completed, on_failed=on_failed)) @@ -74,8 +90,11 @@ class Cluster(object): :param on_completed: callable/0 to call when this succeeds :param on_failed: callable/1 to call when this fails with AMQPError instance """ + self.thread.order_queue.append(DeclareExchange(exchange, + on_completed=on_completed, + on_failed=on_failed)) - def consume(self, queue): + def consume(self, queue, on_completed=None, on_failed=None): """ Start consuming from a queue @@ -83,8 +102,12 @@ class Cluster(object): (.exchange field is not empty), queue will be binded to exchanges. :param queue: Queue to consume from. - :return: + :param on_completed: callable/0 to call when this succeeds + :param on_failed: callable/1 to call when this fails with AMQPError instance """ + self.thread.order_queue.append(ConsumeQueue(queue, + on_completed=on_completed, + on_failed=on_failed)) def drain(self, wait=0): @@ -97,14 +120,13 @@ class Cluster(object): 0 to return immediately :return: a ClusterEvent instance or None """ - - if wait == 0: - try: + try: + if wait == 0: return self.thread.event_queue.get(False) - except Queue.Empty: - return None - else: - return self.thread.event_queue.get(True, wait) + else: + return self.thread.event_queue.get(True, wait) + except Queue.Empty: + return None def start(self): """ @@ -120,4 +142,4 @@ class Cluster(object): """ self.thread.terminate() self.thread.join() - return \ No newline at end of file + return diff --git a/coolamqp/handler.py b/coolamqp/handler.py index 040244a..f8c8266 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -3,10 +3,11 @@ import Queue import logging import collections import time -from .backends import PyAMQPBackend, ConnectionFailedError, RemoteAMQPError, AMQPError +from .backends import ConnectionFailedError, RemoteAMQPError from .messages import Exchange from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived -from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue +from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ + AcknowledgeMessage, NAcknowledgeMessage logger = logging.getLogger(__name__) @@ -19,6 +20,7 @@ class ClusterHandlerThread(threading.Thread): """ :param cluster: coolamqp.Cluster """ + threading.Thread.__init__(self) self.cluster = cluster self.is_terminating = False @@ -41,7 +43,7 @@ class ClusterHandlerThread(threading.Thread): self.connect_id += 1 node = self.cluster.node_to_connect_to.next() - logger.info('Connecting to ', node) + logger.info('Connecting to %s', node) try: self.backend = self.cluster.backend(node, self) @@ -61,7 +63,7 @@ class ClusterHandlerThread(threading.Thread): except ConnectionFailedError as e: # a connection failure happened :( - logger.warning('Connecting to ', node, 'failed due to ', e) + logger.warning('Connecting to %s failed due to %s', node, repr(e)) if self.backend is not None: self.backend.shutdown() self.backend = None # good policy to release resources before you sleep @@ -75,7 +77,6 @@ class ClusterHandlerThread(threading.Thread): else: exponential_backoff_delay = 60 else: - from .events import ConnectionUp self.event_queue.put(ConnectionUp()) break # we connected :) @@ -93,6 +94,7 @@ class ClusterHandlerThread(threading.Thread): self.backend.basic_publish(order.message, order.exchange, order.routing_key) elif isinstance(order, DeclareExchange): self.backend.exchange_declare(order.exchange) + self.declared_exchanges.append(order.exchange) elif isinstance(order, ConsumeQueue): self.backend.queue_declare(order.queue) @@ -104,6 +106,8 @@ class ClusterHandlerThread(threading.Thread): self.backend.queue_bind(order.queue, order.queue.exchange) self.backend.basic_consume(order.queue) + self.queues_by_consumer_tags[order.queue.consumer_tag] = order.queue + elif isinstance(order, CancelQueue): try: q = self.queues_by_consumer_tags.pop(order.queue.consumer_tag) @@ -112,7 +116,14 @@ class ClusterHandlerThread(threading.Thread): else: self.backend.basic_cancel(order.queue.consumer_tag) self.event_queue.put(ConsumerCancelled(order.queue)) + elif isinstance(order, AcknowledgeMessage): + if order.connect_id == self.connect_id: + self.backend.basic_ack(order.delivery_tag) + elif isinstance(order, NAcknowledgeMessage): + if order.connect_id == self.connect_id: + self.backend.basic_nack(order.delivery_tag) except RemoteAMQPError as e: + logger.error('Remote AMQP error: %s', e) order.failed(e) # we are allowed to go on except ConnectionFailedError: self.order_queue.appendleft(order) @@ -125,7 +136,7 @@ class ClusterHandlerThread(threading.Thread): except ConnectionFailedError as e: logger.warning('Connection to broker lost') - self.event_queue.append(ConnectionDown()) + self.event_queue.put(ConnectionDown()) self._reconnect() @@ -137,7 +148,7 @@ class ClusterHandlerThread(threading.Thread): self.is_terminating = True - ## events called + ## events called def _on_recvmessage(self, body, exchange_name, routing_key, delivery_tag, properties): """ Upon receiving a message @@ -170,7 +181,9 @@ class ClusterHandlerThread(threading.Thread): :param receivedMessage: a ReceivedMessage object to ack :param on_completed: callable/0 to call when acknowledgemenet succeeded """ - raise NotImplementedError + self.order_queue.append(AcknowledgeMessage(receivedMessage.connect_id, + receivedMessage.delivery_tag, + on_completed=on_completed)) def _do_nackmessage(self, receivedMessage, on_completed=None): @@ -179,4 +192,6 @@ class ClusterHandlerThread(threading.Thread): :param receivedMessage: a ReceivedMessage object to ack :param on_completed: callable/0 to call when acknowledgemenet succeeded """ - raise NotImplementedError \ No newline at end of file + self.order_queue.put(NAcknowledgeMessage(receivedMessage.connect_id, + receivedMessage.delivery_tag, + on_completed=on_completed)) diff --git a/coolamqp/messages.py b/coolamqp/messages.py index 1c59363..2bc0e26 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -32,7 +32,7 @@ class ReceivedMessage(Message): :param delivery_tag: delivery tag assigned by AMQP broker to confirm this message. leave None if auto-ack """ - Message.__init__(self, body, headers=headers) + Message.__init__(self, body, properties=properties) self.cht = cht self.connect_id = connect_id diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 3bee67d..8d27e35 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -28,20 +28,39 @@ class SendMessage(Order): self.exchange = exchange self.routing_key = routing_key + class DeclareExchange(Order): """Declare an exchange""" def __init__(self, exchange, on_completed=None, on_failed=None): Order.__init__(self, on_completed=on_completed, on_failed=on_failed) self.exchange = exchange + class ConsumeQueue(Order): """Declare and consume from a queue""" def __init__(self, queue, on_completed=None, on_failed=None): Order.__init__(self, on_completed=on_completed, on_failed=on_failed) self.queue = queue + class CancelQueue(Order): """Cancel consuming from a queue""" def __init__(self, queue, on_completed=None, on_failed=None): Order.__init__(self, on_completed=on_completed, on_failed=on_failed) self.queue = queue + + +class AcknowledgeMessage(Order): + """ACK a message""" + def __init__(self, connect_id, delivery_tag, on_completed): + Order.__init__(self, on_completed=on_completed) + self.connect_id = connect_id + self.delivery_tag = delivery_tag + + +class NAcknowledgeMessage(Order): + """NACK a message""" + def __init__(self, connect_id, delivery_tag, on_completed): + Order.__init__(self, on_completed=on_completed) + self.connect_id = connect_id + self.delivery_tag = delivery_tag diff --git a/examples/send_to_myself.py b/examples/send_to_myself.py new file mode 100644 index 0000000..a74ed45 --- /dev/null +++ b/examples/send_to_myself.py @@ -0,0 +1,33 @@ +from coolamqp import Cluster, ClusterNode, Queue, Message, ConnectionUp, ConnectionDown, MessageReceived +import logging +import time + +QUEUE_NAME = 'f' + +logging.basicConfig() + +cluster = Cluster([ClusterNode('xx', 'xx', 'xx', 'xx', heartbeat=10)]).start() + +a_queue = Queue(QUEUE_NAME, auto_delete=True) +cluster.consume(a_queue) + +q = time.time() +i = 0 + +while True: + if time.time() - q > 10: + q = time.time() + cluster.send(Message('hello world '+str(i)), routing_key=QUEUE_NAME) + i += 1 + + evt = cluster.drain(2) + + if isinstance(evt, ConnectionUp): + print 'Connection is up' + elif isinstance(evt, ConnectionDown): + print 'Connection is down' + elif isinstance(evt, MessageReceived): + print 'Message is %s' % (evt.message.body, ) + evt.message.ack() + + -- GitLab