diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 6f9ae38607ffe229188f5409ee84a12254dadb72..96e8387311b55cd205e3ebcc8bb2fa69ed45c1d6 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -80,6 +80,7 @@ class Consumer(Channeler): :type queue: coolamqp.objects.Queue :param on_message: callable that will process incoming messages :type on_message: callable(ReceivedMessage instance) + :param span: optional span, if opentracing is installed :param no_ack: Will this consumer require acknowledges from messages? :type no_ack: bool :param qos: a tuple of (prefetch size, prefetch window) for this @@ -113,9 +114,10 @@ class Consumer(Channeler): 'future_to_notify', 'future_to_notify_on_dead', 'fail_on_first_time_resource_locked', 'cancel_on_failure', 'body_receive_mode', 'consumer_tag', 'on_cancel', 'on_broker_cancel', - 'hb_watch', 'deliver_watch') + 'hb_watch', 'deliver_watch', 'span') - def __init__(self, queue, on_message, no_ack=True, qos=None, + def __init__(self, queue, on_message, span=None, + no_ack=True, qos=None, cancel_on_failure=False, future_to_notify=None, fail_on_first_time_resource_locked=False, @@ -128,6 +130,7 @@ class Consumer(Channeler): """ super(Consumer, self).__init__() + self.span = span self.queue = queue self.no_ack = no_ack @@ -298,7 +301,16 @@ class Consumer(Channeler): should_retry = True if self.future_to_notify: - self.future_to_notify.set_exception(AMQPError(payload)) + err = AMQPError(payload) + if self.span is not None: + from opentracing import logs, tags + self.span.set_tag(tags.ERROR, True) + self.span.log_kv({logs.EVENT: tags.ERROR, + logs.ERROR_OBJECT: err, + logs.ERROR_KIND: type(err)}) + self.span = None + + self.future_to_notify.set_exception(err) self.future_to_notify = None logger.debug('Notifying connection closed with %s', payload) @@ -316,6 +328,10 @@ class Consumer(Channeler): self.future_to_notify_on_dead.set_result(None) self.future_to_notify_on_dead = None if should_retry: + if self.span is not None: + from opentracing import logs + self.span.log_kv({logs.EVENT: 'Retrying'}) + if old_con.state == ST_ONLINE: logger.info('Retrying with %s', self.queue.name) self.attach(old_con) @@ -424,8 +440,11 @@ class Consumer(Channeler): ) elif isinstance(payload, BasicConsumeOk): # AWWW RIGHT~!!! We're good. - self.on_operational(True) consumer_tag = self.consumer_tag + if self.span is not None: + self.span.set_tag('consumer.tag', consumer_tag) + + self.on_operational(True) # Register watches for receiving shit # this is multi-shot by default diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index 71891f05f64c598471c2d6f69e1b5d4d7b94e1af..1e0cbdba14cd70b13883568c2d5a2b671097c8bc 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -29,24 +29,67 @@ class Operation(object): This will register it's own callback. Please, call on_connection_dead when connection is broken to fail futures with ConnectionDead, since this object does not watch for Fails """ - __slots__ = ('done', 'fut', 'declarer', 'obj', 'on_done') + __slots__ = ('done', 'fut', 'declarer', 'obj', 'on_done', 'parent_span', 'enqueued_span', + 'processing_span') - def __init__(self, declarer, obj, fut=None): + def __init__(self, declarer, obj, fut=None, span_parent=None, span_enqueued=None): self.done = False self.fut = fut + self.parent_span = span_parent + self.enqueued_span = span_enqueued + self.processing_span = None self.declarer = declarer self.obj = obj self.on_done = Callable() # callable/0 + def span_exception(self, exception): + if self.parent_span is not None: + if self.processing_span is not None: + from opentracing import tags, logs + self.processing_span.set_tag(tags.ERROR, True) + self.processing_span.log_kv({logs.EVENT: tags.ERROR, + logs.ERROR_KIND: exception, + logs.ERROR_OBJECT: exception}) + self.processing_span.finish() + self.processing_span = None + if self.enqueued_span is not None: + self.enqueued_span.finish() + self.enqueued_span = None + self.parent_span.finish() + def on_connection_dead(self): """To be called by declarer when our link fails""" if self.fut is not None: - self.fut.set_exception(ConnectionDead()) + err = ConnectionDead() + self.span_exception(err) + self.fut.set_exception(err) self.fut = None + def span_starting(self): + if self.enqueued_span is not None: + self.enqueued_span.finish() + from opentracing import follows_from + self.processing_span = self.declarer.cluster.tracer.start_span('Declaring', + child_of=self.parent_span, + references=follows_from(self.enqueued_span)) + + def span_finished(self): + if self.parent_span is None: + self.processing_span.finish() + + def span_begin(self): + if self.enqueued_span is not None: + self.enqueued_span.finish() + from opentracing import follows_from + self.processing_span = self.declarer.cluster.tracer.start_span('Declaring', + child_of=self.parent_span, + references=follows_from(self.enqueued_span)) + self.enqueued_span = None + def perform(self): """Attempt to perform this op.""" + self.span_begin() obj = self.obj if isinstance(obj, Exchange): self.declarer.method_and_watch( @@ -66,8 +109,10 @@ class Operation(object): assert not self.done self.done = True if isinstance(payload, ChannelClose): + err = AMQPError(payload) + self.span_exception(err) if self.fut is not None: - self.fut.set_exception(AMQPError(payload)) + self.fut.set_exception(err) self.fut = None else: # something that had no Future failed. Is it in declared? @@ -76,6 +121,7 @@ class Operation(object): self.obj) # todo access not threadsafe self.declarer.on_discard(self.obj) else: + self.span_finished() if self.fut is not None: self.fut.set_result(None) self.fut = None @@ -83,8 +129,9 @@ class Operation(object): class DeleteQueue(Operation): - def __init__(self, declarer, queue, fut): - super(DeleteQueue, self).__init__(declarer, queue, fut=fut) + def __init__(self, declarer, queue, fut, span_parent=None, span_enqueued=None): + super(DeleteQueue, self).__init__(declarer, queue, fut=fut, span_parent=span_parent, + span_enqueued=span_enqueued) def perform(self): queue = self.obj @@ -98,8 +145,11 @@ class DeleteQueue(Operation): assert not self.done self.done = True if isinstance(payload, ChannelClose): - self.fut.set_exception(AMQPError(payload)) + err = AMQPError(payload) + self.span_exception(err) + self.fut.set_exception(err) else: # Queue.DeleteOk + self.span_finished() self.fut.set_result(None) self.declarer.on_operation_done() @@ -111,13 +161,13 @@ class Declarer(Channeler, Synchronized): This also maintains a list of declared queues/exchanges, and redeclares them on each reconnect. """ - def __init__(self): + def __init__(self, cluster): """ Create a new declarer. """ Channeler.__init__(self) Synchronized.__init__(self) - + self.cluster = cluster self.declared = set() # since Queues and Exchanges are hashable... # anonymous queues aren't, but we reject those # persistent @@ -172,7 +222,7 @@ class Declarer(Channeler, Synchronized): self.in_process = None self._do_operations() - def delete_queue(self, queue): + def delete_queue(self, queue, span=None): """ Delete a queue. @@ -182,6 +232,7 @@ class Declarer(Channeler, Synchronized): If the queue is in declared consumer list, it will not be removed. :param queue: Queue instance + :param span: optional span, if opentracing is installed :return: a Future """ fut = Future() @@ -192,7 +243,7 @@ class Declarer(Channeler, Synchronized): return fut - def declare(self, obj, persistent=False): + def declare(self, obj, persistent=False, span=None): """ Schedule to have an object declared. @@ -210,6 +261,7 @@ class Declarer(Channeler, Synchronized): :param obj: Exchange or Queue instance :param persistent: will be redeclared upon disconnect. To remove, use "undeclare" + :param span: span if opentracing is installed :return: a Future instance :raise ValueError: tried to declare anonymous queue """ @@ -217,6 +269,12 @@ class Declarer(Channeler, Synchronized): if obj.anonymous: raise ValueError('Cannot declare anonymous queue') + if span is not None: + enqueued_span = self.cluster.tracer.start_span('Enqueued', child_of=span) + else: + span = None + enqueued_span = None + fut = Future() fut.set_running_or_notify_cancel() @@ -224,7 +282,7 @@ class Declarer(Channeler, Synchronized): if obj not in self.declared: self.declared.add(obj) # todo access not threadsafe - self.left_to_declare.append(Operation(self, obj, fut)) + self.left_to_declare.append(Operation(self, obj, fut, span, enqueued_span)) self._do_operations() return fut diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index 62fd665835bff0a545930cf6d872b60833e7ee2b..77cc6b90b56f74f0107964046abcac294594c6e1 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -41,7 +41,8 @@ logger = logging.getLogger(__name__) # for holding messages when MODE_CNPUB and link is down CnpubMessageSendOrder = collections.namedtuple('CnpubMessageSendOrder', ('message', 'exchange_name', - 'routing_key', 'future')) + 'routing_key', 'future', + 'parent_span', 'span_enqueued')) # todo what if publisher in MODE_CNPUB fails mid message? they dont seem to be recovered @@ -80,7 +81,7 @@ class Publisher(Channeler, Synchronized): class UnusablePublisher(Exception): """This publisher will never work (eg. MODE_CNPUB on a broker not supporting publisher confirms)""" - def __init__(self, mode, cluster_to_set_connected_upon_first_connect=None): + def __init__(self, mode, cluster=None): Channeler.__init__(self) Synchronized.__init__(self) @@ -94,7 +95,8 @@ class Publisher(Channeler, Synchronized): # Future to confirm or None, flags as tuple|empty tuple self.tagger = None # None, or AtomicTagger instance id MODE_CNPUB - self.cluster_to_set_connected_upon_first_connect = cluster_to_set_connected_upon_first_connect + self.set_connected = False + self.cluster = cluster self.critically_failed = False self.content_flow = True self.blocked = False @@ -131,7 +133,8 @@ class Publisher(Channeler, Synchronized): def on_fail(self): self.state = ST_OFFLINE - def _pub(self, message, exchange_name, routing_key): + def _pub(self, message, exchange_name, routing_key, parent_span=None, span_enqueued=None, + dont_close_span=False): """ Just send the message. Sends BasicDeliver + header + body. @@ -143,6 +146,13 @@ class Publisher(Channeler, Synchronized): :type exchange_name: bytes :param routing_key: bytes """ + span = None + if parent_span is not None: + import opentracing + span_enqueued.finish() + span = self.cluster.tracer.start_span('Sending', + child_of=parent_span, + references=opentracing.follows_from(span_enqueued)) # Break down large bodies bodies = [] @@ -177,6 +187,12 @@ class Publisher(Channeler, Synchronized): for body in bodies: self.frames_to_send.append(AMQPBodyFrame(self.channel_id, body)) + if span is not None: + span.finish() + + if parent_span is not None and not dont_close_span: + parent_span.finish() + def _mode_cnpub_process_deliveries(self): """ Dispatch all frames that are waiting to be sent @@ -189,18 +205,24 @@ class Publisher(Channeler, Synchronized): while len(self.messages) > 0: try: - msg, xchg, rk, fut = self.messages.popleft() + msg, xchg, rk, fut, parent_span, span_enqueued = self.messages.popleft() except IndexError: # todo see docs/casefile-0001 break if not fut.set_running_or_notify_cancel(): + if span_enqueued is not None: + import opentracing + span_enqueued.log_kv({opentracing.logs.EVENT: 'Cancelled'}) + span_enqueued.finish() + parent_span.finish() continue # cancelled self.tagger.deposit(self.tagger.get_key(), - FutureConfirmableRejectable(fut)) + FutureConfirmableRejectable(fut), + parent_span) assert isinstance(xchg, (six.binary_type, six.text_type)) - self._pub(msg, xchg, rk) + self._pub(msg, xchg, rk, parent_span, span_enqueued, dont_close_span=True) def _on_cnpub_delivery(self, payload): """ @@ -214,7 +236,7 @@ class Publisher(Channeler, Synchronized): self.tagger.nack(payload.delivery_tag, payload.multiple) @Synchronized.synchronized - def publish(self, message, exchange=b'', routing_key=b''): + def publish(self, message, exchange=b'', routing_key=b'', span=None): """ Schedule to have a message published. @@ -232,9 +254,14 @@ class Publisher(Channeler, Synchronized): :param exchange: exchange name to use. Default direct exchange by default. Can also be an Exchange object. :type exchange: bytes, str or Exchange instance :param routing_key: routing key to use + :param span: optional span, if opentracing is installed :return: a Future instance, or None :raise Publisher.UnusablePublisher: this publisher will never work (eg. MODE_CNPUB on Non-RabbitMQ) """ + if span is not None: + span_enqueued = self.cluster.tracer.start_span('Enqueued', child_of=span) + else: + span_enqueued = None if isinstance(exchange, Exchange): exchange = exchange.name.encode('utf8') @@ -250,13 +277,13 @@ class Publisher(Channeler, Synchronized): logger.debug( u'Publish request, but not connected - dropping the message') else: - self._pub(message, exchange, routing_key) + self._pub(message, exchange, routing_key, span, span_enqueued) elif self.mode == Publisher.MODE_CNPUB: fut = Future() # todo can optimize this not to create an object if ST_ONLINE already - cnpo = CnpubMessageSendOrder(message, exchange, routing_key, fut) + cnpo = CnpubMessageSendOrder(message, exchange, routing_key, fut, span, span_enqueued) self.messages.append(cnpo) if self.state == ST_ONLINE: @@ -314,14 +341,9 @@ class Publisher(Channeler, Synchronized): self.on_operational(True) # inform the cluster that we've been connected - try: - self.cluster_to_set_connected_upon_first_connect - except AttributeError: - pass - else: - if self.cluster_to_set_connected_upon_first_connect is not None: - self.cluster_to_set_connected_upon_first_connect.connected = True - del self.cluster_to_set_connected_upon_first_connect + if not self.set_connected: + self.cluster.connected = True + self.set_connected = True # now we need to listen for BasicAck and BasicNack diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py index 3243a5b7766f308cf0890b713886ddb9fae4a2c1..97ae562c76a80795e71a9ef2a2c8becdd2532a95 100644 --- a/coolamqp/attaches/utils.py +++ b/coolamqp/attaches/utils.py @@ -4,10 +4,25 @@ from __future__ import print_function, absolute_import, division import functools import logging import threading +from concurrent.futures import Future logger = logging.getLogger(__name__) +def close_future(fut: Future, span): # type: (Future, opentracing.Span) -> Future + """ + To be called as a Future callback, means to close the span + """ + def inner_close(fut): + exc = fut.exception() + if exc is not None: + from opentracing import Span + Span._on_error(span, type(exc), exc, '<unavailable>') + span.finish() + fut.add_done_callback(inner_close) + return fut + + class ConfirmableRejectable(object): """ Protocol for objects put into AtomicTagger. You need not subclass it, @@ -90,7 +105,7 @@ class AtomicTagger(object): # they remain to be acked/nacked # invariant: FOR EACH i, j: (i>j) => (tags[i][0] > tags[j][0]) - def deposit(self, tag, obj): + def deposit(self, tag, obj, span=None): """ Put a tag into the tag list. @@ -102,7 +117,7 @@ class AtomicTagger(object): until you call .ack() or .nack(). """ assert tag >= 0 - opt = (tag, obj) + opt = (tag, obj, span) with self.lock: if len(self.tags) == 0: @@ -163,11 +178,20 @@ class AtomicTagger(object): items = self.tags[start:stop] del self.tags[start:stop] - for tag, cr in items: + for tag, cr, span in items: + if span is not None: + from opentracing import logs + if ack: cr.confirm() + if span is not None: + span.log_kv({logs.EVENT: 'Ack'}) + span.finish() else: cr.reject() + if span is not None: + span.log_kv({logs.EVENT: 'Nack'}) + span.finish() def ack(self, tag, multiple): """ diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index dfa1d9a717d2fb30340b243ba2058d9f473139d2..8834fb0557edf61a3010e0f7900f0ebd03462668 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -14,6 +14,7 @@ import monotonic import six from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer +from coolamqp.attaches.utils import close_future from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ NothingMuch, Event from coolamqp.clustering.single import SingleNodeReconnector @@ -26,6 +27,7 @@ logger = logging.getLogger(__name__) THE_POPE_OF_NOPE = NothingMuch() +# If any spans are spawn here, it's Cluster's job to finish them, except for publish() class Cluster(object): """ Frontend for your AMQP needs. @@ -46,6 +48,7 @@ class Cluster(object): :param name: name to appear in log items and prctl() for the listener thread :param on_blocked: callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be called with a value of True if connection becomes blocked, and False upon an unblock + :param tracer: tracer, if opentracing is installed """ # Events you can be informed about @@ -57,7 +60,8 @@ class Cluster(object): extra_properties=None, # type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]] log_frames=None, # type: tp.Optional[FrameLogger] name=None, # type: tp.Optional[str] - on_blocked=None # type: tp.Callable[[bool], None] + on_blocked=None, # type: tp.Callable[[bool], None], + tracer=None # type: opentracing.Traccer ): from coolamqp.objects import NodeDefinition if isinstance(nodes, NodeDefinition): @@ -66,6 +70,13 @@ class Cluster(object): if len(nodes) > 1: raise NotImplementedError(u'Multiple nodes not supported yet') + if tracer is not None: + try: + import opentracing + except ImportError: + raise RuntimeError('tracer given, but opentracing is not installed!') + + self.tracer = tracer self.name = name or 'CoolAMQP' self.node, = nodes self.extra_properties = extra_properties @@ -83,33 +94,57 @@ class Cluster(object): self.on_fail = None def declare(self, obj, # type: tp.Union[Queue, Exchange] - persistent=False # type: bool + persistent=False, # type: bool + span=None # type: tp.Optional[opentracing.Span] ): # type: (...) -> concurrent.futures.Future """ Declare a Queue/Exchange :param obj: Queue/Exchange object :param persistent: should it be redefined upon reconnect? + :param span: optional parent span, if opentracing is installed :return: Future """ - return self.decl.declare(obj, persistent=persistent) + if span is not None: + child_span = self._make_span('declare', span) + else: + child_span = None + fut = self.decl.declare(obj, persistent=persistent, span=child_span) + return close_future(fut, child_span) - def drain(self, timeout): # type: (float) -> Event + def drain(self, timeout, span=None): # type: (float) -> Event """ Return an Event. :param timeout: time to wait for an event. 0 means return immediately. None means block forever + :para span: optional parent span, if opentracing is installed :return: an Event instance. NothingMuch is returned when there's nothing within a given timoeout """ - try: - if timeout == 0: - return self.events.get_nowait() - else: - return self.events.get(True, timeout) - except six.moves.queue.Empty: - return THE_POPE_OF_NOPE - - def consume(self, queue, on_message=None, *args, **kwargs): + def fetch(): + try: + if timeout == 0: + return self.events.get_nowait() + else: + return self.events.get(True, timeout) + except six.moves.queue.Empty: + return THE_POPE_OF_NOPE + + if span is not None: + from opentracing import tags + parent_span = self.tracer.start_active_span('AMQP call', + child_of=span, + tags={ + tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, + tags.DATABASE_TYPE: 'amqp', + tags.DATABASE_STATEMENT: 'drain' + }) + + with parent_span: + return fetch() + else: + return fetch() + + def consume(self, queue, on_message=None, span=None, *args, **kwargs): # type: (Queue, tp.Callable[[MessageReceived], None] -> tp.Tuple[Consumer, Future] """ Start consuming from a queue. @@ -123,16 +158,21 @@ class Cluster(object): Note that name of anonymous queue might change at any time! :param on_message: callable that will process incoming messages if you leave it at None, messages will be .put into self.events + :param span: optional span, if opentracing is installed :return: a tuple (Consumer instance, and a Future), that tells, when consumer is ready """ + if span is not None: + child_span = self._make_span('consume', span) + else: + child_span = None fut = Future() fut.set_running_or_notify_cancel() # it's running right now on_message = on_message or ( lambda rmsg: self.events.put_nowait(MessageReceived(rmsg))) - con = Consumer(queue, on_message, future_to_notify=fut, *args, + con = Consumer(queue, on_message, future_to_notify=fut, span=span, *args, **kwargs) self.attache_group.add(con) - return con, fut + return con, close_future(fut, child_span) def delete_queue(self, queue): # type: (coolamqp.objects.Queue) -> Future """ @@ -143,11 +183,26 @@ class Cluster(object): """ return self.decl.delete_queue(queue) + def _make_span(self, call, span): + try: + from opentracing import tags + except ImportError: + pass + else: + return self.tracer.start_span('AMQP call', + child_of=span, + tags={ + tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, + tags.DATABASE_TYPE: 'amqp', + tags.DATABASE_STATEMENT: call + }) + def publish(self, message, # type: Message exchange=None, # type: tp.Union[Exchange, str, bytes] routing_key=u'', # type: tp.Union[str, bytes] tx=None, # type: tp.Optional[bool] - confirm=None # type: tp.Optional[bool] + confirm=None, # type: tp.Optional[bool] + span=None # type: tp.Optional[opentracing.Span] ): # type: (...) -> tp.Optional[Future] """ Publish a message. @@ -161,8 +216,12 @@ class Cluster(object): Note that if tx if False, and message cannot be delivered to broker at once, it will be discarded :param tx: deprecated, alias for confirm + :param span: optionally, current span, if opentracing is installed :return: Future to be finished on completion or None, is confirm/tx was not chosen """ + if self.tracer is not None: + span = self._make_span('publish', span) + if isinstance(exchange, Exchange): exchange = exchange.name.encode('utf8') elif exchange is None: @@ -187,7 +246,8 @@ class Cluster(object): try: return (self.pub_tr if tx else self.pub_na).publish(message, exchange, - routing_key) + routing_key, + span) except Publisher.UnusablePublisher: raise NotImplementedError( u'Sorry, this functionality is not yet implemented!') @@ -231,9 +291,9 @@ class Cluster(object): self.snr.on_blocked.add(self.on_blocked) # Spawn a transactional publisher and a noack publisher - self.pub_tr = Publisher(Publisher.MODE_CNPUB, cluster_to_set_connected_upon_first_connect=self) - self.pub_na = Publisher(Publisher.MODE_NOACK) - self.decl = Declarer() + self.pub_tr = Publisher(Publisher.MODE_CNPUB, self) + self.pub_na = Publisher(Publisher.MODE_NOACK, self) + self.decl = Declarer(self) self.attache_group.add(self.pub_tr) self.attache_group.add(self.pub_na) diff --git a/setup.py b/setup.py index 224844ceb073aa95785dc8155c15ba542acbf0cb..92f16b786bdaa9cbacea66518b31c42fcd10f67e 100644 --- a/setup.py +++ b/setup.py @@ -13,6 +13,7 @@ setup(keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availabilit test_suite='nose2.collector.collector', extras_require={ ':python_version == "2.7"': ['futures', 'typing'], - 'prctl': ['prctl'] + 'prctl': ['prctl'], + 'opentracing': ['opentracing'], } )