diff --git a/CHANGELOG.md b/CHANGELOG.md index e42dba89dec2cc3c955aae4e558147ccd4528678..66c2d01c34c0d843d9f1346dd189c00fdaf883a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # v0.105: -* _TBA_ +* listener thread will be prctl()ed if [prctl](https://pypi.org/project/python-prctl/) is installed +* extra attribute for Cluster # v0.104: diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 84e3f55e1422a3dfa47b0856bae9e75aed5bd457..96e99369f17679c5512310d99bdfbf3c3bf8b447 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1,2 +1,2 @@ # coding=UTF-8 -__version__ = '0.105a1' +__version__ = '0.105' diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index be6fae14912d06ee582648531cff3eb560ec037d..07e6fa85f51f6d6d65e685ace39bb5a3be04d878 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -46,13 +46,14 @@ class Cluster(object): :param log_frames: an object that will have it's method .on_frame(timestamp, frame, direction) called upon receiving/sending a frame. Timestamp is UNIX timestamp, frame is AMQPFrame, direction is one of 'to_client', 'to_server' + :param name: name to appear in log items and prctl() for the listener thread """ # Events you can be informed about ST_LINK_LOST = 0 # Link has been lost ST_LINK_REGAINED = 1 # Link has been regained - def __init__(self, nodes, on_fail=None, extra_properties=None, log_frames=None): + def __init__(self, nodes, on_fail=None, extra_properties=None, log_frames=None, name=None): from coolamqp.objects import NodeDefinition if isinstance(nodes, NodeDefinition): nodes = [nodes] @@ -60,6 +61,7 @@ class Cluster(object): if len(nodes) > 1: raise NotImplementedError(u'Multiple nodes not supported yet') + self.name = name or 'CoolAMQP' self.node, = nodes self.extra_properties = extra_properties self.log_frames = log_frames @@ -204,9 +206,9 @@ class Cluster(object): except AttributeError: pass else: - raise RuntimeError(u'This was already called!') + raise RuntimeError(u'[%s] This was already called!' % (self.name, )) - self.listener = ListenerThread() + self.listener = ListenerThread(name=self.name) self.attache_group = AttacheGroup() @@ -214,7 +216,7 @@ class Cluster(object): self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener, self.extra_properties, - log_frames) + log_frames, self.name) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) if self.on_fail is not None: self.snr.on_fail.add(self.on_fail) @@ -238,7 +240,7 @@ class Cluster(object): while not self.attache_group.is_online() and monotonic.monotonic() - start_at < timeout: time.sleep(0.1) if not self.attache_group.is_online(): - raise ConnectionDead('Could not connect within %s seconds' % (timeout,)) + raise ConnectionDead('[%s] Could not connect within %s seconds' % (self.name, timeout,)) def shutdown(self, wait=True): # type: (bool) -> None """ @@ -253,7 +255,7 @@ class Cluster(object): except AttributeError: raise RuntimeError(u'shutdown without start') - logger.info('Commencing shutdown') + logger.info('[%s] Commencing shutdown', self.name) self.listener.terminate() if wait: diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index f86ae2e19a0d0a5ce0fab12d4c41891626132e33..e66c35e16c68f5090694d0280960da7137ceeeaa 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -15,13 +15,14 @@ class SingleNodeReconnector(object): """ def __init__(self, node_def, attache_group, listener_thread, extra_properties=None, - log_frames=None): + log_frames=None, name=None): self.listener_thread = listener_thread self.node_def = node_def self.attache_group = attache_group self.connection = None self.extra_properties = extra_properties self.log_frames = log_frames + self.name = name or 'CoolAMQP' self.terminating = False @@ -38,7 +39,8 @@ class SingleNodeReconnector(object): # Initiate connecting - this order is very important! self.connection = Connection(self.node_def, self.listener_thread, extra_properties=self.extra_properties, - log_frames=self.log_frames) + log_frames=self.log_frames, + name=self.name) self.attache_group.attach(self.connection) self.connection.start(timeout) self.connection.finalize.add(self.on_fail) diff --git a/coolamqp/uplink/connection/connection.py b/coolamqp/uplink/connection/connection.py index c4b05d781c327da29da4536fbe9ec8d5f8dd6e3c..bb67d25d2263c93cacfcc4e68ca54df378909a9e 100644 --- a/coolamqp/uplink/connection/connection.py +++ b/coolamqp/uplink/connection/connection.py @@ -82,7 +82,8 @@ class Connection(object): def __init__(self, node_definition, # type: coolamqp.objects.NodeDefinition listener_thread, extra_properties, # type: tp.Dict[bytes, tp.Tuple[tp.Any, str]] - log_frames=None + log_frames=None, + name=None ): """ Create an object that links to an AMQP broker. @@ -98,6 +99,7 @@ class Connection(object): self.listener_thread = listener_thread self.node_definition = node_definition self.uuid = uuid.uuid4().hex[:5] + self.name = name or 'CoolAMQP' self.recvf = ReceivingFramer(self.on_frame) self.extra_properties = extra_properties # todo a list doesn't seem like a very strong atomicity guarantee @@ -141,7 +143,7 @@ class Connection(object): def on_connected(self): """Called by handshaker upon reception of final connection.open-ok""" - logger.info('[%s] Connection ready.', self.uuid) + logger.info('[%s] Connection ready.', self.name) self.state = ST_ONLINE @@ -169,7 +171,7 @@ class Connection(object): else: break - logger.debug('[%s] TCP connection established, authentication in progress', self.uuid) + logger.debug('[%s] TCP connection established, authentication in progress', self.name) sock.settimeout(0) header = bytearray(b'AMQP\x00\x00\x09\x01') @@ -206,7 +208,7 @@ class Connection(object): and second time from ListenerThread when socket is disposed of Therefore we need to make sure callbacks are called EXACTLY once """ - logger.info('Connection lost') + logger.info('[%s] Connection lost', self.name) self.state = ST_OFFLINE # Update state @@ -238,7 +240,7 @@ class Connection(object): if isinstance(payload, ConnectionClose): self.send([AMQPMethodFrame(0, ConnectionCloseOk())]) logger.info(u'[%s] Broker closed our connection - code %s reason %s', - self.uuid, + self.name, payload.reply_code, payload.reply_text.tobytes().decode('utf8')) @@ -312,9 +314,9 @@ class Connection(object): if not watch_handled: if isinstance(frame, AMQPMethodFrame): - logger.warning('[%s] Unhandled method frame %s', self.uuid, repr(frame.payload)) + logger.warning('[%s] Unhandled method frame %s', self.name, repr(frame.payload)) else: - logger.warning('[%s] Unhandled frame %s', self.uuid, frame) + logger.warning('[%s] Unhandled frame %s', self.name, frame) def watchdog(self, delay, callback): """ diff --git a/coolamqp/uplink/listener/thread.py b/coolamqp/uplink/listener/thread.py index c7cad123889304e581e72449e8c56771d958d5b0..10ec8f856275c2deb0ec51243d791c9253132a25 100644 --- a/coolamqp/uplink/listener/thread.py +++ b/coolamqp/uplink/listener/thread.py @@ -14,9 +14,10 @@ class ListenerThread(threading.Thread): It automatically picks the best listener for given platform. """ - def __init__(self): + def __init__(self, name=None): threading.Thread.__init__(self, name='coolamqp/ListenerThread') self.daemon = True + self.name = name or 'CoolAMQP' self.terminating = False self._call_next_io_event = Callable(oneshots=True) @@ -42,6 +43,13 @@ class ListenerThread(threading.Thread): self.listener.activate(sock) def run(self): + try: + import prctl + except ImportError: + pass + else: + prctl.set_name(self.name+' - AMQP listener thread') + while not self.terminating: self.listener.wait(timeout=1) diff --git a/docs/tutorial.rst b/docs/tutorial.rst index 3961727741ca2c1adcd02b8f1c7b61345b6c7573..a7091744d7ea1f21c4f0616baa3293c121b8734d 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -27,11 +27,16 @@ accepts a list of nodes: :: from coolamqp.clustering import Cluster - cluster = Cluster([node]) + cluster = Cluster([node], name='My Cluster') cluster.start(wait=True) *wait=True* will block until connection is completed. After this, you can use other methods. +*name* is optional. If you specify it, and have prctl_ installed, the thread will +receive a provided label, postfixed by **AMQP listener thread**. + +.. _prctl: https://pypi.org/project/python-prctl/ + .. autoclass:: coolamqp.clustering.Cluster :members: