diff --git a/CHANGELOG.md b/CHANGELOG.md index d638b55df791f7f1ffbe9a1f5dd7d582bea6ebbd..39c1c895db54ca4d16b538e77ef6a734913537d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ -# v2.13.5 +# v2.14 * more cache classes are now generic * added `MetrifiedThreadPoolExecutor.get_queue_length` * `ExclusiveWritebackCache.sync` is no longer best effort * fixed logging exceptions in `MetrifiedThreadPoolExecutor` +* added `cpu_time` +* extracted `percentile` diff --git a/docs/coding/functions.rst b/docs/coding/functions.rst index dde15ad879ab3537d850568562d667e60fc03561..945d0d014136bf486b8c1ccdc663a393312ea4c3 100644 --- a/docs/coding/functions.rst +++ b/docs/coding/functions.rst @@ -16,10 +16,6 @@ Functions and decorators .. autofunction:: satella.coding.queue_iterator -.. autofunction:: satella.coding.transforms.intify - -.. autofunction:: satella.coding.transforms.jsonify - .. autofunction:: satella.coding.update_key_if_not_none .. autofunction:: satella.coding.update_key_if_true diff --git a/docs/coding/transforms.rst b/docs/coding/transforms.rst index cf5e90509925e2daed76567cf728d6699b9a0f2f..e81b0b574f70d9ecb88c45da4e35f4de03d97b73 100644 --- a/docs/coding/transforms.rst +++ b/docs/coding/transforms.rst @@ -1,6 +1,13 @@ Rudimentary data transforms =========================== + +.. autofunction:: satella.coding.transforms.intify + +.. autofunction:: satella.coding.transforms.jsonify + +.. autofunction:: satella.coding.transforms.percentile + pad_to_multiple_of_length ------------------------- diff --git a/docs/cpu_time.rst b/docs/cpu_time.rst new file mode 100644 index 0000000000000000000000000000000000000000..3f295a08114eaa3402eb8414e29a1ca740dbbb91 --- /dev/null +++ b/docs/cpu_time.rst @@ -0,0 +1,15 @@ +======== +CPU time +======== + +Satella's cpu_time helps your processes play nice with the overall CPU usage, ie. deferring +non-critical tasks to until CPU usage falls lower than the average. + +cpu_time does this by periodically monitoring CPU's usage and building your usage profile. +The profile is refreshed each X minutes. + +.. autofunction:: satella.cpu_time.calculate_occupancy_factor + +.. autofunction:: satella.cpu_time.sleep_except + + diff --git a/satella/__init__.py b/satella/__init__.py index 15812cb12d99467c84607dcc8dd856825ed7d827..5493cd9babf4e268f49a0aa0e8db600fa7fe5d9b 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.13.5_a4' +__version__ = '2.14_b1' diff --git a/satella/coding/transforms/__init__.py b/satella/coding/transforms/__init__.py index b65c3b4887d623551e4d5b390f96de0745aa9f6f..94f235a31be2e2d3ea7dd07ec4392bc15dd32252 100644 --- a/satella/coding/transforms/__init__.py +++ b/satella/coding/transforms/__init__.py @@ -5,10 +5,11 @@ import typing as tp from satella.coding.decorators import for_argument from .jsonify import jsonify from .merger import merge_series +from .percentile import percentile __all__ = ['stringify', 'split_shuffle_and_join', 'one_tuple', 'merge_series', 'pad_to_multiple_of_length', 'clip', - 'jsonify', 'intify'] + 'jsonify', 'intify', 'percentile'] from satella.coding.typing import T, NoArgCallable, Appendable, Number, Predicate diff --git a/satella/coding/transforms/percentile.py b/satella/coding/transforms/percentile.py new file mode 100644 index 0000000000000000000000000000000000000000..0e4ebc52adc94a9a1fe80fb82412f934e6eae46e --- /dev/null +++ b/satella/coding/transforms/percentile.py @@ -0,0 +1,23 @@ +import math +import typing as tp + + +# shamelessly taken from +# http://code.activestate.com/recipes/511478-finding-the-percentile-of-the-values/) +def percentile(n: tp.List[float], percent: float) -> float: + """ + Find the percentile of a list of values. + + :param n: - is a list of values. Note this MUST BE already sorted. + :param percent: - a float value from 0.0 to 1.0. + + :return: the percentile of the values + """ + k = (len(n) - 1) * percent + f = math.floor(k) + c = math.ceil(k) + if f == c: + return n[int(k)] + d0 = n[int(f)] * (c - k) + d1 = n[int(c)] * (k - f) + return d0 + d1 diff --git a/satella/cpu_time/__init__.py b/satella/cpu_time/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1c75cedde739ee0d215a9eb9b0b704acec086169 --- /dev/null +++ b/satella/cpu_time/__init__.py @@ -0,0 +1,5 @@ +from .collector import calculate_occupancy_factor, sleep_except +from .concurrency import CPUTimeAwareIntervalTerminableThread + +__all__ = ['calculate_occupancy_factor', 'sleep_except', + 'CPUTimeAwareIntervalTerminableThread'] diff --git a/satella/cpu_time/collector.py b/satella/cpu_time/collector.py new file mode 100644 index 0000000000000000000000000000000000000000..ba7da4c597343b7397d3226ad8a0081f1bd17146 --- /dev/null +++ b/satella/cpu_time/collector.py @@ -0,0 +1,150 @@ +import typing as tp +import threading +import multiprocessing +import time + +import psutil + +from satella.coding.structures import Singleton +from satella.coding.transforms import percentile + + +@Singleton +class CPUProfileBuilderThread(threading.Thread): + """ + A CPU profile builder thread + + :param window_seconds: the amount of seconds for which to collect data + :param refresh_each: time of seconds to sleep between rebuilding of profiles + """ + def __init__(self, window_seconds: int = 300, refresh_each: int = 1800, + percentiles_requested: tp.Sequence[float] = (0.9, )): + super().__init__(name='CPU profile builder', daemon=True) + self.window_size = window_seconds + self.refresh_each = refresh_each + self.data = [] + self.minimum_of = None + self.maximum_of = None + self.percentiles_requested = list(percentiles_requested) + self.percentile_values = [] + self.percentiles_regenerated = False + self.start() + + def request_percentile(self, percent: float) -> None: + if percent not in self.percentiles_requested: + self.percentiles_requested.append(percent) + self.percentiles_regenerated = False + + def percentile(self, percent: float) -> float: + if not self.data: + return 0 + if percent in self.percentiles_requested and self.percentiles_regenerated: + return self.percentile_values[self.percentiles_requested.index(percent)] + else: + return percentile(self.data, percent) + + def is_done(self) -> bool: + return bool(self.data) + + def recalculate(self) -> None: + data = [] + calculate_occupancy_factor() # as first values tend to be a bit wonky + for _ in range(self.window_size): + time.sleep(1) + data.append(calculate_occupancy_factor()) + percentiles = [] + for percent in self.percentiles_requested: + percentiles.append(percentile(data, percent)) + self.percentile_values = percentiles + self.percentiles_regenerated = True + self.minimum_of = min(data) + self.maximum_of = max(data) + self.data = data + + def run(self): + while True: + time.sleep(self.refresh_each) + self.recalculate() + + +def sleep_except(seconds: float, of_below: tp.Optional[float] = None, + of_above: tp.Optional[float] = None, + check_each: float = 1) -> bool: + """ + Sleep for specified number of seconds. + + Quit earlier if the occupancy factor goes below of_below or above of_above + :param seconds: + :param of_below: + :param of_above: + :param check_each: amount of seconds to sleep at once + :return: whether was awoken due to CPU time condition + """ + of = calculate_occupancy_factor() + while seconds > 0: + if of_above is not None: + if of_above < of: + return True + if of_below is not None: + if of_below > of: + return True + time_to_sleep = min(seconds, check_each) + time.sleep(time_to_sleep) + seconds -= time_to_sleep + if seconds <= 0: + return False + of = calculate_occupancy_factor() + return False + + +previous_cf: float = None +previous_timestamp: float = None + + +def _calculate_occupancy_factor() -> float: + c = psutil.cpu_times() + try: + try: + try: + used = c.user + c.nice + c.system + c.irq + c.softirq + c.steal + c.guest + c.guest_nice + except AttributeError: + # Linux? + used = c.user + c.nice + c.system + c.irq + c.softirq + except AttributeError: + # UNIX ? + used = c.user + c.nice + c.system + except AttributeError: + # windows? + used = c.user + c.system + c.interrupt + cur_time = time.monotonic() + occupation_factor = used / multiprocessing.cpu_count() + global previous_timestamp, previous_cf + if previous_timestamp is None: + previous_cf = occupation_factor + previous_timestamp = cur_time + return + delta = cur_time - previous_timestamp + if delta == 0: + return + of = (occupation_factor - previous_cf)/delta + previous_cf = occupation_factor + previous_timestamp = cur_time + return of + + +def calculate_occupancy_factor() -> float: + """ + Return a float between 0 and 1 telling you how occupied is your system. + + Note that this will be the average between now and the time it was last called. + + This in rare cases may block for up to 0.1 seconds + """ + c = _calculate_occupancy_factor() + while c is None: + time.sleep(0.1) + c = _calculate_occupancy_factor() + return c + + +calculate_occupancy_factor() diff --git a/satella/cpu_time/concurrency.py b/satella/cpu_time/concurrency.py new file mode 100644 index 0000000000000000000000000000000000000000..4776ff9b420f4462ac4857fe51c181b5b4795374 --- /dev/null +++ b/satella/cpu_time/concurrency.py @@ -0,0 +1,63 @@ +from abc import abstractmethod, ABCMeta + +from satella.coding.concurrent import IntervalTerminableThread +from satella.cpu_time import sleep_except +from satella.cpu_time.collector import CPUProfileBuilderThread +from satella.time import measure + +CHECK_INTERVAL = 2 + + +class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=ABCMeta): + """ + An IntervalTerminableThread that can call the loop a bit faster than usual, + based of current CPU time metrics. + + :param seconds: time that a single looping through should take. This will + include the time spent on calling .loop(), the rest of this time will + be spent safe_sleep()ing. + :param max_sooner: amount of seconds that is ok to call this earlier + :param percentile: percentile that CPU usage has to fall below to call it earlier. + """ + + def __init__(self, seconds: float, max_sooner: float, percentile: float, *args, **kwargs): + self.seconds = seconds + self.max_sooner = max_sooner + cp_bt = CPUProfileBuilderThread() + cp_bt.request_percentile(percentile) + self.percentile = percentile + super().__init__(seconds, *args, **kwargs) + + @abstractmethod + def loop(self) -> None: + """ + Override me! + + If True is returned, the thread will not sleep and .loop() will be executed + once more. + """ + + def run(self): + self.prepare() + while not self._terminating: + with measure() as measurement: + v = self.loop() + if measurement() > self.seconds: + continue + seconds_to_wait = self.seconds - measurement() + while seconds_to_wait > 0: + if seconds_to_wait > self.max_sooner: + self.safe_sleep(seconds_to_wait - self.max_sooner) + seconds_to_wait -= self.max_sooner + if seconds_to_wait <= 0: + continue + cp_bt = CPUProfileBuilderThread() + perc_val = cp_bt.percentile(self.percentile) + + while seconds_to_wait > 0: + time_to_sleep = min(CHECK_INTERVAL, seconds_to_wait) + if sleep_except(time_to_sleep, perc_val) or self.terminating: + seconds_to_wait = 0 + break + seconds_to_wait -= time_to_sleep + self.cleanup() diff --git a/satella/instrumentation/metrics/metric_types/summary.py b/satella/instrumentation/metrics/metric_types/summary.py index 8564b528c87e092f14079fb46f6fd599dd68038c..f3d13ed967389a54002acd99a11f8d9090195576 100644 --- a/satella/instrumentation/metrics/metric_types/summary.py +++ b/satella/instrumentation/metrics/metric_types/summary.py @@ -3,33 +3,13 @@ import math import typing as tp import warnings +from satella.coding.transforms.percentile import percentile from .base import EmbeddedSubmetrics, MetricLevel from .measurable_mixin import MeasurableMixin from .registry import register_metric from ..data import MetricData, MetricDataCollection -# shamelessly taken from -# http://code.activestate.com/recipes/511478-finding-the-percentile-of-the-values/) -def percentile(n: tp.List[float], percent: float) -> float: - """ - Find the percentile of a list of values. - - :param n: - is a list of values. Note this MUST BE already sorted. - :param percent: - a float value from 0.0 to 1.0. - - :return: the percentile of the values - """ - k = (len(n) - 1) * percent - f = math.floor(k) - c = math.ceil(k) - if f == c: - return n[int(k)] - d0 = n[int(f)] * (c - k) - d1 = n[int(c)] * (k - f) - return d0 + d1 - - @register_metric class SummaryMetric(EmbeddedSubmetrics, MeasurableMixin): """ diff --git a/tests/test_cpu_time.py b/tests/test_cpu_time.py new file mode 100644 index 0000000000000000000000000000000000000000..44a82807fe56d307b09e7ee60e18754d9a429b27 --- /dev/null +++ b/tests/test_cpu_time.py @@ -0,0 +1,37 @@ +import time +import unittest + +from satella.cpu_time import calculate_occupancy_factor, sleep_except, \ + CPUTimeAwareIntervalTerminableThread + + +class TestCPUTime(unittest.TestCase): + def test_cpu_time_aware_terminable_thread(self): + class TestingThread(CPUTimeAwareIntervalTerminableThread): + def __init__(self): + super().__init__(5, 3, 0.5) + self.a = 0 + + def loop(self) -> None: + self.a += 1 + + tt = TestingThread() + tt.start() + time.sleep(0.2) + self.assertEqual(tt.a, 1) + time.sleep(5) + self.assertEqual(tt.a, 2) + tt.terminate().join() + + def test_sleep_except(self): + c = time.monotonic() + sleep_except(1) + self.assertGreaterEqual(time.monotonic() - c, 1) + + def test_calculate_occupancy_factor(self): + c = calculate_occupancy_factor() + self.assertGreaterEqual(c, 0) + self.assertLessEqual(c, 1) + c = calculate_occupancy_factor() + self.assertGreaterEqual(c, 0) + self.assertLessEqual(c, 1)