Skip to content
Snippets Groups Projects
Commit b8f4356e authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

added MemoryPressureManager

parent 76e7547b
No related branches found
No related tags found
No related merge requests found
# v2.5.0
* added `CallableMetric`
* added `MemoryPressureManager`
# v2.4.45
......
......@@ -17,6 +17,7 @@ Visit the project's page at GitHub_!
coding/concurrent
coding/sequences
instrumentation/traceback
instrumentation/memory
instrumentation/metrics
exception_handling
json
......
Memory pressure
===============
When faced with the risk of running low on memory, you know that some of your
programs' variables are cache. They can be discarded, dropped on the floor, to
be recomputed later.
Problem is, that they need a trigger to do it. Memory pressure management
from Satella solves that problem.
Defining severity levels
------------------------
To define a severity level, use the following classes:
.. autoclass:: satella.instrumentation.memory.GlobalAbsoluteValue
.. autoclass:: satella.instrumentation.memory.GlobalRelativeValue
.. autoclass:: satella.instrumentation.memory.LocalAbsoluteValue
.. autoclass:: satella.instrumentation.memory.LocalRelativeValue
Here you can either provide a callable or override the ``can_fire`` method
.. autoclass:: satella.instrumentation.memory.CustomCondition
:members:
You can combine them with following operators:
.. autoclass:: satella.instrumentation.memory.All
.. autoclass:: satella.instrumentation.memory.Any
.. autoclass:: satella.instrumentation.memory.Not
Then, you make a list out of them. This list, with indices counted from 1,
signals what condition needs to be true for the program to enter given severity level.
Handlers
--------
It is impossible to go from severity level 1 to say 3 without hitting 2. 2 will
be hit by the way, the manager will call any handlers that are in the way.
Note that severity levels are concurrent - for example,
level 1 coexists with level 2, and if level 2 is in effect, that means
that level 1 is still in effect. You can register your handlers here:
.. autoclass:: satella.instrumentation.memory.MemoryPressureManager
:members:
from .conditions import Any, All, GlobalRelativeValue, GlobalAbsoluteValue, LocalAbsoluteValue, \
LocalRelativeValue, GB, MB, KB, CustomCondition, Not
from .memthread import MemoryPressureManager
__all__ = ['Any', 'All', 'MemoryPressureManager', 'GlobalAbsoluteValue',
'GB', 'GlobalRelativeValue', 'LocalRelativeValue', 'LocalAbsoluteValue', 'MB', 'KB',
'CustomCondition', 'Not']
from abc import ABCMeta, abstractmethod
import typing as tp
import logging
import functools
import psutil
__all__ = ['GB', 'MB', 'KB', 'Any', 'All', 'GlobalAbsoluteValue', 'GlobalRelativeValue',
'LocalRelativeValue', 'LocalAbsoluteValue', 'MemoryCondition', 'ZerothSeverity',
'CustomCondition', 'Not']
logger = logging.getLogger(__name__)
GB = 1024*1024*1024
MB = 1024*1024
KB = 1024
class MemoryCondition(metaclass=ABCMeta):
__slots__ = ('value', )
def __init__(self, value: int):
self.value = value
@abstractmethod
def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool:
"""Has this severity level been reached?"""
class ZerothSeverity(MemoryCondition):
def __init__(self):
pass
def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool:
return True
class OperationJoin(MemoryCondition):
__slots__ = ('conditions', )
def __init__(self, *conditions: MemoryCondition):
self.conditions = conditions
def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool:
return functools.reduce(self.OPERATOR, (
condition.can_fire(local_memory_data, local_maximum_consume) for condition in
self.conditions), self.STARTING_VALUE)
class Any(OperationJoin):
"""This is true if one of the arguments is True"""
@staticmethod
def OPERATOR(a, b):
return a or b
STARTING_VALUE = False
class All(OperationJoin):
"""This is true if all arguments are True"""
@staticmethod
def OPERATOR(a, b):
return a and b
STARTING_VALUE = True
class Not(MemoryCondition):
"""True only if provided condition is false"""
__slots__ = ('condition', )
def __int__(self, condition: MemoryCondition):
self.condition = condition
def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool:
return not self.condition.can_fire(local_memory_data, local_maximum_consume)
class GlobalAbsoluteValue(MemoryCondition):
"""If free memory globally falls below this many bytes, given severity level starts"""
def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool:
return psutil.virtual_memory().available < self.value
class GlobalRelativeValue(MemoryCondition):
"""
If percentage of global free memory falls below this many bytes, given severity level starts
"""
def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool:
return psutil.virtual_memory().available / psutil.virtual_memory().total < (
self.value / 100)
class LocalAbsoluteValue(MemoryCondition):
"""
If free memory falls below this many bytes from what the program can maximally consume this
severity level starts
"""
def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool:
return local_maximum_consume - local_memory_data.rss < self.value
class LocalRelativeValue(MemoryCondition):
"""
If percentage of memory available to this process in regards to what the program can
maximally consume falls below this level, given severity level starts
"""
def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool:
return local_memory_data.rss / local_maximum_consume < (1-self.value / 100)
class CustomCondition(MemoryCondition):
"""
A custom condition. Condition that is true if attached callable/0 returns True.
:param callable_: callable to call upon asking whether this condition is valid. This
should be relatively cheap to compute.
"""
__slots__ = ('callable', )
def __init__(self, callable_: tp.Callable[[], bool]):
self.callable = callable_
def can_fire(self, local_memory_data, local_maximum_consume: tp.Optional[int]) -> bool:
v = self.callable()
logger.warning('Custom condition %s returning %s' % (self.callable, v))
return v
import logging
import os
import time
import typing as tp
import psutil
from satella.coding.concurrent import CallableGroup
from satella.coding.concurrent import TerminableThread
from satella.coding.structures import Singleton
from satella.time import measure
from .conditions import MemoryCondition, ZerothSeverity
logger = logging.getLogger(__name__)
__all__ = ['MemoryPressureManager', 'GB', 'MB', 'KB']
class CallNoMoreOftenThan:
__slots__ = ('interval', 'callable', 'last_called')
def __init__(self, interval: int, callable_: tp.Callable[[], None]):
self.interval = interval
self.callable = callable_
self.last_called = 0
def __call__(self, *args, **kwargs):
if time.monotonic() - self.last_called >= self.interval:
self.callable(*args, **kwargs)
self.last_called = time.monotonic()
@Singleton
class MemoryPressureManager(TerminableThread):
"""
Manager of the memory pressure.
The program is in some severity state. The baseline state is 0, meaning everything's OK.
Please note that it is sufficient to instantiate this class for the thread to run.
Eg.
>>> mt = MemoryPressureManager(maximum_available=4*GB, severity_levels=[80, 90])
>>> @mt.register_on_severity(1)
>>> def trigger_a():
>>> print('80% consumption of memory exceeded')
>>> @mt.register_on_severity(2)
>>> def trigger_b():
>>> print('90% consumption of memory exceeded')
:param maximum_available: maximum amount of memory that this program can use
:param severity_levels: this defines the levels of severity. A level is reached when program's
consumption is other this many percent of it's maximum_available amount of memory.
:param global_severity_levels:
"""
__slots__ = ('process', 'maximum_available', 'severity_levels', 'callbacks_on_entered',
'callbacks_on_remaing', 'current_severity_level', 'check_interval')
def __init__(self, maximum_available: tp.Optional[int] = None,
severity_levels: tp.List[MemoryCondition] = None,
check_interval: int = 10):
super().__init__(daemon=True)
self.process = psutil.Process(os.getpid())
self.maximum_available = maximum_available
self.severity_levels = [ZerothSeverity()] + (severity_levels or [])
self.callbacks_on_entered = [CallableGroup(gather=False) for i in
range(len(self.severity_levels))]
self.callbacks_on_remains = [CallableGroup(gather=False) for i in
range(len(self.severity_levels))]
self.callbacks_on_left = [CallableGroup(gather=False) for i in
range(len(self.severity_levels))]
self.current_severity_level = 0
self.stopped = False
self.check_interval = check_interval
self.start()
def advance_to_severity_level(self, target_level: int):
while self.current_severity_level != target_level:
delta = target_level - self.current_severity_level
if delta > 0:
# Means we are ENTERING a severity level
self.current_severity_level += delta
self.callbacks_on_entered[self.current_severity_level]()
logger.warning('Entered severity level %s' % (self.current_severity_level, ))
elif delta < 0:
# Means we are LEAVING a severity level
self.callbacks_on_left[self.current_severity_level]()
logger.warning('Left severity level %s' % (self.current_severity_level, ))
self.current_severity_level += delta
def stop(self):
"""Stop this thread from operating"""
self.stopped = True
def resume(self):
"""Resume the operation of this thread"""
self.stopped = False
def loop(self) -> None:
if self.stopped:
return time.sleep(self.check_interval)
with measure() as measurement:
severity_level = self.calculate_severity_level()
if self.current_severity_level != severity_level:
self.advance_to_severity_level(severity_level)
else:
self.callbacks_on_remains[severity_level]()
elapsed = measurement()
if elapsed < self.check_interval:
time.sleep(self.check_interval - elapsed)
def calculate_severity_level(self) -> int:
"""
This returns a severity level. 0 is the baseline severity level.
"""
memory_info = self.process.memory_info()
for level, condition in reversed(list(enumerate(self.severity_levels))):
if condition.can_fire(memory_info, self.maximum_available):
logger.warning('Condition %s was true on level %s' % (condition, level))
return level
@staticmethod
def register_on_entered_severity(severity: int):
"""
Register this handler to fire on entered a particular severity level.
This means that situation has gotten worse.
Use like this:
>>> MemoryPressureManager.register_on_entered_severity(1)
>>> def entered_severity_one():
>>> print('Entered memory severity level 1')
:param severity: severity level to react to
"""
def outer(fun):
MemoryPressureManager().callbacks_on_entered[severity].add(fun)
return fun
return outer
@staticmethod
def register_on_left_severity(severity: int):
"""
Register a handler to be called when given severity level is left. This means
that we have advanced to a lower severity level.
>>> MemoryPressureManager.register_on_left_severity(1)
>>> def entered_severity_one():
>>> print('Memory comsumption no longer 1')
:param severity: severity level to leave
"""
def outer(fun):
MemoryPressureManager().callbacks_on_left[severity].add(fun)
return fun
return outer
@staticmethod
def register_on_remaining_in_severity(severity: int, call_no_more_often_than: int = 0):
"""
Register this handler to fire on remaining in a particular severity level. Use like this:
>>> MemoryPressureManager.register_on_remaining_in_severity(0, 30)
>>> def entered_severity_one():
>>> print('Memory comsumption OK. I am called no more often than each 30 seconds')
:param severity: severity level
:param call_no_more_often_than: call no more often than this amount of seconds
"""
def outer(fun):
MemoryPressureManager().callbacks_on_remains[severity].add(
CallNoMoreOftenThan(call_no_more_often_than, fun))
return outer
......@@ -161,7 +161,7 @@ class EmbeddedSubmetrics(LeafMetric):
>>> metric.handle(3, label='value')
If you try to inherit from it, refer to :py:class:`.simple.IntegerMetric` to see how to do it.
And please pass all the arguments received from child class into this constructor, as this
All please pass all the arguments received from child class into this constructor, as this
constructor actually stores them!
Refer to :py:class:`.cps.ClicksPerTimeUnitMetric` on how to do that.
"""
......
......@@ -178,7 +178,7 @@ class StoredVariableValue:
raise ValueError('value was never pickled')
elif self.pickle_type == 'failed':
raise ValueError(
'Value has failed to be pickled, reason is %s' % (self.pickle,))
'MemoryCondition has failed to be pickled, reason is %s' % (self.pickle,))
elif self.pickle_type == 'pickle/gzip':
pickle_ = zlib.decompress(self.pickle)
elif self.pickle_type == 'pickle':
......
......@@ -17,7 +17,7 @@ class PIDFileLock:
>>> with PIDFileLock('myservice.pid'):
>>> ... rest of code ..
Or alternatively
Any alternatively
>>> pid_lock = PIDFileLock('myservice.pid')
>>> pid_lock.acquire()
......
import logging
import typing as tp
from satella.instrumentation.memory import MemoryPressureManager, CustomCondition, All, Any
import time
import unittest
logger = logging.getLogger(__name__)
class OnDemandCondition(CustomCondition):
def __init__(self):
self.value = False
super().__init__(lambda: self.value)
def can_fire(self, *args) -> bool:
return self.value
class TestMemory(unittest.TestCase):
def test_memory(self):
odc = OnDemandCondition()
a = {'memory': False,
'calls': 0,
'improved': False,
'level_2_engaged': False,
'level_2_confirmed': False}
cc = CustomCondition(lambda: a['level_2_engaged'])
MemoryPressureManager(None, [odc, All(cc, Any(cc, cc))], 2)
@MemoryPressureManager.register_on_entered_severity(2)
def call_on_level_2():
a['level_2_confirmed'] = True
@MemoryPressureManager.register_on_remaining_in_severity(1)
def call_on_memory_still():
a['calls'] += 1
@MemoryPressureManager.register_on_entered_severity(1)
def call_on_no_memory():
a['memory'] = True
@MemoryPressureManager.register_on_left_severity(1)
def call_improved():
a['improved'] = True
self.assertFalse(a['memory'])
self.assertFalse(a['improved'])
time.sleep(3)
odc.value = True
time.sleep(5)
self.assertTrue(a['memory'])
self.assertFalse(a['improved'])
self.assertGreater(a['calls'], 0)
odc.value = False
time.sleep(3)
self.assertTrue(a['improved'])
self.assertTrue(a['memory'])
a['level_2_engaged'] = True
time.sleep(3)
self.assertTrue(a['level_2_confirmed'])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment