Skip to content
Snippets Groups Projects
Commit 581649db authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

beta for 2.14

parent bcd0e285
No related branches found
No related tags found
No related merge requests found
# v2.13.5
# v2.14
* more cache classes are now generic
* added `MetrifiedThreadPoolExecutor.get_queue_length`
* `ExclusiveWritebackCache.sync` is no longer best effort
* fixed logging exceptions in `MetrifiedThreadPoolExecutor`
* added `cpu_time`
* extracted `percentile`
......@@ -16,10 +16,6 @@ Functions and decorators
.. autofunction:: satella.coding.queue_iterator
.. autofunction:: satella.coding.transforms.intify
.. autofunction:: satella.coding.transforms.jsonify
.. autofunction:: satella.coding.update_key_if_not_none
.. autofunction:: satella.coding.update_key_if_true
......
Rudimentary data transforms
===========================
.. autofunction:: satella.coding.transforms.intify
.. autofunction:: satella.coding.transforms.jsonify
.. autofunction:: satella.coding.transforms.percentile
pad_to_multiple_of_length
-------------------------
......
========
CPU time
========
Satella's cpu_time helps your processes play nice with the overall CPU usage, ie. deferring
non-critical tasks to until CPU usage falls lower than the average.
cpu_time does this by periodically monitoring CPU's usage and building your usage profile.
The profile is refreshed each X minutes.
.. autofunction:: satella.cpu_time.calculate_occupancy_factor
.. autofunction:: satella.cpu_time.sleep_except
__version__ = '2.13.5_a4'
__version__ = '2.14_b1'
......@@ -5,10 +5,11 @@ import typing as tp
from satella.coding.decorators import for_argument
from .jsonify import jsonify
from .merger import merge_series
from .percentile import percentile
__all__ = ['stringify', 'split_shuffle_and_join', 'one_tuple',
'merge_series', 'pad_to_multiple_of_length', 'clip',
'jsonify', 'intify']
'jsonify', 'intify', 'percentile']
from satella.coding.typing import T, NoArgCallable, Appendable, Number, Predicate
......
import math
import typing as tp
# shamelessly taken from
# http://code.activestate.com/recipes/511478-finding-the-percentile-of-the-values/)
def percentile(n: tp.List[float], percent: float) -> float:
"""
Find the percentile of a list of values.
:param n: - is a list of values. Note this MUST BE already sorted.
:param percent: - a float value from 0.0 to 1.0.
:return: the percentile of the values
"""
k = (len(n) - 1) * percent
f = math.floor(k)
c = math.ceil(k)
if f == c:
return n[int(k)]
d0 = n[int(f)] * (c - k)
d1 = n[int(c)] * (k - f)
return d0 + d1
from .collector import calculate_occupancy_factor, sleep_except
from .concurrency import CPUTimeAwareIntervalTerminableThread
__all__ = ['calculate_occupancy_factor', 'sleep_except',
'CPUTimeAwareIntervalTerminableThread']
import typing as tp
import threading
import multiprocessing
import time
import psutil
from satella.coding.structures import Singleton
from satella.coding.transforms import percentile
@Singleton
class CPUProfileBuilderThread(threading.Thread):
"""
A CPU profile builder thread
:param window_seconds: the amount of seconds for which to collect data
:param refresh_each: time of seconds to sleep between rebuilding of profiles
"""
def __init__(self, window_seconds: int = 300, refresh_each: int = 1800,
percentiles_requested: tp.Sequence[float] = (0.9, )):
super().__init__(name='CPU profile builder', daemon=True)
self.window_size = window_seconds
self.refresh_each = refresh_each
self.data = []
self.minimum_of = None
self.maximum_of = None
self.percentiles_requested = list(percentiles_requested)
self.percentile_values = []
self.percentiles_regenerated = False
self.start()
def request_percentile(self, percent: float) -> None:
if percent not in self.percentiles_requested:
self.percentiles_requested.append(percent)
self.percentiles_regenerated = False
def percentile(self, percent: float) -> float:
if not self.data:
return 0
if percent in self.percentiles_requested and self.percentiles_regenerated:
return self.percentile_values[self.percentiles_requested.index(percent)]
else:
return percentile(self.data, percent)
def is_done(self) -> bool:
return bool(self.data)
def recalculate(self) -> None:
data = []
calculate_occupancy_factor() # as first values tend to be a bit wonky
for _ in range(self.window_size):
time.sleep(1)
data.append(calculate_occupancy_factor())
percentiles = []
for percent in self.percentiles_requested:
percentiles.append(percentile(data, percent))
self.percentile_values = percentiles
self.percentiles_regenerated = True
self.minimum_of = min(data)
self.maximum_of = max(data)
self.data = data
def run(self):
while True:
time.sleep(self.refresh_each)
self.recalculate()
def sleep_except(seconds: float, of_below: tp.Optional[float] = None,
of_above: tp.Optional[float] = None,
check_each: float = 1) -> bool:
"""
Sleep for specified number of seconds.
Quit earlier if the occupancy factor goes below of_below or above of_above
:param seconds:
:param of_below:
:param of_above:
:param check_each: amount of seconds to sleep at once
:return: whether was awoken due to CPU time condition
"""
of = calculate_occupancy_factor()
while seconds > 0:
if of_above is not None:
if of_above < of:
return True
if of_below is not None:
if of_below > of:
return True
time_to_sleep = min(seconds, check_each)
time.sleep(time_to_sleep)
seconds -= time_to_sleep
if seconds <= 0:
return False
of = calculate_occupancy_factor()
return False
previous_cf: float = None
previous_timestamp: float = None
def _calculate_occupancy_factor() -> float:
c = psutil.cpu_times()
try:
try:
try:
used = c.user + c.nice + c.system + c.irq + c.softirq + c.steal + c.guest + c.guest_nice
except AttributeError:
# Linux?
used = c.user + c.nice + c.system + c.irq + c.softirq
except AttributeError:
# UNIX ?
used = c.user + c.nice + c.system
except AttributeError:
# windows?
used = c.user + c.system + c.interrupt
cur_time = time.monotonic()
occupation_factor = used / multiprocessing.cpu_count()
global previous_timestamp, previous_cf
if previous_timestamp is None:
previous_cf = occupation_factor
previous_timestamp = cur_time
return
delta = cur_time - previous_timestamp
if delta == 0:
return
of = (occupation_factor - previous_cf)/delta
previous_cf = occupation_factor
previous_timestamp = cur_time
return of
def calculate_occupancy_factor() -> float:
"""
Return a float between 0 and 1 telling you how occupied is your system.
Note that this will be the average between now and the time it was last called.
This in rare cases may block for up to 0.1 seconds
"""
c = _calculate_occupancy_factor()
while c is None:
time.sleep(0.1)
c = _calculate_occupancy_factor()
return c
calculate_occupancy_factor()
from abc import abstractmethod, ABCMeta
from satella.coding.concurrent import IntervalTerminableThread
from satella.cpu_time import sleep_except
from satella.cpu_time.collector import CPUProfileBuilderThread
from satella.time import measure
CHECK_INTERVAL = 2
class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=ABCMeta):
"""
An IntervalTerminableThread that can call the loop a bit faster than usual,
based of current CPU time metrics.
:param seconds: time that a single looping through should take. This will
include the time spent on calling .loop(), the rest of this time will
be spent safe_sleep()ing.
:param max_sooner: amount of seconds that is ok to call this earlier
:param percentile: percentile that CPU usage has to fall below to call it earlier.
"""
def __init__(self, seconds: float, max_sooner: float, percentile: float, *args, **kwargs):
self.seconds = seconds
self.max_sooner = max_sooner
cp_bt = CPUProfileBuilderThread()
cp_bt.request_percentile(percentile)
self.percentile = percentile
super().__init__(seconds, *args, **kwargs)
@abstractmethod
def loop(self) -> None:
"""
Override me!
If True is returned, the thread will not sleep and .loop() will be executed
once more.
"""
def run(self):
self.prepare()
while not self._terminating:
with measure() as measurement:
v = self.loop()
if measurement() > self.seconds:
continue
seconds_to_wait = self.seconds - measurement()
while seconds_to_wait > 0:
if seconds_to_wait > self.max_sooner:
self.safe_sleep(seconds_to_wait - self.max_sooner)
seconds_to_wait -= self.max_sooner
if seconds_to_wait <= 0:
continue
cp_bt = CPUProfileBuilderThread()
perc_val = cp_bt.percentile(self.percentile)
while seconds_to_wait > 0:
time_to_sleep = min(CHECK_INTERVAL, seconds_to_wait)
if sleep_except(time_to_sleep, perc_val) or self.terminating:
seconds_to_wait = 0
break
seconds_to_wait -= time_to_sleep
self.cleanup()
......@@ -3,33 +3,13 @@ import math
import typing as tp
import warnings
from satella.coding.transforms.percentile import percentile
from .base import EmbeddedSubmetrics, MetricLevel
from .measurable_mixin import MeasurableMixin
from .registry import register_metric
from ..data import MetricData, MetricDataCollection
# shamelessly taken from
# http://code.activestate.com/recipes/511478-finding-the-percentile-of-the-values/)
def percentile(n: tp.List[float], percent: float) -> float:
"""
Find the percentile of a list of values.
:param n: - is a list of values. Note this MUST BE already sorted.
:param percent: - a float value from 0.0 to 1.0.
:return: the percentile of the values
"""
k = (len(n) - 1) * percent
f = math.floor(k)
c = math.ceil(k)
if f == c:
return n[int(k)]
d0 = n[int(f)] * (c - k)
d1 = n[int(c)] * (k - f)
return d0 + d1
@register_metric
class SummaryMetric(EmbeddedSubmetrics, MeasurableMixin):
"""
......
import time
import unittest
from satella.cpu_time import calculate_occupancy_factor, sleep_except, \
CPUTimeAwareIntervalTerminableThread
class TestCPUTime(unittest.TestCase):
def test_cpu_time_aware_terminable_thread(self):
class TestingThread(CPUTimeAwareIntervalTerminableThread):
def __init__(self):
super().__init__(5, 3, 0.5)
self.a = 0
def loop(self) -> None:
self.a += 1
tt = TestingThread()
tt.start()
time.sleep(0.2)
self.assertEqual(tt.a, 1)
time.sleep(5)
self.assertEqual(tt.a, 2)
tt.terminate().join()
def test_sleep_except(self):
c = time.monotonic()
sleep_except(1)
self.assertGreaterEqual(time.monotonic() - c, 1)
def test_calculate_occupancy_factor(self):
c = calculate_occupancy_factor()
self.assertGreaterEqual(c, 0)
self.assertLessEqual(c, 1)
c = calculate_occupancy_factor()
self.assertGreaterEqual(c, 0)
self.assertLessEqual(c, 1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment