diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 1e1105d34af8d972f075d7ddf7c119d9d1020780..7d2b7c1eabe858c9235b29277a148ae867f81cae 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -3,7 +3,7 @@ import itertools from six.moves import queue as Queue from coolamqp.backends import PyAMQPBackend from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \ - DeleteExchange, SetQoS + DeleteExchange, SetQoS, DeclareQueue from .messages import Exchange @@ -95,10 +95,21 @@ class Cluster(object): :param on_failed: callable/1 to call when this fails with AMQPError instance :return: a Future with this order's status """ - self.thread.order_queue.append(DeclareExchange(exchange, - on_completed=on_completed, - on_failed=on_failed)) + a = DeclareExchange(exchange, on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a + def declare_queue(self, queue, on_completed=None, on_failed=None): + """ + Declares a queue + :param queue: Queue to declare + :param on_completed: callable/0 to call when this succeeds + :param on_failed: callable/1 to call when this fails with AMQPError instance + :return: a Future with this order's status + """ + a = DeclareQueue(queue, on_completed=on_completed, on_failed=on_failed) + self.thread.order_queue.append(a) + return a def delete_exchange(self, exchange, on_completed=None, on_failed=None): """ diff --git a/coolamqp/handler.py b/coolamqp/handler.py index fb6613e0aca4f0ba21acc56c9f6d0e557a98179b..978e5657647fef2e2027720058c2584438cd4654 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -9,7 +9,7 @@ from .messages import Exchange from .events import ConnectionUp, ConnectionDown, ConsumerCancelled, MessageReceived from .orders import SendMessage, DeclareExchange, ConsumeQueue, CancelQueue, \ AcknowledgeMessage, NAcknowledgeMessage, DeleteQueue, \ - DeleteExchange, SetQoS + DeleteExchange, SetQoS, DeclareQueue logger = logging.getLogger(__name__) @@ -108,6 +108,8 @@ class ClusterHandlerThread(threading.Thread): self.backend.exchange_delete(order.exchange) if order.exchange.name in self.declared_exchanges: del self.declared_exchanges[order.exchange.name] + elif isinstance(order, DeclareQueue): + self.backend.queue_declare(order.queue) elif isinstance(order, DeleteQueue): self.backend.queue_delete(order.queue) elif isinstance(order, ConsumeQueue): @@ -138,7 +140,7 @@ class ClusterHandlerThread(threading.Thread): self.backend.basic_ack(order.delivery_tag) elif isinstance(order, NAcknowledgeMessage): if order.connect_id == self.connect_id: - self.backend.basic_nack(order.delivery_tag) + self.backend.basic_reject(order.delivery_tag) except RemoteAMQPError as e: logger.error('Remote AMQP error: %s', e) order.failed(e) # we are allowed to go on diff --git a/coolamqp/orders.py b/coolamqp/orders.py index 2752205535dc43facad4144dc5e622b3d1f541c9..0934be504c8d3feab402d531a6c7b3bf6470e54a 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -62,6 +62,13 @@ class DeleteExchange(Order): self.exchange = exchange +class DeclareQueue(Order): + """Declare a a queue""" + def __init__(self, queue, on_completed=None, on_failed=None): + Order.__init__(self, on_completed=on_completed, on_failed=on_failed) + self.queue = queue + + class ConsumeQueue(Order): """Declare and consume from a queue""" def __init__(self, queue, on_completed=None, on_failed=None): diff --git a/setup.py b/setup.py index d5b969b8a58ad7d88382569332f730a7e8e7cd80..78f0595271e849e7fbb3be2e3537d7a28140da37 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from distutils.core import setup setup(name='CoolAMQP', - version='0.2', + version='0.4', description='The AMQP client library', author=u'DMS Serwis s.c.', author_email='piotrm@smok.co', @@ -14,6 +14,7 @@ setup(name='CoolAMQP', license='MIT License', long_description='''The Python AMQP client library that makes you forget about all the nasty corner cases about AMQP reconnection''', requires=[ - "amqp" + "amqp", + "six" ] ) \ No newline at end of file