diff --git a/CHANGELOG.md b/CHANGELOG.md index 3512f63ad88c204a556ddd70eb637e6693996424..4ae2210d0a0296ba184a3fef402060ae06fe0cd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,3 +3,5 @@ on GitHub. This file serves to only note what changes have been made so far, between releases. # v1.2.12 + +* added `LoggingFrameTracer` and `HoldingFrameTracer` diff --git a/coolamqp/__init__.py b/coolamqp/__init__.py index 494d006696043e029ca4730f5b9b6ea2186eed4d..5302b4a03ef69d254d79adb922f49d09c7963e80 100644 --- a/coolamqp/__init__.py +++ b/coolamqp/__init__.py @@ -1 +1 @@ -__version__ = '1.2.12_a1' +__version__ = '1.2.12' diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 3070cd03becf4f6b97ab1a3afe280ad128f55ac2..2ef2d9655fba92ae4246463c455e9b223a2a272a 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -15,7 +15,7 @@ from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ NothingMuch, Event from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.exceptions import ConnectionDead -from coolamqp.objects import Exchange, Message, Queue, FrameLogger, QueueBind +from coolamqp.objects import Exchange, Message, Queue, QueueBind from coolamqp.uplink import ListenerThread from coolamqp.utils import monotonic @@ -42,6 +42,7 @@ class Cluster(object): Connection.__init__ :param log_frames: an object that supports logging each and every frame CoolAMQP sends and receives from the broker + :type log_frames: tp.Optional[:class:`coolamqp.tracing.BaseFrameTracer`] :param name: name to appear in log items and prctl() for the listener thread :param on_blocked: callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be called with a value of True if connection becomes blocked, and False upon an unblock @@ -56,7 +57,7 @@ class Cluster(object): on_fail=None, # type: tp.Optional[tp.Callable[[], None]] extra_properties=None, # type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]] - log_frames=None, # type: tp.Optional[FrameLogger] + log_frames=None, name=None, # type: tp.Optional[str] on_blocked=None, # type: tp.Callable[[bool], None], tracer=None # type: opentracing.Traccer @@ -79,7 +80,7 @@ class Cluster(object): self.name = name or 'CoolAMQP' # type: str self.node, = nodes # type: NodeDefinition self.extra_properties = extra_properties - self.log_frames = log_frames # type: tp.Optional[FrameLogger] + self.log_frames = log_frames self.on_blocked = on_blocked # type: tp.Optional[tp.Callable[[bool], None]] self.connected = False # type: bool self.listener = None # type: BaseListener diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 1311d050b03bb0dcc4167109bc1bb8b9b63fb451..65d9db0bc849e69d12fcd7cd89356db5a4131e15 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -16,11 +16,6 @@ logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() -try: - Protocol = tp.Protocol -except AttributeError: - Protocol = object - def toutf8(q): if isinstance(q, six.binary_type): @@ -34,21 +29,6 @@ def tobytes(q): return q -class FrameLogger(Protocol): - def on_frame(self, timestamp, # type: float - frame, # type: AMQPFrame - direction # type: str - ): - """ - Log a frame - - :param timestamp: timestamp in seconds since Unix Epoch - :param frame: AMQPFrame to parse - :param direction: either 'to_client' when this is frame received from the broker, or - 'to_server' if it's a frame that's being sent to the broker - """ - - class Callable(object): """ Add a bunch of callables to one list, and just invoke'm. diff --git a/coolamqp/tracing.py b/coolamqp/tracing.py new file mode 100644 index 0000000000000000000000000000000000000000..e06a16c7c9d1cb9a0369caa7e39294de5c22dfc2 --- /dev/null +++ b/coolamqp/tracing.py @@ -0,0 +1,54 @@ +import logging + + +class BaseFrameTracer(object): + """An abstract do-nothing frame tracer""" + + def on_frame(self, timestamp, frame, direction): + """ + Called by AMQP upon receiving a frame information + + :param timestamp: timestamp + :type timestamp: float + :param frame: frame that is sent or received + :type frame: :class:`coolamqp.framing.base.AMQPFrame` + :param direction: either 'to_client' or 'to_server' + :type direction: str + """ + + +class LoggingFrameTracer(BaseFrameTracer): + """ + A frame tracer that outputs each frame to log + + :param logger: the logger to log onto + :param log_level: the level of logging to log with + """ + def __init__(self, logger, log_level=logging.WARNING): + self.logger = logger + self.log_level = log_level + + def on_frame(self, timestamp, frame, direction): + if direction == 'to_client': + self.logger.log(self.log_level, 'RECEIVED %s', frame.payload) + else: + self.logger.log(self.log_level, 'SENT %s type %s', frame, type(frame)) + + +class HoldingFrameTracer(BaseFrameTracer): + """ + A frame tracer that holds the frames in memory + + :ivar frames: a list of tuple (direction:str (either 'to_client' or 'to_server'), + timestamp::float, + frame:: :class:`~coolamqp.framing.base.AMQPFrame`) + """ + def __init__(self): + self.frames = [] + + def on_frame(self, timestamp, frame, direction): + self.frames.append((direction, timestamp, frame)) + + def clear(self): + """Clear internal frame list""" + self.frames = [] diff --git a/docs/caveats.rst b/docs/caveats.rst index 7761bdd84014dc9d01a95684962179309f2a509c..c6df2f3b25c9fecfe03e4896ec57eb68d5ac14c9 100644 --- a/docs/caveats.rst +++ b/docs/caveats.rst @@ -10,7 +10,7 @@ Since CoolAMQP tries to be fast, it uses memoryviews everywhere. **ReceivedMessa properties therefore, are memoryviews. So, it you wanted to read the routing key a message was sent with, or message's encoding, you should do: -:: +.. code-block:: python received_msg.routing_key.to_bytes() received_msg.properties.content_encoding.to_bytes() diff --git a/docs/index.rst b/docs/index.rst index 0a6c28ad192d51286a29d9987c75e2d708dadfab..464423d69f44c636ef4ecaa73b1afed0ed4fc00f 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -10,6 +10,7 @@ Welcome to CoolAMQP's documentation! caveats frames basics + tracing Indices and tables ================== diff --git a/docs/tracing.rst b/docs/tracing.rst new file mode 100644 index 0000000000000000000000000000000000000000..193f933440e412d2574ae6524b124edb14ac30b7 --- /dev/null +++ b/docs/tracing.rst @@ -0,0 +1,38 @@ +Frame tracing +============= + +CoolAMQP allows you to trace every sent or received frame. Just provide an instance of + +.. autoclass:: coolamqp.tracing.BaseFrameTracer + :members: + + +LoggingFrameTracer +~~~~~~~~~~~~~~~~~~ + +To show each frame that is sent or received to the server use the following: + +.. code-block:: python + + import logging + + logger = logging.getLogger(__name__) + + from coolamqp.tracing import LoggingFrameTracer + + frame_tracer = LoggingFrameTracer(logger, logging.WARNING) + + cluster = Cluster([NODE], log_frames=frame_logger) + cluster.start() + + +Documentation of the class: + +.. autoclass:: coolamqp.tracing.LoggingFrameTracer + :members: + +HoldingFrameTracer +~~~~~~~~~~~~~~~~~~ + +.. autoclass:: coolamqp.tracing.HoldingFrameTracer + :members: diff --git a/docs/tutorial.rst b/docs/tutorial.rst index a7091744d7ea1f21c4f0616baa3293c121b8734d..7ca13d85574dbfd1db21c85be38c069bdb6b2c60 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -1,4 +1,3 @@ -======== Tutorial ======== @@ -15,7 +14,7 @@ the AMQP connection string. .. autoclass:: coolamqp.objects.NodeDefinition :members: -:: +.. code-block:: python from coolamqp.objects import NodeDefinition @@ -24,7 +23,7 @@ the AMQP connection string. Cluster instances are used to interface with the cluster (or a single broker). It accepts a list of nodes: -:: +.. code-block:: python from coolamqp.clustering import Cluster cluster = Cluster([node], name='My Cluster') @@ -47,7 +46,7 @@ Publishing and consuming Connecting is boring. After we do, we want to do something! Let's try sending a message, and receiving it. To do that, you must first define a queue, and register a consumer. -:: +.. code-block:: python from coolamqp.objects import Queue @@ -57,7 +56,7 @@ you must first define a queue, and register a consumer. consume_confirm.result() # wait for consuming to start This will create an auto-delete and exclusive queue. After than, a consumer will be registered for this queue. -_no_ack=False_ will mean that we have to manually confirm messages. +_no_ack=False_ will mean that we have to manually confirm messages. You can specify a callback, that will be called with a message if one's received by this consumer. Since we did not do that, this will go to a generic queue belonging to _Cluster_. @@ -68,7 +67,7 @@ when AMQP _basic.consume-ok_ is received. To send a message we need to construct it first, and later publish: -:: +.. code-block:: python from coolamqp.objects import Message @@ -84,7 +83,7 @@ nor decode, and will always expect and return bytes. To actually get our message, we need to start a consumer first. To do that, just invoke: -:: +.. code-block:: python cons, fut = cluster.consume(Queue('name of the queue'), **kwargs) diff --git a/tests/test_clustering/test_log_frames.py b/tests/test_clustering/test_log_frames.py index 5a8c6ffbcb8a416c80fdd00f9fbd079923ced7bf..08f8d8f0153164820143cff3c499bf2fb674aeb5 100644 --- a/tests/test_clustering/test_log_frames.py +++ b/tests/test_clustering/test_log_frames.py @@ -8,7 +8,8 @@ import time import unittest from coolamqp.clustering import Cluster -from coolamqp.objects import NodeDefinition, FrameLogger +from coolamqp.objects import NodeDefinition +from coolamqp.tracing import HoldingFrameTracer NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) logging.basicConfig(level=logging.DEBUG) @@ -16,17 +17,10 @@ logging.basicConfig(level=logging.DEBUG) class TestLogFrames(unittest.TestCase): def test_log_frames_works(self): - class LogFrames(FrameLogger): - def __init__(self): - self.received_frames = 0 - - def on_frame(self, timestamp, frame, direction): - self.received_frames += 1 - - frame_logger = LogFrames() + frame_logger = HoldingFrameTracer() self.c = Cluster([NODE], log_frames=frame_logger) self.c.start() - self.assertGreaterEqual(frame_logger.received_frames, 3) + self.assertGreaterEqual(len(frame_logger.frames), 3) def tearDown(self): self.c.shutdown()