From 87fa00c4a49bb29e6544f3dd5961759cfad52b1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Fri, 24 Jun 2016 11:21:11 +0200 Subject: [PATCH] lotsa bugfixes thx PyCharm and Inspect Code <3 --- coolamqp/__init__.py | 1 + coolamqp/backends/__init__.py | 1 + coolamqp/backends/base.py | 4 ++-- coolamqp/backends/pyamqp.py | 2 +- coolamqp/cluster.py | 3 ++- coolamqp/events.py | 1 + coolamqp/handler.py | 7 ++++--- coolamqp/messages.py | 9 +++++---- coolamqp/orders.py | 4 +++- examples/send_to_myself.py | 10 ++++++---- 10 files changed, 26 insertions(+), 16 deletions(-) diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 669a034..9cf5c22 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,3 +1,4 @@ +#coding=UTF-8 from .cluster import ClusterNode, Cluster from .events import ConnectionDown, ConnectionUp, MessageReceived, ConsumerCancelled from .messages import Message, Exchange, Queue diff --git a/coolamqp/backends/__init__.py b/coolamqp/backends/__init__.py index 27a3401..a1caeea 100644 --- a/coolamqp/backends/__init__.py +++ b/coolamqp/backends/__init__.py @@ -1,2 +1,3 @@ +#coding=UTF-8 from .pyamqp import PyAMQPBackend from .base import AMQPError, ConnectionFailedError, RemoteAMQPError diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py index dfb3679..fd7b2dc 100644 --- a/coolamqp/backends/base.py +++ b/coolamqp/backends/base.py @@ -1,5 +1,4 @@ - - +#coding=UTF-8 class AMQPError(Exception): pass @@ -7,6 +6,7 @@ class AMQPError(Exception): class ConnectionFailedError(AMQPError): """Connection to broker failed""" + class RemoteAMQPError(AMQPError): """ Remote AMQP broker responded with an error code diff --git a/coolamqp/backends/pyamqp.py b/coolamqp/backends/pyamqp.py index f4fea16..c7b54d4 100644 --- a/coolamqp/backends/pyamqp.py +++ b/coolamqp/backends/pyamqp.py @@ -1,3 +1,4 @@ +#coding=UTF-8 """Backend using pyamqp""" import amqp import socket @@ -115,7 +116,6 @@ class PyAMQPBackend(AMQPBackend): """ Start consuming from a queue :param queue: Queue object - :param on_message: callable/1 """ self.channel.basic_consume(queue.name, consumer_tag=queue.consumer_tag, diff --git a/coolamqp/cluster.py b/coolamqp/cluster.py index 995d963..1e1105d 100644 --- a/coolamqp/cluster.py +++ b/coolamqp/cluster.py @@ -1,5 +1,6 @@ +#coding=UTF-8 import itertools -import Queue +from six.moves import queue as Queue from coolamqp.backends import PyAMQPBackend from .orders import SendMessage, ConsumeQueue, DeclareExchange, CancelQueue, DeleteQueue, \ DeleteExchange, SetQoS diff --git a/coolamqp/events.py b/coolamqp/events.py index 469a3ac..e32db8d 100644 --- a/coolamqp/events.py +++ b/coolamqp/events.py @@ -1,3 +1,4 @@ +#coding=UTF-8 """ Events emitted by Cluster """ diff --git a/coolamqp/handler.py b/coolamqp/handler.py index bc2412d..e92b17b 100644 --- a/coolamqp/handler.py +++ b/coolamqp/handler.py @@ -1,3 +1,4 @@ +#coding=UTF-8 import threading import six.moves.queue as Queue import logging @@ -55,17 +56,17 @@ class ClusterHandlerThread(threading.Thread): if self.qos is not None: self.backend.basic_qos(*self.qos) - for exchange in self.declared_exchanges.itervalues(): + for exchange in self.declared_exchanges.values(): self.backend.exchange_declare(exchange) - for queue in self.queues_by_consumer_tags.itervalues(): + for queue in self.queues_by_consumer_tags.values(): self.backend.queue_declare(queue) if queue.exchange is not None: if isinstance(queue.exchange, Exchange): self.backend.queue_bind(queue, queue.exchange) else: for exchange in queue.exchange: - self.backend.queue_bind(queue, queue.exchange) + self.backend.queue_bind(queue, exchange) self.backend.basic_consume(queue) except ConnectionFailedError as e: diff --git a/coolamqp/messages.py b/coolamqp/messages.py index edd8f39..7dd218f 100644 --- a/coolamqp/messages.py +++ b/coolamqp/messages.py @@ -1,23 +1,24 @@ +#coding=UTF-8 import uuid class Message(object): """AMQP message object""" - def __init__(self, body, properties={}): + def __init__(self, body, properties=None): """ Create a Message object :param body: stream of octets :param properties: AMQP properties to be sent along """ self.body = body - self.properties = properties + self.properties = {} if properties is None else {} class ReceivedMessage(Message): """Message as received from AMQP system""" - def __init__(self, body, cht, connect_id, exchange_name, routing_key, properties={}, delivery_tag=None): + def __init__(self, body, cht, connect_id, exchange_name, routing_key, properties=None, delivery_tag=None): """ :param body: message body. A stream of octets. :param cht: parent ClusterHandlerThread that emitted this message @@ -25,7 +26,7 @@ class ReceivedMessage(Message): not to ack messages that were received from a dead connection :param exchange_name: name of exchange this message was submitted to :param routing_key: routing key with which this message was sent - :param properties: dictionary. Headers received from AMQP + :param properties: dictionary. Headers received from AMQP or None for empty dict :param delivery_tag: delivery tag assigned by AMQP broker to confirm this message. leave None if auto-ack diff --git a/coolamqp/orders.py b/coolamqp/orders.py index b7a0e6e..2752205 100644 --- a/coolamqp/orders.py +++ b/coolamqp/orders.py @@ -1,3 +1,4 @@ +#coding=UTF-8 """ Orders that can be dispatched to ClusterHandlerThread """ @@ -77,7 +78,8 @@ class DeleteQueue(Order): class SetQoS(Order): """Set QoS""" - def __init__(self, prefetch_window, prefetch_count): + def __init__(self, prefetch_window, prefetch_count, on_completed=None, on_failed=None): + Order.__init__(self, on_completed=on_completed, on_failed=on_failed) self.qos = (prefetch_window, prefetch_count) diff --git a/examples/send_to_myself.py b/examples/send_to_myself.py index 66b5f6d..8604a24 100644 --- a/examples/send_to_myself.py +++ b/examples/send_to_myself.py @@ -1,3 +1,5 @@ +#coding=UTF-8 +from __future__ import print_function from coolamqp import Cluster, ClusterNode, Queue, Message, ConnectionUp, ConnectionDown, MessageReceived, ConsumerCancelled import logging import time @@ -22,13 +24,13 @@ while True: evt = cluster.drain(2) if isinstance(evt, ConnectionUp): - print 'Connection is up' + print('Connection is up') elif isinstance(evt, ConnectionDown): - print 'Connection is down' + print('Connection is down') elif isinstance(evt, MessageReceived): - print 'Message is %s' % (evt.message.body, ) + print('Message is %s' % (evt.message.body, )) evt.message.ack() elif isinstance(evt, ConsumerCancelled): - print 'Consumer %s cancelled' % (evt.queue.name, ) + print('Consumer %s cancelled' % (evt.queue.name, )) -- GitLab