From 581649dbc9e550d51acb3558ce322015a35a5fd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Wed, 4 Nov 2020 18:48:50 +0100 Subject: [PATCH] beta for 2.14 --- CHANGELOG.md | 4 +- docs/coding/functions.rst | 4 - docs/coding/transforms.rst | 7 + docs/cpu_time.rst | 15 ++ satella/__init__.py | 2 +- satella/coding/transforms/__init__.py | 3 +- satella/coding/transforms/percentile.py | 23 +++ satella/cpu_time/__init__.py | 5 + satella/cpu_time/collector.py | 150 ++++++++++++++++++ satella/cpu_time/concurrency.py | 63 ++++++++ .../metrics/metric_types/summary.py | 22 +-- tests/test_cpu_time.py | 37 +++++ 12 files changed, 307 insertions(+), 28 deletions(-) create mode 100644 docs/cpu_time.rst create mode 100644 satella/coding/transforms/percentile.py create mode 100644 satella/cpu_time/__init__.py create mode 100644 satella/cpu_time/collector.py create mode 100644 satella/cpu_time/concurrency.py create mode 100644 tests/test_cpu_time.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d638b55d..39c1c895 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ -# v2.13.5 +# v2.14 * more cache classes are now generic * added `MetrifiedThreadPoolExecutor.get_queue_length` * `ExclusiveWritebackCache.sync` is no longer best effort * fixed logging exceptions in `MetrifiedThreadPoolExecutor` +* added `cpu_time` +* extracted `percentile` diff --git a/docs/coding/functions.rst b/docs/coding/functions.rst index dde15ad8..945d0d01 100644 --- a/docs/coding/functions.rst +++ b/docs/coding/functions.rst @@ -16,10 +16,6 @@ Functions and decorators .. autofunction:: satella.coding.queue_iterator -.. autofunction:: satella.coding.transforms.intify - -.. autofunction:: satella.coding.transforms.jsonify - .. autofunction:: satella.coding.update_key_if_not_none .. autofunction:: satella.coding.update_key_if_true diff --git a/docs/coding/transforms.rst b/docs/coding/transforms.rst index cf5e9050..e81b0b57 100644 --- a/docs/coding/transforms.rst +++ b/docs/coding/transforms.rst @@ -1,6 +1,13 @@ Rudimentary data transforms =========================== + +.. autofunction:: satella.coding.transforms.intify + +.. autofunction:: satella.coding.transforms.jsonify + +.. autofunction:: satella.coding.transforms.percentile + pad_to_multiple_of_length ------------------------- diff --git a/docs/cpu_time.rst b/docs/cpu_time.rst new file mode 100644 index 00000000..3f295a08 --- /dev/null +++ b/docs/cpu_time.rst @@ -0,0 +1,15 @@ +======== +CPU time +======== + +Satella's cpu_time helps your processes play nice with the overall CPU usage, ie. deferring +non-critical tasks to until CPU usage falls lower than the average. + +cpu_time does this by periodically monitoring CPU's usage and building your usage profile. +The profile is refreshed each X minutes. + +.. autofunction:: satella.cpu_time.calculate_occupancy_factor + +.. autofunction:: satella.cpu_time.sleep_except + + diff --git a/satella/__init__.py b/satella/__init__.py index 15812cb1..5493cd9b 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.13.5_a4' +__version__ = '2.14_b1' diff --git a/satella/coding/transforms/__init__.py b/satella/coding/transforms/__init__.py index b65c3b48..94f235a3 100644 --- a/satella/coding/transforms/__init__.py +++ b/satella/coding/transforms/__init__.py @@ -5,10 +5,11 @@ import typing as tp from satella.coding.decorators import for_argument from .jsonify import jsonify from .merger import merge_series +from .percentile import percentile __all__ = ['stringify', 'split_shuffle_and_join', 'one_tuple', 'merge_series', 'pad_to_multiple_of_length', 'clip', - 'jsonify', 'intify'] + 'jsonify', 'intify', 'percentile'] from satella.coding.typing import T, NoArgCallable, Appendable, Number, Predicate diff --git a/satella/coding/transforms/percentile.py b/satella/coding/transforms/percentile.py new file mode 100644 index 00000000..0e4ebc52 --- /dev/null +++ b/satella/coding/transforms/percentile.py @@ -0,0 +1,23 @@ +import math +import typing as tp + + +# shamelessly taken from +# http://code.activestate.com/recipes/511478-finding-the-percentile-of-the-values/) +def percentile(n: tp.List[float], percent: float) -> float: + """ + Find the percentile of a list of values. + + :param n: - is a list of values. Note this MUST BE already sorted. + :param percent: - a float value from 0.0 to 1.0. + + :return: the percentile of the values + """ + k = (len(n) - 1) * percent + f = math.floor(k) + c = math.ceil(k) + if f == c: + return n[int(k)] + d0 = n[int(f)] * (c - k) + d1 = n[int(c)] * (k - f) + return d0 + d1 diff --git a/satella/cpu_time/__init__.py b/satella/cpu_time/__init__.py new file mode 100644 index 00000000..1c75cedd --- /dev/null +++ b/satella/cpu_time/__init__.py @@ -0,0 +1,5 @@ +from .collector import calculate_occupancy_factor, sleep_except +from .concurrency import CPUTimeAwareIntervalTerminableThread + +__all__ = ['calculate_occupancy_factor', 'sleep_except', + 'CPUTimeAwareIntervalTerminableThread'] diff --git a/satella/cpu_time/collector.py b/satella/cpu_time/collector.py new file mode 100644 index 00000000..ba7da4c5 --- /dev/null +++ b/satella/cpu_time/collector.py @@ -0,0 +1,150 @@ +import typing as tp +import threading +import multiprocessing +import time + +import psutil + +from satella.coding.structures import Singleton +from satella.coding.transforms import percentile + + +@Singleton +class CPUProfileBuilderThread(threading.Thread): + """ + A CPU profile builder thread + + :param window_seconds: the amount of seconds for which to collect data + :param refresh_each: time of seconds to sleep between rebuilding of profiles + """ + def __init__(self, window_seconds: int = 300, refresh_each: int = 1800, + percentiles_requested: tp.Sequence[float] = (0.9, )): + super().__init__(name='CPU profile builder', daemon=True) + self.window_size = window_seconds + self.refresh_each = refresh_each + self.data = [] + self.minimum_of = None + self.maximum_of = None + self.percentiles_requested = list(percentiles_requested) + self.percentile_values = [] + self.percentiles_regenerated = False + self.start() + + def request_percentile(self, percent: float) -> None: + if percent not in self.percentiles_requested: + self.percentiles_requested.append(percent) + self.percentiles_regenerated = False + + def percentile(self, percent: float) -> float: + if not self.data: + return 0 + if percent in self.percentiles_requested and self.percentiles_regenerated: + return self.percentile_values[self.percentiles_requested.index(percent)] + else: + return percentile(self.data, percent) + + def is_done(self) -> bool: + return bool(self.data) + + def recalculate(self) -> None: + data = [] + calculate_occupancy_factor() # as first values tend to be a bit wonky + for _ in range(self.window_size): + time.sleep(1) + data.append(calculate_occupancy_factor()) + percentiles = [] + for percent in self.percentiles_requested: + percentiles.append(percentile(data, percent)) + self.percentile_values = percentiles + self.percentiles_regenerated = True + self.minimum_of = min(data) + self.maximum_of = max(data) + self.data = data + + def run(self): + while True: + time.sleep(self.refresh_each) + self.recalculate() + + +def sleep_except(seconds: float, of_below: tp.Optional[float] = None, + of_above: tp.Optional[float] = None, + check_each: float = 1) -> bool: + """ + Sleep for specified number of seconds. + + Quit earlier if the occupancy factor goes below of_below or above of_above + :param seconds: + :param of_below: + :param of_above: + :param check_each: amount of seconds to sleep at once + :return: whether was awoken due to CPU time condition + """ + of = calculate_occupancy_factor() + while seconds > 0: + if of_above is not None: + if of_above < of: + return True + if of_below is not None: + if of_below > of: + return True + time_to_sleep = min(seconds, check_each) + time.sleep(time_to_sleep) + seconds -= time_to_sleep + if seconds <= 0: + return False + of = calculate_occupancy_factor() + return False + + +previous_cf: float = None +previous_timestamp: float = None + + +def _calculate_occupancy_factor() -> float: + c = psutil.cpu_times() + try: + try: + try: + used = c.user + c.nice + c.system + c.irq + c.softirq + c.steal + c.guest + c.guest_nice + except AttributeError: + # Linux? + used = c.user + c.nice + c.system + c.irq + c.softirq + except AttributeError: + # UNIX ? + used = c.user + c.nice + c.system + except AttributeError: + # windows? + used = c.user + c.system + c.interrupt + cur_time = time.monotonic() + occupation_factor = used / multiprocessing.cpu_count() + global previous_timestamp, previous_cf + if previous_timestamp is None: + previous_cf = occupation_factor + previous_timestamp = cur_time + return + delta = cur_time - previous_timestamp + if delta == 0: + return + of = (occupation_factor - previous_cf)/delta + previous_cf = occupation_factor + previous_timestamp = cur_time + return of + + +def calculate_occupancy_factor() -> float: + """ + Return a float between 0 and 1 telling you how occupied is your system. + + Note that this will be the average between now and the time it was last called. + + This in rare cases may block for up to 0.1 seconds + """ + c = _calculate_occupancy_factor() + while c is None: + time.sleep(0.1) + c = _calculate_occupancy_factor() + return c + + +calculate_occupancy_factor() diff --git a/satella/cpu_time/concurrency.py b/satella/cpu_time/concurrency.py new file mode 100644 index 00000000..4776ff9b --- /dev/null +++ b/satella/cpu_time/concurrency.py @@ -0,0 +1,63 @@ +from abc import abstractmethod, ABCMeta + +from satella.coding.concurrent import IntervalTerminableThread +from satella.cpu_time import sleep_except +from satella.cpu_time.collector import CPUProfileBuilderThread +from satella.time import measure + +CHECK_INTERVAL = 2 + + +class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=ABCMeta): + """ + An IntervalTerminableThread that can call the loop a bit faster than usual, + based of current CPU time metrics. + + :param seconds: time that a single looping through should take. This will + include the time spent on calling .loop(), the rest of this time will + be spent safe_sleep()ing. + :param max_sooner: amount of seconds that is ok to call this earlier + :param percentile: percentile that CPU usage has to fall below to call it earlier. + """ + + def __init__(self, seconds: float, max_sooner: float, percentile: float, *args, **kwargs): + self.seconds = seconds + self.max_sooner = max_sooner + cp_bt = CPUProfileBuilderThread() + cp_bt.request_percentile(percentile) + self.percentile = percentile + super().__init__(seconds, *args, **kwargs) + + @abstractmethod + def loop(self) -> None: + """ + Override me! + + If True is returned, the thread will not sleep and .loop() will be executed + once more. + """ + + def run(self): + self.prepare() + while not self._terminating: + with measure() as measurement: + v = self.loop() + if measurement() > self.seconds: + continue + seconds_to_wait = self.seconds - measurement() + while seconds_to_wait > 0: + if seconds_to_wait > self.max_sooner: + self.safe_sleep(seconds_to_wait - self.max_sooner) + seconds_to_wait -= self.max_sooner + if seconds_to_wait <= 0: + continue + cp_bt = CPUProfileBuilderThread() + perc_val = cp_bt.percentile(self.percentile) + + while seconds_to_wait > 0: + time_to_sleep = min(CHECK_INTERVAL, seconds_to_wait) + if sleep_except(time_to_sleep, perc_val) or self.terminating: + seconds_to_wait = 0 + break + seconds_to_wait -= time_to_sleep + self.cleanup() diff --git a/satella/instrumentation/metrics/metric_types/summary.py b/satella/instrumentation/metrics/metric_types/summary.py index 8564b528..f3d13ed9 100644 --- a/satella/instrumentation/metrics/metric_types/summary.py +++ b/satella/instrumentation/metrics/metric_types/summary.py @@ -3,33 +3,13 @@ import math import typing as tp import warnings +from satella.coding.transforms.percentile import percentile from .base import EmbeddedSubmetrics, MetricLevel from .measurable_mixin import MeasurableMixin from .registry import register_metric from ..data import MetricData, MetricDataCollection -# shamelessly taken from -# http://code.activestate.com/recipes/511478-finding-the-percentile-of-the-values/) -def percentile(n: tp.List[float], percent: float) -> float: - """ - Find the percentile of a list of values. - - :param n: - is a list of values. Note this MUST BE already sorted. - :param percent: - a float value from 0.0 to 1.0. - - :return: the percentile of the values - """ - k = (len(n) - 1) * percent - f = math.floor(k) - c = math.ceil(k) - if f == c: - return n[int(k)] - d0 = n[int(f)] * (c - k) - d1 = n[int(c)] * (k - f) - return d0 + d1 - - @register_metric class SummaryMetric(EmbeddedSubmetrics, MeasurableMixin): """ diff --git a/tests/test_cpu_time.py b/tests/test_cpu_time.py new file mode 100644 index 00000000..44a82807 --- /dev/null +++ b/tests/test_cpu_time.py @@ -0,0 +1,37 @@ +import time +import unittest + +from satella.cpu_time import calculate_occupancy_factor, sleep_except, \ + CPUTimeAwareIntervalTerminableThread + + +class TestCPUTime(unittest.TestCase): + def test_cpu_time_aware_terminable_thread(self): + class TestingThread(CPUTimeAwareIntervalTerminableThread): + def __init__(self): + super().__init__(5, 3, 0.5) + self.a = 0 + + def loop(self) -> None: + self.a += 1 + + tt = TestingThread() + tt.start() + time.sleep(0.2) + self.assertEqual(tt.a, 1) + time.sleep(5) + self.assertEqual(tt.a, 2) + tt.terminate().join() + + def test_sleep_except(self): + c = time.monotonic() + sleep_except(1) + self.assertGreaterEqual(time.monotonic() - c, 1) + + def test_calculate_occupancy_factor(self): + c = calculate_occupancy_factor() + self.assertGreaterEqual(c, 0) + self.assertLessEqual(c, 1) + c = calculate_occupancy_factor() + self.assertGreaterEqual(c, 0) + self.assertLessEqual(c, 1) -- GitLab