# coding=UTF-8 """ THE object you interface with """ from __future__ import print_function, absolute_import, division import logging import time import typing as tp import warnings from concurrent.futures import Future import monotonic import six from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer from coolamqp.clustering.events import ConnectionLost, MessageReceived, \ NothingMuch, Event from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.exceptions import ConnectionDead from coolamqp.objects import Exchange from coolamqp.uplink import ListenerThread logger = logging.getLogger(__name__) THE_POPE_OF_NOPE = NothingMuch() class Cluster(object): """ Frontend for your AMQP needs. This has ListenerThread. Call .start() to connect to AMQP. It is not safe to fork() after .start() is called, but it's OK before. :param nodes: list of nodes, or a single node. For now, only one is supported. :type nodes: NodeDefinition instance or a list of NodeDefinition instances :param on_fail: callable/0 to call when connection fails in an unclean way. This is a one-shot :type on_fail: callable/0 :param extra_properties: refer to documentation in [/coolamqp/connection/connection.py] Connection.__init__ :param log_frames: an object that will have it's method .on_frame(timestamp, frame, direction) called upon receiving/sending a frame. Timestamp is UNIX timestamp, frame is AMQPFrame, direction is one of 'to_client', 'to_server' """ # Events you can be informed about ST_LINK_LOST = 0 # Link has been lost ST_LINK_REGAINED = 1 # Link has been regained def __init__(self, nodes, on_fail=None, extra_properties=None, log_frames=None): from coolamqp.objects import NodeDefinition if isinstance(nodes, NodeDefinition): nodes = [nodes] if len(nodes) > 1: raise NotImplementedError(u'Multiple nodes not supported yet') self.node, = nodes self.extra_properties = extra_properties self.log_frames = log_frames if on_fail is not None: def decorated(): if not self.listener.terminating: on_fail() self.on_fail = decorated else: self.on_fail = None def declare(self, obj, persistent=False): # type: (tp.Union[coolamqp.objects.Queue, coolamqp.objects.Exchange], bool) -> # concurrent.futures.Future """ Declare a Queue/Exchange :param obj: Queue/Exchange object :param persistent: should it be redefined upon reconnect? :return: Future """ return self.decl.declare(obj, persistent=persistent) def drain(self, timeout): # type: (float) -> Event """ Return an Event. :param timeout: time to wait for an event. 0 means return immediately. None means block forever :return: an Event instance. NothingMuch is returned when there's nothing within a given timoeout """ try: if timeout == 0: return self.events.get_nowait() else: return self.events.get(True, timeout) except six.moves.queue.Empty: return THE_POPE_OF_NOPE def consume(self, queue, on_message=None, *args, **kwargs): """ Start consuming from a queue. args and kwargs will be passed to Consumer constructor (coolamqp.attaches.consumer.Consumer). Don't use future_to_notify - it's done here! Take care not to lose the Consumer object - it's the only way to cancel a consumer! :param queue: Queue object, being consumed from right now. Note that name of anonymous queue might change at any time! :param on_message: callable that will process incoming messages if you leave it at None, messages will be .put into self.events :type on_message: callable(ReceivedMessage instance) or None :return: a tuple (Consumer instance, and a Future), that tells, when consumer is ready """ fut = Future() fut.set_running_or_notify_cancel() # it's running right now on_message = on_message or ( lambda rmsg: self.events.put_nowait(MessageReceived(rmsg))) con = Consumer(queue, on_message, future_to_notify=fut, *args, **kwargs) self.attache_group.add(con) return con, fut def delete_queue(self, queue): # type: (coolamqp.objects.Queue) -> concurrent.futures.Future """ Delete a queue. :param queue: Queue instance that represents what to delete :return: a Future (will succeed with None or fail with AMQPError) """ return self.decl.delete_queue(queue) def publish(self, message, exchange=None, routing_key=u'', tx=None, confirm=None): """ Publish a message. :param message: Message to publish :type message: coolamqp.objects.Message :param exchange: exchange to use. Default is the "direct" empty-name exchange. :type exchange: unicode/bytes (exchange name) or Exchange object. :param routing_key: routing key to use :type routing_key: tp.Union[str, bytes] :param confirm: Whether to publish it using confirms/transactions. If you choose so, you will receive a Future that can be used to check it broker took responsibility for this message. Note that if tx if False, and message cannot be delivered to broker at once, it will be discarded :type confirm: tp.Optional[bool] :param tx: deprecated, alias for confirm :type tx: tp.Optional[bool] :return: Future or None """ if isinstance(exchange, Exchange): exchange = exchange.name.encode('utf8') elif exchange is None: exchange = b'' elif isinstance(exchange, six.text_type): exchange = exchange.encode('utf8') if isinstance(routing_key, six.text_type): routing_key = routing_key.encode('utf8') if tx is not None: # confirm is a drop-in replacement. tx is unfortunately named warnings.warn(u'Use confirm kwarg instead', DeprecationWarning) if confirm is not None: raise RuntimeError( u'Using both tx= and confirm= at once does not make sense') elif confirm is not None: tx = confirm else: tx = False try: return (self.pub_tr if tx else self.pub_na).publish(message, exchange, routing_key) except Publisher.UnusablePublisher: raise NotImplementedError( u'Sorry, this functionality is not yet implemented!') def start(self, wait=True, timeout=10.0, log_frames=None): # type: (bool, float, bool) -> None """ Connect to broker. Initialize Cluster. Only after this call is Cluster usable. It is not safe to fork after this. :param wait: block until connection is ready :param timeout: timeout to wait until the connection is ready. If it is not, a ConnectionDead error will be raised :raise RuntimeError: called more than once :raise ConnectionDead: failed to connect within timeout :param log_frames: whether to keep a log of sent/received frames in self.log_frames """ try: self.listener except AttributeError: pass else: raise RuntimeError(u'This was already called!') self.listener = ListenerThread() self.attache_group = AttacheGroup() self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.* self.snr = SingleNodeReconnector(self.node, self.attache_group, self.listener, self.extra_properties, log_frames) self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost())) if self.on_fail is not None: self.snr.on_fail.add(self.on_fail) # Spawn a transactional publisher and a noack publisher self.pub_tr = Publisher(Publisher.MODE_CNPUB) self.pub_na = Publisher(Publisher.MODE_NOACK) self.decl = Declarer() self.attache_group.add(self.pub_tr) self.attache_group.add(self.pub_na) self.attache_group.add(self.decl) self.listener.init() self.listener.start() self.snr.connect(timeout=timeout) if wait: # this is only going to take a short amount of time, so we're fine with polling start_at = monotonic.monotonic() while not self.attache_group.is_online() and monotonic.monotonic() - start_at < timeout: time.sleep(0.1) if not self.attache_group.is_online(): raise ConnectionDead('Could not connect within %s seconds' % (timeout,)) def shutdown(self, wait=True): # type: (bool) -> None """ Terminate all connections, release resources - finish the job. :param wait: block until this is done :raise RuntimeError: if called without start() being called first """ try: self.listener except AttributeError: raise RuntimeError(u'shutdown without start') logger.info('Commencing shutdown') self.listener.terminate() if wait: self.listener.join()