Skip to content
Snippets Groups Projects
Commit 1db68f43 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

2.14.24

parent b85dbcd2
No related branches found
Tags v2.14.24
No related merge requests found
......@@ -3,4 +3,6 @@
* `measure.time_remaining` will not return negative values
* `MemoryErrorExceptionHandler` will use less memory procesing `MemoryError`s
* `merge_series` will now throw a `ValueError` instead of `RuntimeError`
* add `MemoryPressureManager.register_on_memory_normal`
* fixed a bug where `CallableGroup` would not remove it's one-shot callbacks
* this fixed a bug with `MemoryPressureManager` entering a state too often
__version__ = '2.14.24a4'
__version__ = '2.14.24'
......@@ -134,12 +134,16 @@ class CallableGroup(tp.Generic[T]):
If callables raise, it will be passed through.
As a side-effect, removes cancelled
:class:`~satella.coding.concurrent.CancellableCallback` instances registered.
:return: list of results if gather was set, else None
"""
if self.has_cancelled_callbacks:
self.remove_cancelled()
callables = copy.copy(self.callables)
self.callables.clear()
results = []
......
......@@ -68,6 +68,7 @@ class MemoryPressureManager(TerminableThread):
self.callbacks_on_left = [CallableGroup(gather=False) for _ in
range(len(
self.severity_levels))] # type: tp.List[CallableGroup]
self.callbacks_on_memory_normal = CallableGroup(gather=False)
self.severity_level = 0 # type: int
self.stopped = False # type: bool
self.check_interval = check_interval # type: int
......@@ -90,6 +91,8 @@ class MemoryPressureManager(TerminableThread):
if self.log_transitions:
logger.warning('Left severity level %s' % (self.severity_level,))
self.severity_level += delta
if self.severity_level == 0:
self.callbacks_on_memory_normal()
def stop(self):
"""Stop this thread from operating"""
......@@ -103,6 +106,8 @@ class MemoryPressureManager(TerminableThread):
if self.stopped:
return time.sleep(self.check_interval)
self.callbacks_on_memory_normal.remove_cancelled()
for cg in self.callbacks_on_entered:
cg.remove_cancelled()
......@@ -132,6 +137,20 @@ class MemoryPressureManager(TerminableThread):
if condition.can_fire(memory_info, self.maximum_available):
return level
@staticmethod
def register_on_memory_normal(fun: tp.Callable) -> CancellableCallback:
"""
Register this handler to fire when memory state falls back to 0.
This will be fired once, once memory state falls back to normal.
:param fun: callable to register
:return: a CancellableCallback under this callback is registered
"""
cc = CancellableCallback(fun)
MemoryPressureManager().callbacks_on_memory_normal.add(cc)
return cc
@staticmethod
def register_on_entered_severity(severity: int):
"""
......
......@@ -25,15 +25,21 @@ class TestMemory(unittest.TestCase):
'times_entered_1': 0,
'level_2_engaged': False,
'level_2_confirmed': False,
'cancelled': 0}
'cancelled': 0,
'mem_normal': 0}
cc = CustomCondition(lambda: a['level_2_engaged'])
MemoryPressureManager(None, [odc, All(cc, Any(cc, cc))], 2)
def memory_normal():
nonlocal a
a['mem_normal'] += 1
def cancel():
nonlocal a
a['cancelled'] += 1
MemoryPressureManager.register_on_memory_normal(memory_normal)
cc = MemoryPressureManager.register_on_entered_severity(1)(cancel)
......@@ -56,6 +62,7 @@ class TestMemory(unittest.TestCase):
self.assertFalse(a['memory'])
self.assertFalse(a['improved'])
self.assertEqual(a['mem_normal'], 0)
time.sleep(3)
odc.value = True
time.sleep(5)
......@@ -70,9 +77,11 @@ class TestMemory(unittest.TestCase):
self.assertTrue(a['improved'])
self.assertEqual(a['times_entered_1'], 1)
self.assertTrue(a['memory'])
self.assertEqual(a['mem_normal'], 1)
a['level_2_engaged'] = True
time.sleep(3)
self.assertEqual(MemoryPressureManager().severity_level, 2)
self.assertEqual(a['cancelled'], 1)
self.assertEqual(a['times_entered_1'], 3)
self.assertEqual(a['times_entered_1'], 2)
self.assertTrue(a['level_2_confirmed'])
self.assertEqual(a['mem_normal'], 1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment