diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index 43769cf5215f9f8a2c29aaec925c7b9bc66e9500..c50d6e6a6d1af570451b4f4505f09333a0618ae1 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -275,7 +275,7 @@ class MessageReceiver(object): if ack_expected: self.acks_pending.add(self.bdeliver.delivery_tag) - from coolamqp.messages import ReceivedMessage + from coolamqp.objects import ReceivedMessage rm = ReceivedMessage( b''.join(map(six.binary_type, self.body)), #todo inefficient as FUUUCK self.bdeliver.exchange, diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index b366c341bede731432ef079d6a7e8d7bfdd8f219..453164a2bd7f75f83969c650e5c4da48538df871 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -27,8 +27,7 @@ from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.uplink import PUBLISHER_CONFIRMS, MethodWatch from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable -from coolamqp.futures import Future - +from coolamqp.objects import Future logger = logging.getLogger(__name__) @@ -118,6 +117,11 @@ class Publisher(Channeler): while len(self.messages) > 0: msg, xchg, rk, fut = self.messages.popleft() + if fut.cancelled: + # Ok, don't do this. + fut.set_cancel() + continue + self.tagger.deposit(self.tagger.get_key(), FutureConfirmableRejectable(fut)) self._pub(msg, xchg, rk) @@ -143,6 +147,8 @@ class Publisher(Channeler): or exception (a plain Exception instance). Exception will happen when broker NACKs the message: that, according to RabbitMQ, means an internal error in Erlang process. + Returned Future can be cancelled - this will prevent from sending the message, if it hasn't commenced yet. + If mode is MODE_NOACK: this function returns None. Messages are dropped on the floor if there's no connection. diff --git a/coolamqp/backends/__init__.py b/coolamqp/backends/__init__.py deleted file mode 100644 index b7d705d70cb3bab249f88d6926223aae4420f817..0000000000000000000000000000000000000000 --- a/coolamqp/backends/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# coding=UTF-8 -from coolamqp.backends.coolamqp import CoolAMQPBackend -from coolamqp.backends.base import AMQPError, ConnectionFailedError, RemoteAMQPError, Cancelled -""" -Backend is a legacy way to access CoolAMQP. -""" \ No newline at end of file diff --git a/coolamqp/backends/base.py b/coolamqp/backends/base.py deleted file mode 100644 index 47e4af6b103b3a9bf2275866c050f0e306958aab..0000000000000000000000000000000000000000 --- a/coolamqp/backends/base.py +++ /dev/null @@ -1,141 +0,0 @@ -# coding=UTF-8 -class AMQPError(Exception): - """Connection errors and bawking of AMQP server""" - code = None - reply_text = 'AMQP error' - - def __repr__(self): - return u'AMQPError()' - - -class ConnectionFailedError(AMQPError): - """Connection to broker failed""" - reply_text = 'failed connecting to broker' - - def __repr__(self): - return u'ConnectionFailedError("%s")' % map(repr, (self.reply_text, )) - - -class Cancelled(Exception): - """Cancel ordered by user""" - - -class RemoteAMQPError(AMQPError): - """ - Remote AMQP broker responded with an error code - """ - def __init__(self, code, text=None): - """ - :param code: AMQP error code - :param text: AMQP error text (optional) - """ - AMQPError.__init__(self, text) - self.code = code - self.text = text or 'server sent back an error' - - def __repr__(self): - return u'RemoteAMQPError(%s, %s)' % map(repr, (self.code, self.text)) - -class AMQPBackend(object): - """ - Dummy AMQP backend. - - Every method may raise either ConnectionFailedError (if connection failed) - or RemoteAMQPError (if broker returned an error response) - """ - - def __init__(self, cluster_node, cluster_handler_thread): - """ - Connects to an AMQP backend. - """ - self.cluster_handler_thread = cluster_handler_thread - - def process(self, max_time=10): - """ - Do bookkeeping, process messages, etc. - :param max_time: maximum time in seconds this call can take - :raises ConnectionFailedError: if connection failed in the meantime - """ - - def exchange_declare(self, exchange): - """ - Declare an exchange - :param exchange: Exchange object - """ - - def exchange_delete(self, exchange): - """ - Delete an exchange - :param exchange: Exchange object - """ - - def queue_bind(self, queue, exchange, routing_key=''): - """ - Bind a queue to an exchange - :param queue: Queue object - :param exchange: Exchange object - :param routing_key: routing key to use - """ - - def queue_delete(self, queue): - """ - Delete a queue. - - :param queue: Queue - """ - - - def queue_declare(self, queue): - """ - Declare a queue. - - This will change queue's name if anonymous - :param queue: Queue - """ - - def basic_cancel(self, consumer_tag): - """ - Cancel consuming, identified by a consumer_tag - :param consumer_tag: consumer_tag to cancel - """ - - def basic_consume(self, queue, no_ack=False): - """ - Start consuming from a queue - :param queue: Queue object - :param no_ack: Messages will not need to be ack()ed for this queue - """ - - def basic_ack(self, delivery_tag): - """ - ACK a message. - :param delivery_tag: delivery tag to ack - """ - - def basic_qos(self, prefetch_size, prefetch_count, global_): - """ - Issue a basic.qos(prefetch_size, prefetch_count, True) against broker - :param prefetch_size: prefetch window size in octets - :param prefetch_count: prefetch window in terms of whole messages - """ - - def basic_reject(self, delivery_tag): - """ - Reject a message - :param delivery_tag: delivery tag to reject - """ - - def basic_publish(self, message, exchange, routing_key): - """ - Send a message - :param message: Message object to send - :param exchange: Exchange object to publish to - :param routing_key: routing key to use - """ - - def shutdown(self): - """ - Close this connection. - This is not allowed to return anything or raise - """ - self.cluster_handler_thread = None # break GC cycles diff --git a/coolamqp/backends/coolamqp.py b/coolamqp/backends/coolamqp.py deleted file mode 100644 index 8ca02034f6401bbc106f2a0d3a2ae7a3dcf81dcd..0000000000000000000000000000000000000000 --- a/coolamqp/backends/coolamqp.py +++ /dev/null @@ -1,114 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function - - -from coolamqp.backends.base import AMQPBackend - - -# Create a global ListenerThread -from coolamqp.uplink import ListenerThread -GLOBALS = { - 'listener': ListenerThread() -} - - -class CoolAMQPBackend(AMQPBackend): - """ - A backend utilizing CoolAMQP's coolamqp.attaches and coolamqp.connection. - Backend starts with creating a connection, and ends with blanging it. - """ - def __init__(self, cluster_node, cluster_handler_thread): - """ - Connects to an AMQP backend. - """ - self.cluster_handler_thread = cluster_handler_thread - - def process(self, max_time=10): - """ - Do bookkeeping, process messages, etc. - :param max_time: maximum time in seconds this call can take - :raises ConnectionFailedError: if connection failed in the meantime - """ - - def exchange_declare(self, exchange): - """ - Declare an exchange - :param exchange: Exchange object - """ - - def exchange_delete(self, exchange): - """ - Delete an exchange - :param exchange: Exchange object - """ - - def queue_bind(self, queue, exchange, routing_key=''): - """ - Bind a queue to an exchange - :param queue: Queue object - :param exchange: Exchange object - :param routing_key: routing key to use - """ - - def queue_delete(self, queue): - """ - Delete a queue. - - :param queue: Queue - """ - - - def queue_declare(self, queue): - """ - Declare a queue. - - This will change queue's name if anonymous - :param queue: Queue - """ - - def basic_cancel(self, consumer_tag): - """ - Cancel consuming, identified by a consumer_tag - :param consumer_tag: consumer_tag to cancel - """ - - def basic_consume(self, queue, no_ack=False): - """ - Start consuming from a queue - :param queue: Queue object - :param no_ack: Messages will not need to be ack()ed for this queue - """ - - def basic_ack(self, delivery_tag): - """ - ACK a message. - :param delivery_tag: delivery tag to ack - """ - - def basic_qos(self, prefetch_size, prefetch_count, global_): - """ - Issue a basic.qos(prefetch_size, prefetch_count, True) against broker - :param prefetch_size: prefetch window size in octets - :param prefetch_count: prefetch window in terms of whole messages - """ - - def basic_reject(self, delivery_tag): - """ - Reject a message - :param delivery_tag: delivery tag to reject - """ - - def basic_publish(self, message, exchange, routing_key): - """ - Send a message - :param message: Message object to send - :param exchange: Exchange object to publish to - :param routing_key: routing key to use - """ - - def shutdown(self): - """ - Close this connection. - This is not allowed to return anything or raise - """ - self.cluster_handler_thread = None # break GC cycles diff --git a/coolamqp/futures.py b/coolamqp/futures.py deleted file mode 100644 index 661d2e7c352f1c733af58d44ad85de8826a7f140..0000000000000000000000000000000000000000 --- a/coolamqp/futures.py +++ /dev/null @@ -1,71 +0,0 @@ -# coding=UTF-8 -from __future__ import absolute_import, division, print_function -import concurrent.futures -import threading -import logging - - -logger = logging.getLogger(__name__) - - -class Future(concurrent.futures.Future): - """ - A strange future (only one thread may wait for it) - """ - def __init__(self): - self.lock = threading.Lock() - self.lock.acquire() - - self.completed = False - self.successfully = None - self._result = None - self.cancelled = False - self.running = True - - self.callables = [] - - def add_done_callback(self, fn): - self.callables.append(fn) - - def result(self, timeout=None): - assert timeout is None, u'Non-none timeouts not supported' - self.lock.acquire() - - if self.completed: - if self.successfully: - return self._result - else: - raise self._result - else: - if self.cancelled: - raise concurrent.futures.CancelledError() - else: - # it's invalid to release the lock, not do the future if it's not cancelled - raise RuntimeError(u'Invalid state!') - - def cancel(self): - """ - When cancelled, future will attempt not to complete (completed=False). - :return: - """ - self.cancelled = True - - def __finish(self, result, successful): - self.completed = True - self.successfully = successful - self._result = result - self.lock.release() - - for callable in self.callables: - try: - callable(self) - except Exception as e: - logger.error('Exception in base order future: %s', repr(e)) - except BaseException as e: - logger.critical('WILD NASAL DEMON APPEARED: %s', repr(e)) - - def set_result(self, result=None): - self.__finish(result, True) - - def set_exception(self, exception): - self.__finish(exception, False) diff --git a/coolamqp/messages.py b/coolamqp/objects.py similarity index 59% rename from coolamqp/messages.py rename to coolamqp/objects.py index 380f0bce746e41d2b50920316abf92262266c2a9..ed06cb252f07f19e6216ea3440ff0613b25cd68e 100644 --- a/coolamqp/messages.py +++ b/coolamqp/objects.py @@ -1,18 +1,29 @@ # coding=UTF-8 +""" +Core objects used in CoolAMQP +""" +import threading import uuid import six +import logging +import concurrent.futures from coolamqp.framing.definitions import BasicContentPropertyList as MessageProperties +__all__ = ('Message', 'ReceivedMessage', 'MessageProperties', 'Queue', 'Exchange', 'Future') -__all__ = ('Message', 'ReceivedMessage', 'MessageProperties') - +logger = logging.getLogger(__name__) EMPTY_PROPERTIES = MessageProperties() class Message(object): - """AMQP message object""" + """ + An AMQP message. Has a binary body, and some properties. + + Properties is a highly regularized class - see coolamqp.framing.definitions.BasicContentPropertyList + for a list of possible properties. + """ def __init__(self, body, properties=None): """ @@ -34,7 +45,14 @@ class Message(object): LAMBDA_NONE = lambda: None class ReceivedMessage(Message): - """Message as received from AMQP system""" + """ + A message that was received from the AMQP broker. + + It additionally has an exchange name, routing key used, it's delivery tag, + and methods for ack() or nack(). + + Note that if the consumer that generated this message was no_ack, .ack() and .nack() are no-ops. + """ def __init__(self, body, exchange_name, routing_key, properties=None, @@ -125,3 +143,77 @@ class Queue(object): def __hash__(self): return hash(self.name) + + +class Future(concurrent.futures.Future): + """ + Future returned by asynchronous CoolAMQP methods. + + A strange future (only one thread may wait for it) + """ + __slots__ = ('lock', 'completed', 'successfully', '_result', 'running', 'callables', 'cancelled') + + + def __init__(self): + self.lock = threading.Lock() + self.lock.acquire() + + self.completed = False + self.successfully = None + self._result = None + self.cancelled = False + self.running = True + + self.callables = [] + + def add_done_callback(self, fn): + self.callables.append(fn) + + def result(self, timeout=None): + assert timeout is None, u'Non-none timeouts not supported' + self.lock.acquire() + + if self.completed: + if self.successfully: + return self._result + else: + raise self._result + else: + if self.cancelled: + raise concurrent.futures.CancelledError() + else: + # it's invalid to release the lock, not do the future if it's not cancelled + raise RuntimeError(u'Invalid state!') + + def cancel(self): + """ + When cancelled, future will attempt not to complete (completed=False). + :return: + """ + self.cancelled = True + + def __finish(self, result, successful): + self.completed = True + self.successfully = successful + self._result = result + self.lock.release() + + for callable in self.callables: + try: + callable(self) + except Exception as e: + logger.error('Exception in base order future: %s', repr(e)) + except BaseException as e: + logger.critical('WILD NASAL DEMON APPEARED: %s', repr(e)) + + def set_result(self, result=None): + self.__finish(result, True) + + def set_exception(self, exception): + self.__finish(exception, False) + + def set_cancel(self): + """Executor has seen that this is cancelled, and discards it from list of things to do""" + assert self.cancelled + self.completed = False + self.lock.release() diff --git a/tests/run.py b/tests/run.py index a54aab3771946ea00301d33fd6054efe318e4fed..74469dbfd7f8e511772a1c2330aae90d4f1a83ba 100644 --- a/tests/run.py +++ b/tests/run.py @@ -2,12 +2,12 @@ from __future__ import absolute_import, division, print_function from coolamqp.uplink import ListenerThread import time, logging, threading -from coolamqp.messages import Message, MessageProperties +from coolamqp.objects import Message, MessageProperties from coolamqp.connection import NodeDefinition from coolamqp.uplink import Connection from coolamqp.attaches import Consumer, Publisher, MODE_NOACK, MODE_CNPUB -from coolamqp.messages import Queue +from coolamqp.objects import Queue import time