diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 669a034317a838588b3f7433c2e261f88fa2228e..9cf5c225f07524e1ba16c6c3777d1a2198551af7 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,3 +1,4 @@ +#coding=UTF-8 from .cluster import ClusterNode, Cluster from .events import ConnectionDown, ConnectionUp, MessageReceived, ConsumerCancelled from .messages import Message, Exchange, Queue diff --git a/coolamqp/backends/__init__.py b/coolamqp/backends/__init__.py index 27a3401e33ca65e1efe8f8359c5a260f115bb784..a1caeea735ead73be2957a6c730a31ca13e9ecbb 100644 --- a/coolamqp/backends/__init__.py +++ b/coolamqp/backends/__init__.py @@ -1,2 +1,3 @@ +#coding=UTF-8 from .pyamqp import PyAMQPBackend from .base import AMQPError, ConnectionFailedError, RemoteAMQPError diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index dfb3679252f5ff9dd52754dcaacccea9c30f0c9f..fd7b2dc0556b3012fe04500c2a1cc4dd0fd96bdd 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -1,5 +1,4 @@ - - +#coding=UTF-8 class AMQPError(Exception): pass @@ -7,6 +6,7 @@ class AMQPError(Exception): class ConnectionFailedError(AMQPError): """Connection to broker failed""" + class RemoteAMQPError(AMQPError): """ Remote AMQP broker responded with an error code diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index f4fea16aa61055e13894884fb82888506ce8c881..c7b54d497b0814c6635550d063215740ee08ddaf 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -1,3 +1,4 @@ +#coding=UTF-8 """Backend using pyamqp""" import amqp import socket @@ -115,7 +116,6 @@ class PyAMQPBackend(AMQPBackend): """ Start consuming from a queue :param queue: Queue object - :param on_message: callable/1 """ self.channel.basic_consume(queue.name, consumer_tag=queue.consumer_tag, diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 995d963335301c51784939656982354b2737203e..1e1105d34af8d972f075d7ddf7c119d9d1020780 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -1,5 +1,6 @@ +#coding=UTF-8 import itertools -import Queue +from six.moves import queue as Queue from coolamqp.backends import PyAMQPBackend from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \ DeleteExchange, SetQoS diff --git a/coolamqp/events.py b/coolamqp/events.py index 469a3ac8463ec1e65e5dc61a06ef5408d593d864..e32db8da7458c25216705bae265a721ef6d97b2a 100644 --- a/coolamqp/events.py +++ b/coolamqp/events.py @@ -1,3 +1,4 @@ +#coding=UTF-8 """ Events emitted by Cluster """ diff --git a/coolamqp/handler.py b/coolamqp/handler.py index bc2412d62e47a8478749564f0b63d255861b2070..e92b17b966d5f759f1a7792e1747146a0f78e52c 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -1,3 +1,4 @@ +#coding=UTF-8 import threading import six.moves.queue as Queue import logging @@ -55,17 +56,17 @@ class ClusterHandlerThread(threading.Thread): if self.qos is not None: self.backend.basic_qos(*self.qos) - for exchange in self.declared_exchanges.itervalues(): + for exchange in self.declared_exchanges.values(): self.backend.exchange_declare(exchange) - for queue in self.queues_by_consumer_tags.itervalues(): + for queue in self.queues_by_consumer_tags.values(): self.backend.queue_declare(queue) if queue.exchange is not None: if isinstance(queue.exchange, Exchange): self.backend.queue_bind(queue, queue.exchange) else: for exchange in queue.exchange: - self.backend.queue_bind(queue, queue.exchange) + self.backend.queue_bind(queue, exchange) self.backend.basic_consume(queue) except ConnectionFailedError as e: diff --git a/coolamqp/messages.py b/coolamqp/messages.py index edd8f393000ec50d3f22a47d49dcfa7ac1a63eae..7dd218fb1dbba3aa09862b40fc55bab3c9becb4b 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -1,23 +1,24 @@ +#coding=UTF-8 import uuid class Message(object): """AMQP message object""" - def __init__(self, body, properties={}): + def __init__(self, body, properties=None): """ Create a Message object :param body: stream of octets :param properties: AMQP properties to be sent along """ self.body = body - self.properties = properties + self.properties = {} if properties is None else {} class ReceivedMessage(Message): """Message as received from AMQP system""" - def __init__(self, body, cht, connect_id, exchange_name, routing_key, properties={}, delivery_tag=None): + def __init__(self, body, cht, connect_id, exchange_name, routing_key, properties=None, delivery_tag=None): """ :param body: message body. A stream of octets. :param cht: parent ClusterHandlerThread that emitted this message @@ -25,7 +26,7 @@ class ReceivedMessage(Message): not to ack messages that were received from a dead connection :param exchange_name: name of exchange this message was submitted to :param routing_key: routing key with which this message was sent - :param properties: dictionary. Headers received from AMQP + :param properties: dictionary. Headers received from AMQP or None for empty dict :param delivery_tag: delivery tag assigned by AMQP broker to confirm this message. leave None if auto-ack diff --git a/coolamqp/orders.py b/coolamqp/orders.py index b7a0e6e40a0426b555f0f62d53c915a6f7a9477c..2752205535dc43facad4144dc5e622b3d1f541c9 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -1,3 +1,4 @@ +#coding=UTF-8 """ Orders that can be dispatched to ClusterHandlerThread """ @@ -77,7 +78,8 @@ class DeleteQueue(Order): class SetQoS(Order): """Set QoS""" - def __init__(self, prefetch_window, prefetch_count): + def __init__(self, prefetch_window, prefetch_count, on_completed=None, on_failed=None): + Order.__init__(self, on_completed=on_completed, on_failed=on_failed) self.qos = (prefetch_window, prefetch_count) diff --git a/examples/send_to_myself.py b/examples/send_to_myself.py index 66b5f6d9004dcbe8ee8cdc4f60ca009f4bc9a501..8604a246c61a833f983c79e8d2d6f4e0686ec1bb 100644 --- a/examples/send_to_myself.py +++ b/examples/send_to_myself.py @@ -1,3 +1,5 @@ +#coding=UTF-8 +from __future__ import print_function from coolamqp import Cluster, ClusterNode, Queue, Message, ConnectionUp, ConnectionDown, MessageReceived, ConsumerCancelled import logging import time @@ -22,13 +24,13 @@ while True: evt = cluster.drain(2) if isinstance(evt, ConnectionUp): - print 'Connection is up' + print('Connection is up') elif isinstance(evt, ConnectionDown): - print 'Connection is down' + print('Connection is down') elif isinstance(evt, MessageReceived): - print 'Message is %s' % (evt.message.body, ) + print('Message is %s' % (evt.message.body, )) evt.message.ack() elif isinstance(evt, ConsumerCancelled): - print 'Consumer %s cancelled' % (evt.queue.name, ) + print('Consumer %s cancelled' % (evt.queue.name, ))