From 17b3d77f0ed9e8f696b7d0988043f9c3d718edd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Sun, 10 Jul 2016 12:25:49 +0200 Subject: [PATCH] can declare queue from Cluster instance --- coolamqp/cluster.py | 19 +++++++++++++++---- coolamqp/handler.py | 6 ++++-- coolamqp/orders.py | 7 +++++++ setup.py | 5 +++-- 4 files changed, 29 insertions(+), 8 deletions(-) diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 1e1105d..7d2b7c1 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -3,7 +3,7 @@ import itertools from six.moves import queue as Queue from coolamqp.backends import PyAMQPBackend from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \ - DeleteExchange, SetQoS + DeleteExchange, SetQoS, DeclareQueue from .messages import Exchange @@ -95,10 +95,21 @@ class Cluster(object): :param on_failed: callable/1 to call when this fails with AMQPError instance :return: a Future with this order's status """ - self.thread.order_queue.append(DeclareExchange(exchange, - on_completed=on_completed, - on_failed=on_failed)) + a = DeclareExchange(exchange, on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a + def declare_queue(self, queue, on_completed=None, on_failed=None): + """ + Declares a queue + :param queue: Queue to declare + :param on_completed: callable/0 to call when this succeeds + :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status + """ + a = DeclareQueue(queue, on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a def delete_exchange(self, exchange, on_completed=None, on_failed=None): """ diff --git a/coolamqp/handler.py b/coolamqp/handler.py index fb6613e..978e565 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -9,7 +9,7 @@ from .messages import Exchange from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \ - DeleteExchange, SetQoS + DeleteExchange, SetQoS, DeclareQueue logger = logging.getLogger(__name__) @@ -108,6 +108,8 @@ class ClusterHandlerThread(threading.Thread): self.backend.exchange_delete(order.exchange) if order.exchange.name in self.declared_exchanges: del self.declared_exchanges[order.exchange.name] + elif isinstance(order, DeclareQueue): + self.backend.queue_declare(order.queue) elif isinstance(order, DeleteQueue): self.backend.queue_delete(order.queue) elif isinstance(order, ConsumeQueue): @@ -138,7 +140,7 @@ class ClusterHandlerThread(threading.Thread): self.backend.basic_ack(order.delivery_tag) elif isinstance(order, NAcknowledgeMessage): if order.connect_id == self.connect_id: - self.backend.basic_nack(order.delivery_tag) + self.backend.basic_reject(order.delivery_tag) except RemoteAMQPError as e: logger.error('Remote AMQP error: %s', e) order.failed(e) # we are allowed to go on diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 2752205..0934be5 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -62,6 +62,13 @@ class DeleteExchange(Order): self.exchange = exchange +class DeclareQueue(Order): + """Declare a a queue""" + def __init__(self, queue, on_completed=None, on_failed=None): + Order.__init__(self, on_completed=on_completed, on_failed=on_failed) + self.queue = queue + + class ConsumeQueue(Order): """Declare and consume from a queue""" def __init__(self, queue, on_completed=None, on_failed=None): diff --git a/setup.py b/setup.py index d5b969b..78f0595 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from distutils.core import setup setup(name='CoolAMQP', - version='0.2', + version='0.4', description='The AMQP client library', author=u'DMS Serwis s.c.', author_email='piotrm@smok.co', @@ -14,6 +14,7 @@ setup(name='CoolAMQP', license='MIT License', long_description='''The Python AMQP client library that makes you forget about all the nasty corner cases about AMQP reconnection''', requires=[ - "amqp" + "amqp", + "six" ] ) \ No newline at end of file -- GitLab