Skip to content
Snippets Groups Projects
Commit 47c9aff2 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

added a way to register an object to be cleaned up via MemoryPressureManager

parent 1b20d3bf
No related branches found
No related tags found
No related merge requests found
...@@ -2,4 +2,5 @@ ...@@ -2,4 +2,5 @@
* added DeferredValue * added DeferredValue
* satella.coding.Context are considered experimental * satella.coding.Context are considered experimental
* added a way to register an object to be cleaned up via MemoryPressureManager
...@@ -54,6 +54,10 @@ that level 1 is still in effect. You can register your handlers here: ...@@ -54,6 +54,10 @@ that level 1 is still in effect. You can register your handlers here:
.. autoclass:: satella.instrumentation.memory.MemoryPressureManager .. autoclass:: satella.instrumentation.memory.MemoryPressureManager
:members: :members:
Note that you can also register objects to have their methods called on entering a memory
severity level, if these objects have a way to to for example drop some data onto disk and
decrease memory usage via :meth:`~satella.instrumentation.memory.MemoryPressureManager.cleanup_on_entered`.
install_force_gc_collect install_force_gc_collect
------------------------ ------------------------
......
__version__ = '2.24.2a3' __version__ = '2.24.2a4'
from __future__ import annotations
import logging import logging
import os import os
import typing as tp import typing as tp
import weakref
import psutil import psutil
...@@ -38,7 +40,8 @@ class MemoryPressureManager(IntervalTerminableThread): ...@@ -38,7 +40,8 @@ class MemoryPressureManager(IntervalTerminableThread):
:param maximum_available: maximum amount of memory that this program can use :param maximum_available: maximum amount of memory that this program can use
:param severity_levels: this defines the levels of severity. A level is reached when program's :param severity_levels: this defines the levels of severity. A level is reached when program's
consumption is other this many percent of it's maximum_available amount of memory. consumption is other this many percent of it's maximum_available amount of memory. Note that you need to
specify only the abnormal memory levels, the default level of 0 will be added automatically.
:param check_interval: amount of seconds of pause between consecutive checks, or :param check_interval: amount of seconds of pause between consecutive checks, or
a time string a time string
:param log_transitions: whether to log to logger when a transition takes place :param log_transitions: whether to log to logger when a transition takes place
...@@ -67,11 +70,26 @@ class MemoryPressureManager(IntervalTerminableThread): ...@@ -67,11 +70,26 @@ class MemoryPressureManager(IntervalTerminableThread):
self.callbacks_on_left = [CallableGroup(gather=False) for _ in self.callbacks_on_left = [CallableGroup(gather=False) for _ in
range(len( range(len(
self.severity_levels))] # type: tp.List[CallableGroup] self.severity_levels))] # type: tp.List[CallableGroup]
self.objects_to_cleanup_on_entered = [[] for _ in range(len(self.severity_levels))]
self.callbacks_on_memory_normal = CallableGroup(gather=False) self.callbacks_on_memory_normal = CallableGroup(gather=False)
self.severity_level = 0 # type: int self.severity_level = 0 # type: int
self.stopped = False # type: bool self.stopped = False # type: bool
self.start() self.start()
@staticmethod
def cleanup_on_entered(target_level: int, obj: tp.Any,
collector: tp.Callable[[tp.Any], None] = lambda y: y.cleanup()):
"""
Attempt to recover memory by calling a particular method on an object.
A weak reference will be stored to this object
:param target_level: cleanup will be attempted on entering this severity level
:param obj: object to call this on
:param collector: a lambda to call a routine on this object
"""
MemoryPressureManager().objects_to_cleanup_on_entered[target_level].append((weakref.ref(obj), collector))
def advance_to_severity_level(self, target_level: int): def advance_to_severity_level(self, target_level: int):
while self.severity_level != target_level: while self.severity_level != target_level:
delta = target_level - self.severity_level delta = target_level - self.severity_level
...@@ -81,6 +99,13 @@ class MemoryPressureManager(IntervalTerminableThread): ...@@ -81,6 +99,13 @@ class MemoryPressureManager(IntervalTerminableThread):
# Means we are ENTERING a severity level # Means we are ENTERING a severity level
self.severity_level += delta self.severity_level += delta
self.callbacks_on_entered[self.severity_level]() self.callbacks_on_entered[self.severity_level]()
new_list = []
for ref, collector in self.objects_to_cleanup_on_entered[self.severity_level]:
obj = ref()
if obj is not None:
collector(obj)
new_list.append((ref, collector))
self.objects_to_cleanup_on_entered[self.severity_level] = new_list
if self.log_transitions: if self.log_transitions:
logger.warning('Entered severity level %s' % (self.severity_level,)) logger.warning('Entered severity level %s' % (self.severity_level,))
elif delta < 0: elif delta < 0:
......
...@@ -53,9 +53,21 @@ class TestMemory(unittest.TestCase): ...@@ -53,9 +53,21 @@ class TestMemory(unittest.TestCase):
'cancelled': 0, 'cancelled': 0,
'mem_normal': 0} 'mem_normal': 0}
class ObjectToCleanup:
def __init__(self):
self.cleaned_up = False
def cleanup(self):
self.cleaned_up = True
obj1 = ObjectToCleanup()
obj2 = ObjectToCleanup()
cc = CustomCondition(lambda: a['level_2_engaged']) cc = CustomCondition(lambda: a['level_2_engaged'])
MemoryPressureManager(None, [odc, All(cc, Any(cc, cc))], 2) MemoryPressureManager(None, [odc, All(cc, Any(cc, cc))], 2)
MemoryPressureManager.cleanup_on_entered(1, obj1)
MemoryPressureManager.cleanup_on_entered(2, obj2)
def memory_normal(): def memory_normal():
nonlocal a nonlocal a
...@@ -96,7 +108,9 @@ class TestMemory(unittest.TestCase): ...@@ -96,7 +108,9 @@ class TestMemory(unittest.TestCase):
self.assertTrue(a['memory']) self.assertTrue(a['memory'])
self.assertFalse(a['improved']) self.assertFalse(a['improved'])
self.assertGreater(a['calls'], 0) self.assertGreater(a['calls'], 0)
self.assertTrue(obj1.cleaned_up)
self.assertEqual(a['times_entered_1'], 1) self.assertEqual(a['times_entered_1'], 1)
del obj1
odc.value = False odc.value = False
time.sleep(3) time.sleep(3)
self.assertTrue(a['improved']) self.assertTrue(a['improved'])
...@@ -105,8 +119,10 @@ class TestMemory(unittest.TestCase): ...@@ -105,8 +119,10 @@ class TestMemory(unittest.TestCase):
self.assertEqual(a['mem_normal'], 1) self.assertEqual(a['mem_normal'], 1)
a['level_2_engaged'] = True a['level_2_engaged'] = True
time.sleep(3) time.sleep(3)
self.assertEqual(MemoryPressureManager().objects_to_cleanup_on_entered[1], [])
self.assertEqual(MemoryPressureManager().severity_level, 2) self.assertEqual(MemoryPressureManager().severity_level, 2)
self.assertEqual(a['cancelled'], 1) self.assertEqual(a['cancelled'], 1)
self.assertEqual(a['times_entered_1'], 2) self.assertEqual(a['times_entered_1'], 2)
self.assertTrue(a['level_2_confirmed']) self.assertTrue(a['level_2_confirmed'])
self.assertEqual(a['mem_normal'], 1) self.assertEqual(a['mem_normal'], 1)
self.assertTrue(obj2.cleaned_up)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment