diff --git a/.circleci/config.yml b/.circleci/config.yml index 5adace3c7be6a0a6f2713d0335f68ea372844473..584d8c7ae313ea13cef262f2b53ffc5d07345937 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -12,8 +12,12 @@ commands: - run: command: | pip install -r requirements.txt - pip install -U pytest-xdist pytest-cov pytest pytest-forked pluggy py opentracing + pip install -U pytest-xdist pytest-cov pytest pytest-forked pluggy py opentracing pylint python setup.py install + pylint: + description: Run pylint + steps: + - run: pylint -j 4 satella || true unit_test: description: Run the unit tests steps: @@ -25,6 +29,7 @@ jobs: - checkout - code-climate/install - setup_python + - pylint - unit_test - run: name: Collect results diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d4a487d2d3d0e300afcb969d48a2eb80de396ba..8f0e26e03d262a395e852ddb14df7689236edd0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,14 @@ # v2.23.5 +The software +------------ + * added AutoflushFile.seek() * added AutoflushFile.truncate() +* added CPUTimeManager.set_refresh_each() +* added satella.instrumentation.cpu_time.get_own_cpu_time() + +Build process +------------- + +* added pylint diff --git a/docs/coding/decorators.rst b/docs/coding/decorators.rst index 52020b492423541999125e7431a576aa3276fbb9..36665e7b250897b821f8680211a837afc2b8a791 100644 --- a/docs/coding/decorators.rst +++ b/docs/coding/decorators.rst @@ -23,6 +23,8 @@ Decorators .. autofunction:: satella.coding.decorators.loop_while +.. autofunction:: satella.coding.decorators.repeat_forever + .. autofunction:: satella.coding.for_argument .. autofunction:: satella.coding.chain_functions diff --git a/docs/instrumentation/cpu_time.rst b/docs/instrumentation/cpu_time.rst index 7fcc31f185ee7ff1102110311824ef3de592cf1e..69170936c692abb41c30fdb4b9a48c4e15a114ba 100644 --- a/docs/instrumentation/cpu_time.rst +++ b/docs/instrumentation/cpu_time.rst @@ -23,4 +23,8 @@ And here's a helpful variant of .. autoclass:: satella.instrumentation.cpu_time.CPUTimeAwareIntervalTerminableThread :members: +.. autofunction:: satella.instrumentation.cpu_time.get_own_cpu_usage + +.. autoclass:: satella.instrumentation.cpu_time.pCPUtimes + :members: diff --git a/satella/__init__.py b/satella/__init__.py index 0ae5d7b737e24a25dc7fe6842f9dd13416a0fb38..e32c6254b9a894cea3d6e79e214a78954e9e0290 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.23.5a2' +__version__ = '2.24.0a1' diff --git a/satella/coding/decorators/__init__.py b/satella/coding/decorators/__init__.py index f524f8fadc99916ecfe01c37b5647fe3c5507529..4589d29cd20f6e0ce2e34e2e5ae19b2d888c32a2 100644 --- a/satella/coding/decorators/__init__.py +++ b/satella/coding/decorators/__init__.py @@ -4,11 +4,11 @@ from .arguments import auto_adapt_to_methods, attach_arguments, for_argument, \ cached_property from .decorators import wraps, chain_functions, has_keys, short_none, memoize, return_as_list, \ default_return, cache_memoize, call_method_on_exception -from .flow_control import loop_while, queue_get +from .flow_control import loop_while, queue_get, repeat_forever from .preconditions import postcondition, precondition from .retry_dec import retry -__all__ = ['retry', 'transform_result', 'transform_arguments', +__all__ = ['retry', 'transform_result', 'transform_arguments', 'repeat_forever', 'execute_before', 'postcondition', 'precondition', 'wraps', 'queue_get', 'chain_functions', 'has_keys', 'short_none', 'auto_adapt_to_methods', 'attach_arguments', 'for_argument', 'loop_while', 'memoize', diff --git a/satella/coding/decorators/flow_control.py b/satella/coding/decorators/flow_control.py index f25420c5b006c38b4f73616f68ba2962e2769d5c..4f556f830cd7d88d64f2cbd53441074ad2d39a5f 100644 --- a/satella/coding/decorators/flow_control.py +++ b/satella/coding/decorators/flow_control.py @@ -7,6 +7,19 @@ from satella.coding.typing import ExceptionClassType, NoArgCallable, Predicate Queue = tp.TypeVar('Queue') +def repeat_forever(fun): + """ + A decorator that will place your function inside a while True loop. + """ + @wraps(fun) + def inner(*args, **kwargs): + while True: + fun(*args, **kwargs) + doc = '' if inner.__doc__ is None else inner.__doc__ + inner.__doc__ = doc + "\nThis will be repeated forever." + return inner + + def queue_get(queue_getter: tp.Union[str, tp.Callable[[object], Queue]], timeout: tp.Optional[float] = None, exception_empty: tp.Union[ diff --git a/satella/coding/structures/ranking.py b/satella/coding/structures/ranking.py index 6b2619fa3d89e4382e224ed60d946c840d6918be..8effd7b03ca4dd4303ce3e98d45fe388a985c3d6 100644 --- a/satella/coding/structures/ranking.py +++ b/satella/coding/structures/ranking.py @@ -41,7 +41,7 @@ class Ranking(tp.Generic[T]): def add(self, item: T) -> None: """ - Add a single element to the ranking and recalculate it + Add a single element to the ranking and _recalculate it """ index = self.ranking.add(item) for position, item in enumerate(self.ranking[index:], start=index): @@ -49,7 +49,7 @@ class Ranking(tp.Generic[T]): def remove(self, item: T) -> None: """ - Remove a single element from the ranking and recalculate it + Remove a single element from the ranking and _recalculate it """ index = self.ranking.index(item) self.ranking.remove(item) diff --git a/satella/instrumentation/cpu_time/__init__.py b/satella/instrumentation/cpu_time/__init__.py index 94049f584668077af771390b40ceb3c6a7ba5500..68eaa651c84b8c4e9a188c4ef77da786f5470297 100644 --- a/satella/instrumentation/cpu_time/__init__.py +++ b/satella/instrumentation/cpu_time/__init__.py @@ -1,6 +1,6 @@ -from .collector import calculate_occupancy_factor, sleep_cpu_aware, CPUTimeManager +from .collector import calculate_occupancy_factor, sleep_cpu_aware, CPUTimeManager, get_own_cpu_usage, pCPUtimes from .concurrency import CPUTimeAwareIntervalTerminableThread -__all__ = ['calculate_occupancy_factor', 'sleep_cpu_aware', +__all__ = ['calculate_occupancy_factor', 'sleep_cpu_aware', 'pCPUtimes', 'get_own_cpu_usage', 'CPUTimeAwareIntervalTerminableThread', 'CPUTimeManager'] diff --git a/satella/instrumentation/cpu_time/collector.py b/satella/instrumentation/cpu_time/collector.py index 590da9bfa5ae22ff867a3e7243b0f5aba236390e..47a4348d64f37975b128c5a4c08dd4e15de9cf8c 100644 --- a/satella/instrumentation/cpu_time/collector.py +++ b/satella/instrumentation/cpu_time/collector.py @@ -1,20 +1,27 @@ +from __future__ import annotations +import os import typing as tp import threading import multiprocessing import time +import collections import psutil from satella.coding import for_argument -from satella.coding.structures import Singleton +from satella.coding.decorators import repeat_forever from satella.coding.transforms import percentile from satella.time import parse_time_string -DEFAULT_REFRESH_EACH = '30m' -DEFAULT_WINDOW_SECONDS = '5m' +DEFAULT_REFRESH_EACH = '10s' +DEFAULT_WINDOW_SECONDS = '10s' + + +pCPUtimes = collections.namedtuple('pcputimes', + ['user', 'system', 'children_user', 'children_system', + 'iowait']) -@Singleton class CPUProfileBuilderThread(threading.Thread): """ A CPU profile builder thread and a core singleton object to use. @@ -25,6 +32,7 @@ class CPUProfileBuilderThread(threading.Thread): Or a time string. :param refresh_each: time of seconds to sleep between rebuilding of profiles, or a time string. """ + thread = None def __init__(self, window_seconds: tp.Union[str, int] = DEFAULT_WINDOW_SECONDS, refresh_each: tp.Union[str, int] = DEFAULT_REFRESH_EACH, @@ -33,10 +41,49 @@ class CPUProfileBuilderThread(threading.Thread): self.window_size = int(parse_time_string(window_seconds)) self.refresh_each = parse_time_string(refresh_each) self.data = [] + self.process = psutil.Process(os.getpid()) self.percentiles_requested = list(percentiles_requested) self.percentile_values = [] self.percentiles_regenerated = False - self.start() + self.started = False + self.own_load_average = collections.deque() # typing: tuple[float, pCPUtimes] + + @staticmethod + def get_instance(): + """Access instances of this thread in this way ONLY!""" + if CPUProfileBuilderThread.thread is None: + CPUProfileBuilderThread.thread = CPUProfileBuilderThread() + CPUProfileBuilderThread.thread.start() + return CPUProfileBuilderThread.thread + + def start(self) -> None: + if self.started: + return + super().start() + self.started = True + + def save_load(self, times: pCPUtimes) -> None: + while len(self.own_load_average) > 3 and \ + self.own_load_average[0][0] < self.own_load_average[-1][0] - self.window_size: + self.own_load_average.popleft() + tpl = time.monotonic(), times + self.own_load_average.append(tpl) + + def get_own_cpu_usage(self) -> tp.Optional[pCPUtimes]: + """ + Return own CPU usage. + + :return: None if data not yet ready, or a PCPUtimes namedtuple + """ + if len(self.own_load_average) < 2: + return None + time_p, times_v = self.own_load_average[-2] + time_c, times_c = self.own_load_average[-1] + difference = time_c - time_p + tp = {} + for field in times_v._fields: + tp[field] = (getattr(times_c, field) - getattr(times_v, field)) / difference + return pCPUtimes(**tp) def request_percentile(self, percent: float) -> None: if percent not in self.percentiles_requested: @@ -54,7 +101,8 @@ class CPUProfileBuilderThread(threading.Thread): def is_done(self) -> bool: return bool(self.data) - def recalculate(self) -> None: + def _recalculate(self) -> None: + """Takes as long as window size""" data = [] calculate_occupancy_factor() # as first values tend to be a bit wonky for _ in range(int(self.window_size)): @@ -68,10 +116,11 @@ class CPUProfileBuilderThread(threading.Thread): self.percentiles_regenerated = True self.data = data + @repeat_forever def run(self): - while True: - time.sleep(self.refresh_each) - self.recalculate() + self._recalculate() + self.save_load(self.process.cpu_times()) + time.sleep(self.refresh_each) class CPUTimeManager: @@ -82,7 +131,7 @@ class CPUTimeManager: :param percent: float between 0 and 1 :return: the value of the percentile """ - return CPUProfileBuilderThread().percentile(percent) + return CPUProfileBuilderThread.get_instance().percentile(percent) @staticmethod def set_window_size(window_size: float) -> None: @@ -91,7 +140,13 @@ class CPUTimeManager: :param window_size: time, in seconds """ - CPUProfileBuilderThread().window_size = window_size + CPUProfileBuilderThread.get_instance().window_size = window_size + + @staticmethod + def set_refresh_each(refresh: tp.Union[str, float, int]) -> None: + """Change the refresh interval for the CPU usage collection thread""" + + CPUProfileBuilderThread.get_instance().refresh_each = parse_time_string(refresh) @for_argument(parse_time_string) @@ -179,8 +234,20 @@ def calculate_occupancy_factor() -> float: :return: a float between 0 and 1 telling you how occupied CPU-wise is your system. """ + CPUProfileBuilderThread.get_instance() c = _calculate_occupancy_factor() while c is None: time.sleep(0.1) c = _calculate_occupancy_factor() return c + + +def get_own_cpu_usage() -> tp.Optional[pCPUtimes]: + """ + Return own CPU usage (this process only) + + :return: a namedtuple of (user, system, children_user, children_system, iowait) divided by number of seconds that + passed since the last measure. + or None if data not yet ready + """ + return CPUProfileBuilderThread.get_instance().get_own_cpu_usage() diff --git a/satella/instrumentation/cpu_time/concurrency.py b/satella/instrumentation/cpu_time/concurrency.py index 58ae63e3e543d30324ace3be267715220082a2f5..aefc676e90b9f2a10d09998399d43574b507d9a5 100644 --- a/satella/instrumentation/cpu_time/concurrency.py +++ b/satella/instrumentation/cpu_time/concurrency.py @@ -31,7 +31,7 @@ class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=A self.seconds = parse_time_string(seconds) self.wakeup_interval = parse_time_string(wakeup_interval) self.max_sooner = max_sooner or seconds * 0.1 - cp_bt = CPUProfileBuilderThread() + cp_bt = CPUProfileBuilderThread.get_instance() cp_bt.request_percentile(percentile) self.percentile = percentile super().__init__(seconds, *args, **kwargs) @@ -48,7 +48,7 @@ class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=A return measurement() def __sleep_waiting_for_cpu(self, how_long: float) -> None: - cp_bt = CPUProfileBuilderThread() + cp_bt = CPUProfileBuilderThread.get_instance() per_val = cp_bt.percentile(self.percentile) while how_long > 0 and not self._terminating: diff --git a/tests/test_instrumentation/test_cpu_time.py b/tests/test_instrumentation/test_cpu_time.py index b3a3ae8df0c13d443a787f813b84f8df7b47df2c..bedf4431f9f62faad20c2250f671928e7a4c2964 100644 --- a/tests/test_instrumentation/test_cpu_time.py +++ b/tests/test_instrumentation/test_cpu_time.py @@ -1,11 +1,21 @@ +import threading import time import unittest from satella.instrumentation.cpu_time import calculate_occupancy_factor, sleep_cpu_aware, \ - CPUTimeAwareIntervalTerminableThread + CPUTimeAwareIntervalTerminableThread, get_own_cpu_usage, CPUTimeManager + +from satella.time import measure + +TERMINATOR = True class TestCPUTime(unittest.TestCase): + + @classmethod + def setUpClass(cls): + CPUTimeManager.set_refresh_each('5s') + def test_cpu_time_aware_terminable_thread_terminates(self): class TestingThread(CPUTimeAwareIntervalTerminableThread): def __init__(self): @@ -34,6 +44,33 @@ class TestCPUTime(unittest.TestCase): self.assertEqual(tt.a, 2) tt.terminate().join() + def test_get_own_cpu_usage(self): + global TERMINATOR + def run(): + global TERMINATOR + while TERMINATOR: + pass + thr = threading.Thread(target=run) + usage = get_own_cpu_usage() # start the thread + thr.start() + time.sleep(10) + while usage is None: + time.sleep(5) + usage = get_own_cpu_usage() + + try: + with measure(timeout=30) as m: + while not m.timeouted: + time.sleep(5) + usage = get_own_cpu_usage() + if usage.user > 0.9: + break + else: + self.fail('Timeout when waiting for significant CPU usage') + finally: + TERMINATOR = False + thr.join() + def test_sleep_except(self): c = time.monotonic() sleep_cpu_aware(1)