From 745bf66e43c643780033facc72604d1d1b465514 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Fri, 3 Jan 2020 16:43:37 +0100 Subject: [PATCH] type hints for Cluster --- coolamqp/clustering/cluster.py | 17 +++++++++++------ coolamqp/clustering/single.py | 4 ++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index f85e04e..29f105c 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -14,7 +14,7 @@ import six from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ - NothingMuch + NothingMuch, Event from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.exceptions import ConnectionDead from coolamqp.objects import Exchange @@ -74,6 +74,8 @@ class Cluster(object): self.on_fail = None def declare(self, obj, persistent=False): + # type: (tp.Union[coolamqp.objects.Queue, coolamqp.objects.Exchange], bool) -> + # concurrent.futures.Future """ Declare a Queue/Exchange :param obj: Queue/Exchange object @@ -82,7 +84,7 @@ class Cluster(object): """ return self.decl.declare(obj, persistent=persistent) - def drain(self, timeout): + def drain(self, timeout): # type: (float) -> Event """ Return an Event. :param timeout: time to wait for an event. 0 means return immediately. None means block forever @@ -121,7 +123,7 @@ class Cluster(object): self.attache_group.add(con) return con, fut - def delete_queue(self, queue): + def delete_queue(self, queue): # type: (coolamqp.objects.Queue) -> concurrent.futures.Future """ Delete a queue. @@ -132,6 +134,8 @@ class Cluster(object): def publish(self, message, exchange=None, routing_key=u'', tx=None, confirm=None): + # type: (coolamqp.objects.Message, tp.Optional[coolamqp.objects.Exchange], + # tp.Union[str, bytes], bool, bool) -> concurrent.futures.Future """ Publish a message. @@ -176,7 +180,8 @@ class Cluster(object): raise NotImplementedError( u'Sorry, this functionality is not yet implemented!') - def start(self, wait=True, timeout=10.0, log_frames=False): + def start(self, wait=True, timeout=10.0, log_frames=None): + # type: (bool, float, bool) -> None """ Connect to broker. Initialize Cluster. @@ -205,7 +210,7 @@ class Cluster(object): self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener, self.extra_properties, - self.log_frames) + log_frames) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) if self.on_fail is not None: self.snr.on_fail.add(self.on_fail) @@ -231,7 +236,7 @@ class Cluster(object): if not self.snr.is_connected(): raise ConnectionDead('Could not connect within %s seconds' % (timeout, )) - def shutdown(self, wait=True): + def shutdown(self, wait=True): # type: (bool) -> None """ Terminate all connections, release resources - finish the job. :param wait: block until this is done diff --git a/coolamqp/clustering/single.py b/coolamqp/clustering/single.py index 1766327..f86ae2e 100644 --- a/coolamqp/clustering/single.py +++ b/coolamqp/clustering/single.py @@ -29,10 +29,10 @@ class SingleNodeReconnector(object): self.on_fail.add(self._on_fail) - def is_connected(self): + def is_connected(self): # type: () -> bool return self.connection is not None - def connect(self, timeout): + def connect(self, timeout): # type: (float) -> None assert self.connection is None # Initiate connecting - this order is very important! -- GitLab