Skip to content
Snippets Groups Projects
Commit b0970bd9 authored by Piotr Maślanka's avatar Piotr Maślanka
Browse files

ver 2

parent 581649db
No related branches found
Tags v2.14_b2
No related merge requests found
......@@ -22,6 +22,7 @@ Visit the project's page at GitHub_!
coding/sequences
coding/transforms
coding/typing
instrumentation/cpu_time
instrumentation/traceback
instrumentation/memory
instrumentation/metrics
......
========
CPU time
========
......@@ -10,6 +9,18 @@ The profile is refreshed each X minutes.
.. autofunction:: satella.cpu_time.calculate_occupancy_factor
.. autofunction:: satella.cpu_time.sleep_except
.. autofunction:: satella.cpu_time.sleep_cpu_aware
Here's the primary thread you can use to work with things:
.. autoclass:: satella.cpu_time.CPUTimeManager
:members:
And here's a helpful variant of
:py:class:`satella.coding.concurrent.IntervalTerminableThread`:
.. autoclass:: satella.cpu_time.CPUTimeAwareIntervalTerminableThread
:members:
__version__ = '2.14_b1'
__version__ = '2.14_b2'
......@@ -6,7 +6,7 @@ from .id_allocator import IDAllocator
from .locked_dataset import LockedDataset
from .locked_structure import LockedStructure
from .monitor import MonitorList, Monitor, MonitorDict, RMonitor
from .sync_threadpool import sync_threadpool
from .sync import sync_threadpool
from .thread import TerminableThread, Condition, SingleStartThread, call_in_separate_thread, \
BogusTerminableThread, IntervalTerminableThread
from .timer import Timer
......
import time
import typing as tp
from concurrent.futures import Executor, ThreadPoolExecutor, wait, ProcessPoolExecutor
from concurrent.futures import Executor, ThreadPoolExecutor, ProcessPoolExecutor
from satella.coding.concurrent import sync_threadpool
from satella.coding.concurrent.sync import sync_threadpool
from satella.coding.concurrent.monitor import Monitor
from satella.coding.recast_exceptions import silence_excs
from satella.coding.typing import V, K
......@@ -54,11 +54,10 @@ class ExclusiveWritebackCache(tp.Generic[K, V]):
"""
Return current amount of entries waiting for writeback
"""
# noinspection PyProtectedMember
if isinstance(self.executor, ThreadPoolExecutor):
# noinspection PyProtectedMember
return self.executor._work_queue.qsize()
elif isinstance(self.executor, ProcessPoolExecutor):
# noinspection PyProtectedMember
return self.executor._call_queue.qsize()
else:
return 0
......
from .collector import calculate_occupancy_factor, sleep_except
from .concurrency import CPUTimeAwareIntervalTerminableThread
__all__ = ['calculate_occupancy_factor', 'sleep_except',
'CPUTimeAwareIntervalTerminableThread']
from .collector import calculate_occupancy_factor, sleep_cpu_aware, CPUTimeManager
from .concurrency import CPUTimeAwareIntervalTerminableThread
__all__ = ['calculate_occupancy_factor', 'sleep_cpu_aware',
'CPUTimeAwareIntervalTerminableThread',
'CPUTimeManager']
......@@ -12,7 +12,7 @@ from satella.coding.transforms import percentile
@Singleton
class CPUProfileBuilderThread(threading.Thread):
"""
A CPU profile builder thread
A CPU profile builder thread and a core singleton object to use.
:param window_seconds: the amount of seconds for which to collect data
:param refresh_each: time of seconds to sleep between rebuilding of profiles
......@@ -67,26 +67,41 @@ class CPUProfileBuilderThread(threading.Thread):
self.recalculate()
def sleep_except(seconds: float, of_below: tp.Optional[float] = None,
of_above: tp.Optional[float] = None,
check_each: float = 1) -> bool:
class CPUTimeManager:
@staticmethod
def percentile(percent: float) -> float:
"""
Return given percentile of current CPU time's profile
:param percent: float between 0 and 1
:return: the value of the percentile
"""
cp = CPUProfileBuilderThread()
return cp.percentile(percent)
def sleep_cpu_aware(seconds: float, of_below: tp.Optional[float] = None,
of_above: tp.Optional[float] = None,
check_each: float = 1) -> bool:
"""
Sleep for specified number of seconds.
Quit earlier if the occupancy factor goes below of_below or above of_above
:param seconds:
:param of_below:
:param of_above:
:param seconds: time to sleep
:param of_below: occupancy factor below which the sleep will return
:param of_above: occupancy factor above which the sleep will return
:param check_each: amount of seconds to sleep at once
:return: whether was awoken due to CPU time condition
"""
if of_below is None and of_above is None:
time.sleep(seconds)
return False
of = calculate_occupancy_factor()
while seconds > 0:
if of_above is not None:
if of_above < of:
if of > of_above:
return True
if of_below is not None:
if of_below > of:
if of < of_below:
return True
time_to_sleep = min(seconds, check_each)
time.sleep(time_to_sleep)
......@@ -146,5 +161,3 @@ def calculate_occupancy_factor() -> float:
c = _calculate_occupancy_factor()
return c
calculate_occupancy_factor()
import typing as tp
from abc import abstractmethod, ABCMeta
from satella.coding.concurrent import IntervalTerminableThread
from satella.cpu_time import sleep_except
from satella.cpu_time.collector import CPUProfileBuilderThread
from satella.instrumentation.cpu_time import sleep_cpu_aware
from satella.instrumentation.cpu_time.collector import CPUProfileBuilderThread
from satella.time import measure
CHECK_INTERVAL = 2
class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=ABCMeta):
"""
......@@ -16,13 +15,17 @@ class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=A
:param seconds: time that a single looping through should take. This will
include the time spent on calling .loop(), the rest of this time will
be spent safe_sleep()ing.
:param max_sooner: amount of seconds that is ok to call this earlier
:param max_sooner: amount of seconds that is ok to call this earlier.
Default is 6 times seconds.
:param percentile: percentile that CPU usage has to fall below to call it earlier.
:param wakeup_interval: amount of seconds to wake up between to check for _terminating status
"""
def __init__(self, seconds: float, max_sooner: float, percentile: float, *args, **kwargs):
def __init__(self, seconds: float, max_sooner: tp.Optional[float] = None, percentile: float = 0.3,
wakeup_interval: float = 3.0, *args, **kwargs):
self.seconds = seconds
self.max_sooner = max_sooner
self.wakeup_interval = wakeup_interval
self.max_sooner = max_sooner or seconds * 6
cp_bt = CPUProfileBuilderThread()
cp_bt.request_percentile(percentile)
self.percentile = percentile
......@@ -37,27 +40,37 @@ class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=A
once more.
"""
def _execute_measured(self) -> float:
with measure() as measurement:
self.loop()
return measurement()
def __sleep_waiting_for_cpu(self, how_long: float) -> None:
cp_bt = CPUProfileBuilderThread()
per_val = cp_bt.percentile(self.percentile)
while how_long > 0 and not self._terminating:
time_to_sleep = min(self.wakeup_interval, how_long)
if sleep_cpu_aware(time_to_sleep, per_val):
break
how_long -= time_to_sleep
def __sleep(self, how_long: float) -> None:
if how_long > self.max_sooner:
if self.safe_sleep(how_long - self.max_sooner):
return
how_long = self.max_sooner
self.__sleep_waiting_for_cpu(how_long)
def run(self):
self.prepare()
while not self._terminating:
with measure() as measurement:
v = self.loop()
if measurement() > self.seconds:
continue
seconds_to_wait = self.seconds - measurement()
while seconds_to_wait > 0:
if seconds_to_wait > self.max_sooner:
self.safe_sleep(seconds_to_wait - self.max_sooner)
seconds_to_wait -= self.max_sooner
if seconds_to_wait <= 0:
continue
cp_bt = CPUProfileBuilderThread()
perc_val = cp_bt.percentile(self.percentile)
while seconds_to_wait > 0:
time_to_sleep = min(CHECK_INTERVAL, seconds_to_wait)
if sleep_except(time_to_sleep, perc_val) or self.terminating:
seconds_to_wait = 0
break
seconds_to_wait -= time_to_sleep
measured = self._execute_measured()
seconds_to_wait = self.seconds - measured
if seconds_to_wait > 0:
self.__sleep(seconds_to_wait)
elif seconds_to_wait < 0:
self.on_overrun(measured)
self.cleanup()
import time
import unittest
from satella.cpu_time import calculate_occupancy_factor, sleep_except, \
from satella.instrumentation.cpu_time import calculate_occupancy_factor, sleep_cpu_aware, \
CPUTimeAwareIntervalTerminableThread
......@@ -25,13 +25,12 @@ class TestCPUTime(unittest.TestCase):
def test_sleep_except(self):
c = time.monotonic()
sleep_except(1)
sleep_cpu_aware(1)
self.assertGreaterEqual(time.monotonic() - c, 1)
def test_calculate_occupancy_factor(self):
c = calculate_occupancy_factor()
self.assertGreaterEqual(c, 0)
self.assertLessEqual(c, 1)
c = calculate_occupancy_factor()
self.assertGreaterEqual(c, 0)
self.assertLessEqual(c, 1)
......@@ -42,7 +42,6 @@ class TestThreadPoolExecutor(unittest.TestCase):
entries_waiting=entries_waiting)
self.assertEqual(wbc[5], 3)
self.assertEqual(b['no_calls'], 1)
self.assertEqual(n_th(entries_waiting.to_metric_data().values).value, 0)
self.assertRaises(KeyError, lambda: wbc[-1])
self.assertEqual(b['no_calls'], 2)
self.assertEqual(wbc[5], 3)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment