Skip to content
Snippets Groups Projects
Commit 0150dc73 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

fix frame tracing

parent 750b0fe6
No related branches found
Tags v1.2.12
No related merge requests found
...@@ -3,3 +3,5 @@ on GitHub. This file serves to only note what changes ...@@ -3,3 +3,5 @@ on GitHub. This file serves to only note what changes
have been made so far, between releases. have been made so far, between releases.
# v1.2.12 # v1.2.12
* added `LoggingFrameTracer` and `HoldingFrameTracer`
__version__ = '1.2.12_a1' __version__ = '1.2.12'
...@@ -15,7 +15,7 @@ from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ ...@@ -15,7 +15,7 @@ from coolamqp.clustering.events import ConnectionLost, MessageReceived, \
NothingMuch, Event NothingMuch, Event
from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.clustering.single import SingleNodeReconnector
from coolamqp.exceptions import ConnectionDead from coolamqp.exceptions import ConnectionDead
from coolamqp.objects import Exchange, Message, Queue, FrameLogger, QueueBind from coolamqp.objects import Exchange, Message, Queue, QueueBind
from coolamqp.uplink import ListenerThread from coolamqp.uplink import ListenerThread
from coolamqp.utils import monotonic from coolamqp.utils import monotonic
...@@ -42,6 +42,7 @@ class Cluster(object): ...@@ -42,6 +42,7 @@ class Cluster(object):
Connection.__init__ Connection.__init__
:param log_frames: an object that supports logging each and every frame CoolAMQP sends and :param log_frames: an object that supports logging each and every frame CoolAMQP sends and
receives from the broker receives from the broker
:type log_frames: tp.Optional[:class:`coolamqp.tracing.BaseFrameTracer`]
:param name: name to appear in log items and prctl() for the listener thread :param name: name to appear in log items and prctl() for the listener thread
:param on_blocked: callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be :param on_blocked: callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be
called with a value of True if connection becomes blocked, and False upon an unblock called with a value of True if connection becomes blocked, and False upon an unblock
...@@ -56,7 +57,7 @@ class Cluster(object): ...@@ -56,7 +57,7 @@ class Cluster(object):
on_fail=None, # type: tp.Optional[tp.Callable[[], None]] on_fail=None, # type: tp.Optional[tp.Callable[[], None]]
extra_properties=None, extra_properties=None,
# type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]] # type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]]
log_frames=None, # type: tp.Optional[FrameLogger] log_frames=None,
name=None, # type: tp.Optional[str] name=None, # type: tp.Optional[str]
on_blocked=None, # type: tp.Callable[[bool], None], on_blocked=None, # type: tp.Callable[[bool], None],
tracer=None # type: opentracing.Traccer tracer=None # type: opentracing.Traccer
...@@ -79,7 +80,7 @@ class Cluster(object): ...@@ -79,7 +80,7 @@ class Cluster(object):
self.name = name or 'CoolAMQP' # type: str self.name = name or 'CoolAMQP' # type: str
self.node, = nodes # type: NodeDefinition self.node, = nodes # type: NodeDefinition
self.extra_properties = extra_properties self.extra_properties = extra_properties
self.log_frames = log_frames # type: tp.Optional[FrameLogger] self.log_frames = log_frames
self.on_blocked = on_blocked # type: tp.Optional[tp.Callable[[bool], None]] self.on_blocked = on_blocked # type: tp.Optional[tp.Callable[[bool], None]]
self.connected = False # type: bool self.connected = False # type: bool
self.listener = None # type: BaseListener self.listener = None # type: BaseListener
......
...@@ -16,11 +16,6 @@ logger = logging.getLogger(__name__) ...@@ -16,11 +16,6 @@ logger = logging.getLogger(__name__)
EMPTY_PROPERTIES = MessageProperties() EMPTY_PROPERTIES = MessageProperties()
try:
Protocol = tp.Protocol
except AttributeError:
Protocol = object
def toutf8(q): def toutf8(q):
if isinstance(q, six.binary_type): if isinstance(q, six.binary_type):
...@@ -34,21 +29,6 @@ def tobytes(q): ...@@ -34,21 +29,6 @@ def tobytes(q):
return q return q
class FrameLogger(Protocol):
def on_frame(self, timestamp, # type: float
frame, # type: AMQPFrame
direction # type: str
):
"""
Log a frame
:param timestamp: timestamp in seconds since Unix Epoch
:param frame: AMQPFrame to parse
:param direction: either 'to_client' when this is frame received from the broker, or
'to_server' if it's a frame that's being sent to the broker
"""
class Callable(object): class Callable(object):
""" """
Add a bunch of callables to one list, and just invoke'm. Add a bunch of callables to one list, and just invoke'm.
......
import logging
class BaseFrameTracer(object):
"""An abstract do-nothing frame tracer"""
def on_frame(self, timestamp, frame, direction):
"""
Called by AMQP upon receiving a frame information
:param timestamp: timestamp
:type timestamp: float
:param frame: frame that is sent or received
:type frame: :class:`coolamqp.framing.base.AMQPFrame`
:param direction: either 'to_client' or 'to_server'
:type direction: str
"""
class LoggingFrameTracer(BaseFrameTracer):
"""
A frame tracer that outputs each frame to log
:param logger: the logger to log onto
:param log_level: the level of logging to log with
"""
def __init__(self, logger, log_level=logging.WARNING):
self.logger = logger
self.log_level = log_level
def on_frame(self, timestamp, frame, direction):
if direction == 'to_client':
self.logger.log(self.log_level, 'RECEIVED %s', frame.payload)
else:
self.logger.log(self.log_level, 'SENT %s type %s', frame, type(frame))
class HoldingFrameTracer(BaseFrameTracer):
"""
A frame tracer that holds the frames in memory
:ivar frames: a list of tuple (direction:str (either 'to_client' or 'to_server'),
timestamp::float,
frame:: :class:`~coolamqp.framing.base.AMQPFrame`)
"""
def __init__(self):
self.frames = []
def on_frame(self, timestamp, frame, direction):
self.frames.append((direction, timestamp, frame))
def clear(self):
"""Clear internal frame list"""
self.frames = []
...@@ -10,7 +10,7 @@ Since CoolAMQP tries to be fast, it uses memoryviews everywhere. **ReceivedMessa ...@@ -10,7 +10,7 @@ Since CoolAMQP tries to be fast, it uses memoryviews everywhere. **ReceivedMessa
properties therefore, are memoryviews. So, it you wanted to read the routing key a message was sent with, properties therefore, are memoryviews. So, it you wanted to read the routing key a message was sent with,
or message's encoding, you should do: or message's encoding, you should do:
:: .. code-block:: python
received_msg.routing_key.to_bytes() received_msg.routing_key.to_bytes()
received_msg.properties.content_encoding.to_bytes() received_msg.properties.content_encoding.to_bytes()
......
...@@ -10,6 +10,7 @@ Welcome to CoolAMQP's documentation! ...@@ -10,6 +10,7 @@ Welcome to CoolAMQP's documentation!
caveats caveats
frames frames
basics basics
tracing
Indices and tables Indices and tables
================== ==================
......
Frame tracing
=============
CoolAMQP allows you to trace every sent or received frame. Just provide an instance of
.. autoclass:: coolamqp.tracing.BaseFrameTracer
:members:
LoggingFrameTracer
~~~~~~~~~~~~~~~~~~
To show each frame that is sent or received to the server use the following:
.. code-block:: python
import logging
logger = logging.getLogger(__name__)
from coolamqp.tracing import LoggingFrameTracer
frame_tracer = LoggingFrameTracer(logger, logging.WARNING)
cluster = Cluster([NODE], log_frames=frame_logger)
cluster.start()
Documentation of the class:
.. autoclass:: coolamqp.tracing.LoggingFrameTracer
:members:
HoldingFrameTracer
~~~~~~~~~~~~~~~~~~
.. autoclass:: coolamqp.tracing.HoldingFrameTracer
:members:
========
Tutorial Tutorial
======== ========
...@@ -15,7 +14,7 @@ the AMQP connection string. ...@@ -15,7 +14,7 @@ the AMQP connection string.
.. autoclass:: coolamqp.objects.NodeDefinition .. autoclass:: coolamqp.objects.NodeDefinition
:members: :members:
:: .. code-block:: python
from coolamqp.objects import NodeDefinition from coolamqp.objects import NodeDefinition
...@@ -24,7 +23,7 @@ the AMQP connection string. ...@@ -24,7 +23,7 @@ the AMQP connection string.
Cluster instances are used to interface with the cluster (or a single broker). It Cluster instances are used to interface with the cluster (or a single broker). It
accepts a list of nodes: accepts a list of nodes:
:: .. code-block:: python
from coolamqp.clustering import Cluster from coolamqp.clustering import Cluster
cluster = Cluster([node], name='My Cluster') cluster = Cluster([node], name='My Cluster')
...@@ -47,7 +46,7 @@ Publishing and consuming ...@@ -47,7 +46,7 @@ Publishing and consuming
Connecting is boring. After we do, we want to do something! Let's try sending a message, and receiving it. To do that, Connecting is boring. After we do, we want to do something! Let's try sending a message, and receiving it. To do that,
you must first define a queue, and register a consumer. you must first define a queue, and register a consumer.
:: .. code-block:: python
from coolamqp.objects import Queue from coolamqp.objects import Queue
...@@ -57,7 +56,7 @@ you must first define a queue, and register a consumer. ...@@ -57,7 +56,7 @@ you must first define a queue, and register a consumer.
consume_confirm.result() # wait for consuming to start consume_confirm.result() # wait for consuming to start
This will create an auto-delete and exclusive queue. After than, a consumer will be registered for this queue. This will create an auto-delete and exclusive queue. After than, a consumer will be registered for this queue.
_no_ack=False_ will mean that we have to manually confirm messages. _no_ack=False_ will mean that we have to manually confirm messages.
You can specify a callback, that will be called with a message if one's received by this consumer. Since You can specify a callback, that will be called with a message if one's received by this consumer. Since
we did not do that, this will go to a generic queue belonging to _Cluster_. we did not do that, this will go to a generic queue belonging to _Cluster_.
...@@ -68,7 +67,7 @@ when AMQP _basic.consume-ok_ is received. ...@@ -68,7 +67,7 @@ when AMQP _basic.consume-ok_ is received.
To send a message we need to construct it first, and later publish: To send a message we need to construct it first, and later publish:
:: .. code-block:: python
from coolamqp.objects import Message from coolamqp.objects import Message
...@@ -84,7 +83,7 @@ nor decode, and will always expect and return bytes. ...@@ -84,7 +83,7 @@ nor decode, and will always expect and return bytes.
To actually get our message, we need to start a consumer first. To do that, just invoke: To actually get our message, we need to start a consumer first. To do that, just invoke:
:: .. code-block:: python
cons, fut = cluster.consume(Queue('name of the queue'), **kwargs) cons, fut = cluster.consume(Queue('name of the queue'), **kwargs)
......
...@@ -8,7 +8,8 @@ import time ...@@ -8,7 +8,8 @@ import time
import unittest import unittest
from coolamqp.clustering import Cluster from coolamqp.clustering import Cluster
from coolamqp.objects import NodeDefinition, FrameLogger from coolamqp.objects import NodeDefinition
from coolamqp.tracing import HoldingFrameTracer
NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20) NODE = NodeDefinition(os.environ.get('AMQP_HOST', '127.0.0.1'), 'guest', 'guest', heartbeat=20)
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
...@@ -16,17 +17,10 @@ logging.basicConfig(level=logging.DEBUG) ...@@ -16,17 +17,10 @@ logging.basicConfig(level=logging.DEBUG)
class TestLogFrames(unittest.TestCase): class TestLogFrames(unittest.TestCase):
def test_log_frames_works(self): def test_log_frames_works(self):
class LogFrames(FrameLogger): frame_logger = HoldingFrameTracer()
def __init__(self):
self.received_frames = 0
def on_frame(self, timestamp, frame, direction):
self.received_frames += 1
frame_logger = LogFrames()
self.c = Cluster([NODE], log_frames=frame_logger) self.c = Cluster([NODE], log_frames=frame_logger)
self.c.start() self.c.start()
self.assertGreaterEqual(frame_logger.received_frames, 3) self.assertGreaterEqual(len(frame_logger.frames), 3)
def tearDown(self): def tearDown(self):
self.c.shutdown() self.c.shutdown()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment