From aa5ce7b5ab6dc1d96765c4cb00168b16da9b147a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Fri, 27 Dec 2019 15:19:36 +0100 Subject: [PATCH] Feature/add capability to add sth to client properties 32 (#41) * fixed #32 * fixed #32 * fixed #32 --- coolamqp/clustering/cluster.py | 7 +++++-- coolamqp/clustering/single.py | 5 +++-- coolamqp/uplink/connection/connection.py | 8 +++++--- coolamqp/uplink/handshake.py | 9 +++++++-- tests/utils.py | 4 +++- 5 files changed, 23 insertions(+), 10 deletions(-) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 57f16a7..9a508df 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -38,13 +38,15 @@ class Cluster(object): ST_LINK_LOST = 0 # Link has been lost ST_LINK_REGAINED = 1 # Link has been regained - def __init__(self, nodes, on_fail=None): + def __init__(self, nodes, on_fail=None, extra_properties=None): """ :param nodes: list of nodes, or a single node. For now, only one is supported. :type nodes: NodeDefinition instance or a list of NodeDefinition instances :param on_fail: callable/0 to call when connection fails in an unclean way. This is a one-shot :type on_fail: callable/0 + :param extra_properties: refer to documentation in [/coolamqp/connection/connection.py] + Connection.__init__ """ from coolamqp.objects import NodeDefinition if isinstance(nodes, NodeDefinition): @@ -54,6 +56,7 @@ class Cluster(object): raise NotImplementedError(u'Multiple nodes not supported yet') self.node, = nodes + self.extra_properties = extra_properties if on_fail is not None: def decorated(): @@ -194,7 +197,7 @@ class Cluster(object): self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* self.snr = SingleNodeReconnector(self.node, self.attache_group, - self.listener) + self.listener, self.extra_properties) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) if self.on_fail is not None: self.snr.on_fail.add(self.on_fail) diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index bb5b752..656b886 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -14,11 +14,12 @@ class SingleNodeReconnector(object): Connection to one node. It will do it's best to remain alive. """ - def __init__(self, node_def, attache_group, listener_thread): + def __init__(self, node_def, attache_group, listener_thread, extra_properties=None): self.listener_thread = listener_thread self.node_def = node_def self.attache_group = attache_group self.connection = None + self.extra_properties = extra_properties self.terminating = False @@ -33,7 +34,7 @@ class SingleNodeReconnector(object): assert self.connection is None # Initiate connecting - this order is very important! - self.connection = Connection(self.node_def, self.listener_thread) + self.connection = Connection(self.node_def, self.listener_thread, self.extra_properties) self.attache_group.attach(self.connection) self.connection.start(timeout) self.connection.finalize.add(self.on_fail) diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index fbecd1e..95c742b 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -78,7 +78,7 @@ class Connection(object): This logger is talkative mostly on INFO, and regarding connection state """ - def __init__(self, node_definition, listener_thread): + def __init__(self, node_definition, listener_thread, extra_properties): """ Create an object that links to an AMQP broker. @@ -86,12 +86,14 @@ class Connection(object): :param node_definition: NodeDefinition instance to use :param listener_thread: ListenerThread to use as async engine + :param extra_properties: extra properties to send to the target server + must conform to the syntax given in (/coolamqp/uplink/handshake.py)'s CLIENT_PROPERTIES """ self.listener_thread = listener_thread self.node_definition = node_definition self.uuid = uuid.uuid4().hex[:5] self.recvf = ReceivingFramer(self.on_frame) - + self.extra_properties = extra_properties # todo a list doesn't seem like a very strong atomicity guarantee self.watches = {} # channel => list of [Watch instance] self.any_watches = [] # list of Watches that should check everything @@ -170,7 +172,7 @@ class Connection(object): self.watch_for_method(0, (ConnectionClose, ConnectionCloseOk), self.on_connection_close) - Handshaker(self, self.node_definition, self.on_connected) + Handshaker(self, self.node_definition, self.on_connected, self.extra_properties) def on_fail(self): """ diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index f1829f5..ad8b582 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -5,6 +5,7 @@ from __future__ import absolute_import, division, print_function Provides reactors that can authenticate an AQMP session """ import six +import copy from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \ ConnectionTune, ConnectionTuneOk, ConnectionOpen, ConnectionOpenOk from coolamqp.framing.frames import AMQPMethodFrame @@ -23,7 +24,7 @@ CLIENT_DATA = [ # because RabbitMQ is some kind of a fascist and does not allow # these fields to be of type short-string (b'product', (b'CoolAMQP', 'S')), - (b'version', (b'0.91', 'S')), + (b'version', (b'0.96', 'S')), (b'copyright', (b'Copyright (C) 2016-2017 DMS Serwis', 'S')), ( b'information', ( @@ -41,7 +42,7 @@ class Handshaker(object): Object that given a connection rolls the handshake. """ - def __init__(self, connection, node_definition, on_success): + def __init__(self, connection, node_definition, on_success, extra_properties=None): """ :param connection: Connection instance to use :type node_definition: NodeDefinition @@ -57,6 +58,7 @@ class Handshaker(object): # Callbacks self.on_success = on_success + self.EXTRA_PROPERTIES = extra_properties or [] # Called by internal setup def on_watchdog(self): @@ -90,6 +92,9 @@ class Handshaker(object): self.connection.watchdog(WATCHDOG_TIMEOUT, self.on_watchdog) self.connection.watch_for_method(0, ConnectionTune, self.on_connection_tune) + global CLIENT_DATA + CLIENT_DATA = copy.copy(CLIENT_DATA) + CLIENT_DATA.extend(self.EXTRA_PROPERTIES) self.connection.send([ AMQPMethodFrame(0, ConnectionStartOk(CLIENT_DATA, b'PLAIN', diff --git a/tests/utils.py b/tests/utils.py index 17481f7..a311e14 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -12,7 +12,9 @@ from coolamqp.backends.base import AMQPBackend, ConnectionFailedError def getamqp(): - amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) + amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')], extra_properties=[ + (b'mode', (b'Testing', 'S')), + ]) amqp.start() return amqp -- GitLab