Skip to content
Snippets Groups Projects
Commit 7d9ee2ac authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

v0.105

parent 81ad0a6b
No related branches found
No related tags found
No related merge requests found
# v0.105:
* _TBA_
* listener thread will be prctl()ed if [prctl](https://pypi.org/project/python-prctl/) is installed
* extra attribute for Cluster
# v0.104:
......
# coding=UTF-8
__version__ = '0.105a1'
__version__ = '0.105'
......@@ -46,13 +46,14 @@ class Cluster(object):
:param log_frames: an object that will have it's method .on_frame(timestamp,
frame, direction) called upon receiving/sending a frame. Timestamp is UNIX timestamp,
frame is AMQPFrame, direction is one of 'to_client', 'to_server'
:param name: name to appear in log items and prctl() for the listener thread
"""
# Events you can be informed about
ST_LINK_LOST = 0 # Link has been lost
ST_LINK_REGAINED = 1 # Link has been regained
def __init__(self, nodes, on_fail=None, extra_properties=None, log_frames=None):
def __init__(self, nodes, on_fail=None, extra_properties=None, log_frames=None, name=None):
from coolamqp.objects import NodeDefinition
if isinstance(nodes, NodeDefinition):
nodes = [nodes]
......@@ -60,6 +61,7 @@ class Cluster(object):
if len(nodes) > 1:
raise NotImplementedError(u'Multiple nodes not supported yet')
self.name = name or 'CoolAMQP'
self.node, = nodes
self.extra_properties = extra_properties
self.log_frames = log_frames
......@@ -204,9 +206,9 @@ class Cluster(object):
except AttributeError:
pass
else:
raise RuntimeError(u'This was already called!')
raise RuntimeError(u'[%s] This was already called!' % (self.name, ))
self.listener = ListenerThread()
self.listener = ListenerThread(name=self.name)
self.attache_group = AttacheGroup()
......@@ -214,7 +216,7 @@ class Cluster(object):
self.snr = SingleNodeReconnector(self.node, self.attache_group,
self.listener, self.extra_properties,
log_frames)
log_frames, self.name)
self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost()))
if self.on_fail is not None:
self.snr.on_fail.add(self.on_fail)
......@@ -238,7 +240,7 @@ class Cluster(object):
while not self.attache_group.is_online() and monotonic.monotonic() - start_at < timeout:
time.sleep(0.1)
if not self.attache_group.is_online():
raise ConnectionDead('Could not connect within %s seconds' % (timeout,))
raise ConnectionDead('[%s] Could not connect within %s seconds' % (self.name, timeout,))
def shutdown(self, wait=True): # type: (bool) -> None
"""
......@@ -253,7 +255,7 @@ class Cluster(object):
except AttributeError:
raise RuntimeError(u'shutdown without start')
logger.info('Commencing shutdown')
logger.info('[%s] Commencing shutdown', self.name)
self.listener.terminate()
if wait:
......
......@@ -15,13 +15,14 @@ class SingleNodeReconnector(object):
"""
def __init__(self, node_def, attache_group, listener_thread, extra_properties=None,
log_frames=None):
log_frames=None, name=None):
self.listener_thread = listener_thread
self.node_def = node_def
self.attache_group = attache_group
self.connection = None
self.extra_properties = extra_properties
self.log_frames = log_frames
self.name = name or 'CoolAMQP'
self.terminating = False
......@@ -38,7 +39,8 @@ class SingleNodeReconnector(object):
# Initiate connecting - this order is very important!
self.connection = Connection(self.node_def, self.listener_thread,
extra_properties=self.extra_properties,
log_frames=self.log_frames)
log_frames=self.log_frames,
name=self.name)
self.attache_group.attach(self.connection)
self.connection.start(timeout)
self.connection.finalize.add(self.on_fail)
......
......@@ -82,7 +82,8 @@ class Connection(object):
def __init__(self, node_definition, # type: coolamqp.objects.NodeDefinition
listener_thread, extra_properties, # type: tp.Dict[bytes, tp.Tuple[tp.Any, str]]
log_frames=None
log_frames=None,
name=None
):
"""
Create an object that links to an AMQP broker.
......@@ -98,6 +99,7 @@ class Connection(object):
self.listener_thread = listener_thread
self.node_definition = node_definition
self.uuid = uuid.uuid4().hex[:5]
self.name = name or 'CoolAMQP'
self.recvf = ReceivingFramer(self.on_frame)
self.extra_properties = extra_properties
# todo a list doesn't seem like a very strong atomicity guarantee
......@@ -141,7 +143,7 @@ class Connection(object):
def on_connected(self):
"""Called by handshaker upon reception of final connection.open-ok"""
logger.info('[%s] Connection ready.', self.uuid)
logger.info('[%s] Connection ready.', self.name)
self.state = ST_ONLINE
......@@ -169,7 +171,7 @@ class Connection(object):
else:
break
logger.debug('[%s] TCP connection established, authentication in progress', self.uuid)
logger.debug('[%s] TCP connection established, authentication in progress', self.name)
sock.settimeout(0)
header = bytearray(b'AMQP\x00\x00\x09\x01')
......@@ -206,7 +208,7 @@ class Connection(object):
and second time from ListenerThread when socket is disposed of
Therefore we need to make sure callbacks are called EXACTLY once
"""
logger.info('Connection lost')
logger.info('[%s] Connection lost', self.name)
self.state = ST_OFFLINE # Update state
......@@ -238,7 +240,7 @@ class Connection(object):
if isinstance(payload, ConnectionClose):
self.send([AMQPMethodFrame(0, ConnectionCloseOk())])
logger.info(u'[%s] Broker closed our connection - code %s reason %s',
self.uuid,
self.name,
payload.reply_code,
payload.reply_text.tobytes().decode('utf8'))
......@@ -312,9 +314,9 @@ class Connection(object):
if not watch_handled:
if isinstance(frame, AMQPMethodFrame):
logger.warning('[%s] Unhandled method frame %s', self.uuid, repr(frame.payload))
logger.warning('[%s] Unhandled method frame %s', self.name, repr(frame.payload))
else:
logger.warning('[%s] Unhandled frame %s', self.uuid, frame)
logger.warning('[%s] Unhandled frame %s', self.name, frame)
def watchdog(self, delay, callback):
"""
......
......@@ -14,9 +14,10 @@ class ListenerThread(threading.Thread):
It automatically picks the best listener for given platform.
"""
def __init__(self):
def __init__(self, name=None):
threading.Thread.__init__(self, name='coolamqp/ListenerThread')
self.daemon = True
self.name = name or 'CoolAMQP'
self.terminating = False
self._call_next_io_event = Callable(oneshots=True)
......@@ -42,6 +43,13 @@ class ListenerThread(threading.Thread):
self.listener.activate(sock)
def run(self):
try:
import prctl
except ImportError:
pass
else:
prctl.set_name(self.name+' - AMQP listener thread')
while not self.terminating:
self.listener.wait(timeout=1)
......
......@@ -27,11 +27,16 @@ accepts a list of nodes:
::
from coolamqp.clustering import Cluster
cluster = Cluster([node])
cluster = Cluster([node], name='My Cluster')
cluster.start(wait=True)
*wait=True* will block until connection is completed. After this, you can use other methods.
*name* is optional. If you specify it, and have prctl_ installed, the thread will
receive a provided label, postfixed by **AMQP listener thread**.
.. _prctl: https://pypi.org/project/python-prctl/
.. autoclass:: coolamqp.clustering.Cluster
:members:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment