From 0150dc737996ec3d1b464316bc918c88d9da23c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Wed, 31 Mar 2021 18:52:18 +0200 Subject: [PATCH] fix frame tracing --- CHANGELOG.md | 2 + coolamqp/__init__.py | 2 +- coolamqp/clustering/cluster.py | 7 +-- coolamqp/objects.py | 20 --------- coolamqp/tracing.py | 54 ++++++++++++++++++++++++ docs/caveats.rst | 2 +- docs/index.rst | 1 + docs/tracing.rst | 38 +++++++++++++++++ docs/tutorial.rst | 13 +++--- tests/test_clustering/test_log_frames.py | 14 ++---- 10 files changed, 111 insertions(+), 42 deletions(-) create mode 100644 coolamqp/tracing.py create mode 100644 docs/tracing.rst diff --git a/CHANGELOG.md b/CHANGELOG.md index 3512f63..4ae2210 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,3 +3,5 @@ on GitHub. This file serves to only note what changes have been made so far, between releases. # v1.2.12 + +* added `LoggingFrameTracer` and `HoldingFrameTracer` diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 494d006..5302b4a 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1 @@ -__version__ = '1.2.12_a1' +__version__ = '1.2.12' diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 3070cd0..2ef2d96 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -15,7 +15,7 @@ from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ NothingMuch, Event from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.exceptions import ConnectionDead -from coolamqp.objects import Exchange, Message, Queue, FrameLogger, QueueBind +from coolamqp.objects import Exchange, Message, Queue, QueueBind from coolamqp.uplink import ListenerThread from coolamqp.utils import monotonic @@ -42,6 +42,7 @@ class Cluster(object): Connection.__init__ :param log_frames: an object that supports logging each and every frame CoolAMQP sends and receives from the broker + :type log_frames: tp.Optional[:class:`coolamqp.tracing.BaseFrameTracer`] :param name: name to appear in log items and prctl() for the listener thread :param on_blocked: callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be called with a value of True if connection becomes blocked, and False upon an unblock @@ -56,7 +57,7 @@ class Cluster(object): on_fail=None, # type: tp.Optional[tp.Callable[[], None]] extra_properties=None, # type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]] - log_frames=None, # type: tp.Optional[FrameLogger] + log_frames=None, name=None, # type: tp.Optional[str] on_blocked=None, # type: tp.Callable[[bool], None], tracer=None # type: opentracing.Traccer @@ -79,7 +80,7 @@ class Cluster(object): self.name = name or 'CoolAMQP' # type: str self.node, = nodes # type: NodeDefinition self.extra_properties = extra_properties - self.log_frames = log_frames # type: tp.Optional[FrameLogger] + self.log_frames = log_frames self.on_blocked = on_blocked # type: tp.Optional[tp.Callable[[bool], None]] self.connected = False # type: bool self.listener = None # type: BaseListener diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 1311d05..65d9db0 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -16,11 +16,6 @@ logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() -try: - Protocol = tp.Protocol -except AttributeError: - Protocol = object - def toutf8(q): if isinstance(q, six.binary_type): @@ -34,21 +29,6 @@ def tobytes(q): return q -class FrameLogger(Protocol): - def on_frame(self, timestamp, # type: float - frame, # type: AMQPFrame - direction # type: str - ): - """ - Log a frame - - :param timestamp: timestamp in seconds since Unix Epoch - :param frame: AMQPFrame to parse - :param direction: either 'to_client' when this is frame received from the broker, or - 'to_server' if it's a frame that's being sent to the broker - """ - - class Callable(object): """ Add a bunch of callables to one list, and just invoke'm. diff --git a/coolamqp/tracing.py b/coolamqp/tracing.py new file mode 100644 index 0000000..e06a16c --- /dev/null +++ b/coolamqp/tracing.py @@ -0,0 +1,54 @@ +import logging + + +class BaseFrameTracer(object): + """An abstract do-nothing frame tracer""" + + def on_frame(self, timestamp, frame, direction): + """ + Called by AMQP upon receiving a frame information + + :param timestamp: timestamp + :type timestamp: float + :param frame: frame that is sent or received + :type frame: :class:`coolamqp.framing.base.AMQPFrame` + :param direction: either 'to_client' or 'to_server' + :type direction: str + """ + + +class LoggingFrameTracer(BaseFrameTracer): + """ + A frame tracer that outputs each frame to log + + :param logger: the logger to log onto + :param log_level: the level of logging to log with + """ + def __init__(self, logger, log_level=logging.WARNING): + self.logger = logger + self.log_level = log_level + + def on_frame(self, timestamp, frame, direction): + if direction == 'to_client': + self.logger.log(self.log_level, 'RECEIVED %s', frame.payload) + else: + self.logger.log(self.log_level, 'SENT %s type %s', frame, type(frame)) + + +class HoldingFrameTracer(BaseFrameTracer): + """ + A frame tracer that holds the frames in memory + + :ivar frames: a list of tuple (direction:str (either 'to_client' or 'to_server'), + timestamp::float, + frame:: :class:`~coolamqp.framing.base.AMQPFrame`) + """ + def __init__(self): + self.frames = [] + + def on_frame(self, timestamp, frame, direction): + self.frames.append((direction, timestamp, frame)) + + def clear(self): + """Clear internal frame list""" + self.frames = [] diff --git a/docs/caveats.rst b/docs/caveats.rst index 7761bdd..c6df2f3 100644 --- a/docs/caveats.rst +++ b/docs/caveats.rst @@ -10,7 +10,7 @@ Since CoolAMQP tries to be fast, it uses memoryviews everywhere. **ReceivedMessa properties therefore, are memoryviews. So, it you wanted to read the routing key a message was sent with, or message's encoding, you should do: -:: +.. code-block:: python received_msg.routing_key.to_bytes() received_msg.properties.content_encoding.to_bytes() diff --git a/docs/index.rst b/docs/index.rst index 0a6c28a..464423d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -10,6 +10,7 @@ Welcome to CoolAMQP's documentation! caveats frames basics + tracing Indices and tables ================== diff --git a/docs/tracing.rst b/docs/tracing.rst new file mode 100644 index 0000000..193f933 --- /dev/null +++ b/docs/tracing.rst @@ -0,0 +1,38 @@ +Frame tracing +============= + +CoolAMQP allows you to trace every sent or received frame. Just provide an instance of + +.. autoclass:: coolamqp.tracing.BaseFrameTracer + :members: + + +LoggingFrameTracer +~~~~~~~~~~~~~~~~~~ + +To show each frame that is sent or received to the server use the following: + +.. code-block:: python + + import logging + + logger = logging.getLogger(__name__) + + from coolamqp.tracing import LoggingFrameTracer + + frame_tracer = LoggingFrameTracer(logger, logging.WARNING) + + cluster = Cluster([NODE], log_frames=frame_logger) + cluster.start() + + +Documentation of the class: + +.. autoclass:: coolamqp.tracing.LoggingFrameTracer + :members: + +HoldingFrameTracer +~~~~~~~~~~~~~~~~~~ + +.. autoclass:: coolamqp.tracing.HoldingFrameTracer + :members: diff --git a/docs/tutorial.rst b/docs/tutorial.rst index a709174..7ca13d8 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -1,4 +1,3 @@ -======== Tutorial ======== @@ -15,7 +14,7 @@ the AMQP connection string. .. autoclass:: coolamqp.objects.NodeDefinition :members: -:: +.. code-block:: python from coolamqp.objects import NodeDefinition @@ -24,7 +23,7 @@ the AMQP connection string. Cluster instances are used to interface with the cluster (or a single broker). It accepts a list of nodes: -:: +.. code-block:: python from coolamqp.clustering import Cluster cluster = Cluster([node], name='My Cluster') @@ -47,7 +46,7 @@ Publishing and consuming Connecting is boring. After we do, we want to do something! Let's try sending a message, and receiving it. To do that, you must first define a queue, and register a consumer. -:: +.. code-block:: python from coolamqp.objects import Queue @@ -57,7 +56,7 @@ you must first define a queue, and register a consumer. consume_confirm.result() # wait for consuming to start This will create an auto-delete and exclusive queue. After than, a consumer will be registered for this queue. -_no_ack=False_ will mean that we have to manually confirm messages. +_no_ack=False_ will mean that we have to manually confirm messages. You can specify a callback, that will be called with a message if one's received by this consumer. Since we did not do that, this will go to a generic queue belonging to _Cluster_. @@ -68,7 +67,7 @@ when AMQP _basic.consume-ok_ is received. To send a message we need to construct it first, and later publish: -:: +.. code-block:: python from coolamqp.objects import Message @@ -84,7 +83,7 @@ nor decode, and will always expect and return bytes. To actually get our message, we need to start a consumer first. To do that, just invoke: -:: +.. code-block:: python cons, fut = cluster.consume(Queue('name of the queue'), **kwargs) diff --git a/tests/test_clustering/test_log_frames.py b/tests/test_clustering/test_log_frames.py index 5a8c6ff..08f8d8f 100644 --- a/tests/test_clustering/test_log_frames.py +++ b/tests/test_clustering/test_log_frames.py @@ -8,7 +8,8 @@ import time import unittest from coolamqp.clustering import Cluster -from coolamqp.objects import NodeDefinition, FrameLogger +from coolamqp.objects import NodeDefinition +from coolamqp.tracing import HoldingFrameTracer NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) @@ -16,17 +17,10 @@ logging.basicConfig(level=logging.DEBUG) class TestLogFrames(unittest.TestCase): def test_log_frames_works(self): - class LogFrames(FrameLogger): - def __init__(self): - self.received_frames = 0 - - def on_frame(self, timestamp, frame, direction): - self.received_frames += 1 - - frame_logger = LogFrames() + frame_logger = HoldingFrameTracer() self.c = Cluster([NODE], log_frames=frame_logger) self.c.start() - self.assertGreaterEqual(frame_logger.received_frames, 3) + self.assertGreaterEqual(len(frame_logger.frames), 3) def tearDown(self): self.c.shutdown() -- GitLab