diff --git a/coolamqp/attaches/consumer.py b/coolamqp/attaches/consumer.py index fc905134028860fce984a997fd343a7190b30e9a..beccd72c3e991b0def16373bf79344fb929011a0 100644 --- a/coolamqp/attaches/consumer.py +++ b/coolamqp/attaches/consumer.py @@ -11,7 +11,7 @@ from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \ BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, HARD_ERRORS, \ BasicCancel from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch -from coolamqp.objects import Future +from concurrent.futures import Future from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.exceptions import AMQPError @@ -131,6 +131,7 @@ class Consumer(Channeler): return self.future_to_notify_on_dead else: self.future_to_notify_on_dead = Future() + self.future_to_notify_on_dead.set_running_or_notify_cancel() self.cancelled = True # you'll blow up big next time you try to use this consumer if you can't cancel, but just close @@ -154,7 +155,7 @@ class Consumer(Channeler): # notify the future if self.future_to_notify is not None: - self.future_to_notify.set_result() + self.future_to_notify.set_result(None) self.future_to_notify = None else: @@ -233,7 +234,7 @@ class Consumer(Channeler): if self.future_to_notify_on_dead: # notify it was cancelled logger.info('Consumer successfully cancelled') - self.future_to_notify_on_dead.set_result() + self.future_to_notify_on_dead.set_result(None) if should_retry: diff --git a/coolamqp/attaches/declarer.py b/coolamqp/attaches/declarer.py index 097726953d647d7aef036198a905913217605c2e..c3e59cb555720e71f3650239c2db17b82b39e2df 100644 --- a/coolamqp/attaches/declarer.py +++ b/coolamqp/attaches/declarer.py @@ -10,8 +10,8 @@ from coolamqp.framing.definitions import ChannelOpenOk, ExchangeDeclare, Exchang QueueDeclareOk, ChannelClose from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, Synchronized - -from coolamqp.objects import Future, Exchange, Queue, Callable +from concurrent.futures import Future +from coolamqp.objects import Exchange, Queue, Callable from coolamqp.exceptions import AMQPError, ConnectionDead logger = logging.getLogger(__name__) @@ -69,7 +69,7 @@ class Operation(object): self.declarer.on_discard(self.obj) else: if self.fut is not None: - self.fut.set_result() + self.fut.set_result(None) self.fut = None self.declarer.on_operation_done() @@ -166,6 +166,7 @@ class Declarer(Channeler, Synchronized): raise ValueError('Cannot declare anonymous queue') fut = Future() + fut.set_running_or_notify_cancel() if persistent: if obj not in self.declared: diff --git a/coolamqp/attaches/publisher.py b/coolamqp/attaches/publisher.py index bfd3bf2e2d4a2f0328f119c82fa5fa53d007871b..e4239ae4061b75b8cd02a30fdf1672c4c37e3678 100644 --- a/coolamqp/attaches/publisher.py +++ b/coolamqp/attaches/publisher.py @@ -29,7 +29,8 @@ from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE from coolamqp.uplink import PUBLISHER_CONFIRMS, MethodWatch, FailWatch from coolamqp.attaches.utils import AtomicTagger, FutureConfirmableRejectable, Synchronized -from coolamqp.objects import Future, Exchange +from concurrent.futures import Future +from coolamqp.objects import Exchange logger = logging.getLogger(__name__) @@ -146,15 +147,11 @@ class Publisher(Channeler, Synchronized): while len(self.messages) > 0: msg, xchg, rk, fut = self.messages.popleft() - if fut.cancelled: - # Ok, don't do this. - fut.set_cancel() - continue + if not fut.set_running_or_notify_cancel(): + continue # cancelled self.tagger.deposit(self.tagger.get_key(), FutureConfirmableRejectable(fut)) - assert isinstance(xchg, (six.binary_type, six.text_type)) - self._pub(msg, xchg, rk) def _on_cnpub_delivery(self, payload): diff --git a/coolamqp/attaches/utils.py b/coolamqp/attaches/utils.py index 1940f92bd80e05a066944d9bf900f438a3125b13..bfb90acd5cee04286c19ce03fa23e7adbdb8322a 100644 --- a/coolamqp/attaches/utils.py +++ b/coolamqp/attaches/utils.py @@ -35,7 +35,7 @@ class FutureConfirmableRejectable(ConfirmableRejectable): self.future = future def confirm(self): - self.future.set_result() + self.future.set_result(None) def reject(self): self.future.set_exception(Exception()) diff --git a/coolamqp/clustering/cluster.py b/coolamqp/clustering/cluster.py index e44608a639e6e9a0f5c25dbf2bde33aa2eead585..b45ec09df8478d331c003bc1816953276a757825 100644 --- a/coolamqp/clustering/cluster.py +++ b/coolamqp/clustering/cluster.py @@ -10,8 +10,8 @@ import time from coolamqp.uplink import ListenerThread from coolamqp.clustering.single import SingleNodeReconnector from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer -from coolamqp.objects import Future, Exchange - +from coolamqp.objects import Exchange +from concurrent.futures import Future from coolamqp.clustering.events import ConnectionLost, MessageReceived, NothingMuch @@ -91,6 +91,7 @@ class Cluster(object): :return: a tuple (Consumer instance, and a Future), that tells, when consumer is ready """ fut = Future() + fut.set_running_or_notify_cancel() # it's running right now on_message = on_message or (lambda rmsg: self.events.put_nowait(MessageReceived(rmsg))) con = Consumer(queue, on_message, future_to_notify=fut, *args, **kwargs) self.attache_group.add(con) diff --git a/coolamqp/objects.py b/coolamqp/objects.py index 4979f8f2df37f9c4e7f00197ba6d2d50e73b30f8..1539d4b7a954a42718c23610aa16f510b1666b13 100644 --- a/coolamqp/objects.py +++ b/coolamqp/objects.py @@ -188,91 +188,6 @@ class Queue(object): return hash(self.name) -class Future(concurrent.futures.Future): - """ - INTERNAL USE ONLY - Future returned by asynchronous CoolAMQP methods. - - A strange future (only one thread may wait for it) - """ - __slots__ = ('lock', 'completed', 'successfully', '_result', 'running', 'callables', 'cancelled') - - def __init__(self): - self.lock = threading.Lock() - self.lock.acquire() - - self.completed = False - self.successfully = None - self._result = None - self.cancelled = False - self.running = True - - self.callables = [] - - def __repr__(self): - a = [] - if self.completed: - a.append(u'completed') - if self.successfully: - a.append(u'successfully') - if self.cancelled: - a.append(u'cancelled') - return u'<CoolAMQP Future %s>' % (u' '.join(a), ) - - def add_done_callback(self, fn): - self.callables.append(fn) - - def result(self, timeout=None): - assert timeout is None, u'Non-none timeouts not supported' - self.lock.acquire() - self.lock.release() - - if self.completed: - if self.successfully: - return self._result - else: - raise self._result - else: - if self.cancelled: - raise concurrent.futures.CancelledError() - else: - # it's invalid to release the lock, not do the future if it's not cancelled - raise RuntimeError(u'Invalid state!') - - def cancel(self): - """ - When cancelled, future will attempt not to complete (completed=False). - :return: - """ - self.cancelled = True - - def __finish(self, result, successful): - self.completed = True - self.successfully = successful - self._result = result - self.lock.release() - - for callable in self.callables: - try: - callable(self) - except Exception as e: - logger.error('Exception in base order future: %s', repr(e)) - except BaseException as e: - logger.critical('WILD NASAL DEMON APPEARED: %s', repr(e)) - - def set_result(self, result=None): - self.__finish(result, True) - - def set_exception(self, exception): - self.__finish(exception, False) - - def set_cancel(self): - """Executor has seen that this is cancelled, and discards it from list of things to do""" - assert self.cancelled - self.completed = False - self.lock.release() - - class NodeDefinition(object): """ Definition of a reachable AMQP node. diff --git a/coolamqp/uplink/handshake.py b/coolamqp/uplink/handshake.py index 00f55745d18c139cfca7be8ba9c1b0928f314367..b1b4f2c8b15d25f3e4bfffe011cdd4890d2931b3 100644 --- a/coolamqp/uplink/handshake.py +++ b/coolamqp/uplink/handshake.py @@ -22,7 +22,7 @@ CLIENT_DATA = [ # because RabbitMQ is some kind of a fascist and does not allow # these fields to be of type short-string (b'product', (b'CoolAMQP', 'S')), - (b'version', (b'0.86', 'S')), + (b'version', (b'0.87', 'S')), (b'copyright', (b'Copyright (C) 2016-2017 DMS Serwis', 'S')), (b'information', (b'Licensed under the MIT License.\nSee https://github.com/smok-serwis/coolamqp for details', 'S')), (b'capabilities', ([(capa, (True, 't')) for capa in SUPPORTED_EXTENSIONS], 'F')), diff --git a/setup.py b/setup.py index cc670f21f893c5853c4e1b20096942666d15f8d5..912aee5ad77b77cca04540d377f1d9f9f38040c1 100644 --- a/setup.py +++ b/setup.py @@ -4,12 +4,12 @@ from setuptools import setup setup(name=u'CoolAMQP', - version='0.86', + version='0.87', description=u'The fastest AMQP client', author=u'DMS Serwis s.c.', author_email=u'piotrm@smok.co', url=u'https://github.com/smok-serwis/coolamqp', - download_url='https://github.com/smok-serwis/coolamqp/archive/v0.86.zip', + download_url='https://github.com/smok-serwis/coolamqp/archive/v0.87.zip', keywords=['amqp', 'rabbitmq', 'client', 'network', 'ha', 'high availability'], packages=[ 'coolamqp',