Skip to content
Snippets Groups Projects

Issue #3

Merged Piotr Maślanka requested to merge issue-#3 into milestone-2.0.0
12 files
+ 148
96
Compare changes
  • Side-by-side
  • Inline
Files
12
@@ -103,7 +103,12 @@ class Cluster(object):
dont_trace=False, arguments=None):
"""
Bind a queue to an exchange
:raise ValueError: cannot bind to anonymous queues
"""
if queue.anonymous:
raise ValueError('Canoot bind to anonymous queue')
if span is not None and not dont_trace:
child_span = self._make_span('bind', span)
else:
@@ -121,6 +126,8 @@ class Cluster(object):
"""
Declare a Queue/Exchange.
Non-anonymous queues have to be declared. Anonymous can't.
.. note:: Note that if your queue relates to an exchange that has not yet been declared you'll be faced with
AMQP error 404: NOT_FOUND, so try to declare your exchanges before your queues.
@@ -129,7 +136,10 @@ class Cluster(object):
:param span: optional parent span, if opentracing is installed
:param dont_trace: if True, a span won't be output
:return: Future
:raises ValueError: tried to declare an anonymous queue
"""
if isinstance(obj, Queue) and obj.anonymous:
raise ValueError('You cannot declare an anonymous queue!')
if span is not None and not dont_trace:
child_span = self._make_span('declare', span)
else:
@@ -183,6 +193,9 @@ class Cluster(object):
Take care not to lose the Consumer object - it's the only way to cancel a consumer!
.. note:: You don't need to explicitly declare queues and exchanges that you will be using beforehand,
this will do this for you on the same channel.
:param queue: Queue object, being consumed from right now.
Note that name of anonymous queue might change at any time!
:param on_message: callable that will process incoming messages
@@ -232,7 +245,6 @@ class Cluster(object):
def publish(self, message, # type: Message
exchange=None, # type: tp.Union[Exchange, str, bytes]
routing_key=u'', # type: tp.Union[str, bytes]
tx=None, # type: tp.Optional[bool]
confirm=None, # type: tp.Optional[bool]
span=None, # type: tp.Optional[opentracing.Span]
dont_trace=False # type: bool
@@ -246,9 +258,8 @@ class Cluster(object):
:param confirm: Whether to publish it using confirms/transactions.
If you choose so, you will receive a Future that can be used
to check it broker took responsibility for this message.
Note that if tx if False, and message cannot be delivered to broker at once,
Note that if confirm is False, and message cannot be delivered to broker at once,
it will be discarded
:param tx: deprecated, alias for confirm
:param span: optionally, current span, if opentracing is installed
:param dont_trace: if set to True, a span won't be generated
:return: Future to be finished on completion or None, is confirm/tx was not chosen
@@ -266,19 +277,8 @@ class Cluster(object):
if isinstance(routing_key, six.text_type):
routing_key = routing_key.encode('utf8')
if tx is not None: # confirm is a drop-in replacement. tx is unfortunately named
warnings.warn(u'Use confirm kwarg instead', DeprecationWarning)
if confirm is not None:
raise RuntimeError(
u'Using both tx= and confirm= at once does not make sense')
elif confirm is not None:
tx = confirm
else:
tx = False
try:
if tx:
if confirm:
clb = self.pub_tr
else:
clb = self.pub_na
Loading