Skip to content
Snippets Groups Projects
Commit 80fc82f9 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

draft: fixes #4

parent 51fd0343
No related branches found
No related tags found
No related merge requests found
Pipeline #63439 canceled with stages
in 33 minutes and 12 seconds
......@@ -17,6 +17,7 @@ v2.0.0
* declare will expect qos to be given as an integer, and will be set as prefetch_count, since RabbitMQ no longer
supports prefetch_size
* same can be said of Consumer.set_qos(prefetch_count)
* added Cluster.
v1.5.0
======
......
......@@ -192,9 +192,6 @@ class Declarer(Channeler, Synchronized):
Channeler.__init__(self)
Synchronized.__init__(self)
self.cluster = cluster
self.declared = set() # since Queues and Exchanges are hashable...
# anonymous queues aren't, but we reject those
# persistent
self.left_to_declare = collections.deque() # since last disconnect. persistent+transient
# deque of Operation objects
......@@ -219,10 +216,6 @@ class Declarer(Channeler, Synchronized):
while len(self.left_to_declare) > 0:
self.left_to_declare.pop().on_connection_dead()
# recast current declarations as new operations
for dec in self.declared:
self.left_to_declare.append(Operation(self, dec))
super(Declarer, self).on_close()
return
......@@ -267,7 +260,7 @@ class Declarer(Channeler, Synchronized):
return fut
def declare(self, obj, persistent=False, span=None):
def declare(self, obj, span=None):
"""
Schedule to have an object declared.
......@@ -280,11 +273,7 @@ class Declarer(Channeler, Synchronized):
Queue declarations CAN fail.
Note that if re-declaring these fails, they will be silently discarded.
You can subscribe an on_discard(Exchange | Queue) here.
:param obj: Exchange or Queue instance
:param persistent: will be redeclared upon disconnect. To remove, use "undeclare"
:param span: span if opentracing is installed
:return: a Future instance
:raise ValueError: tried to declare anonymous queue
......@@ -298,10 +287,6 @@ class Declarer(Channeler, Synchronized):
fut = Future()
fut.set_running_or_notify_cancel()
if persistent:
if obj not in self.declared:
self.declared.add(obj)
self.left_to_declare.append(Operation(self, obj, fut, span, enqueued_span))
self._do_operations()
......
......@@ -89,6 +89,7 @@ class Cluster(object):
self.pub_tr = None # type: Publisher
self.pub_na = None # type: Publisher
self.decl = None # type: Declarer
self.on_fail = None
if on_fail is not None:
def decorated():
......@@ -96,10 +97,8 @@ class Cluster(object):
on_fail()
self.on_fail = decorated
else:
self.on_fail = None
def bind(self, queue, exchange, routing_key, persistent=False, span=None,
def bind(self, queue, exchange, routing_key, span=None,
dont_trace=False, arguments=None):
"""
Bind a queue to an exchange
......@@ -114,12 +113,10 @@ class Cluster(object):
else:
child_span = None
fut = self.decl.declare(QueueBind(queue, exchange, routing_key, arguments),
persistent=persistent,
span=child_span)
return close_future(fut, child_span)
def declare(self, obj, # type: tp.Union[Queue, Exchange]
persistent=False, # type: bool
span=None, # type: tp.Optional[opentracing.Span]
dont_trace=False # type: bool
): # type: (...) -> concurrent.futures.Future
......@@ -132,7 +129,6 @@ class Cluster(object):
AMQP error 404: NOT_FOUND, so try to declare your exchanges before your queues.
:param obj: Queue/Exchange object
:param persistent: should it be redefined upon reconnect?
:param span: optional parent span, if opentracing is installed
:param dont_trace: if True, a span won't be output
:return: Future
......@@ -144,7 +140,7 @@ class Cluster(object):
child_span = self._make_span('declare', span)
else:
child_span = None
fut = self.decl.declare(obj, persistent=persistent, span=child_span)
fut = self.decl.declare(obj, span=child_span)
return close_future(fut, child_span)
def drain(self, timeout, span=None, dont_trace=False): # type: (float) -> Event
......@@ -329,7 +325,6 @@ class Cluster(object):
self.attache_group.add(self.pub_tr)
self.attache_group.add(self.pub_na)
self.attache_group.add(self.decl)
self.listener.init()
self.listener.start()
......@@ -371,3 +366,9 @@ class Cluster(object):
self.listener.terminate()
if wait:
self.listener.join()
def is_shutdown(self):
"""
:return: bool, if this was started and later disconnected.
"""
return self.started and not self.connected
......@@ -75,7 +75,6 @@ class SingleNodeReconnector(object):
return
self.connection = None
self.listener_thread.call_next_io_event(self.connect)
def shutdown(self):
"""Close this connection"""
......
......@@ -54,18 +54,6 @@ class ListenerThread(threading.Thread):
self._call_next_io_event = Callable(oneshots=True)
self.listener = None # type: BaseListener
def call_next_io_event(self, callable):
"""
Call callable after current I/O event is fully processed
sometimes many callables are called in response to single
I/O (eg. teardown, startup). This guarantees a call after
all these are done.
:param callable: callable/0
"""
pass
# self._call_next_io_event.add(callable) - dummy that out, causes AssertionError to appear
def terminate(self):
self.terminating = True
......
......@@ -26,7 +26,9 @@ class TestA(unittest.TestCase):
self.c.start(timeout=20)
def tearDown(self):
self.assertFalse(self.c.is_shutdown())
self.c.shutdown()
self.assertTrue(self.c.is_shutdown())
def test_properties(self):
self.assertEqual(self.c.properties.properties['product'], 'RabbitMQ')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment