From b0970bd91c65c1950f32a9449befa81d04f2cfc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Wed, 4 Nov 2020 20:41:07 +0100 Subject: [PATCH] ver 2 --- docs/index.rst | 1 + docs/{ => instrumentation}/cpu_time.rst | 15 +++- satella/__init__.py | 2 +- satella/coding/concurrent/__init__.py | 2 +- .../{sync_threadpool.py => sync.py} | 0 .../dictionaries/writeback_cache.py | 7 +- satella/cpu_time/__init__.py | 5 -- satella/cpu_time/concurrency.py | 63 --------------- satella/instrumentation/cpu_time/__init__.py | 6 ++ .../cpu_time/collector.py | 35 ++++++--- .../instrumentation/cpu_time/concurrency.py | 76 +++++++++++++++++++ .../test_cpu_time.py | 5 +- .../test_metrics/test_structures.py | 1 - 13 files changed, 127 insertions(+), 91 deletions(-) rename docs/{ => instrumentation}/cpu_time.rst (51%) rename satella/coding/concurrent/{sync_threadpool.py => sync.py} (100%) delete mode 100644 satella/cpu_time/__init__.py delete mode 100644 satella/cpu_time/concurrency.py create mode 100644 satella/instrumentation/cpu_time/__init__.py rename satella/{ => instrumentation}/cpu_time/collector.py (82%) create mode 100644 satella/instrumentation/cpu_time/concurrency.py rename tests/{ => test_instrumentation}/test_cpu_time.py (87%) diff --git a/docs/index.rst b/docs/index.rst index d67537b5..7a13933c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -22,6 +22,7 @@ Visit the project's page at GitHub_! coding/sequences coding/transforms coding/typing + instrumentation/cpu_time instrumentation/traceback instrumentation/memory instrumentation/metrics diff --git a/docs/cpu_time.rst b/docs/instrumentation/cpu_time.rst similarity index 51% rename from docs/cpu_time.rst rename to docs/instrumentation/cpu_time.rst index 3f295a08..a20a0146 100644 --- a/docs/cpu_time.rst +++ b/docs/instrumentation/cpu_time.rst @@ -1,4 +1,3 @@ -======== CPU time ======== @@ -10,6 +9,18 @@ The profile is refreshed each X minutes. .. autofunction:: satella.cpu_time.calculate_occupancy_factor -.. autofunction:: satella.cpu_time.sleep_except +.. autofunction:: satella.cpu_time.sleep_cpu_aware + + +Here's the primary thread you can use to work with things: + +.. autoclass:: satella.cpu_time.CPUTimeManager + :members: + +And here's a helpful variant of +:py:class:`satella.coding.concurrent.IntervalTerminableThread`: + +.. autoclass:: satella.cpu_time.CPUTimeAwareIntervalTerminableThread + :members: diff --git a/satella/__init__.py b/satella/__init__.py index 5493cd9b..96c63727 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.14_b1' +__version__ = '2.14_b2' diff --git a/satella/coding/concurrent/__init__.py b/satella/coding/concurrent/__init__.py index 4bae20bf..64cb2e82 100644 --- a/satella/coding/concurrent/__init__.py +++ b/satella/coding/concurrent/__init__.py @@ -6,7 +6,7 @@ from .id_allocator import IDAllocator from .locked_dataset import LockedDataset from .locked_structure import LockedStructure from .monitor import MonitorList, Monitor, MonitorDict, RMonitor -from .sync_threadpool import sync_threadpool +from .sync import sync_threadpool from .thread import TerminableThread, Condition, SingleStartThread, call_in_separate_thread, \ BogusTerminableThread, IntervalTerminableThread from .timer import Timer diff --git a/satella/coding/concurrent/sync_threadpool.py b/satella/coding/concurrent/sync.py similarity index 100% rename from satella/coding/concurrent/sync_threadpool.py rename to satella/coding/concurrent/sync.py diff --git a/satella/coding/structures/dictionaries/writeback_cache.py b/satella/coding/structures/dictionaries/writeback_cache.py index aa3a09e3..0b6ea125 100644 --- a/satella/coding/structures/dictionaries/writeback_cache.py +++ b/satella/coding/structures/dictionaries/writeback_cache.py @@ -1,8 +1,8 @@ import time import typing as tp -from concurrent.futures import Executor, ThreadPoolExecutor, wait, ProcessPoolExecutor +from concurrent.futures import Executor, ThreadPoolExecutor, ProcessPoolExecutor -from satella.coding.concurrent import sync_threadpool +from satella.coding.concurrent.sync import sync_threadpool from satella.coding.concurrent.monitor import Monitor from satella.coding.recast_exceptions import silence_excs from satella.coding.typing import V, K @@ -54,11 +54,10 @@ class ExclusiveWritebackCache(tp.Generic[K, V]): """ Return current amount of entries waiting for writeback """ + # noinspection PyProtectedMember if isinstance(self.executor, ThreadPoolExecutor): - # noinspection PyProtectedMember return self.executor._work_queue.qsize() elif isinstance(self.executor, ProcessPoolExecutor): - # noinspection PyProtectedMember return self.executor._call_queue.qsize() else: return 0 diff --git a/satella/cpu_time/__init__.py b/satella/cpu_time/__init__.py deleted file mode 100644 index 1c75cedd..00000000 --- a/satella/cpu_time/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .collector import calculate_occupancy_factor, sleep_except -from .concurrency import CPUTimeAwareIntervalTerminableThread - -__all__ = ['calculate_occupancy_factor', 'sleep_except', - 'CPUTimeAwareIntervalTerminableThread'] diff --git a/satella/cpu_time/concurrency.py b/satella/cpu_time/concurrency.py deleted file mode 100644 index 4776ff9b..00000000 --- a/satella/cpu_time/concurrency.py +++ /dev/null @@ -1,63 +0,0 @@ -from abc import abstractmethod, ABCMeta - -from satella.coding.concurrent import IntervalTerminableThread -from satella.cpu_time import sleep_except -from satella.cpu_time.collector import CPUProfileBuilderThread -from satella.time import measure - -CHECK_INTERVAL = 2 - - -class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=ABCMeta): - """ - An IntervalTerminableThread that can call the loop a bit faster than usual, - based of current CPU time metrics. - - :param seconds: time that a single looping through should take. This will - include the time spent on calling .loop(), the rest of this time will - be spent safe_sleep()ing. - :param max_sooner: amount of seconds that is ok to call this earlier - :param percentile: percentile that CPU usage has to fall below to call it earlier. - """ - - def __init__(self, seconds: float, max_sooner: float, percentile: float, *args, **kwargs): - self.seconds = seconds - self.max_sooner = max_sooner - cp_bt = CPUProfileBuilderThread() - cp_bt.request_percentile(percentile) - self.percentile = percentile - super().__init__(seconds, *args, **kwargs) - - @abstractmethod - def loop(self) -> None: - """ - Override me! - - If True is returned, the thread will not sleep and .loop() will be executed - once more. - """ - - def run(self): - self.prepare() - while not self._terminating: - with measure() as measurement: - v = self.loop() - if measurement() > self.seconds: - continue - seconds_to_wait = self.seconds - measurement() - while seconds_to_wait > 0: - if seconds_to_wait > self.max_sooner: - self.safe_sleep(seconds_to_wait - self.max_sooner) - seconds_to_wait -= self.max_sooner - if seconds_to_wait <= 0: - continue - cp_bt = CPUProfileBuilderThread() - perc_val = cp_bt.percentile(self.percentile) - - while seconds_to_wait > 0: - time_to_sleep = min(CHECK_INTERVAL, seconds_to_wait) - if sleep_except(time_to_sleep, perc_val) or self.terminating: - seconds_to_wait = 0 - break - seconds_to_wait -= time_to_sleep - self.cleanup() diff --git a/satella/instrumentation/cpu_time/__init__.py b/satella/instrumentation/cpu_time/__init__.py new file mode 100644 index 00000000..94049f58 --- /dev/null +++ b/satella/instrumentation/cpu_time/__init__.py @@ -0,0 +1,6 @@ +from .collector import calculate_occupancy_factor, sleep_cpu_aware, CPUTimeManager +from .concurrency import CPUTimeAwareIntervalTerminableThread + +__all__ = ['calculate_occupancy_factor', 'sleep_cpu_aware', + 'CPUTimeAwareIntervalTerminableThread', + 'CPUTimeManager'] diff --git a/satella/cpu_time/collector.py b/satella/instrumentation/cpu_time/collector.py similarity index 82% rename from satella/cpu_time/collector.py rename to satella/instrumentation/cpu_time/collector.py index ba7da4c5..addbdcde 100644 --- a/satella/cpu_time/collector.py +++ b/satella/instrumentation/cpu_time/collector.py @@ -12,7 +12,7 @@ from satella.coding.transforms import percentile @Singleton class CPUProfileBuilderThread(threading.Thread): """ - A CPU profile builder thread + A CPU profile builder thread and a core singleton object to use. :param window_seconds: the amount of seconds for which to collect data :param refresh_each: time of seconds to sleep between rebuilding of profiles @@ -67,26 +67,41 @@ class CPUProfileBuilderThread(threading.Thread): self.recalculate() -def sleep_except(seconds: float, of_below: tp.Optional[float] = None, - of_above: tp.Optional[float] = None, - check_each: float = 1) -> bool: +class CPUTimeManager: + @staticmethod + def percentile(percent: float) -> float: + """ + Return given percentile of current CPU time's profile + :param percent: float between 0 and 1 + :return: the value of the percentile + """ + cp = CPUProfileBuilderThread() + return cp.percentile(percent) + + +def sleep_cpu_aware(seconds: float, of_below: tp.Optional[float] = None, + of_above: tp.Optional[float] = None, + check_each: float = 1) -> bool: """ Sleep for specified number of seconds. Quit earlier if the occupancy factor goes below of_below or above of_above - :param seconds: - :param of_below: - :param of_above: + :param seconds: time to sleep + :param of_below: occupancy factor below which the sleep will return + :param of_above: occupancy factor above which the sleep will return :param check_each: amount of seconds to sleep at once :return: whether was awoken due to CPU time condition """ + if of_below is None and of_above is None: + time.sleep(seconds) + return False of = calculate_occupancy_factor() while seconds > 0: if of_above is not None: - if of_above < of: + if of > of_above: return True if of_below is not None: - if of_below > of: + if of < of_below: return True time_to_sleep = min(seconds, check_each) time.sleep(time_to_sleep) @@ -146,5 +161,3 @@ def calculate_occupancy_factor() -> float: c = _calculate_occupancy_factor() return c - -calculate_occupancy_factor() diff --git a/satella/instrumentation/cpu_time/concurrency.py b/satella/instrumentation/cpu_time/concurrency.py new file mode 100644 index 00000000..457693f0 --- /dev/null +++ b/satella/instrumentation/cpu_time/concurrency.py @@ -0,0 +1,76 @@ +import typing as tp +from abc import abstractmethod, ABCMeta + +from satella.coding.concurrent import IntervalTerminableThread +from satella.instrumentation.cpu_time import sleep_cpu_aware +from satella.instrumentation.cpu_time.collector import CPUProfileBuilderThread +from satella.time import measure + + +class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=ABCMeta): + """ + An IntervalTerminableThread that can call the loop a bit faster than usual, + based of current CPU time metrics. + + :param seconds: time that a single looping through should take. This will + include the time spent on calling .loop(), the rest of this time will + be spent safe_sleep()ing. + :param max_sooner: amount of seconds that is ok to call this earlier. + Default is 6 times seconds. + :param percentile: percentile that CPU usage has to fall below to call it earlier. + :param wakeup_interval: amount of seconds to wake up between to check for _terminating status + """ + + def __init__(self, seconds: float, max_sooner: tp.Optional[float] = None, percentile: float = 0.3, + wakeup_interval: float = 3.0, *args, **kwargs): + self.seconds = seconds + self.wakeup_interval = wakeup_interval + self.max_sooner = max_sooner or seconds * 6 + cp_bt = CPUProfileBuilderThread() + cp_bt.request_percentile(percentile) + self.percentile = percentile + super().__init__(seconds, *args, **kwargs) + + @abstractmethod + def loop(self) -> None: + """ + Override me! + + If True is returned, the thread will not sleep and .loop() will be executed + once more. + """ + + def _execute_measured(self) -> float: + with measure() as measurement: + self.loop() + return measurement() + + def __sleep_waiting_for_cpu(self, how_long: float) -> None: + cp_bt = CPUProfileBuilderThread() + per_val = cp_bt.percentile(self.percentile) + + while how_long > 0 and not self._terminating: + time_to_sleep = min(self.wakeup_interval, how_long) + + if sleep_cpu_aware(time_to_sleep, per_val): + break + how_long -= time_to_sleep + + def __sleep(self, how_long: float) -> None: + if how_long > self.max_sooner: + if self.safe_sleep(how_long - self.max_sooner): + return + how_long = self.max_sooner + self.__sleep_waiting_for_cpu(how_long) + + def run(self): + self.prepare() + while not self._terminating: + measured = self._execute_measured() + seconds_to_wait = self.seconds - measured + if seconds_to_wait > 0: + self.__sleep(seconds_to_wait) + elif seconds_to_wait < 0: + self.on_overrun(measured) + + self.cleanup() diff --git a/tests/test_cpu_time.py b/tests/test_instrumentation/test_cpu_time.py similarity index 87% rename from tests/test_cpu_time.py rename to tests/test_instrumentation/test_cpu_time.py index 44a82807..1de37b72 100644 --- a/tests/test_cpu_time.py +++ b/tests/test_instrumentation/test_cpu_time.py @@ -1,7 +1,7 @@ import time import unittest -from satella.cpu_time import calculate_occupancy_factor, sleep_except, \ +from satella.instrumentation.cpu_time import calculate_occupancy_factor, sleep_cpu_aware, \ CPUTimeAwareIntervalTerminableThread @@ -25,13 +25,12 @@ class TestCPUTime(unittest.TestCase): def test_sleep_except(self): c = time.monotonic() - sleep_except(1) + sleep_cpu_aware(1) self.assertGreaterEqual(time.monotonic() - c, 1) def test_calculate_occupancy_factor(self): c = calculate_occupancy_factor() self.assertGreaterEqual(c, 0) - self.assertLessEqual(c, 1) c = calculate_occupancy_factor() self.assertGreaterEqual(c, 0) self.assertLessEqual(c, 1) diff --git a/tests/test_instrumentation/test_metrics/test_structures.py b/tests/test_instrumentation/test_metrics/test_structures.py index 1dafe14e..f7875037 100644 --- a/tests/test_instrumentation/test_metrics/test_structures.py +++ b/tests/test_instrumentation/test_metrics/test_structures.py @@ -42,7 +42,6 @@ class TestThreadPoolExecutor(unittest.TestCase): entries_waiting=entries_waiting) self.assertEqual(wbc[5], 3) self.assertEqual(b['no_calls'], 1) - self.assertEqual(n_th(entries_waiting.to_metric_data().values).value, 0) self.assertRaises(KeyError, lambda: wbc[-1]) self.assertEqual(b['no_calls'], 2) self.assertEqual(wbc[5], 3) -- GitLab