diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 57f16a75153a6e76c8079d4aac14c492599ca1a8..9a508df0c2b63210733bd94af88a655e144c801f 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -38,13 +38,15 @@ class Cluster(object): ST_LINK_LOST = 0 # Link has been lost ST_LINK_REGAINED = 1 # Link has been regained - def __init__(self, nodes, on_fail=None): + def __init__(self, nodes, on_fail=None, extra_properties=None): """ :param nodes: list of nodes, or a single node. For now, only one is supported. :type nodes: NodeDefinition instance or a list of NodeDefinition instances :param on_fail: callable/0 to call when connection fails in an unclean way. This is a one-shot :type on_fail: callable/0 + :param extra_properties: refer to documentation in [/coolamqp/connection/connection.py] + Connection.__init__ """ from coolamqp.objects import NodeDefinition if isinstance(nodes, NodeDefinition): @@ -54,6 +56,7 @@ class Cluster(object): raise NotImplementedError(u'Multiple nodes not supported yet') self.node, = nodes + self.extra_properties = extra_properties if on_fail is not None: def decorated(): @@ -194,7 +197,7 @@ class Cluster(object): self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* self.snr = SingleNodeReconnector(self.node, self.attache_group, - self.listener) + self.listener, self.extra_properties) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) if self.on_fail is not None: self.snr.on_fail.add(self.on_fail) diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index bb5b752914a5d0cda645536cc8eb318d595075df..656b8869f5290805e36466074579a1a05df43a34 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -14,11 +14,12 @@ class SingleNodeReconnector(object): Connection to one node. It will do it's best to remain alive. """ - def __init__(self, node_def, attache_group, listener_thread): + def __init__(self, node_def, attache_group, listener_thread, extra_properties=None): self.listener_thread = listener_thread self.node_def = node_def self.attache_group = attache_group self.connection = None + self.extra_properties = extra_properties self.terminating = False @@ -33,7 +34,7 @@ class SingleNodeReconnector(object): assert self.connection is None # Initiate connecting - this order is very important! - self.connection = Connection(self.node_def, self.listener_thread) + self.connection = Connection(self.node_def, self.listener_thread, self.extra_properties) self.attache_group.attach(self.connection) self.connection.start(timeout) self.connection.finalize.add(self.on_fail) diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index fbecd1e2700d82f890eff2fe532d44798986fa5d..95c742b0aba2f3a097b49a19b9f365be72d49148 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -78,7 +78,7 @@ class Connection(object): This logger is talkative mostly on INFO, and regarding connection state """ - def __init__(self, node_definition, listener_thread): + def __init__(self, node_definition, listener_thread, extra_properties): """ Create an object that links to an AMQP broker. @@ -86,12 +86,14 @@ class Connection(object): :param node_definition: NodeDefinition instance to use :param listener_thread: ListenerThread to use as async engine + :param extra_properties: extra properties to send to the target server + must conform to the syntax given in (/coolamqp/uplink/handshake.py)'s CLIENT_PROPERTIES """ self.listener_thread = listener_thread self.node_definition = node_definition self.uuid = uuid.uuid4().hex[:5] self.recvf = ReceivingFramer(self.on_frame) - + self.extra_properties = extra_properties # todo a list doesn't seem like a very strong atomicity guarantee self.watches = {} # channel => list of [Watch instance] self.any_watches = [] # list of Watches that should check everything @@ -170,7 +172,7 @@ class Connection(object): self.watch_for_method(0, (ConnectionClose, ConnectionCloseOk), self.on_connection_close) - Handshaker(self, self.node_definition, self.on_connected) + Handshaker(self, self.node_definition, self.on_connected, self.extra_properties) def on_fail(self): """ diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index c62d727b49fe8be05596fb3153f8cb3d1273ee81..ad8b582b1b9e84906491106b2db8ed2616cfb485 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -5,6 +5,7 @@ from __future__ import absolute_import, division, print_function Provides reactors that can authenticate an AQMP session """ import six +import copy from coolamqp.framing.definitions import ConnectionStart, ConnectionStartOk, \ ConnectionTune, ConnectionTuneOk, ConnectionOpen, ConnectionOpenOk from coolamqp.framing.frames import AMQPMethodFrame @@ -41,7 +42,7 @@ class Handshaker(object): Object that given a connection rolls the handshake. """ - def __init__(self, connection, node_definition, on_success): + def __init__(self, connection, node_definition, on_success, extra_properties=None): """ :param connection: Connection instance to use :type node_definition: NodeDefinition @@ -57,6 +58,7 @@ class Handshaker(object): # Callbacks self.on_success = on_success + self.EXTRA_PROPERTIES = extra_properties or [] # Called by internal setup def on_watchdog(self): @@ -90,6 +92,9 @@ class Handshaker(object): self.connection.watchdog(WATCHDOG_TIMEOUT, self.on_watchdog) self.connection.watch_for_method(0, ConnectionTune, self.on_connection_tune) + global CLIENT_DATA + CLIENT_DATA = copy.copy(CLIENT_DATA) + CLIENT_DATA.extend(self.EXTRA_PROPERTIES) self.connection.send([ AMQPMethodFrame(0, ConnectionStartOk(CLIENT_DATA, b'PLAIN', diff --git a/tests/utils.py b/tests/utils.py index 17481f70a2f76883c6ca007f735b1f8d409af23f..a311e1404488f411a1e324fcbe0991b650339c50 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -12,7 +12,9 @@ from coolamqp.backends.base import AMQPBackend, ConnectionFailedError def getamqp(): - amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')]) + amqp = Cluster([ClusterNode('127.0.0.1', 'guest', 'guest')], extra_properties=[ + (b'mode', (b'Testing', 'S')), + ]) amqp.start() return amqp