diff --git a/CHANGELOG.md b/CHANGELOG.md index b1640e750ec0c34530798b910cfb3ecf5c096f07..93d89b0fbbf240cb5cc0ad34d89287fb7d1b2510 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,2 +1,4 @@ # v2.24.2 +* added DeferredValue + diff --git a/docs/coding/concurrent.rst b/docs/coding/concurrent.rst index beca4865372236b84111045fa663b6c9d18d9d08..efe13cba8015d6d0c77ab64cd127ac7d0a3abc92 100644 --- a/docs/coding/concurrent.rst +++ b/docs/coding/concurrent.rst @@ -2,6 +2,12 @@ Concurrent data structures ========================== +DeferredValue +============= + +.. autoclass:: satella.coding.concurrent.DeferredValue + :members: + CallableGroup ============= diff --git a/satella/__init__.py b/satella/__init__.py index f0400145ad1bea4d21a221a13e4ec0e22592385d..6a8e7b4da11b9c323bbecdb40c44e4e213f4945c 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.24.2a1' +__version__ = '2.24.2a2' diff --git a/satella/coding/concurrent/__init__.py b/satella/coding/concurrent/__init__.py index 0ff36b46195c4baba1f2e85728b855cc69d9aea4..c66fa442c68bd10bb61e14f06c63e077c364a72d 100644 --- a/satella/coding/concurrent/__init__.py +++ b/satella/coding/concurrent/__init__.py @@ -4,6 +4,7 @@ from .functions import parallel_execute, run_as_future from .futures import Future, WrappingFuture, InvalidStateError, FutureCollection from .id_allocator import IDAllocator, SequentialIssuer from .list_processor import parallel_construct +from .value import DeferredValue from .locked_dataset import LockedDataset from .locked_structure import LockedStructure from .monitor import MonitorList, Monitor, MonitorDict, RMonitor, MonitorSet @@ -21,4 +22,4 @@ __all__ = ['LockedDataset', 'Monitor', 'RMonitor', 'CallableGroup', 'TerminableT 'sync_threadpool', 'IntervalTerminableThread', 'Future', 'MonitorSet', 'WrappingFuture', 'InvalidStateError', 'PeekableQueue', 'parallel_construct', 'CancellableCallback', 'ThreadCollection', 'FutureCollection', - 'SequentialIssuer'] + 'SequentialIssuer', 'DeferredValue'] diff --git a/satella/coding/concurrent/sync.py b/satella/coding/concurrent/sync.py index 0025c177365821033cf68c60000b2d617fb02bd5..8be256015f06df0cf26e2e68171471922a872084 100644 --- a/satella/coding/concurrent/sync.py +++ b/satella/coding/concurrent/sync.py @@ -1,5 +1,5 @@ -import time import typing as tp +import time from concurrent.futures import wait, ThreadPoolExecutor from satella.coding.concurrent.atomic import AtomicNumber diff --git a/satella/coding/concurrent/value.py b/satella/coding/concurrent/value.py new file mode 100644 index 0000000000000000000000000000000000000000..d6f8ae2ba8fe43d4db158f2478a0784c63787ae0 --- /dev/null +++ b/satella/coding/concurrent/value.py @@ -0,0 +1,58 @@ +import time +import typing as tp +from threading import Event + +from satella.coding.typing import T +from satella.exceptions import WouldWaitMore + + +class _UNSET: + pass + + +class DeferredValue(tp.Generic[T]): + """ + A class that allows you to pass arguments that will be available later during runtime. + + Usage: + + >>> def thread1(value): + >>> print(value.val()) + + >>> val = DeferredValue() + >>> threading.Thread(target=thread1, args=(val, )).start() + >>> time.sleep(10) + >>> val.set_value(3) + """ + + __slots__ = 'val', 'lock' + + def __init__(self): + self.lock = Event() + self.val = _UNSET + + def set_value(self, va: T) -> None: + """ + Set a value and wake up all the threads waiting on it. + + :param va: value to set + :raises ValueError: value is already set + """ + if self.val is not _UNSET: + raise ValueError('Value curently set!') + self.val = va + self.lock.set() + + def value(self, timeout: tp.Optional[float] = None) -> T: + """ + Wait until value is available. + + :return: a value + :raises WouldWaitMore: timeout was given and it has expired + """ + if self.val is not _UNSET: + return self.val + tout = self.lock.wait(timeout) + if not tout: + raise WouldWaitMore() + return self.val diff --git a/tests/test_coding/test_concurrent.py b/tests/test_coding/test_concurrent.py index e1a2a0f39b4de5572d932a3411335e2778cea693..c2278655065904901b099ac0c0f359557f7c8404 100644 --- a/tests/test_coding/test_concurrent.py +++ b/tests/test_coding/test_concurrent.py @@ -14,7 +14,8 @@ from satella.coding.concurrent import TerminableThread, CallableGroup, Condition LockedStructure, AtomicNumber, Monitor, IDAllocator, call_in_separate_thread, Timer, \ parallel_execute, run_as_future, sync_threadpool, IntervalTerminableThread, Future, \ WrappingFuture, PeekableQueue, SequentialIssuer, CancellableCallback, ThreadCollection, \ - BogusTerminableThread, SingleStartThread, FutureCollection, MonitorSet, parallel_construct + BogusTerminableThread, SingleStartThread, FutureCollection, MonitorSet, parallel_construct, \ + DeferredValue from satella.coding.concurrent.futures import call_in_future, ExecutorWrapper from satella.coding.sequences import unique from satella.exceptions import WouldWaitMore, AlreadyAllocated, Empty @@ -22,6 +23,23 @@ from satella.exceptions import WouldWaitMore, AlreadyAllocated, Empty class TestConcurrent(unittest.TestCase): + def test_deferred_value(self): + val = DeferredValue() + + def thread(value): + self.assertRaises(WouldWaitMore, lambda: value.value(5)) + val2 = value.value() + self.assertEqual(val2, 4) + + thr = threading.Thread(target=thread, args=(val, )) + thr.start() + thr2 = threading.Thread(target=thread, args=(val, )) + thr2.start() + time.sleep(6) + val.set_value(4) + thr.join() + thr2.join() + def test_parallel_construct(self): mock = MockTracer() set_global_tracer(mock)