Skip to content
Snippets Groups Projects
Commit ae986de6 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

fix the tests

parent e24b9821
No related branches found
No related tags found
No related merge requests found
# coding=UTF-8
"""
THE object you interface with
"""
from __future__ import print_function, absolute_import, division
import logging
......@@ -10,7 +7,6 @@ import typing as tp
import warnings
from concurrent.futures import Future
from coolamqp.utils import monotonic
import six
from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer
......@@ -21,10 +17,11 @@ from coolamqp.clustering.single import SingleNodeReconnector
from coolamqp.exceptions import ConnectionDead
from coolamqp.objects import Exchange, Message, Queue, FrameLogger, QueueBind
from coolamqp.uplink import ListenerThread
from coolamqp.utils import monotonic
logger = logging.getLogger(__name__)
THE_POPE_OF_NOPE = NothingMuch()
nothing_much = NothingMuch()
# If any spans are spawn here, it's Cluster's job to finish them, except for publish()
......@@ -57,11 +54,12 @@ class Cluster(object):
def __init__(self, nodes, # type: tp.Union[NodeDefinition, tp.List[NodeDefinition]]
on_fail=None, # type: tp.Optional[tp.Callable[[], None]]
extra_properties=None, # type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]]
extra_properties=None,
# type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]]
log_frames=None, # type: tp.Optional[FrameLogger]
name=None, # type: tp.Optional[str]
on_blocked=None, # type: tp.Callable[[bool], None],
tracer=None # type: opentracing.Traccer
tracer=None # type: opentracing.Traccer
):
from coolamqp.objects import NodeDefinition
if isinstance(nodes, NodeDefinition):
......@@ -76,18 +74,18 @@ class Cluster(object):
except ImportError:
raise RuntimeError('tracer given, but opentracing is not installed!')
self.started = False
self.started = False # type: bool
self.tracer = tracer
self.name = name or 'CoolAMQP'
self.node, = nodes
self.name = name or 'CoolAMQP' # type: str
self.node, = nodes # type: NodeDefinition
self.extra_properties = extra_properties
self.log_frames = log_frames
self.on_blocked = on_blocked
self.connected = False
self.listener = None
self.attache_group = None
self.events = None
self.snr = None # type: SingleNodeReconnector
self.log_frames = log_frames # type: tp.Optional[FrameLogger]
self.on_blocked = on_blocked # type: tp.Optional[tp.Callable[[bool], None]]
self.connected = False # type: bool
self.listener = None # type: BaseListener
self.attache_group = None # type: AttacheGroup
self.events = None # type: six.moves.queue.Queue
self.snr = None # type: SingleNodeReconnector
if on_fail is not None:
def decorated():
......@@ -113,7 +111,7 @@ class Cluster(object):
def declare(self, obj, # type: tp.Union[Queue, Exchange]
persistent=False, # type: bool
span=None # type: tp.Optional[opentracing.Span]
span=None # type: tp.Optional[opentracing.Span]
): # type: (...) -> concurrent.futures.Future
"""
Declare a Queue/Exchange
......@@ -138,6 +136,7 @@ class Cluster(object):
:para span: optional parent span, if opentracing is installed
:return: an Event instance. NothingMuch is returned when there's nothing within a given timoeout
"""
def fetch():
try:
if timeout == 0:
......@@ -145,17 +144,17 @@ class Cluster(object):
else:
return self.events.get(True, timeout)
except six.moves.queue.Empty:
return THE_POPE_OF_NOPE
return nothing_much
if span is not None:
from opentracing import tags
parent_span = self.tracer.start_active_span('AMQP call',
child_of=span,
tags={
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
tags.DATABASE_TYPE: 'amqp',
tags.DATABASE_STATEMENT: 'drain'
})
tags={
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
tags.DATABASE_TYPE: 'amqp',
tags.DATABASE_STATEMENT: 'drain'
})
with parent_span:
return fetch()
......@@ -220,7 +219,7 @@ class Cluster(object):
routing_key=u'', # type: tp.Union[str, bytes]
tx=None, # type: tp.Optional[bool]
confirm=None, # type: tp.Optional[bool]
span=None # type: tp.Optional[opentracing.Span]
span=None # type: tp.Optional[opentracing.Span]
): # type: (...) -> tp.Optional[Future]
"""
Publish a message.
......
......@@ -16,7 +16,6 @@ coverage run --append -m nose2 -vv
COOLAMQP_FORCE_SELECT_LISTENER=1 coverage run --append -m nose2 -vv
coverage run --append -m stress_tests
coverage combine
coverage report
coverage xml
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment