From cb8637c42a3e5e896009c19b99409db21e072bb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Tue, 2 Mar 2021 15:21:38 +0100 Subject: [PATCH] more time-related calls will accept time strings --- CHANGELOG.md | 1 + satella/__init__.py | 2 +- satella/coding/concurrent/thread.py | 30 +++++++++++-------- satella/coding/concurrent/timer.py | 10 +++++-- .../instrumentation/cpu_time/concurrency.py | 16 +++++----- .../metrics/structures/threadpool.py | 4 +-- satella/processes.py | 10 +++++-- satella/time.py | 11 +++---- tests/test_coding/test_concurrent.py | 2 +- tests/test_instrumentation/test_cpu_time.py | 2 +- 10 files changed, 54 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58709a33..d5f745ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,2 +1,3 @@ # v2.14.47 +* more time-related calls will accept time strings diff --git a/satella/__init__.py b/satella/__init__.py index a459799d..39d0f4f3 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.14.47a1' +__version__ = '2.14.47a2' diff --git a/satella/coding/concurrent/thread.py b/satella/coding/concurrent/thread.py index 3c35ce2c..323e63eb 100644 --- a/satella/coding/concurrent/thread.py +++ b/satella/coding/concurrent/thread.py @@ -9,7 +9,7 @@ from concurrent.futures import Future from threading import Condition as PythonCondition from satella.coding.decorators import wraps -from satella.time import measure +from satella.time import measure, parse_time_string from ..typing import ExceptionList from ...exceptions import ResourceLocked, WouldWaitMore @@ -73,17 +73,19 @@ class Condition(PythonCondition): warnings.warn('Use notify_all instead', DeprecationWarning) self.notify_all() - def wait(self, timeout: tp.Optional[float] = None, + def wait(self, timeout: tp.Optional[tp.Union[str, float]] = None, dont_raise: bool = False) -> None: """ Wait for condition to become true. - :param timeout: timeout to wait. None is default and means infinity + :param timeout: timeout to wait. None is default and means infinity. Can be also a + time string. :param dont_raise: if True, then WouldWaitMore won't be raised :raises ResourceLocked: unable to acquire the underlying lock within specified timeout. :raises WouldWaitMore: wait's timeout has expired """ if timeout is not None: + timeout = parse_time_string(timeout) if timeout < 0: timeout = 0 @@ -148,6 +150,7 @@ class BogusTerminableThread: """ A mock object that implements threading interface but does nothing """ + __slots__ = ('running', 'terminated', 'daemon') def __init__(self): self.running = False @@ -294,19 +297,22 @@ class TerminableThread(threading.Thread): self.terminate().join() return False - def safe_wait_condition(self, condition: Condition, timeout: float, - wake_up_each: float = 2) -> None: + def safe_wait_condition(self, condition: Condition, timeout: tp.Union[str, float], + wake_up_each: tp.Union[str, float] = 2) -> None: """ Wait for a condition, checking periodically if the thread is being terminated. To be invoked only by the thread that's represented by the object! :param condition: condition to wait on - :param timeout: maximum time to wait - :param wake_up_each: amount of seconds to wake up each to check for termination + :param timeout: maximum time to wait in seconds. Can be also a time string + :param wake_up_each: amount of seconds to wake up each to check for termination. + Can be also a time string. :raises WouldWaitMore: timeout has passed and Condition has not happened :raises SystemExit: thread is terminating """ + timeout = parse_time_string(timeout) + wake_up_each = parse_time_string(wake_up_each) t = 0 while t < timeout: if self._terminating: @@ -387,13 +393,13 @@ class IntervalTerminableThread(TerminableThread, metaclass=ABCMeta): If executing .loop() takes more than x seconds, on_overrun() will be called. - :param seconds: time that a single looping through should take. This will - include the time spent on calling .loop(), the rest of this time will - be spent safe_sleep()ing. + :param seconds: time that a single looping through should take in seconds. + Can be also a time string. This will include the time spent on calling .loop(), the rest + of this time will be spent safe_sleep()ing. """ - def __init__(self, seconds: float, *args, **kwargs): - self.seconds = seconds + def __init__(self, seconds: tp.Union[str, float], *args, **kwargs): + self.seconds = parse_time_string(seconds) super().__init__(*args, **kwargs) @abstractmethod diff --git a/satella/coding/concurrent/timer.py b/satella/coding/concurrent/timer.py index 82a064d9..de0ea0f5 100644 --- a/satella/coding/concurrent/timer.py +++ b/satella/coding/concurrent/timer.py @@ -1,3 +1,4 @@ +import typing as tp import logging import threading import time @@ -6,6 +7,7 @@ from satella.coding.recast_exceptions import log_exceptions from .monitor import Monitor from ..structures.heaps.time import TimeBasedHeap from ..structures.singleton import Singleton +from ...time import parse_time_string logger = logging.getLogger(__name__) @@ -21,18 +23,20 @@ class Timer: If spawn_separate is False, exceptions will be logged :param interval: amount of seconds that should elapsed between calling start() and function - executing + executing. Can be also a time string. :param function: function to execute :param args: argument for function :param kwargs: kwargs for function :param spawn_separate: whether to call the function in a separate thread """ + __slots__ = ('args', 'kwargs', 'spawn_separate', 'interval', + 'function', 'execute_at', 'cancelled') - def __init__(self, interval, function, args=None, kwargs=None, spawn_separate=False): + def __init__(self, interval: tp.Union[str, float], function, args=None, kwargs=None, spawn_separate=False): self.args = args or [] self.kwargs = kwargs or {} self.spawn_separate = spawn_separate - self.interval = interval + self.interval = parse_time_string(interval) self.function = function self.execute_at = None self.cancelled = False diff --git a/satella/instrumentation/cpu_time/concurrency.py b/satella/instrumentation/cpu_time/concurrency.py index 858b5685..177871a7 100644 --- a/satella/instrumentation/cpu_time/concurrency.py +++ b/satella/instrumentation/cpu_time/concurrency.py @@ -3,7 +3,7 @@ from abc import abstractmethod, ABCMeta from satella.coding.concurrent import IntervalTerminableThread from .collector import sleep_cpu_aware, CPUProfileBuilderThread -from satella.time import measure +from satella.time import measure, parse_time_string class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=ABCMeta): @@ -14,16 +14,18 @@ class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=A :param seconds: time that a single looping through should take. This will include the time spent on calling .loop(), the rest of this time will be spent safe_sleep()ing. + Can be alternatively a time string :param max_sooner: amount of seconds that is ok to call this earlier. Default is 10% seconds. :param percentile: percentile that CPU usage has to fall below to call it earlier. - :param wakeup_interval: amount of seconds to wake up between to check for _terminating status + :param wakeup_interval: amount of seconds to wake up between to check for _terminating status. + Can be also a time string """ - def __init__(self, seconds: float, max_sooner: tp.Optional[float] = None, percentile: float = 0.3, - wakeup_interval: float = 3.0, *args, **kwargs): - self.seconds = seconds - self.wakeup_interval = wakeup_interval + def __init__(self, seconds: tp.Union[str, float], max_sooner: tp.Optional[float] = None, percentile: float = 0.3, + wakeup_interval: tp.Union[str, float] = '3s', *args, **kwargs): + self.seconds = parse_time_string(seconds) + self.wakeup_interval = parse_time_string(wakeup_interval) self.max_sooner = max_sooner or seconds * 0.1 cp_bt = CPUProfileBuilderThread() cp_bt.request_percentile(percentile) @@ -61,7 +63,7 @@ class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=A def run(self): self.prepare() - while not self._terminating: + while not self.terminating: measured = self._execute_measured() seconds_to_wait = self.seconds - measured if seconds_to_wait > 0: diff --git a/satella/instrumentation/metrics/structures/threadpool.py b/satella/instrumentation/metrics/structures/threadpool.py index 576c76fd..c0fa4225 100644 --- a/satella/instrumentation/metrics/structures/threadpool.py +++ b/satella/instrumentation/metrics/structures/threadpool.py @@ -100,8 +100,8 @@ class MetrifiedThreadPoolExecutor(ThreadPoolExecutor): self._idle_semaphore = threading.Semaphore(0) self._broken = False if not hasattr(self, '_thread_name_prefix'): - self._thread_name_prefix = (thread_name_prefix or - ("ThreadPoolExecutor-%d" % self._counter())) + self._thread_name_prefix = thread_name_prefix or ("ThreadPoolExecutor-%d" % + self._counter()) self.waiting_time_metric = time_spent_waiting or EmptyMetric('') self.executing_time_metric = time_spent_executing or EmptyMetric('') self.metric_level = metric_level diff --git a/satella/processes.py b/satella/processes.py index 93f89cbd..50fb1324 100644 --- a/satella/processes.py +++ b/satella/processes.py @@ -7,6 +7,8 @@ from .exceptions import ProcessFailed __all__ = ['call_and_return_stdout', 'read_nowait'] +from .time import parse_time_string + @call_in_separate_thread(daemon=True) @silence_excs((IOError, OSError)) @@ -32,7 +34,7 @@ def read_nowait(process: subprocess.Popen, output_list: tp.List[str]): def call_and_return_stdout(args: tp.Union[str, tp.List[str]], - timeout: tp.Optional[int] = None, + timeout: tp.Optional[tp.Union[str, int]] = None, encoding: tp.Optional[str] = None, expected_return_code: tp.Optional[int] = None, **kwargs) -> tp.Union[bytes, str]: @@ -46,7 +48,8 @@ def call_and_return_stdout(args: tp.Union[str, tp.List[str]], :param args: arguments to run the program with. Can be either a string or a list of strings. :param timeout: amount of seconds to wait for the process result. If process does not complete - within this time, it will be sent a SIGKILL + within this time, it will be sent a SIGKILL. Can be also a time string. If left at default, + ie. None, timeout won't be considered at all. :param encoding: encoding with which to decode stdout. If none is passed, it will be returned as a bytes object :param expected_return_code: an expected return code of this process. 0 is the default. If @@ -62,6 +65,9 @@ def call_and_return_stdout(args: tp.Union[str, tp.List[str]], proc = subprocess.Popen(args, **kwargs) fut = read_nowait(proc, stdout_list) + if timeout is not None: + timeout = parse_time_string(timeout) + try: proc.wait(timeout=timeout) except subprocess.TimeoutExpired: diff --git a/satella/time.py b/satella/time.py index 7d778363..9808045e 100644 --- a/satella/time.py +++ b/satella/time.py @@ -14,25 +14,26 @@ from satella.exceptions import WouldWaitMore TimeSignal = tp.Callable[[], float] -def sleep(x: float, abort_on_interrupt: bool = False) -> bool: +def sleep(y: tp.Union[str, float], abort_on_interrupt: bool = False) -> bool: """ Sleep for given interval. This won't be interrupted by KeyboardInterrupted, and always will sleep for given time interval. This will return at once if x is negative - :param x: the interval to wait + :param y: the interval to wait in seconds, can be also a time string :param abort_on_interrupt: whether to abort at once when KeyboardInterrupt is seen :returns: whether the function has completed its sleep naturally. False is seen on aborts thanks to KeyboardInterrupt only if abort_on_interrupt is True """ - if x < 0: + y = parse_time_string(y) + if y < 0: return with measure() as measurement: - while measurement() < x: + while measurement() < y: try: - time.sleep(x - measurement()) + time.sleep(y - measurement()) except KeyboardInterrupt: if abort_on_interrupt: return False diff --git a/tests/test_coding/test_concurrent.py b/tests/test_coding/test_concurrent.py index b7de4f5c..83728a4c 100644 --- a/tests/test_coding/test_concurrent.py +++ b/tests/test_coding/test_concurrent.py @@ -283,7 +283,7 @@ class TestConcurrent(unittest.TestCase): def set_a(): a['test'] = True - tmr = Timer(1, set_a) + tmr = Timer('1s', set_a) tmr.start() time.sleep(2) self.assertTrue(a['test']) diff --git a/tests/test_instrumentation/test_cpu_time.py b/tests/test_instrumentation/test_cpu_time.py index 1de37b72..d1afb658 100644 --- a/tests/test_instrumentation/test_cpu_time.py +++ b/tests/test_instrumentation/test_cpu_time.py @@ -9,7 +9,7 @@ class TestCPUTime(unittest.TestCase): def test_cpu_time_aware_terminable_thread(self): class TestingThread(CPUTimeAwareIntervalTerminableThread): def __init__(self): - super().__init__(5, 3, 0.5) + super().__init__('5s', 3, 0.5) self.a = 0 def loop(self) -> None: -- GitLab