Skip to content
Snippets Groups Projects
Commit 6211e714 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

added FutureCollection

parent 82277cea
No related branches found
No related tags found
No related merge requests found
# v2.17.6
* added `synchronize_on_attribute` for `Monitor`
* added `FutureCollection`
......@@ -166,6 +166,12 @@ AtomicNumber
.. autoclass:: satella.coding.concurrent.AtomicNumber
:members:
FutureCollection
================
.. autoclass:: satella.coding.concurrent.FutureCollection
:members:
Condition
=========
......
__version__ = '2.17.6a2'
__version__ = '2.17.6a3'
from .atomic import AtomicNumber
from .callablegroup import CallableGroup, CallNoOftenThan, CancellableCallback
from .functions import parallel_execute, run_as_future
from .futures import Future, WrappingFuture, InvalidStateError
from .futures import Future, WrappingFuture, InvalidStateError, FutureCollection
from .id_allocator import IDAllocator, SequentialIssuer
from .locked_dataset import LockedDataset
from .locked_structure import LockedStructure
......@@ -19,5 +19,5 @@ __all__ = ['LockedDataset', 'Monitor', 'RMonitor', 'CallableGroup', 'TerminableT
'BogusTerminableThread', 'Timer', 'parallel_execute', 'run_as_future',
'sync_threadpool', 'IntervalTerminableThread', 'Future',
'WrappingFuture', 'InvalidStateError', 'PeekableQueue',
'CancellableCallback', 'ThreadCollection',
'CancellableCallback', 'ThreadCollection', 'FutureCollection',
'SequentialIssuer']
from .call_in_future import call_in_future
from .futures import Future, WrappingFuture, InvalidStateError, wrap_if
from .wrapped_executor import ExecutorWrapper
from .collection import FutureCollection
__all__ = ['Future', 'WrappingFuture', 'InvalidStateError', 'call_in_future',
'ExecutorWrapper', 'wrap_if']
'ExecutorWrapper', 'wrap_if', 'FutureCollection']
import typing as tp
from concurrent.futures import Future
class FutureCollection:
"""
A set of futures sharing a common result, or a common exception.
This overloads the operator + for making an union of futures. It can be used
with either instances of :class:`~satella.coding.concurrent.FutureCollection`
or normal futures.
"""
__slots__ = 'futures',
def __init__(self, futures: tp.Sequence[Future]):
if not isinstance(futures, list):
futures = list(futures)
self.futures = futures
def __add__(self, other: tp.Union['FutureCollection', Future, tp.Sequence[Future]]):
if isinstance(other, FutureCollection):
return FutureCollection(self.futures + other.futures)
elif isinstance(other, tp.Sequence):
return FutureCollection(self.futures + list(other))
else:
fc = self.futures[:]
fc.append(other)
return FutureCollection(fc)
def set_running_or_notify_cancel(self) -> None:
"""
Call :code:`set_running_or_notify_cancel` on the futures
"""
for future in self.futures:
future.set_running_or_notify_cancel()
def __iadd__(self, other: tp.Union['FutureCollection', Future, tp.Sequence[Future]]):
if isinstance(other, FutureCollection):
self.futures.extend(other.futures)
return self
elif isinstance(other, tp.Sequence):
self.futures.extend(other)
return self
else:
self.futures.append(other)
return self
def add(self, future: Future) -> 'FutureCollection':
"""
Add a future
:param future: a Future to add
:return: self
"""
self.futures.append(future)
return self
def set_result(self, result) -> None:
"""
Set a result for all futures
:param result: result to set
"""
for future in self.futures:
future.set_result(result)
def set_exception(self, exc) -> None:
"""
Set an exception for all futures
:param exc: exception instance to set
"""
for future in self.futures:
future.set_exception(exc)
def result(self) -> list:
"""
Return the result of all futures, as a list.
This will block until the results are available.
:return: list containing results of all futures
"""
return [fut.result() for fut in self.futures]
......@@ -11,7 +11,7 @@ from satella.coding.concurrent import TerminableThread, CallableGroup, Condition
LockedStructure, AtomicNumber, Monitor, IDAllocator, call_in_separate_thread, Timer, \
parallel_execute, run_as_future, sync_threadpool, IntervalTerminableThread, Future, \
WrappingFuture, PeekableQueue, SequentialIssuer, CancellableCallback, ThreadCollection, \
BogusTerminableThread, SingleStartThread
BogusTerminableThread, SingleStartThread, FutureCollection
from satella.coding.concurrent.futures import call_in_future, ExecutorWrapper
from satella.coding.sequences import unique
from satella.exceptions import WouldWaitMore, AlreadyAllocated, Empty
......@@ -19,6 +19,25 @@ from satella.exceptions import WouldWaitMore, AlreadyAllocated, Empty
class TestConcurrent(unittest.TestCase):
def test_future_collection(self):
fut1 = PythonFuture()
fut2 = PythonFuture()
fs = [PythonFuture(), PythonFuture()]
fut3 = PythonFuture()
fc = FutureCollection([fut1, fut2])
fc.add(fut3)
fc += fs
fc.set_running_or_notify_cancel()
@call_in_separate_thread()
def set_future_collection():
fc.set_result(None)
set_future_collection()
fc.result()
def test_single_start_thread(self):
dct = {'a': 0}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment