From d6aa0eee2fe45a8c3b8c4904cc08a9411ae2a809 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ma=C5=9Blanka?= <piotr.maslanka@henrietta.com.pl> Date: Mon, 4 Mar 2024 09:42:42 +0100 Subject: [PATCH] Fix the docs and pylint, bump version to beta --- .pylintrc | 4 ++-- CHANGELOG.md | 4 ++++ docs/instrumentation/cpu_time.rst | 6 ++++++ satella/__init__.py | 2 +- satella/coding/sequences/sequences.py | 2 +- satella/instrumentation/cpu_time/collector.py | 16 +++++++++------- 6 files changed, 23 insertions(+), 11 deletions(-) diff --git a/.pylintrc b/.pylintrc index bff04040..0f3e65b7 100644 --- a/.pylintrc +++ b/.pylintrc @@ -3,5 +3,5 @@ disable= C0114, # missing-module-docstring C0116, # missing-function-docstring W0603, # global-statement - C0103 # invalid-name - + C0103, # invalid-name + R0913 # too-many-arguments diff --git a/CHANGELOG.md b/CHANGELOG.md index 516c3e9d..a069cc25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,3 +18,7 @@ Build process * added pylint to CodeClimate +Documentation +------------- + +* Improved the documentation for CPU usage functions - noted that there will be a thread spawned in the background. diff --git a/docs/instrumentation/cpu_time.rst b/docs/instrumentation/cpu_time.rst index 69170936..cc2c3325 100644 --- a/docs/instrumentation/cpu_time.rst +++ b/docs/instrumentation/cpu_time.rst @@ -4,9 +4,15 @@ CPU time Satella's cpu_time helps your processes play nice with the overall CPU usage, ie. deferring non-critical tasks to until CPU usage falls lower than the average. +You can even check how much CPU is your process using. + cpu_time does this by periodically monitoring CPU's usage and building your usage profile. The profile is refreshed each X minutes. +.. warning:: Note that running the following procedures will launch a background daemonic thread + spawning to gather data. This thread will run only once, and it will be daemonic, but it will use + some of the CPU. If you don't call any of these, the thread won't be spawned. + .. autofunction:: satella.instrumentation.cpu_time.calculate_occupancy_factor .. autofunction:: satella.instrumentation.cpu_time.sleep_cpu_aware diff --git a/satella/__init__.py b/satella/__init__.py index d1e6ad75..d264c9bb 100644 --- a/satella/__init__.py +++ b/satella/__init__.py @@ -1 +1 @@ -__version__ = '2.24.0a2' +__version__ = '2.24.0b1' diff --git a/satella/coding/sequences/sequences.py b/satella/coding/sequences/sequences.py index 14aa42f1..22482187 100644 --- a/satella/coding/sequences/sequences.py +++ b/satella/coding/sequences/sequences.py @@ -1 +1 @@ -import copy import typing as tp from satella.coding.decorators.decorators import wraps from satella.coding.recast_exceptions import rethrow_as from .iterators import n_th from ..typing import T, Iteratable, NoArgCallable, Predicate def infinite_iterator(returns: tp.Optional[T] = None, return_factory: tp.Optional[NoArgCallable[T]] = None) -> tp.Iterator[T]: """ Return an infinite number of objects. :param returns: object to return. Note that this will be this very object, it will not be copied. :param return_factory: a callable that takes 0 args and returns an element to return. :return: an infinite iterator of provided values """ while True: if returns is None: if return_factory is None: yield None else: yield return_factory() else: yield returns def make_list(element: T, n: int, deep_copy: bool = False) -> tp.List[T]: """ Make a list consisting of n times element. Element will be copied via copy.copy before adding to list. :param element: element :param n: times to repeat the element :param deep_copy: whether to use copy.deepcopy instead of copy.copy :return: list of length n """ output = [] if deep_copy: copy_op = copy.deepcopy else: copy_op = copy.copy for _ in range(n): output.append(copy_op(element)) return output # shamelessly copied from # https://medium.com/better-programming/is-this-the-last-element-of-my-python-for-loop-784f5ff90bb5 def is_last(lst: Iteratable) -> tp.Iterator[tp.Tuple[bool, T]]: """ Return every element of the list, alongside a flag telling is this the last element. Use like: >>> for is_last, element in is_last(my_list): >>> if is_last: >>> ... :param lst: list to iterate thru :return: a p_gen returning (bool, T) Note that this returns a nice, O(1) iterator. """ iterable = iter(lst) ret_var = next(iterable) for val in iterable: yield False, ret_var ret_var = val yield True, ret_var def add_next(lst: Iteratable, wrap_over: bool = False, skip_last: bool = False) -> tp.Iterator[tp.Tuple[T, tp.Optional[T]]]: """ Yields a 2-tuple of given iterable, presenting the next element as second element of the tuple. The last element will be the last element alongside with a None, if wrap_over is False, or the first element if wrap_over was True Example: >>> list(add_next([1, 2, 3, 4, 5])) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, None)] >>> list(add_next([1, 2, 3, 4, 5], True)) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, 1)] :param lst: iterable to iterate over :param wrap_over: whether to attach the first element to the pair of the last element instead of None :param skip_last: if this is True, then last element, alongside with a None, won't be output """ iterator = iter(lst) try: first_val = prev_val = next(iterator) except StopIteration: return for val in iterator: yield prev_val, val prev_val = val if wrap_over: yield prev_val, first_val else: if not skip_last: yield prev_val, None def half_cartesian(seq: tp.Iterable[T], include_same_pairs: bool = True) -> tp.Iterator[tp.Tuple[T, T]]: """ Generate half of the Cartesian product of both sequences. Useful when you have a commutative operation that you'd like to execute on both elements (eg. checking for collisions). Example: >>> list(half_cartesian([1, 2, 3], [1, 2, 3])) == \ >>> [(1, 1), (1, 2), (1, 3), (2, 2), (2, 3), (3, 3)] :param seq: The sequence :param include_same_pairs: if True, then pairs returning two of the same objects will be returned. For example, if False, the following will be true: >>> list(half_cartesian([1, 2, 3], [1, 2, 3], include_same_pairs=False)) == \ >>> [(1, 2), (1, 3), (2, 3)] """ for i, elem1 in enumerate(seq): for j, elem2 in enumerate(seq): if include_same_pairs: if j >= i: yield elem1, elem2 else: if j > i: yield elem1, elem2 def group_quantity(length: int, seq: Iteratable) -> tp.Iterator[tp.List[T]]: """ Slice an iterable into lists containing at most len entries. Eg. >>> assert list(group_quantity(3, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) == [[1, 2, 3], [4, 5, 6], >>> [7, 8, 9], [10]] This correctly detects sequences, and uses an optimized variant via slicing if a sequence is passed. You can safely pass ranges :param length: p_len for the returning sequences :param seq: sequence to split """ if isinstance(seq, tp.Sequence) and not isinstance(seq, range): i = 0 while i < len(seq): yield seq[i:i + length] i += length else: entries = [] for elem in seq: if len(entries) == length: yield entries entries = [elem] else: entries.append(elem) if entries: yield entries def filter_out_nones(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are not None :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item is not None: output.append(item) return output def filter_out_false(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are True :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item: output.append(item) return output @rethrow_as(IndexError, ValueError) def index_of_max(seq: tp.Sequence[T]) -> int: """ Return the index of the maximum element :param seq: sequence to examine :return: index of the maximum element :raise ValueError: sequence was empty """ max_index = 0 max_elem = seq[0] for i, elem in enumerate(seq): if elem > max_elem: max_index = i max_elem = elem return max_index def index_of(predicate: Predicate, seq: tp.Sequence[T]) -> int: """ Return an index of first met element that calling predicate on it returns True :param predicate: predicate to apply :param seq: sequence to examine :return: index of the element :raises ValueError: if no element found """ i = 0 for elem in seq: if predicate(elem): return i i += 1 raise ValueError('Element not found') class Multirun: """ A class to launch the same operation on the entire sequence. Consider: >>> class Counter: >>> def __init__(self, value=0): >>> self.count = value >>> def add(self, v): >>> self.count += 1 >>> def __eq__(self, other): >>> return self.count == other.count >>> def __iadd__(self, other): >>> self.add(other) >>> a = [Counter(), Counter()] The following: >>> for b in a: >>> b.add(2) Can be replaced with >>> Multirun(a).add(2) And the following: >>> for b in a: >>> b += 3 With this >>> b = Mulirun(a) >>> b += 3 Furthermore note that: >>> Multirun(a).add(2) == [Counter(2), Counter(2)] :param sequence: sequence to execute these operations for :param dont_return_list: the operation won't return a list if this is True """ __slots__ = ('sequence', 'dont_return_list') def __bool__(self) -> bool: return bool(self.sequence) def __init__(self, sequence: tp.Iterable, dont_return_list: bool = False): self.sequence = sequence self.dont_return_list = dont_return_list def __iter__(self): return iter(self.sequence) def __getattr__(self, item): def inner(*args, **kwargs): if not self.dont_return_list: results = [] for element in self: getattr(element, item)(*args, **kwargs) results.append(element) return results else: for element in self: getattr(element, item)(*args, **kwargs) # Take care: the array might just be empty... try: fun = getattr(n_th(self), item) inner = wraps(fun)(inner) except IndexError: pass return inner def __iadd__(self, other): for element in self: element += other return self def __isub__(self, other): for element in self: element -= other return self def __imul__(self, other): for element in self: element *= other return self def __itruediv__(self, other): for element in self: element /= other return self def __ifloordiv__(self, other): for element in self: element //= other return self def __ilshift__(self, other): for element in self: element <<= other return self def __irshift__(self, other): for element in self: element >>= other return self def __ipow__(self, other): for element in self: element **= other return self \ No newline at end of file +import copy import typing as tp from satella.coding.decorators.decorators import wraps from satella.coding.recast_exceptions import rethrow_as from .iterators import n_th from ..typing import T, Iteratable, NoArgCallable, Predicate def infinite_iterator(returns: tp.Optional[T] = None, return_factory: tp.Optional[NoArgCallable[T]] = None) -> tp.Iterator[T]: """ Return an infinite number of objects. :param returns: object to return. Note that this will be this very object, it will not be copied. :param return_factory: a callable that takes 0 args and returns an element to return. :return: an infinite iterator of provided values """ while True: if returns is None: if return_factory is None: yield None else: yield return_factory() else: yield returns def make_list(element: T, n: int, deep_copy: bool = False) -> tp.List[T]: """ Make a list consisting of n times element. Element will be copied via copy.copy before adding to list. :param element: element :param n: times to repeat the element :param deep_copy: whether to use copy.deepcopy instead of copy.copy :return: list of length n """ output = [] if deep_copy: copy_op = copy.deepcopy else: copy_op = copy.copy for _ in range(n): output.append(copy_op(element)) return output # shamelessly copied from # https://medium.com/better-programming/is-this-the-last-element-of-my-python-for-loop-784f5ff90bb5 def is_last(lst: Iteratable) -> tp.Iterator[tp.Tuple[bool, T]]: """ Return every element of the list, alongside a flag telling is this the last element. Use like: >>> for is_last, element in is_last(my_list): >>> if is_last: >>> ... :param lst: list to iterate thru :return: a p_gen returning (bool, T) Note that this returns a nice, O(1) iterator. """ iterable = iter(lst) ret_var = next(iterable) for val in iterable: yield False, ret_var ret_var = val yield True, ret_var def add_next(lst: Iteratable, wrap_over: bool = False, skip_last: bool = False) -> tp.Iterator[tp.Tuple[T, tp.Optional[T]]]: """ Yields a 2-tuple of given iterable, presenting the next element as second element of the tuple. The last element will be the last element alongside with a None, if wrap_over is False, or the first element if wrap_over was True Example: >>> list(add_next([1, 2, 3, 4, 5])) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, None)] >>> list(add_next([1, 2, 3, 4, 5], True)) == [(1, 2), (2, 3), (3, 4), (4, 5), (5, 1)] :param lst: iterable to iterate over :param wrap_over: whether to attach the first element to the pair of the last element instead of None :param skip_last: if this is True, then last element, alongside with a None, won't be output """ iterator = iter(lst) try: first_val = prev_val = next(iterator) except StopIteration: return for val in iterator: yield prev_val, val prev_val = val if wrap_over: yield prev_val, first_val else: if not skip_last: yield prev_val, None def half_cartesian(seq: tp.Iterable[T], include_same_pairs: bool = True) -> tp.Iterator[tp.Tuple[T, T]]: """ Generate half of the Cartesian product of both sequences. Useful when you have a commutative operation that you'd like to execute on both elements (eg. checking for collisions). Example: >>> list(half_cartesian([1, 2, 3], [1, 2, 3])) == \ >>> [(1, 1), (1, 2), (1, 3), (2, 2), (2, 3), (3, 3)] :param seq: The sequence :param include_same_pairs: if True, then pairs returning two of the same objects will be returned. For example, if False, the following will be true: >>> list(half_cartesian([1, 2, 3], [1, 2, 3], include_same_pairs=False)) == \ >>> [(1, 2), (1, 3), (2, 3)] """ for i, elem1 in enumerate(seq): for j, elem2 in enumerate(seq): if include_same_pairs: if j >= i: yield elem1, elem2 else: if j > i: yield elem1, elem2 def group_quantity(length: int, seq: Iteratable) -> tp.Iterator[tp.List[T]]: """ Slice an iterable into lists containing at most len entries. Eg. >>> assert list(group_quantity(3, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) == [[1, 2, 3], [4, 5, 6], >>> [7, 8, 9], [10]] This correctly detects sequences, and uses an optimized variant via slicing if a sequence is passed. You can safely pass ranges :param length: p_len for the returning sequences :param seq: sequence to split """ if isinstance(seq, tp.Sequence) and not isinstance(seq, range): i = 0 while i < len(seq): yield seq[i:i + length] i += length else: entries = [] for elem in seq: if len(entries) == length: yield entries entries = [elem] else: entries.append(elem) if entries: yield entries def filter_out_nones(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are not None :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item is not None: output.append(item) return output def filter_out_false(y: tp.Sequence[T]) -> tp.List[T]: """ Return all elements, as a list, that are True :param y: a sequence of items :return: a list of all subelements, in order, that are not None """ output = [] for item in y: if item: output.append(item) return output @rethrow_as(IndexError, ValueError) def index_of_max(seq: tp.Sequence[T]) -> int: """ Return the index of the maximum element :param seq: sequence to examine :return: index of the maximum element :raise ValueError: sequence was empty """ max_index = 0 max_elem = seq[0] for i, elem in enumerate(seq): if elem > max_elem: max_index = i max_elem = elem return max_index def index_of(predicate: Predicate, seq: tp.Sequence[T]) -> int: """ Return an index of first met element that calling predicate on it returns True :param predicate: predicate to apply :param seq: sequence to examine :return: index of the element :raises ValueError: if no element found """ i = 0 for elem in seq: if predicate(elem): return i i += 1 raise ValueError('Element not found') class Multirun: """ A class to launch the same operation on the entire sequence. Consider: >>> class Counter: >>> def __init__(self, value=0): >>> self.count = value >>> def add(self, v): >>> self.count += 1 >>> def __eq__(self, other): >>> return self.count == other.count >>> def __iadd__(self, other): >>> self.add(other) >>> a = [Counter(), Counter()] The following: >>> for b in a: >>> b.add(2) Can be replaced with >>> Multirun(a).add(2) And the following: >>> for b in a: >>> b += 3 With this >>> b = Mulirun(a) >>> b += 3 Furthermore note that: >>> Multirun(a).add(2) == [Counter(2), Counter(2)] :param sequence: sequence to execute these operations for :param dont_return_list: the operation won't return a list if this is True """ __slots__ = 'sequence', 'dont_return_list' def __bool__(self) -> bool: return bool(self.sequence) def __init__(self, sequence: tp.Iterable, dont_return_list: bool = False): self.sequence = sequence self.dont_return_list = dont_return_list def __iter__(self): return iter(self.sequence) def __getattr__(self, item): def inner(*args, **kwargs): if not self.dont_return_list: results = [] for element in self: getattr(element, item)(*args, **kwargs) results.append(element) return results else: for element in self: getattr(element, item)(*args, **kwargs) # Take care: the array might just be empty... try: fun = getattr(n_th(self), item) inner = wraps(fun)(inner) except IndexError: pass return inner def __iadd__(self, other): for element in self: element += other return self def __isub__(self, other): for element in self: element -= other return self def __imul__(self, other): for element in self: element *= other return self def __itruediv__(self, other): for element in self: element /= other return self def __ifloordiv__(self, other): for element in self: element //= other return self def __ilshift__(self, other): for element in self: element <<= other return self def __irshift__(self, other): for element in self: element >>= other return self def __ipow__(self, other): for element in self: element **= other return self \ No newline at end of file diff --git a/satella/instrumentation/cpu_time/collector.py b/satella/instrumentation/cpu_time/collector.py index 06648f93..f6243f72 100644 --- a/satella/instrumentation/cpu_time/collector.py +++ b/satella/instrumentation/cpu_time/collector.py @@ -80,10 +80,12 @@ class _CPUProfileBuilderThread(threading.Thread): time_p, times_v = self.own_load_average[-2] time_c, times_c = self.own_load_average[-1] difference = time_c - time_p - tp = {} - for field in times_v._fields: - tp[field] = (getattr(times_c, field) - getattr(times_v, field)) / difference - return pCPUtimes(**tp) + if difference == 0: + return None + tuple_build = {} + for field in times_v._fields: # pylint: disable=protected-access + tuple_build[field] = (getattr(times_c, field) - getattr(times_v, field)) / difference + return pCPUtimes(**tuple_build) def request_percentile(self, percent: float) -> None: if percent not in self.percentiles_requested: @@ -246,8 +248,8 @@ def get_own_cpu_usage() -> tp.Optional[pCPUtimes]: """ Return own CPU usage (this process only) - :return: a namedtuple of (user, system, children_user, children_system, iowait) divided by number of seconds that - passed since the last measure. - or None if data not yet ready + :return: None if data not ready (just try again in some time), or a a namedtuple of (user, system, children_user, + children_system, iowait) divided by number of seconds that passed since the last measure. The result is divided + by passed time, so a 1 means 100% during the time, and 0.5 means exactly 50% of the CPU used. """ return _CPUProfileBuilderThread.get_instance().get_own_cpu_usage() -- GitLab