diff --git a/CHANGELOG.md b/CHANGELOG.md index 58709a33f875b698a084736f3544358bd2ea6913..d5f745fff9c9f81a214fffc6a7aa62968c1f76b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,2 +1,3 @@ # v2.14.47 +* more time-related calls will accept time strings diff --git a/satella/__init__.py b/satella/__init__.py index a459799df2333002a83929e73bd1c8694817ce40..39d0f4f3e6d0d8407a5e73f8c7c1ec3cbb97ae3b 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.14.47a1' +__version__ = '2.14.47a2' diff --git a/satella/coding/concurrent/thread.py b/satella/coding/concurrent/thread.py index 3c35ce2c7240e4498c6397afc39dc06dd4cfdd33..323e63eb8f22670eba8cd88ffa3040a012be8cd8 100644 --- a/satella/coding/concurrent/thread.py +++ b/satella/coding/concurrent/thread.py @@ -9,7 +9,7 @@ from concurrent.futures import Future from threading import Condition as PythonCondition from satella.coding.decorators import wraps -from satella.time import measure +from satella.time import measure, parse_time_string from ..typing import ExceptionList from ...exceptions import ResourceLocked, WouldWaitMore @@ -73,17 +73,19 @@ class Condition(PythonCondition): warnings.warn('Use notify_all instead', DeprecationWarning) self.notify_all() - def wait(self, timeout: tp.Optional[float] = None, + def wait(self, timeout: tp.Optional[tp.Union[str, float]] = None, dont_raise: bool = False) -> None: """ Wait for condition to become true. - :param timeout: timeout to wait. None is default and means infinity + :param timeout: timeout to wait. None is default and means infinity. Can be also a + time string. :param dont_raise: if True, then WouldWaitMore won't be raised :raises ResourceLocked: unable to acquire the underlying lock within specified timeout. :raises WouldWaitMore: wait's timeout has expired """ if timeout is not None: + timeout = parse_time_string(timeout) if timeout < 0: timeout = 0 @@ -148,6 +150,7 @@ class BogusTerminableThread: """ A mock object that implements threading interface but does nothing """ + __slots__ = ('running', 'terminated', 'daemon') def __init__(self): self.running = False @@ -294,19 +297,22 @@ class TerminableThread(threading.Thread): self.terminate().join() return False - def safe_wait_condition(self, condition: Condition, timeout: float, - wake_up_each: float = 2) -> None: + def safe_wait_condition(self, condition: Condition, timeout: tp.Union[str, float], + wake_up_each: tp.Union[str, float] = 2) -> None: """ Wait for a condition, checking periodically if the thread is being terminated. To be invoked only by the thread that's represented by the object! :param condition: condition to wait on - :param timeout: maximum time to wait - :param wake_up_each: amount of seconds to wake up each to check for termination + :param timeout: maximum time to wait in seconds. Can be also a time string + :param wake_up_each: amount of seconds to wake up each to check for termination. + Can be also a time string. :raises WouldWaitMore: timeout has passed and Condition has not happened :raises SystemExit: thread is terminating """ + timeout = parse_time_string(timeout) + wake_up_each = parse_time_string(wake_up_each) t = 0 while t < timeout: if self._terminating: @@ -387,13 +393,13 @@ class IntervalTerminableThread(TerminableThread, metaclass=ABCMeta): If executing .loop() takes more than x seconds, on_overrun() will be called. - :param seconds: time that a single looping through should take. This will - include the time spent on calling .loop(), the rest of this time will - be spent safe_sleep()ing. + :param seconds: time that a single looping through should take in seconds. + Can be also a time string. This will include the time spent on calling .loop(), the rest + of this time will be spent safe_sleep()ing. """ - def __init__(self, seconds: float, *args, **kwargs): - self.seconds = seconds + def __init__(self, seconds: tp.Union[str, float], *args, **kwargs): + self.seconds = parse_time_string(seconds) super().__init__(*args, **kwargs) @abstractmethod diff --git a/satella/coding/concurrent/timer.py b/satella/coding/concurrent/timer.py index 82a064d9777a99d5ad6fdaae110750e3fcf68ebe..de0ea0f535d8d7d85cc4449a01bc0ce9921a7733 100644 --- a/satella/coding/concurrent/timer.py +++ b/satella/coding/concurrent/timer.py @@ -1,3 +1,4 @@ +import typing as tp import logging import threading import time @@ -6,6 +7,7 @@ from satella.coding.recast_exceptions import log_exceptions from .monitor import Monitor from ..structures.heaps.time import TimeBasedHeap from ..structures.singleton import Singleton +from ...time import parse_time_string logger = logging.getLogger(__name__) @@ -21,18 +23,20 @@ class Timer: If spawn_separate is False, exceptions will be logged :param interval: amount of seconds that should elapsed between calling start() and function - executing + executing. Can be also a time string. :param function: function to execute :param args: argument for function :param kwargs: kwargs for function :param spawn_separate: whether to call the function in a separate thread """ + __slots__ = ('args', 'kwargs', 'spawn_separate', 'interval', + 'function', 'execute_at', 'cancelled') - def __init__(self, interval, function, args=None, kwargs=None, spawn_separate=False): + def __init__(self, interval: tp.Union[str, float], function, args=None, kwargs=None, spawn_separate=False): self.args = args or [] self.kwargs = kwargs or {} self.spawn_separate = spawn_separate - self.interval = interval + self.interval = parse_time_string(interval) self.function = function self.execute_at = None self.cancelled = False diff --git a/satella/instrumentation/cpu_time/concurrency.py b/satella/instrumentation/cpu_time/concurrency.py index 858b5685f6dba3ef8fc6e8c09eb8816de3763a66..177871a79ac89e7507c8737704a6f67883b7fd34 100644 --- a/satella/instrumentation/cpu_time/concurrency.py +++ b/satella/instrumentation/cpu_time/concurrency.py @@ -3,7 +3,7 @@ from abc import abstractmethod, ABCMeta from satella.coding.concurrent import IntervalTerminableThread from .collector import sleep_cpu_aware, CPUProfileBuilderThread -from satella.time import measure +from satella.time import measure, parse_time_string class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=ABCMeta): @@ -14,16 +14,18 @@ class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=A :param seconds: time that a single looping through should take. This will include the time spent on calling .loop(), the rest of this time will be spent safe_sleep()ing. + Can be alternatively a time string :param max_sooner: amount of seconds that is ok to call this earlier. Default is 10% seconds. :param percentile: percentile that CPU usage has to fall below to call it earlier. - :param wakeup_interval: amount of seconds to wake up between to check for _terminating status + :param wakeup_interval: amount of seconds to wake up between to check for _terminating status. + Can be also a time string """ - def __init__(self, seconds: float, max_sooner: tp.Optional[float] = None, percentile: float = 0.3, - wakeup_interval: float = 3.0, *args, **kwargs): - self.seconds = seconds - self.wakeup_interval = wakeup_interval + def __init__(self, seconds: tp.Union[str, float], max_sooner: tp.Optional[float] = None, percentile: float = 0.3, + wakeup_interval: tp.Union[str, float] = '3s', *args, **kwargs): + self.seconds = parse_time_string(seconds) + self.wakeup_interval = parse_time_string(wakeup_interval) self.max_sooner = max_sooner or seconds * 0.1 cp_bt = CPUProfileBuilderThread() cp_bt.request_percentile(percentile) @@ -61,7 +63,7 @@ class CPUTimeAwareIntervalTerminableThread(IntervalTerminableThread, metaclass=A def run(self): self.prepare() - while not self._terminating: + while not self.terminating: measured = self._execute_measured() seconds_to_wait = self.seconds - measured if seconds_to_wait > 0: diff --git a/satella/instrumentation/metrics/structures/threadpool.py b/satella/instrumentation/metrics/structures/threadpool.py index 576c76fd2a2879fe6bab2818c102e956a97ae7b6..c0fa4225353f1a5d43cd456356a3e064600b3a40 100644 --- a/satella/instrumentation/metrics/structures/threadpool.py +++ b/satella/instrumentation/metrics/structures/threadpool.py @@ -100,8 +100,8 @@ class MetrifiedThreadPoolExecutor(ThreadPoolExecutor): self._idle_semaphore = threading.Semaphore(0) self._broken = False if not hasattr(self, '_thread_name_prefix'): - self._thread_name_prefix = (thread_name_prefix or - ("ThreadPoolExecutor-%d" % self._counter())) + self._thread_name_prefix = thread_name_prefix or ("ThreadPoolExecutor-%d" % + self._counter()) self.waiting_time_metric = time_spent_waiting or EmptyMetric('') self.executing_time_metric = time_spent_executing or EmptyMetric('') self.metric_level = metric_level diff --git a/satella/processes.py b/satella/processes.py index 93f89cbd652a956b49c33fdedc26b2d948dbc4ac..50fb13241fd98dd0ee02150a874914e1beac025b 100644 --- a/satella/processes.py +++ b/satella/processes.py @@ -7,6 +7,8 @@ from .exceptions import ProcessFailed __all__ = ['call_and_return_stdout', 'read_nowait'] +from .time import parse_time_string + @call_in_separate_thread(daemon=True) @silence_excs((IOError, OSError)) @@ -32,7 +34,7 @@ def read_nowait(process: subprocess.Popen, output_list: tp.List[str]): def call_and_return_stdout(args: tp.Union[str, tp.List[str]], - timeout: tp.Optional[int] = None, + timeout: tp.Optional[tp.Union[str, int]] = None, encoding: tp.Optional[str] = None, expected_return_code: tp.Optional[int] = None, **kwargs) -> tp.Union[bytes, str]: @@ -46,7 +48,8 @@ def call_and_return_stdout(args: tp.Union[str, tp.List[str]], :param args: arguments to run the program with. Can be either a string or a list of strings. :param timeout: amount of seconds to wait for the process result. If process does not complete - within this time, it will be sent a SIGKILL + within this time, it will be sent a SIGKILL. Can be also a time string. If left at default, + ie. None, timeout won't be considered at all. :param encoding: encoding with which to decode stdout. If none is passed, it will be returned as a bytes object :param expected_return_code: an expected return code of this process. 0 is the default. If @@ -62,6 +65,9 @@ def call_and_return_stdout(args: tp.Union[str, tp.List[str]], proc = subprocess.Popen(args, **kwargs) fut = read_nowait(proc, stdout_list) + if timeout is not None: + timeout = parse_time_string(timeout) + try: proc.wait(timeout=timeout) except subprocess.TimeoutExpired: diff --git a/satella/time.py b/satella/time.py index 7d7783639ed80eafbeaf1903f6c1126967ba8146..9808045e13928f00e42910bba690c1a3945d7cdb 100644 --- a/satella/time.py +++ b/satella/time.py @@ -14,25 +14,26 @@ from satella.exceptions import WouldWaitMore TimeSignal = tp.Callable[[], float] -def sleep(x: float, abort_on_interrupt: bool = False) -> bool: +def sleep(y: tp.Union[str, float], abort_on_interrupt: bool = False) -> bool: """ Sleep for given interval. This won't be interrupted by KeyboardInterrupted, and always will sleep for given time interval. This will return at once if x is negative - :param x: the interval to wait + :param y: the interval to wait in seconds, can be also a time string :param abort_on_interrupt: whether to abort at once when KeyboardInterrupt is seen :returns: whether the function has completed its sleep naturally. False is seen on aborts thanks to KeyboardInterrupt only if abort_on_interrupt is True """ - if x < 0: + y = parse_time_string(y) + if y < 0: return with measure() as measurement: - while measurement() < x: + while measurement() < y: try: - time.sleep(x - measurement()) + time.sleep(y - measurement()) except KeyboardInterrupt: if abort_on_interrupt: return False diff --git a/tests/test_coding/test_concurrent.py b/tests/test_coding/test_concurrent.py index b7de4f5ceef1cc6a65f8bba511f729c3fb1f5720..83728a4c80d84a306f0f1c5207d25faae19273c5 100644 --- a/tests/test_coding/test_concurrent.py +++ b/tests/test_coding/test_concurrent.py @@ -283,7 +283,7 @@ class TestConcurrent(unittest.TestCase): def set_a(): a['test'] = True - tmr = Timer(1, set_a) + tmr = Timer('1s', set_a) tmr.start() time.sleep(2) self.assertTrue(a['test']) diff --git a/tests/test_instrumentation/test_cpu_time.py b/tests/test_instrumentation/test_cpu_time.py index 1de37b7200682283d2850ec69371aa4e4bc59b5c..d1afb6583fd346f950184bb8803ebe2849081ce5 100644 --- a/tests/test_instrumentation/test_cpu_time.py +++ b/tests/test_instrumentation/test_cpu_time.py @@ -9,7 +9,7 @@ class TestCPUTime(unittest.TestCase): def test_cpu_time_aware_terminable_thread(self): class TestingThread(CPUTimeAwareIntervalTerminableThread): def __init__(self): - super().__init__(5, 3, 0.5) + super().__init__('5s', 3, 0.5) self.a = 0 def loop(self) -> None: