diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index 9d2a39dcbac95d229a5ded78a8aeb61f45cf2aea..43318acb0dc9eac81eabd54353bac494978b228f 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -1,7 +1,4 @@ # coding=UTF-8 -""" -THE object you interface with -""" from __future__ import print_function, absolute_import, division import logging @@ -10,7 +7,6 @@ import typing as tp import warnings from concurrent.futures import Future -from coolamqp.utils import monotonic import six from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer @@ -21,10 +17,11 @@ from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.exceptions import ConnectionDead from coolamqp.objects import Exchange, Message, Queue, FrameLogger, QueueBind from coolamqp.uplink import ListenerThread +from coolamqp.utils import monotonic logger = logging.getLogger(__name__) -THE_POPE_OF_NOPE = NothingMuch() +nothing_much = NothingMuch() # If any spans are spawn here, it's Cluster's job to finish them, except for publish() @@ -57,11 +54,12 @@ class Cluster(object): def __init__(self, nodes, # type: tp.Union[NodeDefinition, tp.List[NodeDefinition]] on_fail=None, # type: tp.Optional[tp.Callable[[], None]] - extra_properties=None, # type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]] + extra_properties=None, + # type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]] log_frames=None, # type: tp.Optional[FrameLogger] name=None, # type: tp.Optional[str] on_blocked=None, # type: tp.Callable[[bool], None], - tracer=None # type: opentracing.Traccer + tracer=None # type: opentracing.Traccer ): from coolamqp.objects import NodeDefinition if isinstance(nodes, NodeDefinition): @@ -76,18 +74,18 @@ class Cluster(object): except ImportError: raise RuntimeError('tracer given, but opentracing is not installed!') - self.started = False + self.started = False # type: bool self.tracer = tracer - self.name = name or 'CoolAMQP' - self.node, = nodes + self.name = name or 'CoolAMQP' # type: str + self.node, = nodes # type: NodeDefinition self.extra_properties = extra_properties - self.log_frames = log_frames - self.on_blocked = on_blocked - self.connected = False - self.listener = None - self.attache_group = None - self.events = None - self.snr = None # type: SingleNodeReconnector + self.log_frames = log_frames # type: tp.Optional[FrameLogger] + self.on_blocked = on_blocked # type: tp.Optional[tp.Callable[[bool], None]] + self.connected = False # type: bool + self.listener = None # type: BaseListener + self.attache_group = None # type: AttacheGroup + self.events = None # type: six.moves.queue.Queue + self.snr = None # type: SingleNodeReconnector if on_fail is not None: def decorated(): @@ -113,7 +111,7 @@ class Cluster(object): def declare(self, obj, # type: tp.Union[Queue, Exchange] persistent=False, # type: bool - span=None # type: tp.Optional[opentracing.Span] + span=None # type: tp.Optional[opentracing.Span] ): # type: (...) -> concurrent.futures.Future """ Declare a Queue/Exchange @@ -138,6 +136,7 @@ class Cluster(object): :para span: optional parent span, if opentracing is installed :return: an Event instance. NothingMuch is returned when there's nothing within a given timoeout """ + def fetch(): try: if timeout == 0: @@ -145,17 +144,17 @@ class Cluster(object): else: return self.events.get(True, timeout) except six.moves.queue.Empty: - return THE_POPE_OF_NOPE + return nothing_much if span is not None: from opentracing import tags parent_span = self.tracer.start_active_span('AMQP call', child_of=span, - tags={ - tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, - tags.DATABASE_TYPE: 'amqp', - tags.DATABASE_STATEMENT: 'drain' - }) + tags={ + tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, + tags.DATABASE_TYPE: 'amqp', + tags.DATABASE_STATEMENT: 'drain' + }) with parent_span: return fetch() @@ -220,7 +219,7 @@ class Cluster(object): routing_key=u'', # type: tp.Union[str, bytes] tx=None, # type: tp.Optional[bool] confirm=None, # type: tp.Optional[bool] - span=None # type: tp.Optional[opentracing.Span] + span=None # type: tp.Optional[opentracing.Span] ): # type: (...) -> tp.Optional[Future] """ Publish a message. diff --git a/tests/travis_test.sh b/tests/travis_test.sh index 496ce540ed95135e566640862ace29f67dabe178..d97b04c3f23e6298ab944381d7ee13359be0b172 100644 --- a/tests/travis_test.sh +++ b/tests/travis_test.sh @@ -16,7 +16,6 @@ coverage run --append -m nose2 -vv COOLAMQP_FORCE_SELECT_LISTENER=1 coverage run --append -m nose2 -vv coverage run --append -m stress_tests -coverage combine coverage report coverage xml