diff --git a/CHANGELOG.md b/CHANGELOG.md index e9010bb30f3d738f9585d88917a4de777cb96c91..d8451643e60f6bd1ca0fab7b741947828f0f7cf7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # v2.5.0 * added `CallableMetric` +* added `MemoryPressureManager` # v2.4.45 diff --git a/docs/index.rst b/docs/index.rst index 24bc9dae55667c140f2b05ec9dc706d837776b92..28879607644db10e12d55301a2add72de02a05bc 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -17,6 +17,7 @@ Visit the project's page at GitHub_! coding/concurrent coding/sequences instrumentation/traceback + instrumentation/memory instrumentation/metrics exception_handling json diff --git a/docs/instrumentation/memory.rst b/docs/instrumentation/memory.rst new file mode 100644 index 0000000000000000000000000000000000000000..3adbf0828c1527facbe399868093c8dfc889221d --- /dev/null +++ b/docs/instrumentation/memory.rst @@ -0,0 +1,50 @@ +Memory pressure +=============== + +When faced with the risk of running low on memory, you know that some of your +programs' variables are cache. They can be discarded, dropped on the floor, to +be recomputed later. + +Problem is, that they need a trigger to do it. Memory pressure management +from Satella solves that problem. + +Defining severity levels +------------------------ + +To define a severity level, use the following classes: + +.. autoclass:: satella.instrumentation.memory.GlobalAbsoluteValue + +.. autoclass:: satella.instrumentation.memory.GlobalRelativeValue + +.. autoclass:: satella.instrumentation.memory.LocalAbsoluteValue + +.. autoclass:: satella.instrumentation.memory.LocalRelativeValue + +Here you can either provide a callable or override the ``can_fire`` method + +.. autoclass:: satella.instrumentation.memory.CustomCondition + :members: + +You can combine them with following operators: + +.. autoclass:: satella.instrumentation.memory.All + +.. autoclass:: satella.instrumentation.memory.Any + +.. autoclass:: satella.instrumentation.memory.Not + +Then, you make a list out of them. This list, with indices counted from 1, +signals what condition needs to be true for the program to enter given severity level. + +Handlers +-------- + +It is impossible to go from severity level 1 to say 3 without hitting 2. 2 will +be hit by the way, the manager will call any handlers that are in the way. +Note that severity levels are concurrent - for example, +level 1 coexists with level 2, and if level 2 is in effect, that means +that level 1 is still in effect. You can register your handlers here: + +.. autoclass:: satella.instrumentation.memory.MemoryPressureManager + :members: diff --git a/satella/instrumentation/memory/__init__.py b/satella/instrumentation/memory/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..b8d6eceecaab36f1fb18e8ace812614cb9221e61 --- /dev/null +++ b/satella/instrumentation/memory/__init__.py @@ -0,0 +1,7 @@ +from .conditions import Any, All, GlobalRelativeValue, GlobalAbsoluteValue, LocalAbsoluteValue, \ + LocalRelativeValue, GB, MB, KB, CustomCondition, Not +from .memthread import MemoryPressureManager + +__all__ = ['Any', 'All', 'MemoryPressureManager', 'GlobalAbsoluteValue', + 'GB', 'GlobalRelativeValue', 'LocalRelativeValue', 'LocalAbsoluteValue', 'MB', 'KB', + 'CustomCondition', 'Not'] diff --git a/satella/instrumentation/memory/conditions.py b/satella/instrumentation/memory/conditions.py new file mode 100644 index 0000000000000000000000000000000000000000..77f5f36cf01d91c1406ee3d96e0ea32e50dd8697 --- /dev/null +++ b/satella/instrumentation/memory/conditions.py @@ -0,0 +1,133 @@ +from abc import ABCMeta, abstractmethod +import typing as tp +import logging +import functools +import psutil + +__all__ = ['GB', 'MB', 'KB', 'Any', 'All', 'GlobalAbsoluteValue', 'GlobalRelativeValue', + 'LocalRelativeValue', 'LocalAbsoluteValue', 'MemoryCondition', 'ZerothSeverity', + 'CustomCondition', 'Not'] + +logger = logging.getLogger(__name__) + +GB = 1024*1024*1024 +MB = 1024*1024 +KB = 1024 + + +class MemoryCondition(metaclass=ABCMeta): + __slots__ = ('value', ) + + def __init__(self, value: int): + self.value = value + + @abstractmethod + def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool: + """Has this severity level been reached?""" + + +class ZerothSeverity(MemoryCondition): + def __init__(self): + pass + + def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool: + return True + + +class OperationJoin(MemoryCondition): + __slots__ = ('conditions', ) + + def __init__(self, *conditions: MemoryCondition): + self.conditions = conditions + + def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool: + return functools.reduce(self.OPERATOR, ( + condition.can_fire(local_memory_data, local_maximum_consume) for condition in + self.conditions), self.STARTING_VALUE) + + +class Any(OperationJoin): + """This is true if one of the arguments is True""" + + @staticmethod + def OPERATOR(a, b): + return a or b + + STARTING_VALUE = False + + +class All(OperationJoin): + """This is true if all arguments are True""" + + @staticmethod + def OPERATOR(a, b): + return a and b + + STARTING_VALUE = True + + +class Not(MemoryCondition): + """True only if provided condition is false""" + __slots__ = ('condition', ) + + def __int__(self, condition: MemoryCondition): + self.condition = condition + + def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool: + return not self.condition.can_fire(local_memory_data, local_maximum_consume) + + +class GlobalAbsoluteValue(MemoryCondition): + """If free memory globally falls below this many bytes, given severity level starts""" + + def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool: + return psutil.virtual_memory().available < self.value + + +class GlobalRelativeValue(MemoryCondition): + """ + If percentage of global free memory falls below this many bytes, given severity level starts + """ + + def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool: + return psutil.virtual_memory().available / psutil.virtual_memory().total < ( + self.value / 100) + + +class LocalAbsoluteValue(MemoryCondition): + """ + If free memory falls below this many bytes from what the program can maximally consume this + severity level starts + """ + + def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool: + return local_maximum_consume - local_memory_data.rss < self.value + + +class LocalRelativeValue(MemoryCondition): + """ + If percentage of memory available to this process in regards to what the program can + maximally consume falls below this level, given severity level starts + """ + + def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool: + return local_memory_data.rss / local_maximum_consume < (1-self.value / 100) + + +class CustomCondition(MemoryCondition): + """ + A custom condition. Condition that is true if attached callable/0 returns True. + + :param callable_: callable to call upon asking whether this condition is valid. This + should be relatively cheap to compute. + """ + __slots__ = ('callable', ) + + def __init__(self, callable_: tp.Callable[[], bool]): + self.callable = callable_ + + def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool: + v = self.callable() + logger.warning('Custom condition %s returning %s' % (self.callable, v)) + return v + diff --git a/satella/instrumentation/memory/memthread.py b/satella/instrumentation/memory/memthread.py new file mode 100644 index 0000000000000000000000000000000000000000..10b58e0e96942b752a36c2f85aa4e535134209e9 --- /dev/null +++ b/satella/instrumentation/memory/memthread.py @@ -0,0 +1,183 @@ +import logging +import os +import time +import typing as tp + +import psutil + +from satella.coding.concurrent import CallableGroup +from satella.coding.concurrent import TerminableThread +from satella.coding.structures import Singleton +from satella.time import measure +from .conditions import MemoryCondition, ZerothSeverity + +logger = logging.getLogger(__name__) + +__all__ = ['MemoryPressureManager', 'GB', 'MB', 'KB'] + + +class CallNoMoreOftenThan: + __slots__ = ('interval', 'callable', 'last_called') + + def __init__(self, interval: int, callable_: tp.Callable[[], None]): + self.interval = interval + self.callable = callable_ + self.last_called = 0 + + def __call__(self, *args, **kwargs): + if time.monotonic() - self.last_called >= self.interval: + self.callable(*args, **kwargs) + self.last_called = time.monotonic() + + +@Singleton +class MemoryPressureManager(TerminableThread): + """ + Manager of the memory pressure. + + The program is in some severity state. The baseline state is 0, meaning everything's OK. + + Please note that it is sufficient to instantiate this class for the thread to run. + + Eg. + + >>> mt = MemoryPressureManager(maximum_available=4*GB, severity_levels=[80, 90]) + >>> @mt.register_on_severity(1) + >>> def trigger_a(): + >>> print('80% consumption of memory exceeded') + >>> @mt.register_on_severity(2) + >>> def trigger_b(): + >>> print('90% consumption of memory exceeded') + + :param maximum_available: maximum amount of memory that this program can use + :param severity_levels: this defines the levels of severity. A level is reached when program's + consumption is other this many percent of it's maximum_available amount of memory. + :param global_severity_levels: + """ + __slots__ = ('process', 'maximum_available', 'severity_levels', 'callbacks_on_entered', + 'callbacks_on_remaing', 'current_severity_level', 'check_interval') + + def __init__(self, maximum_available: tp.Optional[int] = None, + severity_levels: tp.List[MemoryCondition] = None, + check_interval: int = 10): + super().__init__(daemon=True) + self.process = psutil.Process(os.getpid()) + self.maximum_available = maximum_available + self.severity_levels = [ZerothSeverity()] + (severity_levels or []) + self.callbacks_on_entered = [CallableGroup(gather=False) for i in + range(len(self.severity_levels))] + self.callbacks_on_remains = [CallableGroup(gather=False) for i in + range(len(self.severity_levels))] + self.callbacks_on_left = [CallableGroup(gather=False) for i in + range(len(self.severity_levels))] + self.current_severity_level = 0 + self.stopped = False + self.check_interval = check_interval + self.start() + + def advance_to_severity_level(self, target_level: int): + while self.current_severity_level != target_level: + delta = target_level - self.current_severity_level + + if delta > 0: + # Means we are ENTERING a severity level + self.current_severity_level += delta + self.callbacks_on_entered[self.current_severity_level]() + logger.warning('Entered severity level %s' % (self.current_severity_level, )) + elif delta < 0: + # Means we are LEAVING a severity level + self.callbacks_on_left[self.current_severity_level]() + logger.warning('Left severity level %s' % (self.current_severity_level, )) + self.current_severity_level += delta + + def stop(self): + """Stop this thread from operating""" + self.stopped = True + + def resume(self): + """Resume the operation of this thread""" + self.stopped = False + + def loop(self) -> None: + if self.stopped: + return time.sleep(self.check_interval) + + with measure() as measurement: + severity_level = self.calculate_severity_level() + if self.current_severity_level != severity_level: + self.advance_to_severity_level(severity_level) + else: + self.callbacks_on_remains[severity_level]() + + elapsed = measurement() + if elapsed < self.check_interval: + time.sleep(self.check_interval - elapsed) + + def calculate_severity_level(self) -> int: + """ + This returns a severity level. 0 is the baseline severity level. + """ + memory_info = self.process.memory_info() + for level, condition in reversed(list(enumerate(self.severity_levels))): + if condition.can_fire(memory_info, self.maximum_available): + logger.warning('Condition %s was true on level %s' % (condition, level)) + return level + + @staticmethod + def register_on_entered_severity(severity: int): + """ + Register this handler to fire on entered a particular severity level. + + This means that situation has gotten worse. + + Use like this: + + >>> MemoryPressureManager.register_on_entered_severity(1) + >>> def entered_severity_one(): + >>> print('Entered memory severity level 1') + + :param severity: severity level to react to + """ + + def outer(fun): + MemoryPressureManager().callbacks_on_entered[severity].add(fun) + return fun + + return outer + + @staticmethod + def register_on_left_severity(severity: int): + """ + Register a handler to be called when given severity level is left. This means + that we have advanced to a lower severity level. + + >>> MemoryPressureManager.register_on_left_severity(1) + >>> def entered_severity_one(): + >>> print('Memory comsumption no longer 1') + + + :param severity: severity level to leave + """ + def outer(fun): + MemoryPressureManager().callbacks_on_left[severity].add(fun) + return fun + return outer + + @staticmethod + def register_on_remaining_in_severity(severity: int, call_no_more_often_than: int = 0): + """ + Register this handler to fire on remaining in a particular severity level. Use like this: + + >>> MemoryPressureManager.register_on_remaining_in_severity(0, 30) + >>> def entered_severity_one(): + >>> print('Memory comsumption OK. I am called no more often than each 30 seconds') + + :param severity: severity level + :param call_no_more_often_than: call no more often than this amount of seconds + """ + + def outer(fun): + MemoryPressureManager().callbacks_on_remains[severity].add( + CallNoMoreOftenThan(call_no_more_often_than, fun)) + + return outer diff --git a/satella/instrumentation/metrics/metric_types/base.py b/satella/instrumentation/metrics/metric_types/base.py index 1188edcedd248f06a2c6ee12be551b1d6f4c1610..cb15543645ed487f33c2dc26cdbae2b8f0a72fea 100644 --- a/satella/instrumentation/metrics/metric_types/base.py +++ b/satella/instrumentation/metrics/metric_types/base.py @@ -161,7 +161,7 @@ class EmbeddedSubmetrics(LeafMetric): >>> metric.handle(3, label='value') If you try to inherit from it, refer to :py:class:`.simple.IntegerMetric` to see how to do it. - And please pass all the arguments received from child class into this constructor, as this + All please pass all the arguments received from child class into this constructor, as this constructor actually stores them! Refer to :py:class:`.cps.ClicksPerTimeUnitMetric` on how to do that. """ diff --git a/satella/instrumentation/trace_back.py b/satella/instrumentation/trace_back.py index 6898f937495a41eb91f8d98384cb6d072c989619..9daf8eb8518693e05c609476a88f0f72adf08e8c 100644 --- a/satella/instrumentation/trace_back.py +++ b/satella/instrumentation/trace_back.py @@ -178,7 +178,7 @@ class StoredVariableValue: raise ValueError('value was never pickled') elif self.pickle_type == 'failed': raise ValueError( - 'Value has failed to be pickled, reason is %s' % (self.pickle,)) + 'MemoryCondition has failed to be pickled, reason is %s' % (self.pickle,)) elif self.pickle_type == 'pickle/gzip': pickle_ = zlib.decompress(self.pickle) elif self.pickle_type == 'pickle': diff --git a/satella/posix/pidlock.py b/satella/posix/pidlock.py index 27023aff153dfd5243e0afb8604c7a333d219442..c57830274323581a8d2a71d063401a9325f8bd60 100644 --- a/satella/posix/pidlock.py +++ b/satella/posix/pidlock.py @@ -17,7 +17,7 @@ class PIDFileLock: >>> with PIDFileLock('myservice.pid'): >>> ... rest of code .. - Or alternatively + Any alternatively >>> pid_lock = PIDFileLock('myservice.pid') >>> pid_lock.acquire() diff --git a/tests/test_instrumentation/test_memory.py b/tests/test_instrumentation/test_memory.py new file mode 100644 index 0000000000000000000000000000000000000000..ccc39f9d43d706d68f9ee9dba15ddfcf2927fc3c --- /dev/null +++ b/tests/test_instrumentation/test_memory.py @@ -0,0 +1,62 @@ +import logging +import typing as tp +from satella.instrumentation.memory import MemoryPressureManager, CustomCondition, All, Any +import time +import unittest +logger = logging.getLogger(__name__) + + +class OnDemandCondition(CustomCondition): + def __init__(self): + self.value = False + super().__init__(lambda: self.value) + + def can_fire(self, *args) -> bool: + return self.value + + +class TestMemory(unittest.TestCase): + def test_memory(self): + odc = OnDemandCondition() + + a = {'memory': False, + 'calls': 0, + 'improved': False, + 'level_2_engaged': False, + 'level_2_confirmed': False} + + cc = CustomCondition(lambda: a['level_2_engaged']) + + MemoryPressureManager(None, [odc, All(cc, Any(cc, cc))], 2) + + @MemoryPressureManager.register_on_entered_severity(2) + def call_on_level_2(): + a['level_2_confirmed'] = True + + @MemoryPressureManager.register_on_remaining_in_severity(1) + def call_on_memory_still(): + a['calls'] += 1 + + @MemoryPressureManager.register_on_entered_severity(1) + def call_on_no_memory(): + a['memory'] = True + + @MemoryPressureManager.register_on_left_severity(1) + def call_improved(): + a['improved'] = True + + self.assertFalse(a['memory']) + self.assertFalse(a['improved']) + time.sleep(3) + odc.value = True + time.sleep(5) + self.assertTrue(a['memory']) + self.assertFalse(a['improved']) + self.assertGreater(a['calls'], 0) + odc.value = False + time.sleep(3) + self.assertTrue(a['improved']) + self.assertTrue(a['memory']) + a['level_2_engaged'] = True + time.sleep(3) + self.assertTrue(a['level_2_confirmed'])