From eef5e2f0a53722ff2dc6056a172b0de1768884f2 Mon Sep 17 00:00:00 2001 From: Piotr Maslanka <piotr.maslanka@henrietta.com.pl> Date: Mon, 9 Jan 2017 14:33:15 +0100 Subject: [PATCH] refactor --- coolamqp/connection/__init__.py | 7 ----- coolamqp/connection/definition.py | 47 ----------------------------- coolamqp/connection/orders.py | 39 ------------------------ coolamqp/objects.py | 44 +++++++++++++++++++++++++++ tests/run.py | 3 +- tests/test_connection/test_state.py | 2 +- 6 files changed, 46 insertions(+), 96 deletions(-) delete mode 100644 coolamqp/connection/__init__.py delete mode 100644 coolamqp/connection/definition.py delete mode 100644 coolamqp/connection/orders.py diff --git a/coolamqp/connection/__init__.py b/coolamqp/connection/__init__.py deleted file mode 100644 index 569c71d..0000000 --- a/coolamqp/connection/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -# coding=UTF-8 -""" -This does things relating to a single connection -""" -from __future__ import absolute_import, division, print_function - -from coolamqp.connection.definition import NodeDefinition diff --git a/coolamqp/connection/definition.py b/coolamqp/connection/definition.py deleted file mode 100644 index f33cc9a..0000000 --- a/coolamqp/connection/definition.py +++ /dev/null @@ -1,47 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import six - - -class NodeDefinition(object): - """ - Definition of a reachable AMQP node. - - This object is hashable. - """ - - def __init__(self, *args, **kwargs): - """ - Create a cluster node definition. - - a = ClusterNode(host='192.168.0.1', user='admin', password='password', - virtual_host='vhost') - - or - - a = ClusterNode('192.168.0.1', 'admin', 'password') - - Additional keyword parameters that can be specified: - heartbeat - heartbeat interval in seconds - port - TCP port to use. Default is 5672 - """ - - self.heartbeat = kwargs.pop('heartbeat', None) - self.port = kwargs.pop('port', 5672) - - if len(kwargs) > 0: - # Prepare arguments for amqp.connection.Connection - self.host = kwargs['host'] - self.user = kwargs['user'] - self.password = kwargs['password'] - self.virtual_host = kwargs.get('virtual_host', '/') - elif len(args) == 3: - self.host, self.user, self.password = args - self.virtual_host = '/' - elif len(args) == 4: - self.host, self.user, self.password, self.virtual_host = args - else: - raise NotImplementedError #todo implement this - - def __str__(self): - return six.text_type(b'amqp://%s:%s@%s/%s'.encode('utf8') % (self.host, self.port, self.user, self.virtual_host)) diff --git a/coolamqp/connection/orders.py b/coolamqp/connection/orders.py deleted file mode 100644 index 9c43262..0000000 --- a/coolamqp/connection/orders.py +++ /dev/null @@ -1,39 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -""" -You can feed Broker with some orders. -They work pretty much like futures. -""" -import threading - - -class BaseOrder(object): - def __init__(self, on_completed=None, on_failed=None): - self.lock = threading.Lock() - self.lock.acquire() - self.on_completed = on_completed - self.on_failed = on_failed - - def on_done(self): - if self.on_completed is not None: - self.on_completed() - self.lock.release() - - def on_fail(self): - if self.on_failed is not None: - self.on_failed() - self.lock.release() - - def wait(self): - self.lock.acquire() - - -class LinkSetup(BaseOrder): - """ - Connecting to broker - """ - -class ConsumeOnChannel(BaseOrder): - """ - - """ \ No newline at end of file diff --git a/coolamqp/objects.py b/coolamqp/objects.py index ed06cb2..3fd9c01 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -217,3 +217,47 @@ class Future(concurrent.futures.Future): assert self.cancelled self.completed = False self.lock.release() + + +class NodeDefinition(object): + """ + Definition of a reachable AMQP node. + + This object is hashable. + """ + + def __init__(self, *args, **kwargs): + """ + Create a cluster node definition. + + a = ClusterNode(host='192.168.0.1', user='admin', password='password', + virtual_host='vhost') + + or + + a = ClusterNode('192.168.0.1', 'admin', 'password') + + Additional keyword parameters that can be specified: + heartbeat - heartbeat interval in seconds + port - TCP port to use. Default is 5672 + """ + + self.heartbeat = kwargs.pop('heartbeat', None) + self.port = kwargs.pop('port', 5672) + + if len(kwargs) > 0: + # Prepare arguments for amqp.connection.Connection + self.host = kwargs['host'] + self.user = kwargs['user'] + self.password = kwargs['password'] + self.virtual_host = kwargs.get('virtual_host', '/') + elif len(args) == 3: + self.host, self.user, self.password = args + self.virtual_host = '/' + elif len(args) == 4: + self.host, self.user, self.password, self.virtual_host = args + else: + raise NotImplementedError #todo implement this + + def __str__(self): + return six.text_type(b'amqp://%s:%s@%s/%s'.encode('utf8') % (self.host, self.port, self.user, self.virtual_host)) \ No newline at end of file diff --git a/tests/run.py b/tests/run.py index 74469db..7fcfe57 100644 --- a/tests/run.py +++ b/tests/run.py @@ -2,8 +2,7 @@ from __future__ import absolute_import, division, print_function from coolamqp.uplink import ListenerThread import time, logging, threading -from coolamqp.objects import Message, MessageProperties -from coolamqp.connection import NodeDefinition +from coolamqp.objects import Message, MessageProperties, NodeDefinition from coolamqp.uplink import Connection from coolamqp.attaches import Consumer, Publisher, MODE_NOACK, MODE_CNPUB diff --git a/tests/test_connection/test_state.py b/tests/test_connection/test_state.py index 5b9cd51..2d36b98 100644 --- a/tests/test_connection/test_state.py +++ b/tests/test_connection/test_state.py @@ -4,7 +4,7 @@ import unittest from coolamqp.connection.state import Broker -from coolamqp.connection import NodeDefinition +from coolamqp.objects import NodeDefinition from coolamqp.uplink import ListenerThread, Connection, Handshaker import socket import time -- GitLab