Skip to content
Snippets Groups Projects
Commit f64bde46 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

fixes to MemoryPressureManager

added `CancellableCallback`
`CallbackGroup` will now support `CancellableCallback`s as well
`MemoryPressureManager` will now use `CancellableCallbacks`
parent f4b9d6ad
No related branches found
Tags v2.14.21
No related merge requests found
......@@ -3,3 +3,7 @@
* fixed docs for `GlobalRelativeValue`
* `MemoryPressureManager` will name it's thread
* enabled _read_in_file to return None as default value
* added `CancellableCallback`
* `CallbackGroup` will now support `CancellableCallback`s as well
* `MemoryPressureManager` will now use `CancellableCallbacks`
......@@ -14,6 +14,12 @@ CallNoOftenThan
.. autoclass:: satella.coding.concurrent.CallNoOftenThan
:members:
CancellableCallback
-------------------
.. autoclass:: satella.coding.concurrent.CancellableCallback
:members:
LockedDataset
=============
......
__version__ = '2.14.21_a3'
__version__ = '2.14.21'
from .atomic import AtomicNumber
from .callablegroup import CallableGroup, CallNoOftenThan
from .callablegroup import CallableGroup, CallNoOftenThan, CancellableCallback
from .functions import parallel_execute, run_as_future
from .futures import Future, WrappingFuture, InvalidStateError
from .id_allocator import IDAllocator, SequentialIssuer
......@@ -18,4 +18,5 @@ __all__ = ['LockedDataset', 'Monitor', 'RMonitor', 'CallableGroup', 'TerminableT
'BogusTerminableThread', 'Timer', 'parallel_execute', 'run_as_future',
'sync_threadpool', 'IntervalTerminableThread', 'Future',
'WrappingFuture', 'InvalidStateError', 'PeekableQueue',
'CancellableCallback',
'SequentialIssuer']
import collections
import copy
import weakref
import time
import typing as tp
from satella.coding.deleters import DictDeleter
from satella.coding.typing import T, NoArgCallable
class CancellableCallback:
"""
A callback that you can cancel.
Useful for event-driven software that looks through lists of callbacks and determines whether
to delete them or further invalidate in some other way.
If called, the function itself won't be called as well if this was cancelled. In this case
a None will be returned instead of the result of callback_fun()
This short circuits __bool__ to return not .cancelled.
Hashable and __eq__-able by identity.
:param callback_fun: function to call
:ivar cancelled: whether this callback was cancelled (bool)
"""
__slots__ = ('cancelled', 'callback_fun')
def __bool__(self) -> bool:
return not self.cancelled
def __hash__(self) -> int:
return hash(id(self))
def __init__(self, callback_fun: tp.Callable):
self.callback_fun = callback_fun
self.cancelled = False
def __call__(self, *args, **kwargs):
if not self.cancelled:
return self.callback_fun(*args, **kwargs)
def cancel(self) -> None:
"""
Cancel this callback.
"""
self.cancelled = True
class CallableGroup(tp.Generic[T]):
"""
This behaves like a function, but allows to add other functions to call
......@@ -22,7 +65,8 @@ class CallableGroup(tp.Generic[T]):
will be propagated.
"""
__slots__ = ('callables', 'gather', 'swallow_exceptions')
__slots__ = ('callables', 'gather', 'swallow_exceptions',
'_has_cancellable_callbacks')
def __init__(self, gather: bool = True, swallow_exceptions: bool = False):
"""
......@@ -35,12 +79,48 @@ class CallableGroup(tp.Generic[T]):
self.callables = collections.OrderedDict() # type: tp.Dict[tp.Callable, bool]
self.gather = gather # type: bool
self.swallow_exceptions = swallow_exceptions # type: bool
self._has_cancellable_callbacks = False
@property
def has_cancelled_callbacks(self) -> bool:
"""
Check whether this has any
:class:`~satella.coding.concurrent.CancellableCallback` instances and whether any
of them was cancelled
"""
if not self._has_cancellable_callbacks:
return False
for clb in self.callables:
if isinstance(clb, CancellableCallback) and not clb:
return True
return False
def remove_cancelled(self) -> None:
"""
Remove it's entries that are CancelledCallbacks and that were cancelled
"""
if not self.has_cancelled_callbacks:
return
with DictDeleter(self.callables) as dd:
for callable_ in dd:
if isinstance(callable_, CancellableCallback) and not callable_:
dd.delete()
def add(self, callable_: NoArgCallable[T], one_shot: bool = False):
def add(self, callable_: tp.Union[CancellableCallback, NoArgCallable[T]],
one_shot: bool = False):
"""
Add a callable.
Can be a :class:`~satella.coding.concurrent.CancellableCallback`, in that case
method :meth:`~satella.coding.concurrent.CallableGroup.remove_cancelled` might
be useful.
:param callable_: callable
:param one_shot: if True, callable will be unregistered after single call
"""
if isinstance(callable_, CancellableCallback):
self._has_cancellable_callbacks = True
from ..structures.hashable_objects import HashableWrapper
callable_ = HashableWrapper(callable_)
if callable_ in self.callables:
......@@ -56,6 +136,9 @@ class CallableGroup(tp.Generic[T]):
:return: list of results if gather was set, else None
"""
if self.has_cancelled_callbacks:
self.remove_cancelled()
callables = copy.copy(self.callables)
results = []
......
......@@ -5,7 +5,7 @@ import typing as tp
import psutil
from satella.coding.concurrent import CallableGroup, CallNoOftenThan
from satella.coding.concurrent import CallableGroup, CallNoOftenThan, CancellableCallback
from satella.coding.concurrent import TerminableThread
from satella.coding.structures import Singleton
from satella.time import measure
......@@ -98,6 +98,15 @@ class MemoryPressureManager(TerminableThread):
if self.stopped:
return time.sleep(self.check_interval)
for cg in self.callbacks_on_entered:
cg.remove_cancelled()
for cg in self.callbacks_on_left:
cg.remove_cancelled()
for cg in self.callbacks_on_remains:
cg.remove_cancelled()
with measure() as measurement:
severity_level = self.calculate_severity_level()
if self.current_severity_level != severity_level:
......@@ -135,8 +144,9 @@ class MemoryPressureManager(TerminableThread):
"""
def outer(fun):
MemoryPressureManager().callbacks_on_entered[severity].add(fun)
return fun
cc = CancellableCallback(fun)
MemoryPressureManager().callbacks_on_entered[severity].add(cc)
return cc
return outer
......@@ -155,8 +165,9 @@ class MemoryPressureManager(TerminableThread):
"""
def outer(fun):
MemoryPressureManager().callbacks_on_left[severity].add(fun)
return fun
cc = CancellableCallback(fun)
MemoryPressureManager().callbacks_on_left[severity].add(cc)
return cc
return outer
......@@ -174,8 +185,9 @@ class MemoryPressureManager(TerminableThread):
"""
def outer(fun):
MemoryPressureManager().callbacks_on_remains[severity].add(
CallNoOftenThan(call_no_more_often_than, fun))
return fun
cno = CallNoOftenThan(call_no_more_often_than, fun)
cc = CancellableCallback(cno)
MemoryPressureManager().callbacks_on_remains[severity].add(cc)
return cc
return outer
......@@ -10,7 +10,7 @@ from concurrent.futures import ThreadPoolExecutor, Future as PythonFuture
from satella.coding.concurrent import TerminableThread, CallableGroup, Condition, MonitorList, \
LockedStructure, AtomicNumber, Monitor, IDAllocator, call_in_separate_thread, Timer, \
parallel_execute, run_as_future, sync_threadpool, IntervalTerminableThread, Future, \
WrappingFuture, PeekableQueue, SequentialIssuer
WrappingFuture, PeekableQueue, SequentialIssuer, CancellableCallback
from satella.coding.concurrent.futures import call_in_future, ExecutorWrapper
from satella.coding.sequences import unique
from satella.exceptions import WouldWaitMore, AlreadyAllocated, Empty
......@@ -18,6 +18,20 @@ from satella.exceptions import WouldWaitMore, AlreadyAllocated, Empty
class TestConcurrent(unittest.TestCase):
def test_cancellable_callback(self):
a = {'test': True}
def y():
nonlocal a
a['test'] = False
cg = CallableGroup()
cc = CancellableCallback(y)
cg.add(cc)
cc.cancel()
cg()
self.assertTrue(a['test'])
def test_sequential_issuer(self):
si = SequentialIssuer()
a = si.issue()
......
......@@ -24,12 +24,19 @@ class TestMemory(unittest.TestCase):
'improved': False,
'times_entered_1': 0,
'level_2_engaged': False,
'level_2_confirmed': False}
'level_2_confirmed': False,
'cancelled': 0}
cc = CustomCondition(lambda: a['level_2_engaged'])
MemoryPressureManager(None, [odc, All(cc, Any(cc, cc))], 2)
def cancel():
nonlocal a
a['cancelled'] += 1
cc = MemoryPressureManager.register_on_entered_severity(1)(cancel)
@MemoryPressureManager.register_on_entered_severity(2)
def call_on_level_2():
a['level_2_confirmed'] = True
......@@ -52,6 +59,8 @@ class TestMemory(unittest.TestCase):
time.sleep(3)
odc.value = True
time.sleep(5)
self.assertEqual(a['cancelled'], 1)
cc.cancel()
self.assertTrue(a['memory'])
self.assertFalse(a['improved'])
self.assertGreater(a['calls'], 0)
......@@ -63,5 +72,6 @@ class TestMemory(unittest.TestCase):
self.assertTrue(a['memory'])
a['level_2_engaged'] = True
time.sleep(3)
self.assertEqual(a['cancelled'], 1)
self.assertEqual(a['times_entered_1'], 3)
self.assertTrue(a['level_2_confirmed'])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment