From 9b4a54c2432e02eb6ab6cd6993a37cfda1c3798f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Fri, 24 Jun 2016 11:12:32 +0200 Subject: [PATCH] added support for QoS --- MANIFEST | 12 ++++++++++++ coolamqp/backends/base.py | 7 +++++++ coolamqp/backends/pyamqp.py | 4 ++++ coolamqp/cluster.py | 7 ++++++- coolamqp/handler.py | 10 +++++++++- coolamqp/orders.py | 6 ++++++ examples/send_to_myself.py | 2 ++ setup.py | 2 +- 8 files changed, 47 insertions(+), 3 deletions(-) create mode 100644 MANIFEST diff --git a/MANIFEST b/MANIFEST new file mode 100644 index 0000000..fab2abd --- /dev/null +++ b/MANIFEST @@ -0,0 +1,12 @@ +# file GENERATED by distutils, do NOT edit +setup.cfg +setup.py +coolamqp\__init__.py +coolamqp\cluster.py +coolamqp\events.py +coolamqp\handler.py +coolamqp\messages.py +coolamqp\orders.py +coolamqp\backends\__init__.py +coolamqp\backends\base.py +coolamqp\backends\pyamqp.py diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index 090b39e..dfb3679 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -95,6 +95,13 @@ class AMQPBackend(object): :param delivery_tag: delivery tag to ack """ + def basic_qos(self, prefetch_size, prefetch_count): + """ + Issue a basic.qos(prefetch_size, prefetch_count, True) against broker + :param prefetch_size: prefetch window size in octets + :param prefetch_count: prefetch window in terms of whole messages + """ + def basic_nack(self, delivery_tag): """ NACK a message. diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index 0e7636a..f4fea16 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -80,6 +80,10 @@ class PyAMQPBackend(AMQPBackend): def exchange_delete(self, exchange): self.channel.exchange_delete(exchange.name) + @translate_exceptions + def basic_qos(self, prefetch_size, prefetch_count): + self.channel.basic_qos(prefetch_size, prefetch_count, True) + @translate_exceptions def queue_delete(self, queue): self.channel.queue_delete(queue.name) diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 7889803..995d963 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -2,7 +2,7 @@ import itertools import Queue from coolamqp.backends import PyAMQPBackend from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \ - DeleteExchange + DeleteExchange, SetQoS from .messages import Exchange @@ -137,6 +137,11 @@ class Cluster(object): self.thread.order_queue.append(a) return a + def qos(self, prefetch_window, prefetch_count): + a = SetQoS(prefetch_window, prefetch_count) + self.thread.order_queue.append(a) + return a + def consume(self, queue, on_completed=None, on_failed=None): """ Start consuming from a queue diff --git a/coolamqp/handler.py b/coolamqp/handler.py index 0fd75ac..d757a08 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -8,7 +8,7 @@ from .messages import Exchange from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \ - DeleteExchange + DeleteExchange, SetQoS logger = logging.getLogger(__name__) @@ -35,6 +35,8 @@ class ClusterHandlerThread(threading.Thread): self.backend = None self.first_connect = True + self.qos = None # or tuple (prefetch_size, prefetch_count) if QoS set + def _reconnect(self): exponential_backoff_delay = 1 @@ -50,6 +52,9 @@ class ClusterHandlerThread(threading.Thread): try: self.backend = self.cluster.backend(node, self) + if self.qos is not None: + self.backend.basic_qos(*self.qos) + for exchange in self.declared_exchanges.itervalues(): self.backend.exchange_declare(exchange) @@ -92,6 +97,9 @@ class ClusterHandlerThread(threading.Thread): try: if isinstance(order, SendMessage): self.backend.basic_publish(order.message, order.exchange, order.routing_key) + elif isinstance(order, SetQoS): + self.qos = order.qos + self.backend.basic_qos(*self.qos) elif isinstance(order, DeclareExchange): self.backend.exchange_declare(order.exchange) self.declared_exchanges[order.exchange.name] = order.exchange diff --git a/coolamqp/orders.py b/coolamqp/orders.py index fc18d76..b7a0e6e 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -75,6 +75,12 @@ class DeleteQueue(Order): self.queue = queue +class SetQoS(Order): + """Set QoS""" + def __init__(self, prefetch_window, prefetch_count): + self.qos = (prefetch_window, prefetch_count) + + class CancelQueue(Order): """Cancel consuming from a queue""" def __init__(self, queue, on_completed=None, on_failed=None): diff --git a/examples/send_to_myself.py b/examples/send_to_myself.py index c43e0ec..66b5f6d 100644 --- a/examples/send_to_myself.py +++ b/examples/send_to_myself.py @@ -10,6 +10,8 @@ cluster = Cluster([ClusterNode('192.168.224.31:5672', 'smok', 'smok', 'smok', he a_queue = Queue(QUEUE_NAME, auto_delete=True) cluster.consume(a_queue) +cluster.qos(0, 1) + q = time.time() while True: diff --git a/setup.py b/setup.py index 9936372..d5b969b 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from distutils.core import setup setup(name='CoolAMQP', - version='0.1', + version='0.2', description='The AMQP client library', author=u'DMS Serwis s.c.', author_email='piotrm@smok.co', -- GitLab