Skip to content
Snippets Groups Projects
Commit dfebb7ff authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

refactored MemoryPressureManager

parent 598470ee
No related branches found
No related tags found
No related merge requests found
__version__ = '2.17.11a3' __version__ = '2.17.11a4'
...@@ -5,8 +5,8 @@ import typing as tp ...@@ -5,8 +5,8 @@ import typing as tp
import psutil import psutil
from satella.coding.concurrent import CallableGroup, CallNoOftenThan, CancellableCallback from satella.coding.concurrent import CallableGroup, CallNoOftenThan, CancellableCallback, \
from satella.coding.concurrent import TerminableThread IntervalTerminableThread
from satella.coding.structures import Singleton from satella.coding.structures import Singleton
from satella.time import measure from satella.time import measure
from .conditions import BaseCondition, ZerothSeverity from .conditions import BaseCondition, ZerothSeverity
...@@ -17,7 +17,7 @@ __all__ = ['MemoryPressureManager'] ...@@ -17,7 +17,7 @@ __all__ = ['MemoryPressureManager']
@Singleton @Singleton
class MemoryPressureManager(TerminableThread): class MemoryPressureManager(IntervalTerminableThread):
""" """
Manager of the memory pressure. Manager of the memory pressure.
...@@ -41,7 +41,8 @@ class MemoryPressureManager(TerminableThread): ...@@ -41,7 +41,8 @@ class MemoryPressureManager(TerminableThread):
:param maximum_available: maximum amount of memory that this program can use :param maximum_available: maximum amount of memory that this program can use
:param severity_levels: this defines the levels of severity. A level is reached when program's :param severity_levels: this defines the levels of severity. A level is reached when program's
consumption is other this many percent of it's maximum_available amount of memory. consumption is other this many percent of it's maximum_available amount of memory.
:param check_interval: amount of seconds of pause between consecutive checks :param check_interval: amount of seconds of pause between consecutive checks, or
a time string
:param log_transitions: whether to log to logger when a transition takes place :param log_transitions: whether to log to logger when a transition takes place
:ivar severity_level: current severity level (int) :ivar severity_level: current severity level (int)
...@@ -50,9 +51,9 @@ class MemoryPressureManager(TerminableThread): ...@@ -50,9 +51,9 @@ class MemoryPressureManager(TerminableThread):
def __init__(self, maximum_available: tp.Optional[int] = None, def __init__(self, maximum_available: tp.Optional[int] = None,
severity_levels: tp.List[BaseCondition] = None, severity_levels: tp.List[BaseCondition] = None,
check_interval: int = 10, check_interval: tp.Union[str, int] = 10,
log_transitions: bool = True): log_transitions: bool = True):
super().__init__(name='memory pressure manager', daemon=True) super().__init__(check_interval, name='memory pressure manager', daemon=True)
self.log_transitions = log_transitions # type: bool self.log_transitions = log_transitions # type: bool
self.process = psutil.Process(os.getpid()) # type: psutil.Process self.process = psutil.Process(os.getpid()) # type: psutil.Process
self.maximum_available = maximum_available # type: int self.maximum_available = maximum_available # type: int
...@@ -71,7 +72,6 @@ class MemoryPressureManager(TerminableThread): ...@@ -71,7 +72,6 @@ class MemoryPressureManager(TerminableThread):
self.callbacks_on_memory_normal = CallableGroup(gather=False) self.callbacks_on_memory_normal = CallableGroup(gather=False)
self.severity_level = 0 # type: int self.severity_level = 0 # type: int
self.stopped = False # type: bool self.stopped = False # type: bool
self.check_interval = check_interval # type: int
self.start() self.start()
def advance_to_severity_level(self, target_level: int): def advance_to_severity_level(self, target_level: int):
...@@ -104,7 +104,7 @@ class MemoryPressureManager(TerminableThread): ...@@ -104,7 +104,7 @@ class MemoryPressureManager(TerminableThread):
def loop(self) -> None: def loop(self) -> None:
if self.stopped: if self.stopped:
return time.sleep(self.check_interval) return
self.callbacks_on_memory_normal.remove_cancelled() self.callbacks_on_memory_normal.remove_cancelled()
...@@ -117,16 +117,11 @@ class MemoryPressureManager(TerminableThread): ...@@ -117,16 +117,11 @@ class MemoryPressureManager(TerminableThread):
for cg in self.callbacks_on_remains: for cg in self.callbacks_on_remains:
cg.remove_cancelled() cg.remove_cancelled()
with measure() as measurement: severity_level = self.calculate_severity_level()
severity_level = self.calculate_severity_level() if self.severity_level != severity_level:
if self.severity_level != severity_level: self.advance_to_severity_level(severity_level)
self.advance_to_severity_level(severity_level) else:
else: self.callbacks_on_remains[severity_level]()
self.callbacks_on_remains[severity_level]()
elapsed = measurement()
if elapsed < self.check_interval:
time.sleep(self.check_interval - elapsed)
def calculate_severity_level(self) -> int: def calculate_severity_level(self) -> int:
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment